8. Building the news aggregator
The pieces from the previous pages combine into one hosted, service-shaped API. The news aggregator from Chapter 11 becomes an HTTP service with auth, rate limiting, and docs.
Multi-source integration strategy
The News Aggregator API fetches articles from NewsAPI and The Guardian, normalises their different response formats, caches results in PostgreSQL, and returns a unified response. This section brings together authentication, rate limiting, database operations, and external API integration.
- Different response shapes. NewsAPI returns articles in a top-level
articlesarray with apublishedAtfield; the Guardian wraps results inresponse.resultsand dates them withwebPublicationDate. Our API has to translate both into one shape before either reaches the response. - A cache in front of the fan-out. External calls are slow and metered. Articles get written to PostgreSQL with a
created_attimestamp; subsequent reads inside the freshness window return from the database without touching either external API. - Failure is normal. Either upstream can be down, slow, or rate-limiting us. The aggregator treats a failed source as an empty list rather than an exception, so one bad source can't take the endpoint down. If both fail, the last known cache stands in.
The same shape appears wherever an API stitches together services it doesn't own: fan out to multiple upstreams, normalise, cache, degrade gracefully. Stripe in front of payment processors, travel sites in front of airlines, news aggregators in front of publisher feeds.
Implementing the source integrations
Create sources.py to handle external API calls and response normalisation.
import os
import requests
from datetime import datetime, timezone
def fetch_newsapi(category: str | None = None, limit: int = 20) -> list[dict]:
"""
Fetch articles from NewsAPI.
Returns normalized article dictionaries.
"""
api_key = os.getenv("NEWSAPI_KEY")
if not api_key:
print("NEWSAPI_KEY not configured")
return []
url = "https://newsapi.org/v2/top-headlines"
params = {
"apiKey": api_key,
"country": "us",
"pageSize": min(limit, 100)
}
if category:
params["category"] = category
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
# Normalize NewsAPI response format
articles = []
for article in data.get("articles", []):
normalized = {
"title": article.get("title", ""),
"description": article.get("description") or "",
"url": article.get("url", ""),
"source": "newsapi",
"category": category or "general",
"published_at": parse_timestamp(article.get("publishedAt")),
}
articles.append(normalized)
return articles
except requests.RequestException as e:
print(f"NewsAPI error: {e}")
return [] # Graceful degradation
def fetch_guardian(category: str | None = None, limit: int = 20) -> list[dict]:
"""
Fetch articles from The Guardian.
Returns normalized article dictionaries.
"""
api_key = os.getenv("GUARDIAN_KEY")
if not api_key:
print("GUARDIAN_KEY not configured")
return []
url = "https://content.guardianapis.com/search"
params = {
"api-key": api_key,
"page-size": min(limit, 50),
"show-fields": "headline,trailText,shortUrl"
}
if category:
params["section"] = category
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
# Normalize Guardian response format
articles = []
results = data.get("response", {}).get("results", [])
for article in results:
fields = article.get("fields", {})
normalized = {
"title": fields.get("headline", article.get("webTitle", "")),
"description": fields.get("trailText", ""),
"url": fields.get("shortUrl", article.get("webUrl", "")),
"source": "guardian",
# Store the requested category so cache lookups match.
# Guardian's sectionName uses display casing ("Technology")
# while requests use lowercase ("technology"); normalising
# to the request value keeps the cache key consistent.
"category": category or article.get("sectionName", "general").lower(),
"published_at": parse_timestamp(article.get("webPublicationDate")),
}
articles.append(normalized)
return articles
except requests.RequestException as e:
print(f"Guardian API error: {e}")
return []
def parse_timestamp(timestamp_str: str | None) -> datetime:
"""
Parse ISO 8601 timestamp from various formats.
Returns datetime object or current time if parsing fails.
"""
if not timestamp_str:
return datetime.now(timezone.utc)
try:
# Handle ISO 8601 with timezone
if "T" in timestamp_str:
clean_ts = timestamp_str.replace("Z", "+00:00")
return datetime.fromisoformat(clean_ts)
return datetime.now(timezone.utc)
except (ValueError, AttributeError):
return datetime.now(timezone.utc)
def fetch_from_all_sources(
category: str | None = None,
source: str | None = None,
limit: int = 20
) -> list[dict]:
"""
Fetch from multiple sources and merge results.
If source specified, fetch only from that source.
"""
articles = []
if source == "newsapi" or source is None:
newsapi_articles = fetch_newsapi(category, limit)
articles.extend(newsapi_articles)
if source == "guardian" or source is None:
guardian_articles = fetch_guardian(category, limit)
articles.extend(guardian_articles)
# Sort by published date (newest first)
articles.sort(key=lambda x: x["published_at"], reverse=True)
# Apply limit after merging
return articles[:limit]
sources.py has four functions, and they map directly to the three demands from the lead: fan out, normalise, degrade gracefully. fetch_newsapi hits NewsAPI's /v2/top-headlines, walks data["articles"], and rewrites each entry into the project's canonical shape (title, description, url, source, category, published_at). If the upstream errors or times out, the function logs and returns []; the rest of the pipeline is built to accept an empty list, so a single bad source can't break the request. fetch_guardian is the same pattern with a different unwrap: the Guardian's articles live under response.results, and most of the useful fields are tucked inside a nested fields object.
parse_timestamp absorbs the small inconsistencies the upstreams have around dates: ISO 8601 with a trailing Z, with an offset, or missing entirely. Each shape lands as a real datetime rather than a string, so the downstream merge can sort by it. fetch_from_all_sources is the orchestrator: dispatch to one source if source is set, otherwise both; merge, sort by published_at newest-first, then slice to limit.
NewsAPI returns publishedAt. Guardian returns webPublicationDate. Without normalisation, your API consumers need to know which source they're dealing with and handle each differently. Normalisation means consumers see published_at consistently, regardless of source.
The normalisation is the value the aggregator adds. A consumer integrates once, against one shape; the variance between upstreams stays on this side of the wall. Add a third source later and consumers see no change.
Database-backed caching with TTL
Every call to external APIs costs time and counts against rate limits. Caching stores recent articles in PostgreSQL. If cached data is fresh (less than 1 hour old), serve it immediately. If stale, fetch fresh data and update cache.
from datetime import datetime, timedelta, timezone
from sqlalchemy.orm import Session
from database import Article as DBArticle
from sources import fetch_from_all_sources
# Cache freshness: 1 hour
CACHE_TTL_MINUTES = 60
def get_or_fetch_articles(
db: Session,
category: str | None = None,
source: str | None = None,
limit: int = 20
) -> tuple[list[DBArticle], str]:
"""
Return (articles, cache_status), where cache_status is:
- "hit" for fresh cached rows
- "miss" for rows fetched from upstream APIs
- "stale" for stale fallback rows when upstream APIs fail
- "empty" when no rows are available
"""
# Check cache freshness
cutoff_time = datetime.now(timezone.utc) - timedelta(minutes=CACHE_TTL_MINUTES)
# Build query for cached articles
query = db.query(DBArticle).filter(DBArticle.updated_at >= cutoff_time)
if category:
query = query.filter(DBArticle.category == category)
if source:
query = query.filter(DBArticle.source == source)
# Get cached articles
cached = query.order_by(DBArticle.published_at.desc()).limit(limit).all()
# If we have any fresh articles for this query, return from cache
if cached:
print(f"Cache hit: {category}/{source}")
return cached, "hit"
# Cache miss or insufficient articles - fetch fresh data
print(f"Cache miss: {category}/{source} - fetching from sources")
fresh_articles = fetch_from_all_sources(category, source, limit)
if not fresh_articles:
# External APIs failed - return stale cache if it exists
print("External APIs failed - returning stale cache")
stale_query = db.query(DBArticle)
if category:
stale_query = stale_query.filter(DBArticle.category == category)
if source:
stale_query = stale_query.filter(DBArticle.source == source)
stale = stale_query.order_by(DBArticle.published_at.desc()).limit(limit).all()
return stale, "stale" if stale else "empty"
# Cache fresh articles
cached_articles = []
for article_data in fresh_articles:
# Check if article already exists (by URL)
existing = db.query(DBArticle).filter(
DBArticle.url == article_data["url"]
).first()
if existing:
# Refresh every upstream-owned field on the existing cached row
existing.updated_at = datetime.now(timezone.utc)
existing.title = article_data["title"]
existing.description = article_data["description"]
existing.source = article_data["source"]
existing.category = article_data["category"]
existing.published_at = article_data["published_at"]
cached_articles.append(existing)
else:
# Insert new article
new_article = DBArticle(
title=article_data["title"],
description=article_data["description"],
url=article_data["url"],
source=article_data["source"],
category=article_data["category"],
published_at=article_data["published_at"]
)
db.add(new_article)
cached_articles.append(new_article)
db.commit()
# Refresh objects to get generated IDs
for article in cached_articles:
db.refresh(article)
return cached_articles, "miss"
The helper returns both the articles and the reason they were returned. That explicit status matters: a freshly fetched cache miss writes rows with current timestamps, so the endpoint should not try to infer hit vs miss from article age later.
The first path is the normal cache hit. If any rows have been updated within the freshness window, the function returns them directly: no external call, no waiting on NewsAPI or the Guardian. The trade-off is that a sparse category (six articles when the caller asked for twenty) still returns six and reports "hit". The cache promises freshness for this query, not a full page of results; a caller that needs exactly limit rows has to ask the source rather than the cache.
The second path is the cache miss. The function fetches from external APIs, stores the result, and reports "miss". If the external APIs fail, it falls back to stale cache and reports "stale" instead.
The upsert step prevents duplicates by checking article URLs. Existing rows get every upstream-owned field refreshed, including source, category, and published_at, so a row cannot return stale classification data after a later fetch. This tutorial keeps one current cached classification per URL; an API that must retain the same article in several simultaneous category result sets would model cached query results separately.
One hour is the default here, picked as a compromise between freshness and external API cost. APIs that need to surface this choice to callers often expose it as a query parameter, e.g. ?max_age=300 for "data no older than five minutes." For this chapter the constant in code is enough.
The complete articles endpoint
Now update main.py with the complete app surface: root and health endpoints, admin key management behind X-Admin-Key, and the article endpoints that integrate authentication, rate limiting, caching, and multi-source fetching.
from contextlib import asynccontextmanager
import os
from fastapi import FastAPI, HTTPException, Depends, Query, Header
from sqlalchemy.orm import Session
from pydantic import BaseModel
from datetime import datetime, timezone
from database import get_db, init_db, Article as DBArticle, APIKey as DBAPIKey
from auth import create_api_key
from rate_limit import require_api_key_with_rate_limit
from cache import get_or_fetch_articles
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Create database tables on startup; nothing to clean up on shutdown."""
init_db()
print("Database tables initialized")
yield
app = FastAPI(
title="News Aggregator API",
description="Unified interface for multiple news sources",
version="1.0.0",
lifespan=lifespan,
)
def require_admin_key(x_admin_key: str | None = Header(None)) -> str:
"""Require the server-side admin key for key-management endpoints."""
expected_key = os.getenv("ADMIN_API_KEY")
if not expected_key:
raise HTTPException(status_code=500, detail="ADMIN_API_KEY is not configured")
if x_admin_key != expected_key:
raise HTTPException(status_code=403, detail="Invalid admin key")
return x_admin_key
@app.get("/")
def root():
return {
"message": "News Aggregator API",
"version": "1.0.0",
"docs_url": "/docs"
}
@app.get("/health")
def health_check():
return {
"status": "healthy",
"timestamp": datetime.now(timezone.utc).isoformat()
}
# Pydantic models
class ArticleResponse(BaseModel):
"""Individual article in API responses."""
id: int
title: str
description: str
url: str
source: str
category: str
published_at: datetime
class Config:
from_attributes = True
class ArticleListResponse(BaseModel):
"""Article list response: up to `limit` articles for the query."""
articles: list[ArticleResponse]
count: int
cache_status: str
class APIKeyCreate(BaseModel):
"""Request body for creating API keys."""
name: str
tier: str = "basic"
class APIKeyResponse(BaseModel):
"""Response with API key (shown once)."""
api_key: str
key_id: int
name: str
tier: str
created_at: datetime
@app.post("/admin/api-keys", response_model=APIKeyResponse, status_code=201)
def create_api_key_endpoint(
key_data: APIKeyCreate,
admin_key: str = Depends(require_admin_key),
db: Session = Depends(get_db)
):
"""Generate a new API key. Returns key once - user must save it."""
return create_api_key(db=db, name=key_data.name, tier=key_data.tier)
@app.delete("/admin/api-keys/{key_id}", status_code=204)
def revoke_api_key(
key_id: int,
admin_key: str = Depends(require_admin_key),
db: Session = Depends(get_db)
):
"""Revoke an API key (mark as inactive)."""
db_key = db.query(DBAPIKey).filter(DBAPIKey.id == key_id).first()
if not db_key:
raise HTTPException(status_code=404, detail="API key not found")
db_key.is_active = False
db.commit()
return None
@app.get("/articles", response_model=ArticleListResponse)
def list_articles(
category: str | None = None,
source: str | None = None,
limit: int = Query(20, ge=1, le=100),
api_key: DBAPIKey = Depends(require_api_key_with_rate_limit),
db: Session = Depends(get_db)
):
"""
List articles from multiple news sources with smart caching.
Query Parameters:
- category: Filter by category (technology, business, etc.)
- source: Filter by source (newsapi, guardian, or omit for all)
- limit: Results per page (1-100, default 20)
Requires: Valid API key in Authorization: Bearer header
Rate Limits: Basic tier = 100 requests/hour
"""
# Get articles and explicit cache status
articles, cache_status = get_or_fetch_articles(db, category, source, limit)
return {
"articles": articles,
"count": len(articles),
"cache_status": cache_status
}
@app.get("/articles/{article_id}", response_model=ArticleResponse)
def get_article(
article_id: int,
api_key: DBAPIKey = Depends(require_api_key_with_rate_limit),
db: Session = Depends(get_db)
):
"""Get a specific article by ID."""
article = db.query(DBArticle).filter(DBArticle.id == article_id).first()
if not article:
raise HTTPException(status_code=404, detail="Article not found")
return article
The final main.py includes every route the rest of the chapter tests and deploys. The root and health endpoints stay public, admin key generation is protected by ADMIN_API_KEY, and the article routes require normal API-key authentication plus rate limiting.
The /articles endpoint stays thin. Authentication and rate limiting happen through dependencies; source fetching and cache decisions happen in get_or_fetch_articles(); the route packages the final response shape.
The cache_status value comes directly from the cache helper, so a first request that fetched from upstream APIs reports "miss", while a later request served from fresh cached rows reports "hit".
The single-article endpoint stays simple: one database lookup, a 404 if the article does not exist, and the same authentication/rate-limit dependency as the list endpoint.
Test the complete system:
# First request - cache miss, fetches from external APIs
curl -H "Authorization: Bearer YOUR_KEY" \
"http://localhost:8000/articles?category=technology&limit=10"
# Response shows cache_status: "miss"
# Server logs: "Cache miss: technology/None - fetching from sources"
# Second request within 1 hour - cache hit
curl -H "Authorization: Bearer YOUR_KEY" \
"http://localhost:8000/articles?category=technology&limit=10"
# Response shows cache_status: "hit"
# Server logs: "Cache hit: technology/None"
# Response is instant - no external API calls
# Filter by specific source
curl -H "Authorization: Bearer YOUR_KEY" \
"http://localhost:8000/articles?source=guardian&limit=5"
# Get single article
curl -H "Authorization: Bearer YOUR_KEY" \
"http://localhost:8000/articles/1"
The two requests look identical from the outside but feel different end-to-end: the cache-miss path round-trips to NewsAPI and the Guardian before responding, while the cache-hit path is a single SELECT against PostgreSQL. The order-of-magnitude gap between those two paths is the reason the cache is in front of the fan-out at all.
At this point the API is feature-complete: /articles authenticates, rate-limits, serves from cache when it can, fans out when it must, normalises both sources into one shape, and degrades to stale data when upstreams fail. What's missing is the safety net that catches regressions when any of these pieces change. That is what the next page builds.
Next, in section 9, we write the pytest suite: fixtures for the database and TestClient, mocked NewsAPI and Guardian responses so the tests don't burn external quota, and per-endpoint contracts for auth, rate limits, and the article surface.