5. Updating the application code

The data now lives in PostgreSQL. The Flask app needs three changes: use a connection pool, switch SQL placeholders from ? to %s, and treat raw_json as PostgreSQL JSONB instead of SQLite text.

Create a connection module

SQLite code often opens a new connection for each operation. In production PostgreSQL, use a small pool so requests can borrow and return connections efficiently.

database.py
import os

from psycopg2.pool import ThreadedConnectionPool
from psycopg2.extras import RealDictCursor

pool = None


def init_db_pool():
    global pool
    if pool is None:
        pool = ThreadedConnectionPool(
            minconn=2,
            maxconn=10,
            dsn=os.getenv(
                'DATABASE_URL', 'postgresql://localhost/music_time_machine_pg'
            ),
        )


def get_db_connection():
    if pool is None:
        init_db_pool()
    return pool.getconn()


def return_db_connection(conn):
    if pool is not None and conn is not None:
        pool.putconn(conn)


def close_db_pool():
    if pool is not None:
        pool.closeall()

Placeholder changes

SQLite uses ? placeholders. psycopg2 uses %s. The value still travels separately from the SQL string, so the query remains parameterised.

SQLite PostgreSQL
WHERE track_id = ? WHERE track_id = %s
INSERT OR IGNORE ON CONFLICT DO NOTHING
date('now', '-30 days') CURRENT_DATE - INTERVAL '30 days'
json.loads(raw_json) in Python raw_json->'album'->>'release_date' in SQL

Saving tracks with raw_json

When a fresh Spotify track object arrives, keep the normal columns and send the whole object through psycopg2's Json adapter for the JSONB column.

tracks_db.py
from psycopg2.extras import Json

from database import get_db_connection, return_db_connection


def save_track(track):
    conn = get_db_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("""
                INSERT INTO tracks (
                    track_id, name, artist_name, album_name,
                    duration_ms, spotify_url, raw_json
                )
                VALUES (%s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (track_id) DO UPDATE SET
                    name = EXCLUDED.name,
                    artist_name = EXCLUDED.artist_name,
                    album_name = EXCLUDED.album_name,
                    duration_ms = EXCLUDED.duration_ms,
                    spotify_url = EXCLUDED.spotify_url,
                    raw_json = EXCLUDED.raw_json,
                    updated_at = NOW()
            """, (
                track['id'],
                track['name'],
                track['artists'][0]['name'],
                track['album']['name'],
                track['duration_ms'],
                track.get('external_urls', {}).get('spotify'),
                Json(track),
            ))
        conn.commit()
    finally:
        return_db_connection(conn)

Reading JSONB fields

JSONB lets the dashboard ask provider-payload questions without parsing every row in Python. For example, this query returns display columns plus fields extracted from the cached raw object:

app.py
def get_track_detail(track_id):
    conn = get_db_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("""
                SELECT
                    track_id,
                    name,
                    artist_name,
                    album_name,
                    duration_ms,
                    spotify_url,
                    raw_json->'album'->>'release_date' AS release_date,
                    raw_json->'external_ids'->>'isrc' AS isrc
                FROM tracks
                WHERE track_id = %s
            """, (track_id,))
            return cursor.fetchone()
    finally:
        return_db_connection(conn)
  • -> returns a JSON value, useful when chaining into nested objects.
  • ->> returns text, useful when rendering or casting.
  • raw_json ? 'album' checks whether the JSON object has a key.
  • raw_json @> '{"explicit": true}'::jsonb checks containment.

Update the snapshot insert

The snapshot insert keeps the same idempotent semantics as SQLite. PostgreSQL expresses them with ON CONFLICT.

snapshots.py
def save_snapshot_row(conn, track_id, snapshot_date, time_range, rank):
    with conn.cursor() as cursor:
        cursor.execute("""
            INSERT INTO snapshots (track_id, snapshot_date, time_range, rank)
            VALUES (%s, %s, %s, %s)
            ON CONFLICT (track_id, snapshot_date, time_range)
            DO UPDATE SET rank = EXCLUDED.rank
        """, (track_id, snapshot_date, time_range, rank))

If those paths run end-to-end without errors, the app's read and write paths are on PostgreSQL. Next, in section 6, we update dashboard queries and test concurrent writes.