5. Errors, timeouts, and limits

Section 4 set up the fan-out. This page hardens it. Partial-failure handling so one dead source does not poison the whole batch; per-request and per-batch timeouts so one slow source does not define the wall-clock; retry-with-backoff so a transient blip does not look like a hard failure; and the testing patterns that keep the whole shape honest under pytest-asyncio. The chapter's standing rule, that an async claim is only as good as the number behind it, applies here twice over: every wall-clock claim derives from a number you can point at, and TaskGroup's fail-fast contract is not the same as gather(..., return_exceptions=True)'s collect-everything one.

Try-except patterns in async functions

Error handling in async code works the same as in synchronous code: you use try-except blocks. The difference is that you must handle errors at the right level to avoid aborting concurrent tasks.

basic_error_handling.py
import asyncio
import httpx

async def fetch_with_error_handling(client, url):
    try:
        response = await client.get(url, timeout=5.0)
        response.raise_for_status()
        return {"success": True, "data": response.json()}
    except httpx.TimeoutException:
        return {"success": False, "error": "timeout"}
    except httpx.HTTPStatusError as exc:
        return {"success": False, "error": f"HTTP {exc.response.status_code}"}
    except Exception as exc:
        return {"success": False, "error": str(exc)}

async def main():
    urls = [
        "https://api.example.com/working",
        "https://api.example.com/slow",
        "https://api.example.com/broken",
    ]
    
    async with httpx.AsyncClient() as client:
        tasks = [fetch_with_error_handling(client, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    # All tasks complete, some may have failed
    for url, result in zip(urls, results):
        if result["success"]:
            print(f"{url}: Success")
        else:
            print(f"{url}: Failed ({result['error']})")

if __name__ == "__main__":
    asyncio.run(main())

By handling errors inside each async function, you prevent one failure from aborting the entire batch. Every task completes with either a success or error result, and you can inspect them all after gather() finishes.

Handling partial failures: some succeed, some fail

In production, you will often fetch from dozens of endpoints where some succeed and others fail. Your code should handle this gracefully: collect the successful results, log the failures, and continue working with whatever data you have.

graceful_degradation.py
import asyncio
import httpx

async def fetch_news_source(client, source_name, url):
    try:
        response = await client.get(url, timeout=5.0)
        response.raise_for_status()
        data = response.json()
        return {
            "source": source_name,
            "success": True,
            "articles": data.get("articles", []),
        }
    except Exception as exc:
        print(f"Failed to fetch {source_name}: {exc}")
        return {
            "source": source_name,
            "success": False,
            "articles": [],
        }

async def aggregate_news():
    sources = {
        "NewsAPI": "https://newsapi.org/v2/top-headlines?country=us&apiKey=...",
        "Guardian": "https://content.guardianapis.com/search?api-key=...",
        "HackerNews": "https://hacker-news.firebaseio.com/v0/topstories.json",
    }
    
    async with httpx.AsyncClient() as client:
        tasks = [
            fetch_news_source(client, name, url)
            for name, url in sources.items()
        ]
        results = await asyncio.gather(*tasks)
    
    # Collect all successful articles
    all_articles = []
    failed_sources = []
    
    for result in results:
        if result["success"]:
            all_articles.extend(result["articles"])
        else:
            failed_sources.append(result["source"])
    
    print(f"Fetched {len(all_articles)} articles from {len(results) - len(failed_sources)} sources")
    if failed_sources:
        print(f"Failed sources: {', '.join(failed_sources)}")
    
    return all_articles

if __name__ == "__main__":
    asyncio.run(aggregate_news())

This pattern is called graceful degradation. If one news source is down, you still show articles from the other sources. The user gets a slightly degraded experience rather than a complete failure.

The asyncio.gather(return_exceptions=True) pattern

Default asyncio.gather() still carries the no-sibling-cancellation gotcha from Section 3: the first exception raises, the survivors keep running. If you want gather to collect every result regardless of failures, pass return_exceptions=True: every position in the result list is either a value or an exception object, and nothing raises.

gather_with_exceptions.py
import asyncio
import httpx

async def fetch_data(client, url):
    response = await client.get(url)
    response.raise_for_status()
    return response.json()

async def main():
    urls = [
        "https://api.example.com/good",
        "https://api.example.com/broken",
        "https://api.example.com/also-good",
    ]
    
    async with httpx.AsyncClient(timeout=5.0) as client:
        tasks = [fetch_data(client, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # results contains both successful data and Exception objects
    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            print(f"{url}: Failed with {type(result).__name__}: {result}")
        else:
            print(f"{url}: Success, got {len(result)} items")

if __name__ == "__main__":
    asyncio.run(main())

This is useful when you want to inspect each failure individually. You can log detailed error information, retry specific failures, or decide how to handle each case based on the exception type.

When to use each pattern

The two shapes (errors handled inside vs collected at the gather boundary) are not interchangeable. Handle errors inside the coroutine when you want structured responses every task agrees on, like a {"success": True/False, "data": ...} dict that downstream code can iterate without isinstance checks. Use return_exceptions=True when you want to inspect every exception centrally -- when the failure mode is "log the exception type, then carry on with the successes."

The keystone in Section 6 picks the structured-response shape: each fetcher returns a dict with a success boolean. That keeps the aggregator's reducer flat (no isinstance branches) and makes the success/failure ratio trivially observable. return_exceptions=True is the right shape when you do not control the inner function signature, or when the work each task does is heterogeneous.

Retry logic with exponential backoff (async version)

You learned retry patterns in Chapter 9. The async version is nearly identical, but uses asyncio.sleep() instead of time.sleep() so that you do not block the event loop.

async_retry.py
import asyncio
import httpx
import random

async def fetch_with_retry(client, url, max_retries=3):
    for attempt in range(max_retries):
        try:
            response = await client.get(url, timeout=5.0)
            response.raise_for_status()
            return response.json()
        except (httpx.TimeoutException, httpx.HTTPStatusError) as exc:
            if attempt == max_retries - 1:
                raise  # Re-raise on final attempt
            
            # Exponential backoff with jitter
            wait_time = (2 ** attempt) + random.uniform(0, 1)
            print(f"Retry {attempt + 1}/{max_retries} after {wait_time:.2f}s")
            await asyncio.sleep(wait_time)

async def main():
    async with httpx.AsyncClient() as client:
        data = await fetch_with_retry(client, "https://api.example.com/unreliable")
        print("Success:", data)

if __name__ == "__main__":
    asyncio.run(main())

The pattern is the same as synchronous retry logic, but using await asyncio.sleep() instead of blocking sleep keeps the event loop responsive. You can run many retry operations concurrently without blocking each other.

Testing async code with pytest-asyncio

Testing async code requires some special setup. The pytest-asyncio plugin lets you write async test functions that pytest will run properly.

Install pytest-asyncio:

Terminal
pip install pytest-asyncio
test_async_basic.py
import os
import pytest
import httpx

OPENWEATHER_KEY = os.environ.get("OPENWEATHER_KEY", "")


async def fetch_weather(city):
    async with httpx.AsyncClient() as client:
        response = await client.get(
            "https://api.openweathermap.org/data/2.5/weather",
            params={"q": city, "appid": OPENWEATHER_KEY},
        )
        response.raise_for_status()
        return response.json()


@pytest.mark.skipif(not OPENWEATHER_KEY, reason="OPENWEATHER_KEY not set")
@pytest.mark.asyncio
async def test_fetch_weather():
    # Live network test; skipped automatically when OPENWEATHER_KEY is absent.
    result = await fetch_weather("London")
    assert "main" in result
    assert "temp" in result["main"]

The @pytest.mark.asyncio decorator tells pytest that this is an async test function; pytest runs it in an event loop and awaits the coroutine. The skipif guard above it keeps the test honest when the API key is not set: the assertion runs against a real OpenWeather response if you have OPENWEATHER_KEY in your environment, and the test is skipped (rather than failing with a 401) when you do not. That is the standard shape for a live-network test: enable it on machines that have the credentials, skip it everywhere else.

Sync vs async tests: key differences

  • Sync test. Call the function directly with no special markers.
  • Async test. Mark with @pytest.mark.asyncio and use await when calling the coroutine.
  • Common error. If you see RuntimeError: no running event loop, you forgot the @pytest.mark.asyncio marker or are mixing sync and async at the call boundary.

Mocking async API calls with pytest-mock

For unit tests, mock the API calls rather than making real network requests. There is one async-specific gotcha to name up front: AsyncMock automatically makes every attribute access async. If you set response = AsyncMock() and then write response.json() inside fetch_weather, the call returns a coroutine object, not a dict, and result["main"] blows up with TypeError: 'coroutine' object is not subscriptable. The fix is to split the two roles: MagicMock for the response object (its methods are plain sync calls), AsyncMock only for the awaited transport call (httpx.AsyncClient.get). The patch below shows the shape.

test_async_mock.py
import pytest
from unittest.mock import AsyncMock, MagicMock
import httpx


async def fetch_weather(city):
    async with httpx.AsyncClient() as client:
        response = await client.get(
            "https://api.openweathermap.org/data/2.5/weather",
            params={"q": city, "appid": "fake-key-for-tests"},
        )
        response.raise_for_status()
        return response.json()


@pytest.mark.asyncio
async def test_fetch_weather_mocked(mocker):
    # The response object is a regular MagicMock: .json() returns a dict
    # synchronously (not a coroutine), and .raise_for_status() returns None.
    mock_response = MagicMock()
    mock_response.json.return_value = {
        "main": {"temp": 280.15},
        "weather": [{"description": "clear sky"}],
    }
    mock_response.raise_for_status.return_value = None

    # httpx.AsyncClient.get is itself a coroutine, so the patch needs to
    # be an AsyncMock that returns the (sync) response object when awaited.
    mocker.patch(
        "httpx.AsyncClient.get",
        new=AsyncMock(return_value=mock_response),
    )

    result = await fetch_weather("London")

    assert result["main"]["temp"] == 280.15
    assert result["weather"][0]["description"] == "clear sky"

By mocking the HTTP call, your test runs instantly and does not depend on external services. This makes your test suite fast and reliable.

AsyncMock is in the standard library unittest.mock from Python 3.8 onwards, so you do not need an extra dependency for it. pytest-mock handles the patching ergonomics, but the standard-library import works the same way.

Testing concurrent operations and race conditions

When testing async code that runs multiple tasks concurrently, you should verify that all tasks complete successfully and that there are no race conditions.

test_concurrent.py
import pytest
import asyncio
from unittest.mock import AsyncMock

async def fetch_multiple(urls):
    async def fetch_one(url):
        await asyncio.sleep(0.1)  # Simulate network delay
        return {"url": url, "data": "response"}
    
    results = await asyncio.gather(*[fetch_one(url) for url in urls])
    return results

@pytest.mark.asyncio
async def test_concurrent_fetches():
    urls = [f"https://api.example.com/{i}" for i in range(5)]
    
    results = await fetch_multiple(urls)
    
    # Verify all requests completed
    assert len(results) == 5
    
    # Verify results are in correct order
    for i, result in enumerate(results):
        assert result["url"] == urls[i]

This test verifies that concurrent operations complete successfully and return results in the expected order. For more complex tests, you might verify that a semaphore correctly limits concurrency or that rate limiting spreads requests over time.

Measuring async performance improvements

To prove that async provides value, you should measure actual performance improvements. Compare the time taken for sync versus async implementations.

benchmark.py
import time
import asyncio
import pytest

def sync_fetch(url):
    time.sleep(0.5)  # Simulate network delay
    return f"result from {url}"

async def async_fetch(url):
    await asyncio.sleep(0.5)  # Simulate network delay
    return f"result from {url}"

def test_sync_performance():
    urls = [f"https://api.example.com/{i}" for i in range(10)]
    
    start = time.time()
    results = [sync_fetch(url) for url in urls]
    elapsed = time.time() - start
    
    assert len(results) == 10
    print(f"Sync: {elapsed:.2f} seconds")  # ~5 seconds

@pytest.mark.asyncio
async def test_async_performance():
    urls = [f"https://api.example.com/{i}" for i in range(10)]
    
    start = time.time()
    results = await asyncio.gather(*[async_fetch(url) for url in urls])
    elapsed = time.time() - start
    
    assert len(results) == 10
    print(f"Async: {elapsed:.2f} seconds")  # ~0.5 seconds

Run these tests with pytest -s to see the printed times. The async version should be roughly 10x faster because all requests run concurrently instead of sequentially.

Section 6 puts the patterns from this page together in the news aggregator: structured per-source responses (so the reducer is a flat for-loop with no isinstance branches), per-request timeouts (so one slow source cannot define the wall-clock), and the partial-failure shape running end to end across three real news APIs.