6. Dashboard queries and concurrent testing

The dashboard is where the PostgreSQL migration becomes visible. Queries that once treated the cached track payload as inert text can now extract release dates, explicit flags, and nested identifiers directly from raw_json. Then a concurrent-write test proves the app has moved past SQLite's single-file locking limits.

The dashboard route, now JSONB-aware

This route keeps the existing dashboard cards and adds two JSONB-derived summaries: release years from raw_json.album.release_date, and an explicit-content breakdown from raw_json.explicit.

app.py
@app.route('/dashboard')
def dashboard():
    conn = get_db_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT COUNT(*) FROM tracks")
            track_count = cursor.fetchone()[0]

            cursor.execute("SELECT COUNT(DISTINCT snapshot_date) FROM snapshots")
            snapshot_count = cursor.fetchone()[0]

            cursor.execute("""
                SELECT
                    LEFT(raw_json->'album'->>'release_date', 4) AS release_year,
                    COUNT(*) AS track_count
                FROM tracks
                WHERE raw_json ? 'album'
                  AND raw_json->'album' ? 'release_date'
                GROUP BY release_year
                ORDER BY release_year DESC
                LIMIT 10
            """)
            release_years = cursor.fetchall()

            cursor.execute("""
                SELECT
                    CASE
                        WHEN (raw_json->>'explicit')::boolean THEN 'Explicit'
                        ELSE 'Clean'
                    END AS explicit_label,
                    COUNT(*) AS track_count
                FROM tracks
                WHERE raw_json ? 'explicit'
                GROUP BY explicit_label
                ORDER BY track_count DESC
            """)
            explicit_breakdown = cursor.fetchall()

        return render_template(
            'dashboard.html',
            track_count=track_count,
            snapshot_count=snapshot_count,
            release_years=release_years,
            explicit_breakdown=explicit_breakdown,
        )
    finally:
        return_db_connection(conn)

This demonstrates three PostgreSQL advantages: nested JSONB extraction, type casting, and aggregation over extracted values. The important part is not the specific dashboard widget; it is that the database can now answer questions about the cached provider payload without Python parsing every row.

Chart endpoint

Chart.js wants a simple JSON response. PostgreSQL does the grouping; Flask only reshapes rows into labels and values.

app.py
@app.route('/api/release-decade-distribution')
def api_release_decade_distribution():
    conn = get_db_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("""
                SELECT
                    (LEFT(raw_json->'album'->>'release_date', 4)::integer / 10 * 10) AS decade,
                    COUNT(*) AS count
                FROM tracks
                WHERE raw_json ? 'album'
                  AND raw_json->'album' ? 'release_date'
                GROUP BY decade
                ORDER BY decade
            """)
            rows = cursor.fetchall()

        return jsonify({
            'labels': [f"{row[0]}s" for row in rows],
            'data': [row[1] for row in rows],
        })
    finally:
        return_db_connection(conn)
static/js/dashboard.js
fetch('/api/release-decade-distribution')
  .then(response => response.json())
  .then(data => {
    const ctx = document.getElementById('releaseDecadeChart').getContext('2d');
    new Chart(ctx, {
      type: 'bar',
      data: {
        labels: data.labels,
        datasets: [{
          label: 'Tracks',
          data: data.data,
          backgroundColor: '#1DB954'
        }]
      },
      options: {
        responsive: true,
        scales: { y: { beginAtZero: true } }
      }
    });
  });

Concurrent-write test

The JSONB queries prove the data model works. The concurrency test proves the deployment reason for PostgreSQL: overlapping writes should complete through the pool without file-lock errors.

test_concurrent.py
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import date
import time

from database import get_db_connection, return_db_connection, init_db_pool


def write_snapshot(rank):
    conn = get_db_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("""
                INSERT INTO snapshots (track_id, snapshot_date, time_range, rank)
                VALUES (%s, %s, %s, %s)
                ON CONFLICT (track_id, snapshot_date, time_range)
                DO UPDATE SET rank = EXCLUDED.rank
            """, ('track_1', date.today(), 'short_term', rank))
        conn.commit()
        return True
    finally:
        return_db_connection(conn)


def ensure_track():
    """Snapshots carry a foreign key to tracks, so track_1 must exist first."""
    conn = get_db_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("""
                INSERT INTO tracks (track_id, name, artist_name)
                VALUES (%s, %s, %s)
                ON CONFLICT (track_id) DO NOTHING
            """, ('track_1', 'Concurrency Test Track', 'Test Artist'))
        conn.commit()
    finally:
        return_db_connection(conn)


def main():
    init_db_pool()
    ensure_track()
    started = time.time()
    successes = 0

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(write_snapshot, rank) for rank in range(1, 501)]
        for future in as_completed(futures):
            if future.result():
                successes += 1

    elapsed = time.time() - started
    print(f"{successes} writes completed in {elapsed:.2f}s")


if __name__ == '__main__':
    main()

The repeated writes all target the same logical snapshot row, so ON CONFLICT keeps the operation idempotent. The point of the test is not to create 500 distinct rows; it is to prove that 500 overlapping write attempts can borrow connections, execute, and return them cleanly.

Next, in section 7, we run the same schema and migration on Railway's PostgreSQL service and deploy the updated Flask app.