3. Phase 1: core implementation

Phase 1 is the heads-down build. Models, endpoints, auth, validation, tests, all running locally. By the end of this phase you've got a working API on localhost.

External API integration

  • API credentials. All three external APIs need credentials in hand before any code runs. The setup is 15-20 minutes total. The keys you obtain here are the same ones the deployed system will use; production swaps the storage from a local .env file to AWS Secrets Manager (covered in Phase 2), but the credentials themselves don't change.

NewsAPI Setup.

  1. Visit newsapi.org and register for a free account
  2. Verify your email address
  3. Navigate to your account page to find your API key
  4. Free tier provides: 100 requests/day, articles from last 30 days
  5. Note: Production use requires paid tier ($449/month), but free tier works for this project

The Guardian API Setup.

  1. Visit open-platform.theguardian.com and register
  2. Create a new application in your dashboard
  3. Copy your API key (called "API Key" or "Developer Key")
  4. Free tier provides: 500 requests/day, full archive access, no rate limit per second
  5. Much more generous than NewsAPI for learning purposes

Reddit API Setup.

  1. Visit reddit.com/prefs/apps (requires Reddit account)
  2. Click "Create App" or "Create Another App"
  3. Choose type: "web app" (enables OAuth redirect)
  4. Set redirect URI: http://localhost:8000/auth/reddit/callback (for local dev)
  5. Note your client ID (under the app name) and client secret
  6. Free tier: 60 requests/minute per OAuth token
  • Store credentials securely. Create a .env file in your project root. Add it to .gitignore immediately before your first commit. This prevents accidentally committing secrets to Git.
.env (gitignored)
# .env - NEVER COMMIT THIS FILE
NEWSAPI_KEY=your_newsapi_key_here
GUARDIAN_API_KEY=your_guardian_key_here
REDDIT_CLIENT_ID=your_reddit_client_id
REDDIT_CLIENT_SECRET=your_reddit_secret
REDDIT_REDIRECT_URI=http://localhost:8000/auth/reddit/callback
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/news_platform
REDIS_URL=redis://localhost:6379
Never commit secrets

Committing API keys to GitHub is a common mistake with a slow recovery. Public-repo scrapers find committed keys within minutes; deleting the commit doesn't help because the keys stay in the repository's history until you rewrite it. Add .env to .gitignore before the first commit, not after.

  • Project dependencies. The Python environment needs a fixed set of packages before the API clients can run. The versions below are the ones the chapter was tested against; pinning them keeps the development environment reproducible across machines and matches what the Docker image installs in Phase 2.
requirements.txt
# Core Framework
fastapi==0.104.1
uvicorn[standard]==0.24.0

# Async HTTP Client
httpx==0.25.1

# Database
sqlalchemy==2.0.23
alembic==1.12.1
psycopg2-binary==2.9.9

# Caching
redis==5.0.1

# Configuration & Validation
pydantic==2.5.0
pydantic-settings==2.1.0
python-dotenv==1.0.0

# Testing
pytest==7.4.3
pytest-asyncio==0.21.1
pytest-cov==4.1.0

# Optional: For Extensions
# textblob==0.17.1          # Sentiment analysis
# celery==5.3.4             # Background jobs
# sendgrid==6.10.0          # Email digests

Install dependencies with pip install -r requirements.txt. Use a virtual environment (python -m venv venv) to isolate project dependencies from your system Python.

  • Database schema, in one place. The complete schema for Phase 1 plus the sentiment-analysis extension Phase 3 walks through. SQLAlchemy models and Alembic migrations come later in this section; the raw DDL is what the rest of the chapter reads against.
Complete Database Schema (5 tables with relationships - click to expand)
schema.sql (complete)
-- Users Table: OAuth authentication and user accounts
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    reddit_id VARCHAR(100) UNIQUE,          -- Reddit user ID from OAuth
    username VARCHAR(100) NOT NULL,
    email VARCHAR(255),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    last_login TIMESTAMP
);

CREATE INDEX idx_users_reddit_id ON users(reddit_id);

-- Articles Table: Aggregated news from all sources
CREATE TABLE articles (
    id SERIAL PRIMARY KEY,
    source VARCHAR(50) NOT NULL,            -- 'newsapi', 'guardian', 'reddit'
    external_id VARCHAR(255) NOT NULL,      -- Source's article ID
    title TEXT NOT NULL,
    description TEXT,
    content TEXT,
    author VARCHAR(255),
    url TEXT NOT NULL,
    image_url TEXT,
    published_at TIMESTAMP NOT NULL,
    fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(source, external_id)             -- Prevent duplicates
);

-- Performance indexes
CREATE INDEX idx_articles_source ON articles(source);
CREATE INDEX idx_articles_published ON articles(published_at DESC);
CREATE INDEX idx_articles_fetched ON articles(fetched_at DESC);

-- Full-text search index
CREATE INDEX idx_articles_title_search ON articles USING GIN(to_tsvector('english', title));
CREATE INDEX idx_articles_content_search ON articles USING GIN(to_tsvector('english', content));

-- User Preferences: Personalization settings
CREATE TABLE user_preferences (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    preferred_sources TEXT[],               -- Array: ['newsapi', 'guardian']
    keywords TEXT[],                        -- Array: ['python', 'ai', 'climate']
    categories TEXT[],                      -- Array: ['technology', 'science']
    notification_enabled BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_user_preferences_user_id ON user_preferences(user_id);

-- Search History: Track user searches for analytics
CREATE TABLE search_history (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id) ON DELETE SET NULL,  -- NULL for anonymous
    query TEXT NOT NULL,
    results_count INTEGER DEFAULT 0,
    searched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_search_history_user_id ON search_history(user_id);
CREATE INDEX idx_search_history_searched_at ON search_history(searched_at DESC);

-- Article Sentiments: Extension feature for sentiment analysis
CREATE TABLE article_sentiments (
    id SERIAL PRIMARY KEY,
    article_id INTEGER NOT NULL REFERENCES articles(id) ON DELETE CASCADE,
    polarity FLOAT NOT NULL,                -- -1.0 (negative) to 1.0 (positive)
    subjectivity FLOAT NOT NULL,            -- 0.0 (objective) to 1.0 (subjective)
    sentiment_label VARCHAR(20),            -- 'positive', 'negative', 'neutral'
    analyzed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(article_id)
);

CREATE INDEX idx_article_sentiments_article_id ON article_sentiments(article_id);
CREATE INDEX idx_article_sentiments_label ON article_sentiments(sentiment_label);

-- Relationships Summary:
-- users (1) ←→ (many) user_preferences
-- users (1) ←→ (many) search_history
-- articles (1) ←→ (1) article_sentiments
Schema design decisions

Source deduplication: The UNIQUE(source, external_id) constraint prevents the same article from being stored twice if fetched multiple times.

Full-text search: GIN indexes on title and content enable fast PostgreSQL full-text search using to_tsvector().

Cascading deletes: ON DELETE CASCADE ensures that deleting a user automatically removes their preferences and sentiments. ON DELETE SET NULL preserves anonymous search history.

Performance indexes: Indexes on published_at DESC and fetched_at DESC optimize the common query pattern of "most recent articles first."

Array columns: PostgreSQL arrays for preferred_sources and keywords avoid a many-to-many junction table for this simple use case.

  • Three per-source clients, one internal shape. One client class per external API: NewsAPIClient, GuardianAPIClient, RedditAPIClient. Each owns its own auth scheme, request format, response parsing, and error handling, and each normalises into the same internal article shape (title, description, url, source, published_at, image_url). The rest of the application sees one shape; the source-shape mess stays bounded inside the client classes.

The build order goes from simplest auth to hardest: NewsAPI (API key in a header), then the Guardian (API key in a query parameter, plus a nested response envelope), then Reddit (OAuth 2.0 client-credentials flow plus bearer token).

  • NewsAPI client. Async HTTP via httpx, API key in the X-Api-Key header, response normalised into the internal article shape. NewsAPI's response wraps the article list as articles[]; the client unwraps it and maps each field.
Complete implementation: NewsAPI client (78 lines, click to expand)
app/services/newsapi.py
"""
NewsAPI client for fetching articles.
API Documentation: https://newsapi.org/docs/endpoints/everything
"""
import httpx
from typing import List, Optional
from app.config import settings


class NewsAPIClient:
    """Client for NewsAPI.org external API."""

    BASE_URL = "https://newsapi.org/v2"

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.client = httpx.AsyncClient(timeout=30.0)

    async def search_articles(
        self, 
        query: str, 
        language: str = "en",
        sort_by: str = "relevancy",
        page_size: int = 20
    ) -> List[dict]:
        """
        Search articles using NewsAPI everything endpoint.

        Args:
            query: Search keywords
            language: Two-letter language code
            sort_by: 'relevancy', 'popularity', or 'publishedAt'
            page_size: Results per page (max 100)

        Returns:
            List of article dictionaries in standardized format

        Raises:
            httpx.HTTPError: On request failure
        """
        url = f"{self.BASE_URL}/everything"
        params = {
            "q": query,
            "language": language,
            "sortBy": sort_by,
            "pageSize": min(page_size, 100),  # API maximum is 100
            "apiKey": self.api_key
        }

        response = await self.client.get(url, params=params)
        response.raise_for_status()  # Raises exception for 4xx/5xx

        data = response.json()

        # Check API-specific error format
        if data.get("status") != "ok":
            raise Exception(f"NewsAPI error: {data.get('message', 'Unknown error')}")

        # Transform to standardized format
        articles = []
        for article in data.get("articles", []):
            articles.append({
                "title": article.get("title"),
                "description": article.get("description"),
                "url": article.get("url"),
                "source": "newsapi",
                "source_name": article.get("source", {}).get("name"),
                "author": article.get("author"),
                "published_at": article.get("publishedAt"),
                "image_url": article.get("urlToImage"),
                "content": article.get("content")
            })

        return articles

    async def close(self):
        """Close the HTTP client."""
        await self.client.aclose()

Async HTTP client: Using httpx.AsyncClient enables concurrent requests later. When you fetch from all three APIs simultaneously, async saves 400-600ms per request.

Standardized response format: External APIs return different structures. Your client transforms NewsAPI's format into a standard schema all clients share. This makes your FastAPI endpoints simpler because they work with one format regardless of source.

Error handling: The raise_for_status() call converts HTTP errors into exceptions. Your endpoint error handlers catch these and return appropriate user-facing errors (Chapter 9's patterns).

Timeout configuration: 30-second timeout prevents hanging requests. NewsAPI typically responds in 200-400ms, so 30s is generous while protecting against indefinite hangs.

  • Guardian client. No auth in the request itself (the API key goes in as a query parameter), but the response is wrapped one layer deeper: response.results[] rather than a top-level articles[]. The client unwraps the envelope and maps Guardian's field names (webTitle, webUrl, webPublicationDate) into the same internal shape.
Complete implementation: Guardian client (89 lines, click to expand)
app/services/guardian.py
"""
Guardian API client for fetching articles.
API Documentation: https://open-platform.theguardian.com/documentation/
"""
import httpx
from typing import List, Optional
from datetime import datetime


class GuardianAPIClient:
    """Client for The Guardian Open Platform API."""

    BASE_URL = "https://content.guardianapis.com"

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.client = httpx.AsyncClient(timeout=30.0)

    async def search_articles(
        self,
        query: str,
        page_size: int = 20,
        order_by: str = "relevance"
    ) -> List[dict]:
        """
        Search articles using Guardian content search endpoint.

        Args:
            query: Search keywords
            page_size: Results per page (max 50)
            order_by: 'newest', 'oldest', or 'relevance'

        Returns:
            List of article dictionaries in standardized format
        """
        url = f"{self.BASE_URL}/search"
        params = {
            "q": query,
            "page-size": min(page_size, 50),  # Guardian's max
            "order-by": order_by,
            "show-fields": "bodyText,thumbnail,byline",  # Request extra fields
            "api-key": self.api_key
        }

        response = await self.client.get(url, params=params)
        response.raise_for_status()

        data = response.json()

        # Check Guardian-specific response format
        if data.get("response", {}).get("status") != "ok":
            raise Exception(f"Guardian API error: {data.get('message', 'Unknown error')}")

        # Transform to standardized format
        articles = []
        for article in data.get("response", {}).get("results", []):
            fields = article.get("fields", {})

            articles.append({
                "title": article.get("webTitle"),
                "description": self._truncate_text(fields.get("bodyText", ""), 300),
                "url": article.get("webUrl"),
                "source": "guardian",
                "source_name": "The Guardian",
                "author": fields.get("byline"),
                "published_at": article.get("webPublicationDate"),
                "image_url": fields.get("thumbnail"),
                "content": fields.get("bodyText")
            })

        return articles

    def _truncate_text(self, text: str, max_length: int) -> str:
        """Truncate text to max_length, ending at word boundary."""
        if len(text) <= max_length:
            return text

        truncated = text[:max_length]
        # Find last space to avoid cutting mid-word
        last_space = truncated.rfind(' ')
        if last_space > 0:
            truncated = truncated[:last_space]

        return truncated + "..."

    async def close(self):
        """Close the HTTP client."""
        await self.client.aclose()

Response nesting: Guardian wraps results in response.results[] instead of top-level articles[]. Your client handles this so endpoints don't need to.

Field mapping: Guardian calls the title webTitle and publication date webPublicationDate. Your standardized format uses title and published_at consistently.

Description generation: Guardian doesn't provide a separate description field. You generate it by truncating bodyText to 300 characters, breaking at word boundaries (better UX than cutting mid-word).

Optional fields: Some Guardian articles lack thumbnails or bylines. Your code handles missing fields gracefully with .get() default values.

  • Reddit client with OAuth 2.0. Four moving pieces rather than one: build the authorisation URL, exchange the returned authorisation code for an access plus refresh token, refresh the access token when it expires, then issue authenticated requests. The class below implements the full authorisation-code flow so the chapter has the OAuth path on file rather than abstracted away in a library.
Complete implementation: Reddit OAuth client (195 lines, click to expand)
app/services/reddit.py
"""
Reddit API client with OAuth 2.0 authentication.
API Documentation: https://www.reddit.com/dev/api/
OAuth Documentation: https://github.com/reddit-archive/reddit/wiki/OAuth2
"""
import httpx
import base64
from typing import List, Optional, Dict
from datetime import datetime, timedelta


class RedditAPIClient:
    """Client for Reddit API with OAuth 2.0 support."""

    BASE_URL = "https://oauth.reddit.com"
    AUTH_URL = "https://www.reddit.com/api/v1/authorize"
    TOKEN_URL = "https://www.reddit.com/api/v1/access_token"

    def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
        self.client = httpx.AsyncClient(timeout=30.0)

    def get_authorization_url(self, state: str) -> str:
        """
        Generate OAuth authorization URL for user to visit.

        Args:
            state: Random string for CSRF protection (validate on callback)

        Returns:
            URL to redirect user to for Reddit authorization
        """
        params = {
            "client_id": self.client_id,
            "response_type": "code",
            "state": state,
            "redirect_uri": self.redirect_uri,
            "duration": "permanent",  # Request refresh token
            "scope": "identity read"  # Permissions we need
        }

        # Build query string manually for proper encoding
        query = "&".join(f"{k}={v}" for k, v in params.items())
        return f"{self.AUTH_URL}?{query}"

    async def exchange_code_for_token(self, code: str) -> Dict[str, any]:
        """
        Exchange authorization code for access token.

        Args:
            code: Authorization code from OAuth callback

        Returns:
            Dictionary with 'access_token', 'refresh_token', 'expires_in'

        Raises:
            Exception: If token exchange fails
        """
        # Reddit requires Basic auth with client credentials
        auth_string = f"{self.client_id}:{self.client_secret}"
        auth_bytes = auth_string.encode('utf-8')
        auth_b64 = base64.b64encode(auth_bytes).decode('utf-8')

        headers = {
            "Authorization": f"Basic {auth_b64}",
            "User-Agent": "NewsIntelligencePlatform/1.0"  # Required by Reddit
        }

        data = {
            "grant_type": "authorization_code",
            "code": code,
            "redirect_uri": self.redirect_uri
        }

        response = await self.client.post(
            self.TOKEN_URL,
            headers=headers,
            data=data
        )
        response.raise_for_status()

        token_data = response.json()

        # Calculate expiration time
        expires_in = token_data.get("expires_in", 3600)  # Usually 1 hour
        token_data["expires_at"] = datetime.utcnow() + timedelta(seconds=expires_in)

        return token_data

    async def refresh_access_token(self, refresh_token: str) -> Dict[str, any]:
        """
        Use refresh token to get new access token.

        Args:
            refresh_token: Refresh token from initial authorization

        Returns:
            Dictionary with new 'access_token' and 'expires_in'
        """
        auth_string = f"{self.client_id}:{self.client_secret}"
        auth_bytes = auth_string.encode('utf-8')
        auth_b64 = base64.b64encode(auth_bytes).decode('utf-8')

        headers = {
            "Authorization": f"Basic {auth_b64}",
            "User-Agent": "NewsIntelligencePlatform/1.0"
        }

        data = {
            "grant_type": "refresh_token",
            "refresh_token": refresh_token
        }

        response = await self.client.post(
            self.TOKEN_URL,
            headers=headers,
            data=data
        )
        response.raise_for_status()

        token_data = response.json()
        expires_in = token_data.get("expires_in", 3600)
        token_data["expires_at"] = datetime.utcnow() + timedelta(seconds=expires_in)

        return token_data

    async def search_subreddit(
        self,
        access_token: str,
        subreddit: str = "news",
        query: str = "",
        limit: int = 20
    ) -> List[dict]:
        """
        Search a subreddit for posts matching query.

        Args:
            access_token: Valid OAuth access token
            subreddit: Subreddit to search (without r/ prefix)
            query: Search keywords
            limit: Number of results (max 100)

        Returns:
            List of post dictionaries in standardized format
        """
        url = f"{self.BASE_URL}/r/{subreddit}/search"

        headers = {
            "Authorization": f"Bearer {access_token}",
            "User-Agent": "NewsIntelligencePlatform/1.0"
        }

        params = {
            "q": query,
            "limit": min(limit, 100),
            "sort": "relevance",
            "restrict_sr": "true",  # Search only this subreddit
            "type": "link"  # Only link posts, not text posts
        }

        response = await self.client.get(url, headers=headers, params=params)
        response.raise_for_status()

        data = response.json()

        # Transform Reddit's complex structure
        articles = []
        for child in data.get("data", {}).get("children", []):
            post = child.get("data", {})

            # Skip if not a valid article
            if post.get("is_self") or not post.get("url"):
                continue

            articles.append({
                "title": post.get("title"),
                "description": post.get("selftext", "")[:300] or None,
                "url": post.get("url"),
                "source": "reddit",
                "source_name": f"r/{subreddit}",
                "author": f"u/{post.get('author')}",
                "published_at": datetime.fromtimestamp(post.get("created_utc", 0)).isoformat(),
                "image_url": post.get("thumbnail") if post.get("thumbnail", "").startswith("http") else None,
                "content": None  # Reddit doesn't provide full content
            })

        return articles

    async def close(self):
        """Close the HTTP client."""
        await self.client.aclose()

Step 1 - Authorization: Your app generates an authorization URL and redirects the user. User logs into Reddit, grants permissions, and Reddit redirects back to your redirect_uri with an authorization code.

Step 2 - Token Exchange: Your backend exchanges the authorization code for an access token and refresh token. The access token expires in 1 hour. The refresh token lasts indefinitely (until user revokes).

Step 3 - Token Refresh: When the access token expires, use the refresh token to get a new access token without requiring user interaction. This enables seamless long-term access.

Basic Authentication: Reddit requires HTTP Basic auth (Base64-encoded client_id:client_secret) for token endpoints. The base64.b64encode() call creates this header.

User-Agent Requirement: Reddit requires all requests to include a User-Agent header. Requests without it are rejected with 429 Too Many Requests.

The three client classes are the structural backbone of Phase 1. The remaining Phase 1 pieces -- SQLAlchemy models and Alembic migrations against the schema above; the FastAPI endpoints that call the clients in parallel via asyncio.gather(); OAuth callback handling that closes the Reddit flow; Redis caching wired into the search endpoint; the pytest suite -- compose the same primitives already covered in detail in Chapters 24-26 (SQLAlchemy and Alembic), 26 (FastAPI, Pydantic, dependency injection), 9 (error handling), and 26 (testing patterns). The chapter trusts you to compose them rather than rewalking each.

Next, in section 4, we take the working local app and put it on AWS: Docker Compose for the local stack, then the Ch27-29 pipeline -- multi-stage Dockerfile, ECR, ECS Fargate, RDS, ALB, CloudWatch, GitHub Actions -- targeting the capstone's repo and service names.