6. Async news aggregator

Chapter 11 built a synchronous news aggregator that pulled headlines from NewsAPI, the Guardian, and Hacker News one source at a time; the sequential cost was the sum of three latencies. This keystone rebuilds the same aggregator with async fan-out so the cost is roughly the slowest single source. The domain is intentionally familiar (same three accounts, same three response shapes, same downstream "merge and present" job), so the comparison stays honest. What changes is the wall-clock, the failure-handling shape, and the rate-limit discipline; the API keys you set up in Chapter 11 carry over unchanged.

What the async version changes

The Chapter 11 aggregator returned a unified feed; this version returns the same unified feed. What is different is what happens in the middle:

  • Concurrent fetching. The three sources fire in parallel via plain asyncio.gather. Wall-clock is the slowest source, not the sum.
  • Partial-failure handling, structured-response style. Each fetcher catches its own exceptions and returns a {"success": True/False, ...} dict. Failures arrive at gather as success-false values rather than as exception objects, so the reducer is a flat for-loop with no isinstance branches -- this is one of the two partial-failure shapes Section 5 named, picked deliberately for the keystone scale. (The other shape, gather(..., return_exceptions=True), is the right call when you do not control the inner coroutines.)
  • Per-request timeouts. Each fetcher carries its own timeout=5.0; one slow source cannot define the wall-clock for the batch.
  • Sync persistence, by design. The SQLite write at the end uses the sync sqlite3 module the book has been using since Chapter 15. aiosqlite would let you write async, but the database is not the bottleneck for a three-source aggregator -- the HTTP fan-out is. Section 1's "do not async for the sake of async" framing is the call here.

By the end of the section you will have an aggregator that demonstrates the cost ratio in the chapter's opening claim concretely: ~1.5 seconds sequential against ~0.5 seconds concurrent for three sources -- the 3x speedup the demo numbers in Sections 1 and 2 set up, now landing on a real keystone build with the gap growing in proportion as the source count rises.

Fetching from multiple sources concurrently

Start by building async functions for each news source. Each function should handle its own errors and return a consistent data structure.

news_fetchers.py
import asyncio
import os
from datetime import datetime, timezone

import httpx

NEWSAPI_KEY = os.environ.get("NEWSAPI_KEY", "")
GUARDIAN_KEY = os.environ.get("GUARDIAN_KEY", "")

async def fetch_newsapi(client):
    try:
        response = await client.get(
            "https://newsapi.org/v2/top-headlines",
            params={"country": "us", "apiKey": NEWSAPI_KEY},
            timeout=5.0,
        )
        response.raise_for_status()
        data = response.json()
        
        articles = []
        for item in data.get("articles", [])[:10]:
            articles.append({
                "source": "NewsAPI",
                "title": item.get("title", "No title"),
                "url": item.get("url"),
                "published_at": item.get("publishedAt"),
            })
        return {"success": True, "source": "NewsAPI", "articles": articles}
    except Exception as exc:
        print(f"Failed to fetch NewsAPI: {exc}")
        return {"success": False, "source": "NewsAPI", "articles": []}

async def fetch_guardian(client):
    try:
        response = await client.get(
            "https://content.guardianapis.com/search",
            params={"api-key": GUARDIAN_KEY, "page-size": 10},
            timeout=5.0,
        )
        response.raise_for_status()
        data = response.json()
        
        articles = []
        for item in data.get("response", {}).get("results", []):
            articles.append({
                "source": "Guardian",
                "title": item.get("webTitle", "No title"),
                "url": item.get("webUrl"),
                "published_at": item.get("webPublicationDate"),
            })
        return {"success": True, "source": "Guardian", "articles": articles}
    except Exception as exc:
        print(f"Failed to fetch Guardian: {exc}")
        return {"success": False, "source": "Guardian", "articles": []}

async def fetch_hackernews(client):
    try:
        # First get top story IDs
        response = await client.get(
            "https://hacker-news.firebaseio.com/v0/topstories.json",
            timeout=5.0,
        )
        response.raise_for_status()
        story_ids = response.json()[:10]
        
        # Fetch each story concurrently
        async def fetch_story(story_id):
            resp = await client.get(
                f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json"
            )
            return resp.json()
        
        stories = await asyncio.gather(*[fetch_story(sid) for sid in story_ids])
        
        articles = []
        for story in stories:
            if story and story.get("url"):
                articles.append({
                    "source": "HackerNews",
                    "title": story.get("title", "No title"),
                    "url": story.get("url"),
                    "published_at": datetime.fromtimestamp(story.get("time", 0), tz=timezone.utc).isoformat(),
                })
        
        return {"success": True, "source": "HackerNews", "articles": articles}
    except Exception as exc:
        print(f"Failed to fetch Hacker News: {exc}")
        return {"success": False, "source": "HackerNews", "articles": []}

Each fetcher handles errors internally and returns a consistent structure with success, source, and articles fields. This makes it easy to aggregate results from multiple sources.

Aggregating results across sources

Combine the three fetchers into a single aggregation function that runs them concurrently and collects the results. Section 4 introduced the semaphore for bounded parallelism on hundred-task fan-outs; with three sources we do not need one (three concurrent in-flight requests against three different providers is not the shape any of them rate-limit on). The pattern from Section 4 applies the moment you scale the aggregator past ten or so sources against a single provider; here, plain gather over three is the right call.

aggregator.py
import asyncio
import time

import httpx

from news_fetchers import fetch_newsapi, fetch_guardian, fetch_hackernews


async def aggregate_news():
    start = time.time()
    
    async with httpx.AsyncClient() as client:
        # Fetch all sources concurrently
        results = await asyncio.gather(
            fetch_newsapi(client),
            fetch_guardian(client),
            fetch_hackernews(client),
        )
    
    elapsed = time.time() - start
    
    # Collect all articles from successful sources
    all_articles = []
    successful_sources = []
    failed_sources = []
    
    for result in results:
        if result["success"]:
            all_articles.extend(result["articles"])
            successful_sources.append(result["source"])
        else:
            failed_sources.append(result["source"])
    
    print(f"Fetched {len(all_articles)} articles from {len(successful_sources)} sources in {elapsed:.2f}s")
    if failed_sources:
        print(f"Failed sources: {', '.join(failed_sources)}")
    
    return all_articles

if __name__ == "__main__":
    articles = asyncio.run(aggregate_news())
    for article in articles[:5]:
        print(f"{article['source']}: {article['title']}")

This function completes in ~0.5 seconds against the ~1.5 seconds the sequential Chapter 11 version takes for the same three sources -- the 3x ratio Section 1 framed and Section 2 demoed, now landing end-to-end on real news APIs.

Persistence: sync sqlite3 by design

The aggregator hands its article list to a synchronous SQLite writer. The book has used stdlib sqlite3 from Chapter 15 onwards, and the keystone keeps that pattern deliberately: the HTTP fan-out is the bottleneck of this build, the database write is a millisecond-scale insert at the end. Switching to aiosqlite would let the database call yield while it waited, but there is nothing else to do during that wait -- the async work has already completed. Section 1's "use async where the cost profile justifies it" framing is the call here.

Save this alongside news_fetchers.py and aggregator.py:

store_articles.py
import asyncio
import sqlite3
from datetime import datetime, timezone

from aggregator import aggregate_news


def init_database():
    conn = sqlite3.connect("news.db")
    conn.execute(
        """
        CREATE TABLE IF NOT EXISTS articles (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            source TEXT NOT NULL,
            title TEXT NOT NULL,
            url TEXT UNIQUE,
            published_at TEXT,
            fetched_at TEXT NOT NULL
        )
        """
    )
    conn.commit()
    conn.close()


def save_articles(articles):
    conn = sqlite3.connect("news.db")
    fetched_at = datetime.now(timezone.utc).isoformat()

    for article in articles:
        try:
            conn.execute(
                """
                INSERT OR IGNORE INTO articles
                    (source, title, url, published_at, fetched_at)
                VALUES (?, ?, ?, ?, ?)
                """,
                (
                    article["source"],
                    article["title"],
                    article["url"],
                    article["published_at"],
                    fetched_at,
                ),
            )
        except sqlite3.Error as exc:
            print(f"Failed to save article: {exc}")

    conn.commit()
    conn.close()
    print(f"Saved {len(articles)} articles to database")


async def main():
    init_database()
    articles = await aggregate_news()
    save_articles(articles)


if __name__ == "__main__":
    asyncio.run(main())

The shape is the same Chapter 15 pattern -- open, execute, commit, close -- with one new beat: datetime.now(timezone.utc).isoformat() rather than datetime.utcnow(). The utcnow form is deprecated as of Python 3.12 because the returned value is timezone-naive and easy to misinterpret; the timezone-aware version makes the UTC intent explicit. Use this shape for any new timestamp code from this chapter onwards.

aiosqlite exists and is genuinely useful when the database is the bottleneck: a web application doing dozens of writes per request while fetching from upstream APIs in the same handler. For this aggregator, the database is one cheap insert per article at the tail of a much more expensive fetch; the async wrapper would buy you nothing and add a dependency. The keystone's persistence stays sync deliberately. Chapter 24 picks up where this stops: when the database itself starts mattering for performance, you graduate from SQLite to PostgreSQL.

Measuring performance: sync vs async

To prove the value of async, run both versions and compare the execution times:

benchmark_aggregator.py
import time
import asyncio
import requests
import httpx

# Three endpoints that each take about half a second, so the sync-vs-async
# difference is visible and reproducible without any API keys.
URLS = [
    "https://httpbin.org/delay/0.5",
    "https://httpbin.org/delay/0.5",
    "https://httpbin.org/delay/0.5",
]


def sync_aggregator():
    start = time.perf_counter()
    results = []
    for url in URLS:
        try:
            response = requests.get(url, timeout=5.0)
            results.append(response.json())
        except Exception as exc:
            print(f"Request failed: {exc}")
    elapsed = time.perf_counter() - start
    print(f"Synchronous: {elapsed:.2f} seconds")
    return results


async def async_aggregator():
    start = time.perf_counter()
    async with httpx.AsyncClient(timeout=5.0) as client:
        tasks = [client.get(url) for url in URLS]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    elapsed = time.perf_counter() - start
    print(f"Asynchronous: {elapsed:.2f} seconds")
    return results


if __name__ == "__main__":
    print("Testing synchronous version...")
    sync_aggregator()

    print("\nTesting async version...")
    asyncio.run(async_aggregator())
Terminal
Testing synchronous version...
Synchronous: 1.52 seconds

Testing async version...
Asynchronous: 0.51 seconds

~1.5 seconds sequential, ~0.5 seconds concurrent: the 3x ratio Section 1 framed, now measured on a keyless harness anyone can run -- three endpoints that each take about half a second. The gap grows linearly with the source count: ten APIs at ~500ms each take ~5 seconds sync but still ~0.5 seconds async (the 10x scenario from Section 2). The shape of the win is identical at three sources or fifty -- async time is bounded by the slowest single fetch, sync time is the sum of all of them.