7. Migrating the weather cache

Schema, data, application code, tests. You'll execute the four-phase workflow end to end on the weather cache from Chapter 15: create the PostgreSQL schema, migrate the rows with progress tracking and verification, port the application code to psycopg2, and test that everything works identically.

Let's migrate a real database. This weather cache stores API responses from OpenWeatherMap. It's simple enough to understand every step, but realistic enough to demonstrate actual migration challenges.

You'll see the complete process: creating the PostgreSQL schema, writing the data migration script, converting application code, and testing the results. Every step here transfers to larger migrations.

Creating the PostgreSQL schema

First, create the table structure in PostgreSQL. This SQL creates the same schema as our SQLite database, but with PostgreSQL-specific types. The script reads its credentials from the .env file you created in Section 4. Save this as create_schema.py at the project root:

create_schema.py
import os
import psycopg2
from dotenv import load_dotenv

load_dotenv()

# Connect to PostgreSQL with credentials from .env
conn = psycopg2.connect(
    host=os.getenv('DB_HOST', 'localhost'),
    database=os.getenv('DB_NAME', 'weather_db'),
    user=os.getenv('DB_USER'),
    password=os.getenv('DB_PASSWORD')
)

try:
    with conn.cursor() as cursor:
        # Create weather_history table with PostgreSQL types
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS weather_history (
                id SERIAL PRIMARY KEY,
                location VARCHAR(100) NOT NULL,
                temperature NUMERIC(5, 2),
                conditions TEXT,
                timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
                created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
            )
        """)

        # Create index for common queries
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_location_timestamp
            ON weather_history(location, timestamp DESC)
        """)

    conn.commit()
    print("PostgreSQL schema created successfully")

except psycopg2.Error as e:
    conn.rollback()
    print(f"Error creating schema: {e}")

finally:
    conn.close()

What changed from SQLite

Six things in this schema are different from the SQLite original. Each one is a deliberate choice that PostgreSQL gives you for free:

  • SERIAL instead of INTEGER PRIMARY KEY. PostgreSQL's way of auto-incrementing IDs.
  • VARCHAR(100) instead of TEXT. Enforces a reasonable length limit on location names.
  • NUMERIC(5, 2) instead of REAL. Precise decimals for temperatures (max 999.99).
  • TIMESTAMP WITH TIME ZONE. Proper timestamp type that stores timezone information.
  • Added created_at column. Tracks when records were inserted (a common pattern in production).
  • Added an index. Optimises queries that filter by location and timestamp.

Writing the migration script

Now write a script that reads data from SQLite and writes it to PostgreSQL. This script handles type conversions and provides progress feedback for large datasets. Save it as migrate_weather.py at the project root:

migrate_weather.py
import os
import sqlite3
import psycopg2
from datetime import datetime
from dotenv import load_dotenv

load_dotenv()

def migrate_weather_data():
    # Connect to both databases
    sqlite_conn = sqlite3.connect('weather.db')
    pg_conn = psycopg2.connect(
        host=os.getenv('DB_HOST', 'localhost'),
        database=os.getenv('DB_NAME', 'weather_db'),
        user=os.getenv('DB_USER'),
        password=os.getenv('DB_PASSWORD')
    )

    try:
        # Count total rows to migrate
        sqlite_cursor = sqlite_conn.cursor()
        sqlite_cursor.execute("SELECT COUNT(*) FROM weather_history")
        total_rows = sqlite_cursor.fetchone()[0]
        print(f"Migrating {total_rows} rows from SQLite to PostgreSQL...")

        # Fetch all data from SQLite
        sqlite_cursor.execute("""
            SELECT id, location, temperature, conditions, timestamp
            FROM weather_history
            ORDER BY id
        """)

        # Insert into PostgreSQL in batches
        pg_cursor = pg_conn.cursor()
        batch_size = 1000
        migrated = 0

        while True:
            rows = sqlite_cursor.fetchmany(batch_size)
            if not rows:
                break

            # Prepare batch insert
            for row in rows:
                # Convert timestamp string to datetime object
                # SQLite stores as TEXT, PostgreSQL needs proper timestamp
                timestamp_str = row[4]
                timestamp_dt = datetime.fromisoformat(timestamp_str)

                pg_cursor.execute("""
                    INSERT INTO weather_history
                    (location, temperature, conditions, timestamp)
                    VALUES (%s, %s, %s, %s)
                """, (row[1], row[2], row[3], timestamp_dt))

                migrated += 1
                if migrated % 100 == 0:
                    print(f"  Migrated {migrated}/{total_rows} rows...")

            # Commit batch
            pg_conn.commit()

        print(f"\nMigration complete! {migrated} rows transferred.")

        # Verify counts match
        pg_cursor.execute("SELECT COUNT(*) FROM weather_history")
        pg_count = pg_cursor.fetchone()[0]

        if pg_count == total_rows:
            print(f"\u2713 Verification passed: {pg_count} rows in PostgreSQL")
        else:
            print(f"\u2717 Count mismatch: SQLite={total_rows}, PostgreSQL={pg_count}")

    except Exception as e:
        pg_conn.rollback()
        print(f"Migration failed: {e}")
        raise

    finally:
        sqlite_conn.close()
        pg_conn.close()

if __name__ == '__main__':
    migrate_weather_data()

Run it from the project root:

Terminal
python migrate_weather.py

You'll see output like:

Terminal
Migrating 2543 rows from SQLite to PostgreSQL...
  Migrated 100/2543 rows...
  Migrated 200/2543 rows...
  Migrated 300/2543 rows...
  ...
  Migrated 2500/2543 rows...

Migration complete! 2543 rows transferred.
✓ Verification passed: 2543 rows in PostgreSQL

Making the migration re-runnable

If anything goes wrong partway through and you re-run migrate_weather.py, the second run will fail or insert duplicates: every row in SQLite gets inserted again, even the ones that already made it across on the first try.

PostgreSQL's ON CONFLICT clause solves this. Add it to the INSERT statement so PostgreSQL skips rows that would create a conflict on a constraint you nominate. The smallest change that makes the migration idempotent is:

pg_cursor.execute("""
    INSERT INTO weather_history
    (id, location, temperature, conditions, timestamp)
    VALUES (%s, %s, %s, %s, %s)
    ON CONFLICT (id) DO NOTHING
""", (row[0], row[1], row[2], row[3], timestamp_dt))

This preserves the original SQLite id values (which is why the INSERT now passes row[0] too) and asks PostgreSQL to ignore any row whose id already exists. Re-running the script becomes safe: completed rows are skipped, only missing rows are inserted. ON CONFLICT is a PostgreSQL feature SQLite doesn't offer, and it shows up again later when you want UPSERT semantics (ON CONFLICT (key) DO UPDATE SET ...).

Preserving the original ids has one consequence to clean up: inserting explicit values into a SERIAL column does not advance its underlying sequence. After the migration the sequence still points at 1, so the next save_weather() call, which omits id and lets PostgreSQL assign one, collides with the migrated id = 1 and raises a UniqueViolation. Reset the sequence to the current maximum once, right after the migration finishes:

pg_cursor.execute("""
    SELECT setval(
        pg_get_serial_sequence('weather_history', 'id'),
        COALESCE((SELECT MAX(id) FROM weather_history), 1)
    )
""")

With the sequence advanced past the highest migrated id, auto-generated ids resume cleanly and never collide with the rows you carried over.

Migration script best practices

Five things the migration script does deliberately, each one a habit worth carrying forward:

  • Batch processing. Fetching 1,000 rows at a time and committing once per batch is faster, and lighter on memory, than committing after every row.
  • Progress reporting. For large migrations, progress updates prevent you from wondering if the script is frozen.
  • Type conversion. The script explicitly converts SQLite's TEXT timestamps to Python datetime objects that PostgreSQL expects.
  • Verification. Always compare row counts after migration to catch data loss.
  • Error handling. If anything fails, rollback() prevents partial data in PostgreSQL.

Converting application code

With data migrated, update your application code to use PostgreSQL. The changes are minimal but critical. Here's the original SQLite version. Save it as weather_app.py (SQLite version) for reference, or if you've been carrying it forward from Chapter 15, just open it ready to edit:

weather_app.py (SQLite version)
import sqlite3
from datetime import datetime, timezone

def save_weather(location, temperature, conditions):
    with sqlite3.connect('weather.db') as conn:
        conn.execute("""
            INSERT INTO weather_history
            (location, temperature, conditions, timestamp)
            VALUES (?, ?, ?, ?)
        """, (location, temperature, conditions, datetime.now(timezone.utc)))

def get_recent_weather(location, days=7):
    with sqlite3.connect('weather.db') as conn:
        cursor = conn.execute("""
            SELECT temperature, conditions, timestamp
            FROM weather_history
            WHERE location = ?
            AND timestamp >= datetime('now', ? || ' days')
            ORDER BY timestamp DESC
        """, (location, -days))
        return cursor.fetchall()

Here's the PostgreSQL version. Save it as weather_app.py at the project root (replacing the SQLite version):

weather_app.py
import psycopg2
from datetime import datetime, timezone, timedelta
import os

def get_connection():
    """Get database connection - change this one function to switch databases"""
    return psycopg2.connect(
        host=os.getenv('DB_HOST', 'localhost'),
        database=os.getenv('DB_NAME', 'weather_db'),
        user=os.getenv('DB_USER', 'your_username'),
        password=os.getenv('DB_PASSWORD', '')
    )

def save_weather(location, temperature, conditions):
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("""
                INSERT INTO weather_history
                (location, temperature, conditions, timestamp)
                VALUES (%s, %s, %s, %s)
            """, (location, temperature, conditions, datetime.now(timezone.utc)))
        conn.commit()  # MUST commit - PostgreSQL doesn't auto-commit
    except psycopg2.Error as e:
        conn.rollback()
        print(f"Database error: {e}")
        raise
    finally:
        conn.close()

def get_recent_weather(location, days=7):
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cutoff_date = datetime.now(timezone.utc) - timedelta(days=days)
            cursor.execute("""
                SELECT temperature, conditions, timestamp
                FROM weather_history
                WHERE location = %s
                AND timestamp >= %s
                ORDER BY timestamp DESC
            """, (location, cutoff_date))
            return cursor.fetchall()
    finally:
        conn.close()

Key code changes

Six things changed in the move from sqlite3 to psycopg2. Each one is a small edit, and each one will bite if you miss it:

  • Import changed. sqlite3psycopg2.
  • Connection centralised. A get_connection() function makes it easy to switch databases or configure from environment variables.
  • Placeholders changed. ?%s in all SQL queries.
  • Explicit commits required. Must call conn.commit() after write operations.
  • Date math changed. SQLite's datetime('now', '-7 days') becomes Python's timedelta calculation.
  • Connections must be closed. Use finally blocks to ensure cleanup.

Testing the migration

After converting code, test thoroughly. Run your existing tests (if you have them) and manually verify critical operations. Save the following as test_migration.py at the project root:

test_migration.py
from numbers import Number

from weather_app import save_weather, get_recent_weather

def test_migration():
    """Test that PostgreSQL version works identically to SQLite"""

    print("Testing write operations...")
    save_weather('Dublin', 12.5, 'Cloudy')
    save_weather('Dublin', 13.2, 'Partly Cloudy')
    save_weather('London', 15.0, 'Rainy')
    print("\u2713 Writes successful")

    print("\nTesting read operations...")
    dublin_weather = get_recent_weather('Dublin', days=7)
    assert len(dublin_weather) >= 2, "Should have at least 2 Dublin records"
    print(f"\u2713 Retrieved {len(dublin_weather)} Dublin records")

    print("\nTesting data integrity...")
    temp, conditions, timestamp = dublin_weather[0]
    assert isinstance(temp, Number), "Temperature should be numeric"
    assert isinstance(conditions, str), "Conditions should be string"
    assert timestamp is not None, "Timestamp should exist"
    print("\u2713 Data types correct")

    print("\nTesting location filtering...")
    london_weather = get_recent_weather('London', days=7)
    assert len(london_weather) >= 1, "Should have London records"
    assert 'London' not in str(dublin_weather), "Dublin query shouldn't return London"
    print("\u2713 Location filtering works")

    print("\n\u2713 All tests passed! Migration successful.")

if __name__ == '__main__':
    test_migration()

Run it:

Terminal
python test_migration.py

You'll see:

Terminal
Testing write operations...
✓ Writes successful

Testing read operations...
✓ Retrieved 2 Dublin records

Testing data integrity...
✓ Data types correct

Testing location filtering...
✓ Location filtering works

✓ All tests passed! Migration successful.

If all tests pass, your migration is complete. The application works identically, but now runs on PostgreSQL with support for concurrent writes, network access, and advanced features.

Next, you'll meet three of those advanced features: JSONB for storing API responses without losing their shape, full-text search for querying that data in human-readable ways, and connection pooling for handling real production traffic.