4. Migrating data from SQLite to PostgreSQL

The schema exists. Now the data has to move without changing meaning. Tracks go first because snapshots reference them. The key transformation is tracks.raw_json: SQLite stores it as text, while PostgreSQL stores it as queryable JSONB.

Migration script shape

The script opens both databases, migrates tracks, migrates snapshots, then verifies row counts and JSONB queryability. It is safe to rerun because inserts use ON CONFLICT. The default DSN points at the local music_time_machine_pg database; setting DATABASE_URL (as .env.production does in section 7) points the same script at production.

migrate_to_postgresql.py
import json
import os
import sqlite3

import psycopg2
from psycopg2.extras import Json
from dotenv import load_dotenv

load_dotenv()

SQLITE_PATH = os.getenv('SQLITE_PATH', 'music_time_machine.db')
POSTGRES_DSN = os.getenv(
    'DATABASE_URL', 'postgresql://localhost/music_time_machine_pg'
)


def connect_sqlite():
    conn = sqlite3.connect(SQLITE_PATH)
    conn.row_factory = sqlite3.Row
    return conn


def connect_postgres():
    return psycopg2.connect(POSTGRES_DSN)


def parse_raw_json(raw_json):
    """Convert SQLite TEXT into a Python object suitable for JSONB."""
    if not raw_json:
        return None
    try:
        return json.loads(raw_json)
    except json.JSONDecodeError:
        return None

Migrating tracks

The load-bearing line is Json(parse_raw_json(row['raw_json'])). psycopg2's Json adapter serialises the Python object as JSON for PostgreSQL, and PostgreSQL stores it as JSONB because the target column has that type.

migrate_to_postgresql.py (continued)
def migrate_tracks(sqlite_conn, pg_conn):
    sqlite_cursor = sqlite_conn.cursor()
    pg_cursor = pg_conn.cursor()

    sqlite_cursor.execute("""
        SELECT track_id, name, artist_name, album_name,
               duration_ms, spotify_url, raw_json
        FROM tracks
    """)
    rows = sqlite_cursor.fetchall()

    for row in rows:
        pg_cursor.execute("""
            INSERT INTO tracks (
                track_id, name, artist_name, album_name,
                duration_ms, spotify_url, raw_json
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (track_id) DO UPDATE SET
                name = EXCLUDED.name,
                artist_name = EXCLUDED.artist_name,
                album_name = EXCLUDED.album_name,
                duration_ms = EXCLUDED.duration_ms,
                spotify_url = EXCLUDED.spotify_url,
                raw_json = EXCLUDED.raw_json,
                updated_at = NOW()
        """, (
            row['track_id'],
            row['name'],
            row['artist_name'],
            row['album_name'],
            row['duration_ms'],
            row['spotify_url'],
            Json(parse_raw_json(row['raw_json'])),
        ))

    pg_conn.commit()
    print(f"Migrated {len(rows)} tracks")

Migrating snapshots

Snapshots preserve the same composite key as SQLite. If a migration dies halfway through, rerunning it updates missing rows without duplicating existing ones.

migrate_to_postgresql.py (continued)
def migrate_snapshots(sqlite_conn, pg_conn):
    sqlite_cursor = sqlite_conn.cursor()
    pg_cursor = pg_conn.cursor()

    sqlite_cursor.execute("""
        SELECT track_id, snapshot_date, time_range, rank
        FROM snapshots
    """)
    rows = sqlite_cursor.fetchall()

    for row in rows:
        pg_cursor.execute("""
            INSERT INTO snapshots (
                track_id, snapshot_date, time_range, rank
            )
            VALUES (%s, %s, %s, %s)
            ON CONFLICT (track_id, snapshot_date, time_range)
            DO UPDATE SET rank = EXCLUDED.rank
        """, (
            row['track_id'],
            row['snapshot_date'],
            row['time_range'],
            row['rank'],
        ))

    pg_conn.commit()
    print(f"Migrated {len(rows)} snapshots")

Verification

Counting rows is the minimum check. The second check proves raw_json arrived as a real JSONB object, not as a JSON string wrapped inside JSONB.

migrate_to_postgresql.py (continued)
def verify_migration(sqlite_conn, pg_conn):
    sqlite_cursor = sqlite_conn.cursor()
    pg_cursor = pg_conn.cursor()

    for table in ('tracks', 'snapshots'):
        sqlite_cursor.execute(f"SELECT COUNT(*) FROM {table}")
        sqlite_count = sqlite_cursor.fetchone()[0]

        pg_cursor.execute(f"SELECT COUNT(*) FROM {table}")
        pg_count = pg_cursor.fetchone()[0]

        print(f"{table}: SQLite={sqlite_count}, PostgreSQL={pg_count}")
        if sqlite_count != pg_count:
            raise RuntimeError(f"Row count mismatch for {table}")

    pg_cursor.execute("""
        SELECT COUNT(*)
        FROM tracks
        WHERE raw_json IS NOT NULL
          AND jsonb_typeof(raw_json) = 'object'
    """)
    object_count = pg_cursor.fetchone()[0]
    print(f"{object_count} tracks have queryable raw_json objects")

    pg_cursor.execute("""
        SELECT name,
               raw_json->'album'->>'release_date' AS release_date,
               raw_json->>'explicit' AS explicit
        FROM tracks
        WHERE raw_json ? 'album'
        LIMIT 1
    """)
    sample = pg_cursor.fetchone()
    if sample:
        print(
            f"Sample track '{sample[0]}': "
            f"release_date={sample[1]}, explicit={sample[2]}"
        )

Run the migration

migrate_to_postgresql.py (continued)
def main():
    sqlite_conn = connect_sqlite()
    pg_conn = connect_postgres()
    try:
        migrate_tracks(sqlite_conn, pg_conn)
        migrate_snapshots(sqlite_conn, pg_conn)
        verify_migration(sqlite_conn, pg_conn)
    finally:
        sqlite_conn.close()
        pg_conn.close()


if __name__ == '__main__':
    main()

If verification passes, the PostgreSQL database has the same rows as SQLite, plus a better representation of the cached track payload. Next, in section 5, we update the Flask app to read and write through PostgreSQL.