4. Concurrent fan-out
Section 3 set up httpx.AsyncClient for one request at a time. This page is where the request count grows past one and the structured-concurrency primitives earn their keep. Two primitives do the bulk of the work: asyncio.TaskGroup (Python 3.11+) is the modern primary, with automatic cancellation and clean exception aggregation; asyncio.gather is the older API you will still meet in existing codebases. A semaphore bounds the parallelism so a fifty-source fan-out does not burst through API rate limits, and per-request timeouts keep one slow source from defining the wall-clock for the whole batch.
asyncio.TaskGroup: structured concurrency, fail-fast by default
asyncio.TaskGroup (Python 3.11+) is the structured-concurrency primitive: tasks are owned by the async with block, exceptions surface as an ExceptionGroup when the block exits, and a single failure cancels the still-running siblings automatically. The lifetime is the block. That is what makes it the primary fan-out tool when fail-fast is the failure mode you want.
Save this as concurrent_taskgroup.py. It fetches weather for three cities concurrently using a TaskGroup and reads the results out after the block closes:
import asyncio
import httpx
async def fetch_weather(client, city):
response = await client.get(
f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid=YOUR_KEY"
)
response.raise_for_status()
return response.json()
async def main():
cities = ["London", "Paris", "Tokyo"]
async with httpx.AsyncClient(timeout=10.0) as client:
async with asyncio.TaskGroup() as group:
tasks = [
group.create_task(fetch_weather(client, city))
for city in cities
]
# The async with block has exited; every task is done.
results = [task.result() for task in tasks]
for city, weather in zip(cities, results):
print(f"{city}: {weather['main']['temp']}K")
if __name__ == "__main__":
asyncio.run(main())
Three properties of TaskGroup carry the weight of this code:
- Task lifetime equals block lifetime. Once the
async withblock exits, every task it spawned has either completed or been cancelled. There is no orphan-task category to worry about. - Exception aggregation as
ExceptionGroup. If two tasks fail concurrently, both exceptions surface together when the block exits (PEP 654's structured exception type). You catch them withexcept* httpx.HTTPError as eg:and iterateeg.exceptions; you never lose a failure. - Fail-fast cancellation. When any task raises, the others get cancelled automatically. If the first weather API to return is a 404, the other two stop on the next
await. This is the right shape when the fan-out is a single unit of work (the dashboard, the aggregator, the multi-source query). It is the wrong shape when you want to collect every result regardless of failures -- that case belongs togather(..., return_exceptions=True), which is next.
asyncio.gather: the legacy-compat alternative
asyncio.gather predates TaskGroup and stays in service for two reasons. It works on Python 3.10 and older, and its return_exceptions=True mode collects results and exceptions into one list rather than raising. The third behaviour is the one to keep in view: as Section 3 showed, default gather does not cancel the siblings when one task raises; the survivors keep running, which is why TaskGroup's cancel-on-failure contract is the modern default. Same fan-out, looser lifecycle: pass coroutines as positional arguments, get back a list of results (or exceptions, if you asked for them).
import asyncio
import httpx
async def fetch_weather(client, city):
response = await client.get(
f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid=YOUR_KEY"
)
response.raise_for_status()
return response.json()
async def main():
cities = ["London", "Paris", "Tokyo"]
async with httpx.AsyncClient(timeout=10.0) as client:
tasks = [fetch_weather(client, city) for city in cities]
# First exception propagates immediately; siblings keep running
# in the background and any later exceptions are discarded.
results = await asyncio.gather(*tasks)
# Collect-everything: failures become elements of the result list
# results = await asyncio.gather(*tasks, return_exceptions=True)
for city, weather in zip(cities, results):
print(f"{city}: {weather['main']['temp']}K")
if __name__ == "__main__":
asyncio.run(main())
The two modes are different tools. Default gather carries the no-sibling-cancellation gotcha from Section 3: the first exception raises, the survivors keep running. If you reach for default gather, you need to handle that asymmetry: either ensure your inner coroutines catch their own exceptions, or wrap the call so the surviving tasks are cancelled explicitly. return_exceptions=True sidesteps the issue entirely: every task runs to completion, every position in the result list is either a value or an exception object, and nothing raises.
For new code where the failure shape is fail-fast with structured cancellation, reach for TaskGroup. When the failure shape is collect-everything, reach for gather(..., return_exceptions=True). The third shape -- inner coroutines that catch and return structured-response dicts -- is the keystone's choice in Section 6: failures arrive at the reducer as values, not as exception objects, so default gather is safe to use. Section 5 covers when each pattern is the right call.
Limiting concurrency with semaphores
Running hundreds of concurrent requests is fast, but it can overwhelm the server or trigger rate limits. A semaphore lets you limit how many tasks run at the same time. Think of it as a bouncer at a club: only N people can be inside at once, and new people must wait until someone leaves.
import asyncio
import httpx
async def fetch_with_semaphore(client, url, semaphore):
async with semaphore:
# Only this many requests run concurrently
response = await client.get(url)
response.raise_for_status()
return response.json()
async def main():
urls = [f"https://api.example.com/item/{i}" for i in range(100)]
# Allow at most 10 concurrent requests
semaphore = asyncio.Semaphore(10)
async with httpx.AsyncClient(timeout=10.0) as client:
tasks = [fetch_with_semaphore(client, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
if __name__ == "__main__":
asyncio.run(main())
Even though you create 100 tasks, only 10 run at any given moment. As each request finishes, the semaphore releases a slot and the next waiting task starts. This keeps your code fast while respecting server limits.
Timeout and cancellation handling
In production, some requests will be slow or hang indefinitely. You should always set timeouts to prevent your application from waiting forever. Both httpx and asyncio support timeouts.
import asyncio
import httpx
async def fetch_with_timeout(url):
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(url)
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
print(f"Request to {url} timed out")
return None
except httpx.HTTPStatusError as exc:
print(f"HTTP error {exc.response.status_code} for {url}")
return None
async def main():
urls = ["https://api.example.com/slow", "https://api.example.com/fast"]
# Set a timeout for the entire gather operation
try:
results = await asyncio.wait_for(
asyncio.gather(*[fetch_with_timeout(url) for url in urls]),
timeout=10.0
)
except asyncio.TimeoutError:
print("Overall operation timed out")
results = []
return results
if __name__ == "__main__":
asyncio.run(main())
This code has two layers of timeout protection: individual requests time out after 5 seconds, and the entire gather() operation times out after 10 seconds. This prevents one slow endpoint from blocking your entire application.
Rate limiting in async applications
Many APIs have rate limits: for example, "100 requests per minute" or "10 requests per second." If you send too many requests too quickly, the API will reject them. A semaphore limits concurrency but does not spread requests over time. For rate limiting, you need to add delays.
import asyncio
import httpx
import time
class RateLimiter:
def __init__(self, max_calls, period):
"""Allow max_calls requests per period (in seconds)."""
self.max_calls = max_calls
self.period = period
self.calls = []
async def acquire(self):
now = time.time()
# Remove calls outside the current period
self.calls = [call_time for call_time in self.calls if now - call_time < self.period]
if len(self.calls) >= self.max_calls:
# Wait until the oldest call expires
sleep_time = self.period - (now - self.calls[0])
await asyncio.sleep(sleep_time)
self.calls.pop(0)
self.calls.append(now)
async def fetch_with_rate_limit(client, url, rate_limiter):
await rate_limiter.acquire()
response = await client.get(url)
response.raise_for_status()
return response.json()
async def main():
urls = [f"https://api.example.com/item/{i}" for i in range(50)]
# Allow 10 requests per second
rate_limiter = RateLimiter(max_calls=10, period=1.0)
async with httpx.AsyncClient(timeout=10.0) as client:
tasks = [fetch_with_rate_limit(client, url, rate_limiter) for url in urls]
results = await asyncio.gather(*tasks)
return results
if __name__ == "__main__":
asyncio.run(main())
This rate limiter spreads requests over time to stay close to the allowed rate. The pattern teaches the shape (sliding-window timestamps with an await-sleep on the oldest entry), but it does have a race: under high concurrency, multiple coroutines can pass the len(self.calls) >= self.max_calls check before any of them mutates state, and the rate can be exceeded briefly. For production use, aiolimiter handles that edge case (and a few others around token refill and bursts) more gracefully. The teach-the-pattern version above is the right shape to read; the production version is the right shape to pip-install.
Progress tracking for multiple concurrent operations
When you run many concurrent tasks, it is useful to track progress. You can use asyncio.as_completed() to process results as they finish, rather than waiting for all tasks to complete.
import asyncio
import httpx
async def fetch_data(client, url):
response = await client.get(url)
response.raise_for_status()
return response.json()
async def main():
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
async with httpx.AsyncClient(timeout=10.0) as client:
tasks = [fetch_data(client, url) for url in urls]
# Process results as they complete
results = []
for i, coro in enumerate(asyncio.as_completed(tasks), start=1):
result = await coro
results.append(result)
print(f"Completed {i}/{len(tasks)} requests")
return results
if __name__ == "__main__":
asyncio.run(main())
This pattern is useful for long-running batch operations where you want to show progress to the user or log intermediate results. Each task completes independently, and you handle them in the order they finish rather than the order you started them.
Section 5 picks up the partial-failure patterns this page set aside, the retry-with-backoff shape for transient errors, and the pytest-asyncio testing discipline that keeps the fan-out honest in CI.