5. The aggregation pipeline
Three normalizers, one orchestrator that fetches independently, contains failures per source, dedupes, and sorts by recency. When NewsAPI is rate-limited and Guardian is down, you still serve articles from HackerNews, and the caller knows exactly what failed.
Orchestrating multiple sources
You have three normalizers that transform API responses into canonical articles. Now build the aggregator that orchestrates everything: fetching from multiple sources, handling failures gracefully when one API is down, combining results, and presenting unified output.
The aggregation pipeline coordinates multiple sources with independent failure handling. When one source fails, the others continue:
This graceful degradation is what separates production systems from prototypes. Users get results even when individual components fail. The system tracks what succeeded and what failed for diagnostics.
Building the aggregator in passes
You will build aggregator.py in four passes: four runnable versions of the file, each extending the last. Pass 1 has no error handling at all and aborts on the first failure -- yet it already coordinates three sources end-to-end. Each subsequent pass adds one capability: per-source failure containment, then deduplication and sorting, then tagged failure categories for operational monitoring.
The arc is the architecture coming into focus. Pass 1 returns a bare list of articles. Pass 2 returns a list plus a minimal stats dict. Pass 3 returns deduplicated, sorted articles plus dedup counts. Pass 4 returns the rich stats dict the monitoring layer downstream consumes. Each pass is a complete runnable file the reader can save, verify, and reason about before the next capability lands.
Pass 1: fetch every source in sequence
The first pass solves one problem: coordinating three API calls into one combined result. The aggregator owns the loop, each source has its own fetch method, and the normalizers from the previous pages convert raw JSON into Article objects. There's no error handling yet -- if any fetch raises, the whole search aborts. That's deliberate; pass 2 introduces the containment, and showing the unguarded version first makes the design decision visible. Save this at the project root as aggregator.py:
import os
import requests
from typing import List
from dotenv import load_dotenv
from models import Article
from normalize_newsapi import normalize_newsapi
from normalize_guardian import normalize_guardian
from normalize_hackernews import normalize_hackernews
load_dotenv()
class NewsAggregator:
"""
Pass 1: coordinate three sources end-to-end.
No error handling yet; any fetch failure aborts the whole search.
"""
def __init__(self):
self.newsapi_key = os.environ.get("NEWSAPI_KEY")
self.guardian_key = os.environ.get("GUARDIAN_KEY")
def fetch_newsapi(self, query: str, max_results: int = 10) -> dict:
if not self.newsapi_key:
raise ValueError("NewsAPI key not configured")
url = "https://newsapi.org/v2/everything"
params = {
"q": query,
"pageSize": max_results,
"apiKey": self.newsapi_key,
"language": "en",
"sortBy": "publishedAt",
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
def fetch_guardian(self, query: str, max_results: int = 10) -> dict:
if not self.guardian_key:
raise ValueError("Guardian API key not configured")
url = "https://content.guardianapis.com/search"
params = {
"q": query,
"page-size": max_results,
"show-fields": "all",
"api-key": self.guardian_key,
"order-by": "newest",
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
def fetch_hackernews(self, query: str, max_results: int = 10) -> dict:
url = "https://hn.algolia.com/api/v1/search"
params = {"query": query, "hitsPerPage": max_results, "tags": "story"}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
def search(self, query: str, max_per_source: int = 10) -> List[Article]:
"""Search every source and return the combined list."""
all_articles: List[Article] = []
sources = [
("NewsAPI", self.fetch_newsapi, normalize_newsapi),
("Guardian", self.fetch_guardian, normalize_guardian),
("HackerNews", self.fetch_hackernews, normalize_hackernews),
]
for source_name, fetch_fn, normalize_fn in sources:
response = fetch_fn(query, max_per_source)
articles, _meta = normalize_fn(response)
all_articles.extend(articles)
print(f"{source_name}: {len(articles)} articles")
return all_articles
The three fetch methods are parameterised variants of one capability (HTTP GET with API-specific params), so they ship together in this pass rather than being split across three passes. What distinguishes the next passes is new capabilities -- failure containment, deduplication, tagged categorisation -- not new sources.
Save this as verify_pass1.py and run it. All three of your API keys need to be set in .env for this pass to complete cleanly; if any key is missing or any source is unreachable, the search aborts mid-loop and that's expected pass 1 behaviour:
from aggregator import NewsAggregator
aggregator = NewsAggregator()
articles = aggregator.search("python programming", max_per_source=3)
print(f"\nTotal articles: {len(articles)}")
print(f"First article: {articles[0].title if articles else '(none)'}")
NewsAPI: 3 articles
Guardian: 3 articles
HackerNews: 3 articles
Total articles: 9
First article: Python 3.13 Performance Improvements
If you see all three source lines printed and a total of 9 articles, pass 1 is working: each fetch reached its API, each normalizer converted the response into Article objects, and the loop concatenated them. If the print stops after one or two sources and you see a traceback from raise_for_status() or a ValueError about a missing key, that's pass 1 behaviour by design -- one failing source aborts the whole search. Pass 2 fixes that. If you see an ImportError on any of the normalize_* functions, those files weren't saved at the project root in the previous pages.
Pass 2: contain failures per source
Pass 2 introduces the single most important architectural decision in this file: each source is fetched inside its own try/except, so one source's failure no longer kills the others. The aggregator's contract changes too -- instead of returning just a list of articles, it returns a tuple of (articles, stats) where the stats dict records which sources succeeded and which failed. The list of failures is still bare strings (e.g. "Guardian") at this pass; pass 4 adds the parenthetical tags that the monitoring layer matches on.
# Update the import line:
from typing import List, Tuple, Dict
# Replace the search method body with this:
def search(self, query: str, max_per_source: int = 10) -> Tuple[List[Article], Dict]:
"""Search every source; one failure no longer aborts the others."""
all_articles: List[Article] = []
stats = {
"sources_succeeded": [],
"sources_failed": [],
}
sources = [
("NewsAPI", self.fetch_newsapi, normalize_newsapi),
("Guardian", self.fetch_guardian, normalize_guardian),
("HackerNews", self.fetch_hackernews, normalize_hackernews),
]
for source_name, fetch_fn, normalize_fn in sources:
try:
response = fetch_fn(query, max_per_source)
articles, _meta = normalize_fn(response)
all_articles.extend(articles)
stats["sources_succeeded"].append(source_name)
print(f"{source_name}: {len(articles)} articles")
except Exception as e:
stats["sources_failed"].append(source_name)
print(f"{source_name}: failed ({type(e).__name__})")
return all_articles, stats
Full aggregator.py after pass 2 (about 80 lines, with the fetch methods unchanged): expand to view or copy fresh
import os
import requests
from typing import List, Tuple, Dict
from dotenv import load_dotenv
from models import Article
from normalize_newsapi import normalize_newsapi
from normalize_guardian import normalize_guardian
from normalize_hackernews import normalize_hackernews
load_dotenv()
class NewsAggregator:
"""Pass 2: per-source try/except so one failure doesn't kill the others."""
def __init__(self):
self.newsapi_key = os.environ.get("NEWSAPI_KEY")
self.guardian_key = os.environ.get("GUARDIAN_KEY")
def fetch_newsapi(self, query: str, max_results: int = 10) -> dict:
if not self.newsapi_key:
raise ValueError("NewsAPI key not configured")
url = "https://newsapi.org/v2/everything"
params = {
"q": query, "pageSize": max_results, "apiKey": self.newsapi_key,
"language": "en", "sortBy": "publishedAt",
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
def fetch_guardian(self, query: str, max_results: int = 10) -> dict:
if not self.guardian_key:
raise ValueError("Guardian API key not configured")
url = "https://content.guardianapis.com/search"
params = {
"q": query, "page-size": max_results, "show-fields": "all",
"api-key": self.guardian_key, "order-by": "newest",
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
def fetch_hackernews(self, query: str, max_results: int = 10) -> dict:
url = "https://hn.algolia.com/api/v1/search"
params = {"query": query, "hitsPerPage": max_results, "tags": "story"}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
def search(self, query: str, max_per_source: int = 10) -> Tuple[List[Article], Dict]:
all_articles: List[Article] = []
stats = {"sources_succeeded": [], "sources_failed": []}
sources = [
("NewsAPI", self.fetch_newsapi, normalize_newsapi),
("Guardian", self.fetch_guardian, normalize_guardian),
("HackerNews", self.fetch_hackernews, normalize_hackernews),
]
for source_name, fetch_fn, normalize_fn in sources:
try:
response = fetch_fn(query, max_per_source)
articles, _meta = normalize_fn(response)
all_articles.extend(articles)
stats["sources_succeeded"].append(source_name)
print(f"{source_name}: {len(articles)} articles")
except Exception as e:
stats["sources_failed"].append(source_name)
print(f"{source_name}: failed ({type(e).__name__})")
return all_articles, stats
The verify for pass 2 has two scripts. The first is the happy path -- all three sources up -- which confirms the new return shape and the stats dict. The second is the deliberate break: invalidate Guardian's key, watch the other two sources still ship articles, and confirm sources_failed records Guardian without taking the whole search down. The break is what makes the try/except earn its keep; with sequential calls and no containment, one outage would force users to retry their query from scratch.
from aggregator import NewsAggregator
aggregator = NewsAggregator()
articles, stats = aggregator.search("python programming", max_per_source=3)
print(f"\nArticles: {len(articles)}")
print(f"Succeeded: {stats['sources_succeeded']}")
print(f"Failed: {stats['sources_failed']}")
NewsAPI: 3 articles
Guardian: 3 articles
HackerNews: 3 articles
Articles: 9
Succeeded: ['NewsAPI', 'Guardian', 'HackerNews']
Failed: []
Now force the failure mode. Save this as verify_pass2_break.py and run it -- it swaps in an invalid Guardian key for one search, then restores the original. The other two sources should continue returning articles, and Guardian should land in sources_failed rather than crashing the run:
from aggregator import NewsAggregator
aggregator = NewsAggregator()
original_key = aggregator.guardian_key
aggregator.guardian_key = "invalid_key_for_testing"
articles, stats = aggregator.search("python", max_per_source=3)
print(f"\nArticles still retrieved: {len(articles)}")
print(f"Succeeded: {stats['sources_succeeded']}")
print(f"Failed: {stats['sources_failed']}")
aggregator.guardian_key = original_key
NewsAPI: 3 articles
Guardian: failed (HTTPError)
HackerNews: 3 articles
Articles still retrieved: 6
Succeeded: ['NewsAPI', 'HackerNews']
Failed: ['Guardian']
If you see Articles still retrieved: 6 with NewsAPI and HackerNews in sources_succeeded and Guardian alone in sources_failed, pass 2 is working: the containment held, one source failed loudly in the print line but the loop continued, and the caller has both articles and a structured record of what went wrong. If the script crashes with a traceback instead, the try/except didn't catch the exception -- almost always because the indentation was lost during paste, dropping the fetch_fn call outside the try block. The HTTPError tag in the print line is just the exception class name; pass 4 replaces it with a richer parenthetical tag like (HTTP 401) that the monitoring layer matches on.
Pass 3: deduplicate by URL and sort by recency
Pass 3 handles two capabilities the combined list needs before it's useful: deduplication (the same story may appear in multiple sources, especially for high-profile news) and recency sorting (users want newest first). Deduplication keys on URL because URLs are stable across sources -- if two sources link to the same canonical article, the first occurrence wins. The stats dict grows three new fields so the caller can reason about cross-source overlap.
# Add at module level, above the NewsAggregator class:
def deduplicate_articles(articles: List[Article]) -> List[Article]:
"""
Drop duplicate articles, keyed on URL.
When multiple sources cover the same story they typically link to
the same canonical URL, so URL is the cheapest reliable key. The
first occurrence wins; later occurrences are dropped.
"""
seen_urls = set()
unique_articles = []
for article in articles:
if article.url not in seen_urls:
seen_urls.add(article.url)
unique_articles.append(article)
return unique_articles
# Inside search, expand the stats dict:
stats = {
"sources_succeeded": [],
"sources_failed": [],
"total_articles": 0,
"unique_articles": 0,
"duplicates_removed": 0,
}
# After the for-loop, before the return, add:
stats["total_articles"] = len(all_articles)
unique_articles = deduplicate_articles(all_articles)
stats["unique_articles"] = len(unique_articles)
stats["duplicates_removed"] = stats["total_articles"] - stats["unique_articles"]
unique_articles.sort(key=lambda a: a.published_at, reverse=True)
return unique_articles, stats
Full aggregator.py after pass 3 (about 95 lines, with the fetch methods unchanged): expand to view or copy fresh
import os
import requests
from typing import List, Tuple, Dict
from dotenv import load_dotenv
from models import Article
from normalize_newsapi import normalize_newsapi
from normalize_guardian import normalize_guardian
from normalize_hackernews import normalize_hackernews
load_dotenv()
def deduplicate_articles(articles: List[Article]) -> List[Article]:
"""Drop duplicate articles, keyed on URL. First occurrence wins."""
seen_urls = set()
unique_articles = []
for article in articles:
if article.url not in seen_urls:
seen_urls.add(article.url)
unique_articles.append(article)
return unique_articles
class NewsAggregator:
"""Pass 3: deduplicate by URL and sort by recency."""
def __init__(self):
self.newsapi_key = os.environ.get("NEWSAPI_KEY")
self.guardian_key = os.environ.get("GUARDIAN_KEY")
# ... (fetch_newsapi / fetch_guardian / fetch_hackernews unchanged from pass 2)
def search(self, query: str, max_per_source: int = 10) -> Tuple[List[Article], Dict]:
all_articles: List[Article] = []
stats = {
"sources_succeeded": [],
"sources_failed": [],
"total_articles": 0,
"unique_articles": 0,
"duplicates_removed": 0,
}
sources = [
("NewsAPI", self.fetch_newsapi, normalize_newsapi),
("Guardian", self.fetch_guardian, normalize_guardian),
("HackerNews", self.fetch_hackernews, normalize_hackernews),
]
for source_name, fetch_fn, normalize_fn in sources:
try:
response = fetch_fn(query, max_per_source)
articles, _meta = normalize_fn(response)
all_articles.extend(articles)
stats["sources_succeeded"].append(source_name)
print(f"{source_name}: {len(articles)} articles")
except Exception as e:
stats["sources_failed"].append(source_name)
print(f"{source_name}: failed ({type(e).__name__})")
stats["total_articles"] = len(all_articles)
unique_articles = deduplicate_articles(all_articles)
stats["unique_articles"] = len(unique_articles)
stats["duplicates_removed"] = stats["total_articles"] - stats["unique_articles"]
unique_articles.sort(key=lambda a: a.published_at, reverse=True)
return unique_articles, stats
Save this as verify_pass3.py. The verify confirms dedup counts and that the returned list is sorted newest-first:
from aggregator import NewsAggregator
aggregator = NewsAggregator()
articles, stats = aggregator.search("python programming", max_per_source=5)
print(f"\nFetched total: {stats['total_articles']}")
print(f"After dedup: {stats['unique_articles']}")
print(f"Duplicates: {stats['duplicates_removed']}")
# Confirm sort order: each article's published_at >= the next one's
ok = all(
articles[i].published_at >= articles[i + 1].published_at
for i in range(len(articles) - 1)
)
print(f"Sorted newest-first: {ok}")
print(f"First headline: {articles[0].title}")
NewsAPI: 5 articles
Guardian: 5 articles
HackerNews: 5 articles
Fetched total: 15
After dedup: 12
Duplicates: 3
Sorted newest-first: True
First headline: Python 3.13 Performance Improvements
If Duplicates is non-zero and Sorted newest-first prints True, pass 3 is working: the URL-keyed set caught cross-source overlap, and the recency sort put the freshest article at the top. If Duplicates: 0 shows up consistently, it just means the three sources didn't overlap for your query that run -- not a bug; try a high-traffic query like "python 3.13" to force overlap. If Sorted newest-first: False appears, the sort line was lost during paste, or reverse=True was dropped (default sort is oldest-first, which is the opposite of what users expect).
Pass 4: tag failures by category and surface per-source counts
Pass 4 is the production polish: the bare except Exception splits into typed handlers, each appending a parenthetical-tagged string to sources_failed so the monitoring layer downstream can match on the category ((HTTP 401) means auth, (HTTP 429) means rate-limit, (timeout) means slow API) without re-parsing exceptions. The stats dict also grows two new fields: articles_per_source for the counts that succeeded, and total_available_per_source from each normalizer's meta dict, which surfaces upstream coverage (a query that used to return 12,000 matches and now returns 50 is a signal even if the per-page count looks healthy). This is the canonical version every downstream module imports from -- it stays inline rather than collapsed because, like the assembled orchestrator in chapter 13, it's the file the operational layer and the display layer both depend on.
import os
import requests
from typing import List, Tuple, Dict
from dotenv import load_dotenv
from models import Article
from normalize_newsapi import normalize_newsapi
from normalize_guardian import normalize_guardian
from normalize_hackernews import normalize_hackernews
load_dotenv()
def deduplicate_articles(articles: List[Article]) -> List[Article]:
"""
Drop duplicate articles, keyed on URL.
When multiple sources cover the same story they typically link to
the same canonical URL, so URL is the cheapest reliable key. The
first occurrence wins; later occurrences are dropped.
"""
seen_urls = set()
unique_articles = []
for article in articles:
if article.url not in seen_urls:
seen_urls.add(article.url)
unique_articles.append(article)
return unique_articles
class NewsAggregator:
"""
Aggregates news from multiple sources with graceful failure handling.
Each source can fail independently without affecting others.
"""
def __init__(self):
"""Initialize with API credentials from environment."""
self.newsapi_key = os.environ.get("NEWSAPI_KEY")
self.guardian_key = os.environ.get("GUARDIAN_KEY")
def fetch_newsapi(self, query: str, max_results: int = 10) -> dict:
"""Fetch articles from NewsAPI."""
if not self.newsapi_key:
raise ValueError("NewsAPI key not configured")
url = "https://newsapi.org/v2/everything"
params = {
"q": query, "pageSize": max_results, "apiKey": self.newsapi_key,
"language": "en", "sortBy": "publishedAt",
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
def fetch_guardian(self, query: str, max_results: int = 10) -> dict:
"""Fetch articles from Guardian API."""
if not self.guardian_key:
raise ValueError("Guardian API key not configured")
url = "https://content.guardianapis.com/search"
params = {
"q": query, "page-size": max_results, "show-fields": "all",
"api-key": self.guardian_key, "order-by": "newest",
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
def fetch_hackernews(self, query: str, max_results: int = 10) -> dict:
"""Fetch articles from HackerNews via Algolia API."""
url = "https://hn.algolia.com/api/v1/search"
params = {"query": query, "hitsPerPage": max_results, "tags": "story"}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
def search(self, query: str, max_per_source: int = 10) -> Tuple[List[Article], Dict]:
"""
Search every source, dedupe across them, sort by recency, and
return the combined results plus a stats dict for diagnostics.
"""
all_articles = []
stats = {
"query": query,
"sources_attempted": [],
"sources_succeeded": [],
"sources_failed": [],
"articles_per_source": {},
"total_available_per_source": {},
"total_articles": 0,
"unique_articles": 0,
"duplicates_removed": 0,
}
sources = [
("NewsAPI", self.fetch_newsapi, normalize_newsapi),
("Guardian", self.fetch_guardian, normalize_guardian),
("HackerNews", self.fetch_hackernews, normalize_hackernews),
]
# Try each source independently; one failure doesn't stop others.
for source_name, fetch_fn, normalize_fn in sources:
stats["sources_attempted"].append(source_name)
try:
response = fetch_fn(query, max_per_source)
articles, meta = normalize_fn(response)
all_articles.extend(articles)
stats["sources_succeeded"].append(source_name)
stats["articles_per_source"][source_name] = len(articles)
stats["total_available_per_source"][source_name] = meta.get("total")
total = meta.get("total")
suffix = f" of {total} matching" if total else ""
print(f"{source_name}: {len(articles)}{suffix} articles")
except ValueError as e:
# Configuration error (missing API key)
stats["sources_failed"].append(f"{source_name} (not configured)")
print(f"{source_name}: {e}")
except requests.HTTPError as e:
# Distinguish HTTP status codes (401 auth, 429 rate-limit, etc.)
status = e.response.status_code if e.response is not None else "unknown"
stats["sources_failed"].append(f"{source_name} (HTTP {status})")
print(f"{source_name}: HTTP {status}")
except requests.Timeout:
stats["sources_failed"].append(f"{source_name} (timeout)")
print(f"{source_name}: request timed out")
except requests.RequestException as e:
# Other network errors (DNS, connection, etc.)
stats["sources_failed"].append(f"{source_name} (network error)")
print(f"{source_name}: {e}")
except Exception as e:
stats["sources_failed"].append(f"{source_name} (error: {type(e).__name__})")
print(f"{source_name}: unexpected error - {e}")
# Combine, deduplicate by URL, sort by recency.
stats["total_articles"] = len(all_articles)
unique_articles = deduplicate_articles(all_articles)
stats["unique_articles"] = len(unique_articles)
stats["duplicates_removed"] = stats["total_articles"] - stats["unique_articles"]
unique_articles.sort(key=lambda a: a.published_at, reverse=True)
return unique_articles, stats
Seven design choices the final aggregator now embodies:
- Independent failures: each source wrapped in try/except; one failure doesn't stop others
- Error categorization: configuration errors, HTTP status codes, timeouts, and other network errors all land in
sources_failedwith a tag ((not configured),(HTTP 401),(HTTP 429),(timeout),(network error)) so the operational layer can match on it without re-parsing - Per-source counts:
articles_per_sourcerecords how many each successful source returned, separate from the totals - Per-source totals available:
total_available_per_sourcerecords how many matching articles each API has (from each normalizer's returnedmetadict), so the reader can see "5 of 12,847" and reason about coverage versus what was fetched - Cross-source dedup: URL-based via
deduplicate_articles; the same canonical URL from two sources collapses to one entry, with the first occurrence kept - Sort by recency: the merged list is ordered newest-first by
published_at, so the canonical timestamp from the normalizers earns its keep here - Partial results: returns whatever articles were successfully fetched and deduped, even if one or two sources failed
Save this as verify_pass4.py. The verify exercises the full pipeline with a real query and prints the rich stats dict so you can see what the operational layer downstream consumes:
from aggregator import NewsAggregator
aggregator = NewsAggregator()
print("=== NEWS AGGREGATOR TEST ===\n")
print("Searching for 'python programming'...\n")
articles, stats = aggregator.search("python programming", max_per_source=5)
print("\n--- Search Statistics ---")
print(f"Sources attempted: {len(stats['sources_attempted'])}")
print(f"Sources succeeded: {len(stats['sources_succeeded'])}")
print(f"Sources failed: {len(stats['sources_failed'])}")
if stats["sources_failed"]:
print(f"Failed sources: {', '.join(stats['sources_failed'])}")
print(
f"Articles: {stats['unique_articles']} "
f"({stats['duplicates_removed']} duplicates removed from {stats['total_articles']})"
)
print("Coverage per source:")
for source, count in stats["articles_per_source"].items():
available = stats["total_available_per_source"].get(source)
suffix = f" of {available}" if available else ""
print(f" {source}: {count}{suffix}")
print("\n--- Sample Results ---")
for i, article in enumerate(articles[:3], 1):
print(f"\n{i}. {article.title}")
print(f" {article.source_name} | {article.format_timestamp()}")
if article.author:
print(f" {article.author}")
print(f" {article.url}")
$ python verify_pass4.py
=== NEWS AGGREGATOR TEST ===
Searching for 'python programming'...
NewsAPI: 5 of 12847 matching articles
Guardian: 5 of 8453 matching articles
HackerNews: 5 of 583201 matching articles
--- Search Statistics ---
Sources attempted: 3
Sources succeeded: 3
Sources failed: 0
Articles: 12 (3 duplicates removed from 15)
Coverage per source:
NewsAPI: 5 of 12847
Guardian: 5 of 8453
HackerNews: 5 of 583201
--- Sample Results ---
1. Python 3.13 Performance Improvements
TechCrunch | January 15, 2025 at 02:30 PM
Sarah Chen
https://techcrunch.com/2025/01/15/python-3-13-performance
2. Python 3.13 is out
HackerNews | January 15, 2025 at 08:45 AM
throwaway2025
https://www.python.org/downloads/release/python-3130/
3. Python programming language gains popularity in schools
Technology | January 14, 2025 at 03:15 PM
Maya Patel
https://www.theguardian.com/technology/2025/jan/14/python-schools
If you see "of NNN matching" suffixes on each source line and Coverage per source printing counts-of-totals, pass 4 is working: the typed exception handlers fell through to the success branch, the meta.get("total") reads picked up upstream totals from each normalizer, and the rich stats dict carries everything the monitoring layer needs. If the "of NNN matching" suffix is missing for a source, that normalizer isn't returning meta["total"] -- check the normalizer's return shape against what pass 4's meta.get("total") call expects.
Operational monitoring: using aggregation statistics
Your aggregator already tracks success and failure statistics for each source. The next step is using them: detect API issues, rate limiting, authentication problems, and service degradation before users complain.
The schema you have to work with is the one NewsAggregator.search() returns. The fields that matter for monitoring:
sources_succeeded: list of source names that returned articlessources_failed: list of strings like"Guardian (HTTP 401)","NewsAPI (HTTP 429)","HackerNews (timeout)", or"Guardian (not configured)". The parenthetical tag is the pattern monitoring matches onarticles_per_source: per-source counts for the sources that succeeded; a zero here means the API responded but returned nothingtotal_available_per_source: per-source totals from eachmetadict; useful for spotting drops in upstream coverage (a query that used to return 12,000 matches now returning 50 is a signal even if the per-page count looks healthy)unique_articles/duplicates_removed: post-dedup counts, useful for tracking cross-source overlap over time
Below are the patterns to recognise in those fields and what to do about each. The JSON examples show only the keys relevant to the pattern at hand; the full stats dict your aggregator returns has additional fields for query metadata (query, sources_attempted) and dedup counts (total_articles, unique_articles, duplicates_removed).
Pattern: consistent failures from one source
If Guardian consistently fails while NewsAPI and HackerNews succeed, you have a Guardian-specific problem.
{
"sources_succeeded": ["NewsAPI", "HackerNews"],
"sources_failed": ["Guardian (HTTP 401)"],
"articles_per_source": {"NewsAPI": 5, "HackerNews": 5}
}
Diagnosis: HTTP 401 means authentication failure. Your Guardian API key is invalid, expired, or not being sent correctly.
Action:
- Verify your
GUARDIAN_KEYenvironment variable is set - Check that the key hasn't expired (Guardian keys are valid indefinitely but can be revoked)
- Test the key directly:
curl "https://content.guardianapis.com/search?api-key=YOUR_KEY" - If expired, register for a new key at
open-platform.theguardian.com
Pattern: rate limiting (HTTP 429)
If you see HTTP 429 errors, you've exceeded the API's request limits.
{
"sources_succeeded": ["Guardian", "HackerNews"],
"sources_failed": ["NewsAPI (HTTP 429)"],
"articles_per_source": {"Guardian": 5, "HackerNews": 5}
}
Diagnosis: You've exceeded NewsAPI's rate limit (500 requests/day on the free tier).
Action:
- Implement request caching to avoid re-fetching the same queries
- Add rate limiting to your client (e.g., max 1 request per minute during testing)
- Consider upgrading to a paid tier if you need higher limits
- For development, cache responses to disk and work with cached data
Pattern: timeouts from one source
Consistent timeouts suggest network issues or an overloaded API server.
{
"sources_succeeded": ["NewsAPI", "HackerNews"],
"sources_failed": ["Guardian (timeout)"],
"articles_per_source": {"NewsAPI": 5, "HackerNews": 5}
}
Diagnosis: Guardian's API is slow or unreachable. Could be temporary server issues, network problems, or your timeout is too aggressive.
Action:
- Check Guardian's status page for reported outages
- Increase timeout from 10 seconds to 30 seconds and retry
- If persistent, implement retry logic with exponential backoff
- Consider the aggregator still provides value with 2 out of 3 sources
Pattern: success but zero articles
API responds successfully but returns no results. Whether this is legitimate or a normalizer regression depends on what the API itself reports about matches available.
{
"sources_succeeded": ["NewsAPI", "Guardian", "HackerNews"],
"sources_failed": [],
"articles_per_source": {"NewsAPI": 5, "Guardian": 0, "HackerNews": 5},
"total_available_per_source": {"NewsAPI": 12847, "Guardian": 8453, "HackerNews": 583201}
}
Diagnosis: read articles_per_source[Guardian] against total_available_per_source[Guardian]:
- Both are 0: Guardian genuinely has no matches for this query. Try a more common query (e.g., "politics") to confirm.
- Total non-zero, fetched 0 (the case shown above; Guardian reports 8,453 matches but the normalizer kept none): Guardian's API found thousands but every item failed the normalizer's validation. Almost always a structural change in the API; verify the normalizer's field mappings against Guardian's current response shape.
Action:
- Check normalizer logs for skipped articles due to missing required fields
- Inspect raw API response to verify the items actually have the field shapes the normalizer expects
- If the API recently changed, update field mappings and add a test fixture for the new shape
Automated health monitoring
Rather than manually inspecting statistics after each search, implement automated health checks that flag problems:
from typing import List
def check_aggregator_health(stats: dict) -> List[str]:
"""
Inspect the stats dict returned by NewsAggregator.search() and
return a list of warning strings. An empty list means everything
is healthy.
"""
warnings = []
# Per-source failures: match on the parenthetical tag in sources_failed.
for failure in stats["sources_failed"]:
if "(HTTP 401)" in failure:
warnings.append(f"[CRITICAL] {failure}: authentication failed. Check API key.")
elif "(HTTP 429)" in failure:
warnings.append(f"[CRITICAL] {failure}: rate limited. Reduce request frequency.")
elif "(timeout)" in failure:
warnings.append(f"[WARN] {failure}: API may be slow or down.")
elif "(not configured)" in failure:
warnings.append(f"[WARN] {failure}: environment variable not set.")
else:
warnings.append(f"[WARN] {failure}: unexpected error.")
# Per-source zero-result: succeeded but returned nothing.
for source, count in stats.get("articles_per_source", {}).items():
if count == 0:
warnings.append(f"[INFO] {source}: no articles found. Query may be too specific.")
# Overall health.
if not stats["sources_succeeded"]:
warnings.append("[CRITICAL] All sources failed. Check network connectivity.")
elif len(stats["sources_succeeded"]) < len(stats["sources_attempted"]):
warnings.append(
f"[WARN] Partial failure. Working: {', '.join(stats['sources_succeeded'])}"
)
return warnings
Integrate health checks into your CLI to display warnings automatically:
from aggregator import NewsAggregator
from display import display_results
from health_check import check_aggregator_health
def search_command(aggregator: NewsAggregator, query: str) -> None:
"""Execute a search and display results plus any health warnings."""
print(f"\nSearching for '{query}'...\n")
articles, stats = aggregator.search(query, max_per_source=5)
warnings = check_aggregator_health(stats)
if warnings:
print("HEALTH WARNINGS:")
for warning in warnings:
print(f" {warning}")
print()
if articles:
display_results(articles, stats)
else:
print("No articles. Try a different query or check the source status above.")
Searching for 'python programming'...
HEALTH WARNINGS:
[CRITICAL] Guardian (HTTP 401): authentication failed. Check API key.
[WARN] Partial failure. Working: NewsAPI, HackerNews
======================================================================
RESULTS
======================================================================
Query: 'python programming'
Sources: NewsAPI, HackerNews
Articles: 8 (2 duplicates removed)
1. Python 3.13 Performance Improvements
TechCrunch | January 15, 2025 at 02:30 PM
...
Production monitoring practices
In production systems, you'd extend this monitoring to:
- Log statistics: Write stats to a file or logging service for historical analysis
- Track trends: Monitor failure rates over time (e.g., "Guardian has failed 15 times in the last hour")
- Alert on thresholds: Send notifications when failure rate exceeds acceptable levels
- Dashboard visualization: Display source health, success rates, and response times in real-time
For this learning project, the CLI warnings provide immediate feedback. The pattern scales to sophisticated monitoring infrastructure as your applications grow.
Decision framework: when to act
Not every failure requires immediate action. Use this framework to decide when to investigate versus when to accept graceful degradation:
| Situation | Urgency | Action |
|---|---|---|
| 1 of 3 sources failing intermittently | Low | Monitor for pattern. Investigate if persists >1 hour. |
| 1 of 3 sources consistently failing | Medium | Check API key, rate limits, and API status page. Fix within 24 hours. |
| 2 of 3 sources failing | High | Investigate immediately. Aggregator value is severely degraded. |
| All sources failing | Critical | Check network connectivity, DNS resolution, and firewall rules immediately. |
| Rate limiting (HTTP 429) | Medium | Implement caching and rate limiting. Upgrade API tier if needed. |
| Authentication failures (HTTP 401) | High | Verify environment variables and API key validity. Fix immediately. |
The beauty of graceful degradation is that 1 or even 2 failing sources still provides value to users. Your monitoring helps you prioritize which issues need immediate attention versus which can wait for the next maintenance window.
URL-based dedup: edge cases and limits
The class deduplicates by URL, which works for the common case where multiple sources link to the same canonical story. A piece about Python 3.13's release might appear via NewsAPI (TechCrunch), Guardian (technology section), and HackerNews (submitted link), all pointing at the same upstream URL; the dedup pass collapses them to one. The cases where this approach falls short are worth knowing about before you ship.
Why not just match on titles?
Your first instinct might be comparing titles. If two articles have the same title, they're duplicates. But titles vary: "Python 3.13 Released" vs. "Python 3.13 Now Available" vs. "New Python 3.13 Release" are all the same story with different headlines. Title matching produces false negatives (misses real duplicates) because of phrasing variations.
URLs are more reliable: if two articles link to https://blog.python.org/2025/01/python-3-13-released.html, they're discussing the same content regardless of how each source titled it. The set-of-seen-URLs implementation in deduplicate_articles at the top of aggregator.py is O(1) per lookup, so it scales to hundreds of articles without issue.
URL-based deduplication works well for most cases, but five edge cases are worth knowing before you ship:
Tracking parameters create false negatives
The same article with different tracking parameters appears as different URLs:
https://techcrunch.com/article?utm_source=newsapi&utm_medium=feed
https://techcrunch.com/article?utm_source=guardian&utm_medium=api
These URLs point to identical content but differ in query parameters. Basic URL comparison treats them as different articles. To handle this, you'd need to normalize URLs by stripping tracking parameters before comparison.
Protocol and subdomain variations
The same resource might appear with minor URL variations:
http://example.com/article
https://example.com/article
https://www.example.com/article
URL normalization could unify these (lowercase domains, remove www, enforce https) but adds complexity. For this aggregator, we accept that these variations might appear as separate articles.
Different URLs for the same story
News outlets often write their own coverage of the same event, producing genuinely different URLs:
https://techcrunch.com/python-3-13-released
https://theverge.com/tech/python-new-version
https://arstechnica.com/programming-language-update
These are three different articles about the same event. URL deduplication correctly treats them as distinct; each outlet wrote their own coverage. This is expected behavior, not a flaw.
First-seen bias
The algorithm keeps whichever instance appears first in the combined list. If NewsAPI returns before Guardian, NewsAPI's version (with its metadata) is retained. This means source ordering affects which metadata survives deduplication.
Alternative: implement "best source" selection. If you prefer Guardian's descriptions over NewsAPI's, keep Guardian's instance when URLs match. This requires explicit preference rules.
When to enhance deduplication
For this news aggregator, simple URL comparison is sufficient. The edge cases above rarely cause problems in practice; most duplicate articles have identical URLs, and tracking parameter variations are uncommon in news APIs.
Consider URL normalization if you notice frequent false negatives in your specific domain. For e-commerce product aggregators or academic paper indexers, more sophisticated deduplication (fuzzy matching, content hashing, title similarity) might be necessary.
The principle remains: start with the simplest approach that solves 90% of cases, then add complexity only when real-world data demonstrates the need.
Takeaways
- Four passes, one orchestrator: sequential fetch (pass 1), per-source containment (pass 2), dedup and sort (pass 3), tagged categories (pass 4). Each pass adds one capability, and each is a complete runnable file
- Graceful degradation: each source is wrapped in its own try/except, so one failure doesn't take the others down -- the deliberate-break verify on pass 2 forces this design to earn its keep
- Tagged failures:
sources_failedentries carry a parenthetical tag ((not configured),(HTTP 401),(HTTP 429),(timeout),(network error)) thatcheck_aggregator_healthmatches against without re-parsing exceptions - URL dedup with edges named: simple set lookup catches the common case; tracking parameters, protocol variants, and different-URL-same-story are flagged as known limits, not silent ones
The next page shows the display layer that presents these unified results to users. The limits page then examines what can still go wrong despite all the defensive programming, motivating Chapter 12's validation approach.