4. Migrating data from SQLite to PostgreSQL
The schema exists. Now the data has to move without changing meaning. Tracks go first because snapshots reference them. The key transformation is tracks.raw_json: SQLite stores it as text, while PostgreSQL stores it as queryable JSONB.
Migration script shape
The script opens both databases, migrates tracks, migrates snapshots, then verifies row counts and JSONB queryability. It is safe to rerun because inserts use ON CONFLICT. The default DSN points at the local music_time_machine_pg database; setting DATABASE_URL (as .env.production does in section 7) points the same script at production.
import json
import os
import sqlite3
import psycopg2
from psycopg2.extras import Json
from dotenv import load_dotenv
load_dotenv()
SQLITE_PATH = os.getenv('SQLITE_PATH', 'music_time_machine.db')
POSTGRES_DSN = os.getenv(
'DATABASE_URL', 'postgresql://localhost/music_time_machine_pg'
)
def connect_sqlite():
conn = sqlite3.connect(SQLITE_PATH)
conn.row_factory = sqlite3.Row
return conn
def connect_postgres():
return psycopg2.connect(POSTGRES_DSN)
def parse_raw_json(raw_json):
"""Convert SQLite TEXT into a Python object suitable for JSONB."""
if not raw_json:
return None
try:
return json.loads(raw_json)
except json.JSONDecodeError:
return None
Migrating tracks
The load-bearing line is Json(parse_raw_json(row['raw_json'])). psycopg2's Json adapter serialises the Python object as JSON for PostgreSQL, and PostgreSQL stores it as JSONB because the target column has that type.
def migrate_tracks(sqlite_conn, pg_conn):
sqlite_cursor = sqlite_conn.cursor()
pg_cursor = pg_conn.cursor()
sqlite_cursor.execute("""
SELECT track_id, name, artist_name, album_name,
duration_ms, spotify_url, raw_json
FROM tracks
""")
rows = sqlite_cursor.fetchall()
for row in rows:
pg_cursor.execute("""
INSERT INTO tracks (
track_id, name, artist_name, album_name,
duration_ms, spotify_url, raw_json
)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (track_id) DO UPDATE SET
name = EXCLUDED.name,
artist_name = EXCLUDED.artist_name,
album_name = EXCLUDED.album_name,
duration_ms = EXCLUDED.duration_ms,
spotify_url = EXCLUDED.spotify_url,
raw_json = EXCLUDED.raw_json,
updated_at = NOW()
""", (
row['track_id'],
row['name'],
row['artist_name'],
row['album_name'],
row['duration_ms'],
row['spotify_url'],
Json(parse_raw_json(row['raw_json'])),
))
pg_conn.commit()
print(f"Migrated {len(rows)} tracks")
Migrating snapshots
Snapshots preserve the same composite key as SQLite. If a migration dies halfway through, rerunning it updates missing rows without duplicating existing ones.
def migrate_snapshots(sqlite_conn, pg_conn):
sqlite_cursor = sqlite_conn.cursor()
pg_cursor = pg_conn.cursor()
sqlite_cursor.execute("""
SELECT track_id, snapshot_date, time_range, rank
FROM snapshots
""")
rows = sqlite_cursor.fetchall()
for row in rows:
pg_cursor.execute("""
INSERT INTO snapshots (
track_id, snapshot_date, time_range, rank
)
VALUES (%s, %s, %s, %s)
ON CONFLICT (track_id, snapshot_date, time_range)
DO UPDATE SET rank = EXCLUDED.rank
""", (
row['track_id'],
row['snapshot_date'],
row['time_range'],
row['rank'],
))
pg_conn.commit()
print(f"Migrated {len(rows)} snapshots")
Verification
Counting rows is the minimum check. The second check proves raw_json arrived as a real JSONB object, not as a JSON string wrapped inside JSONB.
def verify_migration(sqlite_conn, pg_conn):
sqlite_cursor = sqlite_conn.cursor()
pg_cursor = pg_conn.cursor()
for table in ('tracks', 'snapshots'):
sqlite_cursor.execute(f"SELECT COUNT(*) FROM {table}")
sqlite_count = sqlite_cursor.fetchone()[0]
pg_cursor.execute(f"SELECT COUNT(*) FROM {table}")
pg_count = pg_cursor.fetchone()[0]
print(f"{table}: SQLite={sqlite_count}, PostgreSQL={pg_count}")
if sqlite_count != pg_count:
raise RuntimeError(f"Row count mismatch for {table}")
pg_cursor.execute("""
SELECT COUNT(*)
FROM tracks
WHERE raw_json IS NOT NULL
AND jsonb_typeof(raw_json) = 'object'
""")
object_count = pg_cursor.fetchone()[0]
print(f"{object_count} tracks have queryable raw_json objects")
pg_cursor.execute("""
SELECT name,
raw_json->'album'->>'release_date' AS release_date,
raw_json->>'explicit' AS explicit
FROM tracks
WHERE raw_json ? 'album'
LIMIT 1
""")
sample = pg_cursor.fetchone()
if sample:
print(
f"Sample track '{sample[0]}': "
f"release_date={sample[1]}, explicit={sample[2]}"
)
Run the migration
def main():
sqlite_conn = connect_sqlite()
pg_conn = connect_postgres()
try:
migrate_tracks(sqlite_conn, pg_conn)
migrate_snapshots(sqlite_conn, pg_conn)
verify_migration(sqlite_conn, pg_conn)
finally:
sqlite_conn.close()
pg_conn.close()
if __name__ == '__main__':
main()
If verification passes, the PostgreSQL database has the same rows as SQLite, plus a better representation of the cached track payload. Next, in section 5, we update the Flask app to read and write through PostgreSQL.