5. Updating the application code
The data now lives in PostgreSQL. The Flask app needs three changes: use a connection pool, switch SQL placeholders from ? to %s, and treat raw_json as PostgreSQL JSONB instead of SQLite text.
Create a connection module
SQLite code often opens a new connection for each operation. In production PostgreSQL, use a small pool so requests can borrow and return connections efficiently.
import os
from psycopg2.pool import ThreadedConnectionPool
from psycopg2.extras import RealDictCursor
pool = None
def init_db_pool():
global pool
if pool is None:
pool = ThreadedConnectionPool(
minconn=2,
maxconn=10,
dsn=os.getenv(
'DATABASE_URL', 'postgresql://localhost/music_time_machine_pg'
),
)
def get_db_connection():
if pool is None:
init_db_pool()
return pool.getconn()
def return_db_connection(conn):
if pool is not None and conn is not None:
pool.putconn(conn)
def close_db_pool():
if pool is not None:
pool.closeall()
Placeholder changes
SQLite uses ? placeholders. psycopg2 uses %s. The value still travels separately from the SQL string, so the query remains parameterised.
| SQLite | PostgreSQL |
|---|---|
WHERE track_id = ? |
WHERE track_id = %s |
INSERT OR IGNORE |
ON CONFLICT DO NOTHING |
date('now', '-30 days') |
CURRENT_DATE - INTERVAL '30 days' |
json.loads(raw_json) in Python |
raw_json->'album'->>'release_date' in SQL |
Saving tracks with raw_json
When a fresh Spotify track object arrives, keep the normal columns and send the whole object through psycopg2's Json adapter for the JSONB column.
from psycopg2.extras import Json
from database import get_db_connection, return_db_connection
def save_track(track):
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("""
INSERT INTO tracks (
track_id, name, artist_name, album_name,
duration_ms, spotify_url, raw_json
)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (track_id) DO UPDATE SET
name = EXCLUDED.name,
artist_name = EXCLUDED.artist_name,
album_name = EXCLUDED.album_name,
duration_ms = EXCLUDED.duration_ms,
spotify_url = EXCLUDED.spotify_url,
raw_json = EXCLUDED.raw_json,
updated_at = NOW()
""", (
track['id'],
track['name'],
track['artists'][0]['name'],
track['album']['name'],
track['duration_ms'],
track.get('external_urls', {}).get('spotify'),
Json(track),
))
conn.commit()
finally:
return_db_connection(conn)
Reading JSONB fields
JSONB lets the dashboard ask provider-payload questions without parsing every row in Python. For example, this query returns display columns plus fields extracted from the cached raw object:
def get_track_detail(track_id):
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT
track_id,
name,
artist_name,
album_name,
duration_ms,
spotify_url,
raw_json->'album'->>'release_date' AS release_date,
raw_json->'external_ids'->>'isrc' AS isrc
FROM tracks
WHERE track_id = %s
""", (track_id,))
return cursor.fetchone()
finally:
return_db_connection(conn)
->returns a JSON value, useful when chaining into nested objects.->>returns text, useful when rendering or casting.raw_json ? 'album'checks whether the JSON object has a key.raw_json @> '{"explicit": true}'::jsonbchecks containment.
Update the snapshot insert
The snapshot insert keeps the same idempotent semantics as SQLite. PostgreSQL expresses them with ON CONFLICT.
def save_snapshot_row(conn, track_id, snapshot_date, time_range, rank):
with conn.cursor() as cursor:
cursor.execute("""
INSERT INTO snapshots (track_id, snapshot_date, time_range, rank)
VALUES (%s, %s, %s, %s)
ON CONFLICT (track_id, snapshot_date, time_range)
DO UPDATE SET rank = EXCLUDED.rank
""", (track_id, snapshot_date, time_range, rank))
If those paths run end-to-end without errors, the app's read and write paths are on PostgreSQL. Next, in section 6, we update dashboard queries and test concurrent writes.