6. Business logic layer: workflow orchestration
In this section you build orchestrator.py, the weather dashboard's business logic layer. The orchestrator coordinates the multi-step workflow (validate the input, geocode it, process the locations, fetch the weather, process the weather): it calls each step in turn, watches what each one returns, and packages the combined result for whoever asked. The orchestrator does none of the work itself. It makes no HTTP calls (those belong to §4's API client) and validates no individual fields (those belong to §5's processors). Its job is sequencing and outcome judgement: folding every possible outcome from every step into a single return shape that the presentation layer can render without re-reading any errors.
If you're landing here directly
The configuration pattern this section references later is just python-dotenv plus a small dataclass: load .env, read values with os.getenv(), and raise a clear startup error if the required API key is missing.
How the orchestrator works
After each step, the orchestrator checks what came back. Where the workflow breaks determines the final result: SUCCESS, PARTIAL_SUCCESS, or FAILURE.
Every path returns the same WorkflowResult shape. Nothing raises exceptions. This is what allows the presentation layer to switch on result.status without needing a single try/except block.
To do this, the orchestrator calls four peer modules:
-
input_validator.py: Business logic layer, introduced in §6 -
weather_api_client.py: API client layer (§4), called twice -
geocoding_processor.py: Data processing layer (§5) -
weather_processor.py: Data processing layer (§5)
WorkflowResult returns to WeatherDashboardApp, which is what hands it to display.py.
The WorkflowResult shape
Up to now you've seen the orchestrator described in terms of steps and outcomes. This is what those outcomes actually look like in code.
No matter what happens in the workflow, the orchestrator always returns the same outer structure (WorkflowResult). Only the status and the populated fields differ:
SUCCESS
WorkflowResult(
status=WorkflowStatus.SUCCESS,
weather_data=WeatherData(...),
locations=[Location(...), Location(...)],
error_message=None,
error_category=None,
warnings=[],
suggestions=[],
processing_metadata={...},
)
All five steps completed. The result contains both the resolved locations and the processed weather data.
PARTIAL_SUCCESS
WorkflowResult(
status=WorkflowStatus.PARTIAL_SUCCESS,
weather_data=None,
locations=[Location(...), Location(...)],
error_message="Weather data unavailable",
error_category="transient",
warnings=[],
suggestions=[
"Location data is still available",
"Try again in a few moments",
],
processing_metadata={...},
)
The location half succeeded but the weather half failed. The user still gets useful data (the resolved place), along with a clear explanation and retry guidance.
FAILURE
WorkflowResult(
status=WorkflowStatus.FAILURE,
weather_data=None,
locations=[],
error_message="Could not process location data",
error_category="not_found",
warnings=[],
suggestions=[
"Check the spelling of the city name",
"Try a more specific location",
],
processing_metadata={...},
)
The workflow failed before a usable location was produced. There is no weather data and no locations, but the result still carries a structured explanation and suggestions for the user.
How this works with the presentation layer
The presentation layer can always ask the same questions: What is the status? Is there weather data? Are there locations? Is there an error message? Are there suggestions?
Because nothing raises exceptions, the UI never needs try/except. It simply switches on result.status and renders what is present.
Update models.py
WorkflowResult goes in models.py alongside Location, CurrentWeather, and WeatherData. They're all there for the same reason: every layer agrees on the shape before passing data across the boundary. §5 added the first three for the data-processing layer; the orchestrator's return type goes in beside them.
Append this to the models.py you wrote in §5:
from enum import Enum
# (dataclass, field, Optional, List, Dict, Any are already imported at the
# top of models.py from section 5.)
class WorkflowStatus(Enum):
"""Status of a workflow execution."""
SUCCESS = "success"
PARTIAL_SUCCESS = "partial_success" # Got location but not weather
FAILURE = "failure"
@dataclass
class WorkflowResult:
"""One return shape for every orchestrator outcome."""
status: WorkflowStatus
# Primary results
weather_data: Optional[WeatherData] = None
locations: List[Location] = field(default_factory=list)
# Error information (populated when status != SUCCESS)
error_message: Optional[str] = None
error_category: Optional[str] = None # Chapter 9's four categories
# User-facing communication
warnings: List[str] = field(default_factory=list)
suggestions: List[str] = field(default_factory=list)
# Diagnostic metadata for operators (populated in pass 4)
processing_metadata: Dict[str, Any] = field(default_factory=dict)
@property
def succeeded(self) -> bool:
return self.status == WorkflowStatus.SUCCESS
@property
def has_weather(self) -> bool:
return self.weather_data is not None
@property
def has_locations(self) -> bool:
return len(self.locations) > 0
Three statuses, not two, for one specific reason: graceful degradation. If geocoding succeeds but the weather endpoint times out, the user should not see "Cannot get weather". They should see "We found London, GB, but could not load conditions. Try again in a moment." A two-state success/failure model cannot carry that nuance, so the presentation layer ends up reinventing the missing third state by string-matching the error message.
Confirm the new dataclass loads before moving on:
python -c "from models import WorkflowResult, WorkflowStatus; print(WorkflowResult(status=WorkflowStatus.FAILURE).status.value)"
failure
One word of output, but it confirms two things at once: the imports resolve and the enum prints the string the presentation layer will eventually branch on.
InputValidator: a small module the orchestrator delegates to
Before any HTTP fires, the orchestrator has to confirm the user's input is something a geocoder can act on. That work is self-contained (no API calls, no shared state), so it lives in its own module. Save this at the project root as input_validator.py:
"""Input validator for user-provided location queries.
Recognises three input shapes and assigns a confidence score the orchestrator
uses when deciding how to handle ambiguous matches:
"London" -> plain city name, confidence 0.65
"San Francisco, CA" -> city + state/country, confidence 0.85
"51.5074,-0.1278" -> coordinate pair, confidence 0.95
"""
import re
from dataclasses import dataclass
class InputValidationError(Exception):
"""Raised when user input cannot be normalised into a usable query."""
@dataclass
class ValidationResult:
"""Successful validation outcome (failures raise InputValidationError)."""
normalized_query: str
confidence: float
class InputValidator:
"""Validates and normalises user-provided location queries."""
MIN_LENGTH = 2
MAX_LENGTH = 100
# Coordinate pattern: optional sign, digits, optional decimal, comma, same again
COORD_PATTERN = re.compile(r"^\s*-?\d+(?:\.\d+)?\s*,\s*-?\d+(?:\.\d+)?\s*$")
def validate_location_input(self, query: str) -> ValidationResult:
if not query or not query.strip():
raise InputValidationError("Location query cannot be empty.")
normalized = query.strip()
if len(normalized) < self.MIN_LENGTH:
raise InputValidationError(
f"Location query is too short ({len(normalized)} chars; "
f"minimum {self.MIN_LENGTH})."
)
if len(normalized) > self.MAX_LENGTH:
raise InputValidationError(
f"Location query is too long ({len(normalized)} chars; "
f"maximum {self.MAX_LENGTH})."
)
# Coordinate pair: highest confidence, no geocoding ambiguity to resolve
if self.COORD_PATTERN.match(normalized):
return ValidationResult(normalized_query=normalized, confidence=0.95)
# City + state/country: comma-separated, second part at least 2 chars
if "," in normalized:
parts = [p.strip() for p in normalized.split(",", 1)]
if all(parts) and len(parts[1]) >= 2:
return ValidationResult(normalized_query=normalized, confidence=0.85)
# Plain city name: usable but ambiguity is the geocoder's problem
return ValidationResult(normalized_query=normalized, confidence=0.65)
Three input shapes, three confidence levels. A coordinate pair leaves the geocoder no work to do (highest confidence). A city with state or country narrows the matches dramatically. A bare city name is usable but the geocoder may return multiple candidates; the orchestrator will pick the highest-confidence match and surface the rest as alternatives.
Save this as verify_input_validator.py and run it. Four lines of test cover the whole API surface:
from input_validator import InputValidator, InputValidationError
v = InputValidator()
print(f"plain city: {v.validate_location_input('London').confidence}")
print(f"city + country: {v.validate_location_input('San Francisco, CA').confidence}")
print(f"coordinate pair: {v.validate_location_input('51.5074,-0.1278').confidence}")
try:
v.validate_location_input("")
except InputValidationError as e:
print(f"empty rejected: {e}")
plain city: 0.65
city + country: 0.85
coordinate pair: 0.95
empty rejected: Location query cannot be empty.
Three confidence levels and one rejection: the validator's whole API surface, exercised in four lines. The orchestrator now has its two prerequisites; the four passes below assemble the workflow on top of them.
Building the orchestrator in passes
You will build the orchestrator in four passes: four runnable versions of orchestrator.py, each extending the last.
This approach makes the architecture visible as it grows. Pass 1 has no API client at all, yet it already handles input-validation errors correctly. Each pass adds one piece of behaviour, so you can see exactly where each responsibility belongs.
Pass 1: the skeleton (input validation only)
The first version of the orchestrator does one thing: validate the user's input. That's enough to be runnable, enough to test, and enough to expose what can't yet happen: the file's only collaborator at this point is InputValidator. The API client and processors land in pass 2. Save this at the project root as orchestrator.py:
"""Weather dashboard orchestrator -- workflow sequencing and outcome judgement.
Built in four passes:
Pass 1 (this file): validate input, return failure on bad input.
Pass 2: add geocoding fetch + processing.
Pass 3: add weather fetch + processing, with PARTIAL_SUCCESS on weather failure.
Pass 4: telemetry, low-confidence warnings, and a failure-result helper.
"""
import time
from models import WorkflowResult, WorkflowStatus
from input_validator import InputValidator, InputValidationError
class WeatherOrchestrator:
def __init__(self, api_key: str):
# Unused in pass 1; stored to keep the constructor signature stable.
self.api_key = api_key
self.input_validator = InputValidator()
def get_weather_for_city(self, city_name: str) -> WorkflowResult:
start_time = time.monotonic()
# STEP 1: validate input
try:
validated = self.input_validator.validate_location_input(city_name)
except InputValidationError as e:
return WorkflowResult(
status=WorkflowStatus.FAILURE,
error_message=str(e),
error_category="user_input",
suggestions=[
"Please enter a city name",
"Examples: London; Paris, FR; 51.5074,-0.1278",
],
processing_metadata={'total_duration': time.monotonic() - start_time},
)
# STUB: steps 2-5 land in subsequent passes.
return WorkflowResult(
status=WorkflowStatus.FAILURE,
error_message=(
f"Validation OK for '{validated.normalized_query}' "
f"(confidence {validated.confidence:.2f}); "
f"downstream steps land in pass 2"
),
error_category='unknown',
processing_metadata={'total_duration': time.monotonic() - start_time},
)
Save this as verify_pass1.py and run it. No API key needed yet; pass 1 makes no network calls:
from orchestrator import WeatherOrchestrator
o = WeatherOrchestrator(api_key="not_used_yet")
result = o.get_weather_for_city("")
print(f"empty input -> {result.status.value}, category={result.error_category}")
result = o.get_weather_for_city("London")
print(f"valid input -> {result.status.value}, category={result.error_category}")
print(f" message: {result.error_message}")
empty input -> failure, category=user_input
valid input -> failure, category=unknown
message: Validation OK for 'London' (confidence 0.65); downstream steps land in pass 2
Step 1 catches user-input errors with the right category before any HTTP fires. Valid input falls through to a deliberate FAILURE with category 'unknown': the message admits it's a scaffold, and the status admits no useful work has happened yet. The shape that matters is already in place (the orchestrator returns WorkflowResult on every path and never raises), and as capabilities accrete across the next three passes, the status arc moves up: FAILURE (here) → PARTIAL_SUCCESS (after pass 2's geocoding) → SUCCESS (after pass 3's weather).
Pass 2: geocode the input and process the response
Pass 2 adds the geocoding fetch and the geocoding processor. After this pass, a valid city resolves to a real Location dataclass; not-found and transient API failures each return FAILURE with the right category; the orchestrator still doesn't know about weather. The additions, in shorthand: three new imports, two new collaborators in __init__, and steps 2 and 3 in place of pass 1's stub return:
# Add to imports at the top of orchestrator.py:
from weather_api_client import WeatherAPIClient
from geocoding_processor import GeocodingProcessor
from validators import ValidationError
# Add these collaborators inside __init__:
self.api_client = WeatherAPIClient(api_key)
self.geocoding_processor = GeocodingProcessor()
# Replace the STUB return at the bottom of get_weather_for_city
# with these two new steps and a new STUB for steps 4-5:
# STEP 2: fetch geocoding data. The API client already retried on transient
# failures internally; if we still have an error, it's terminal at this layer.
# Read the categorised error context directly -- no string parsing, exactly
# the architectural promise §4 made when it surfaced the category as a
# first-class field on APIResult.
geocoding_result = self.api_client.fetch_geocoding(city_name, limit=5)
if not geocoding_result.success:
category = geocoding_result.error_context.get('category', 'unknown')
if category == 'not_found':
message = f"No location found for '{city_name}'"
suggestions = [
"Check the spelling",
"Try adding a country code: e.g. London, GB",
]
elif category == 'transient':
message = f"Could not reach the geocoding service for '{city_name}'"
suggestions = [
"Check your internet connection",
"Try again in a moment",
]
else: # 'unknown' (auth, unexpected 4xx, malformed JSON)
message = f"Geocoding request failed for '{city_name}'"
suggestions = ["Try again or try a different city"]
return WorkflowResult(
status=WorkflowStatus.FAILURE,
error_message=message,
error_category=category,
suggestions=suggestions,
processing_metadata={'total_duration': time.monotonic() - start_time},
)
# STEP 3: process the geocoding response. A ValidationError here means the
# API returned something we can't act on -- treat as 'not found' so the user
# gets actionable guidance.
try:
locations = self.geocoding_processor.process_response(
geocoding_result.data, city_name
)
except ValidationError:
return WorkflowResult(
status=WorkflowStatus.FAILURE,
error_message=f"Could not process location data for '{city_name}'",
error_category='not_found',
suggestions=[
"Check the spelling of the city name",
"Try a more specific name (include country or state)",
],
processing_metadata={'total_duration': time.monotonic() - start_time},
)
# STUB: steps 4-5 (weather) land in pass 3. For now, return what we have
# as PARTIAL_SUCCESS so the test below can confirm the geocoding path works
# end-to-end.
return WorkflowResult(
status=WorkflowStatus.PARTIAL_SUCCESS,
locations=locations,
error_message="Weather steps not yet implemented",
processing_metadata={'total_duration': time.monotonic() - start_time},
)
Full orchestrator.py after pass 2 (about 95 lines, with step 1 unchanged): expand to view or copy fresh
"""Weather dashboard orchestrator -- pass 2: input + geocoding."""
import time
from models import WorkflowResult, WorkflowStatus
from input_validator import InputValidator, InputValidationError
from weather_api_client import WeatherAPIClient
from geocoding_processor import GeocodingProcessor
from validators import ValidationError
class WeatherOrchestrator:
def __init__(self, api_key: str):
self.input_validator = InputValidator()
self.api_client = WeatherAPIClient(api_key)
self.geocoding_processor = GeocodingProcessor()
def get_weather_for_city(self, city_name: str) -> WorkflowResult:
start_time = time.monotonic()
# STEP 1: validate input (unchanged from pass 1)
try:
validated = self.input_validator.validate_location_input(city_name)
except InputValidationError as e:
return WorkflowResult(
status=WorkflowStatus.FAILURE,
error_message=str(e),
error_category="user_input",
suggestions=[
"Please enter a city name",
"Examples: London; Paris, FR; 51.5074,-0.1278",
],
processing_metadata={'total_duration': time.monotonic() - start_time},
)
city_name = validated.normalized_query
# STEP 2: fetch geocoding data (NEW). The API client already retried
# on transient failures internally; if we still have an error, it's
# terminal at this layer. Read the categorised error context directly --
# no string parsing, exactly the architectural promise §4 made when it
# surfaced the category as a first-class field on APIResult.
geocoding_result = self.api_client.fetch_geocoding(city_name, limit=5)
if not geocoding_result.success:
category = geocoding_result.error_context.get('category', 'unknown')
if category == 'not_found':
message = f"No location found for '{city_name}'"
suggestions = [
"Check the spelling",
"Try adding a country code: e.g. London, GB",
]
elif category == 'transient':
message = f"Could not reach the geocoding service for '{city_name}'"
suggestions = [
"Check your internet connection",
"Try again in a moment",
]
else: # 'unknown' (auth, unexpected 4xx, malformed JSON)
message = f"Geocoding request failed for '{city_name}'"
suggestions = ["Try again or try a different city"]
return WorkflowResult(
status=WorkflowStatus.FAILURE,
error_message=message,
error_category=category,
suggestions=suggestions,
processing_metadata={'total_duration': time.monotonic() - start_time},
)
# STEP 3: process the geocoding response (NEW). A ValidationError here
# means the API returned something we can't act on -- treat it the same
# as 'not found' so the user gets actionable guidance.
try:
locations = self.geocoding_processor.process_response(
geocoding_result.data, city_name
)
except ValidationError:
return WorkflowResult(
status=WorkflowStatus.FAILURE,
error_message=f"Could not process location data for '{city_name}'",
error_category='not_found',
suggestions=[
"Check the spelling of the city name",
"Try a more specific name (include country or state)",
],
processing_metadata={'total_duration': time.monotonic() - start_time},
)
# STUB: steps 4-5 (weather) land in pass 3. For now, return what we have
# as PARTIAL_SUCCESS so the test below can confirm the geocoding path
# works end-to-end.
return WorkflowResult(
status=WorkflowStatus.PARTIAL_SUCCESS,
locations=locations,
error_message="Weather steps not yet implemented",
processing_metadata={'total_duration': time.monotonic() - start_time},
)
Save this as verify_pass2.py. Pass 2 makes real network calls, so make sure OPENWEATHER_API_KEY is in your environment (via .env or shell export, see §2). The script reads the key straight from os.environ; §8's config.py adds the fail-fast credential loader: it reads configuration once at startup and raises a clear ValueError immediately when the key is missing, rather than letting a downstream KeyError or confusing 401 appear later.
import os
from orchestrator import WeatherOrchestrator
o = WeatherOrchestrator(api_key=os.environ["OPENWEATHER_API_KEY"])
# Regression: empty input still rejected at step 1
result = o.get_weather_for_city("")
print(f"empty -> {result.status.value}, {result.error_category}")
# New: valid city geocodes
result = o.get_weather_for_city("London")
print(f"London -> {result.status.value}, {len(result.locations)} matches")
print(f" top match: {result.locations[0].display_name()}")
# New: unfindable city -> not_found
result = o.get_weather_for_city("Xqzthlptv99999")
print(f"garbage -> {result.status.value}, {result.error_category}")
empty -> failure, user_input
London -> partial_success, 5 matches
top match: London, England, GB
garbage -> failure, not_found
Three different code paths exercised in three lines: input rejected, valid city resolved to coordinates with five matches, garbage city refused with the right category. The orchestrator now turns a city name into Location objects and surfaces the alternatives the geocoder returned. What's still stubbed: actually fetching the weather. That's pass 3.
Pass 3: fetch weather, surface partial success on failure
Pass 3 adds steps 4 and 5: fetch weather for the highest-confidence location, process the response into a WeatherData, and turn the orchestrator's three-outcome promise into reality. The new branch worth attention is what happens when geocoding succeeds but weather fails: PARTIAL_SUCCESS, with the located coordinates returned to the user so they know the spelling was fine and the retry should target the weather call, not the input. The additions: one new import, one new collaborator in __init__, and steps 4 and 5 in place of pass 2's stub return:
# Add to imports at the top of orchestrator.py:
from weather_processor import WeatherProcessor
# Add this collaborator inside __init__:
self.weather_processor = WeatherProcessor()
# Right after step 3, before the previous STUB return, add this:
primary_location = locations[0]
# Replace the pass-2 STUB return with steps 4-5 and the SUCCESS return:
# STEP 4: fetch weather data. On failure here we return PARTIAL_SUCCESS,
# not FAILURE -- geocoding succeeded, so the user has actionable information
# (the resolved location) and the right next step is "retry the weather
# call", not "retype the city".
weather_result = self.api_client.fetch_weather(
primary_location.latitude, primary_location.longitude
)
if not weather_result.success:
return WorkflowResult(
status=WorkflowStatus.PARTIAL_SUCCESS,
locations=locations,
error_message="Weather data unavailable",
error_category='transient',
suggestions=[
"Weather service is temporarily unavailable",
"Location data is still available above",
"Try again in a few moments",
],
processing_metadata={'total_duration': time.monotonic() - start_time},
)
# STEP 5: process weather response. A processor failure here is also
# PARTIAL_SUCCESS -- the location is good, the weather payload was
# malformed; the user can act on the location and retry.
try:
weather_data = self.weather_processor.process_response(
weather_result.data, primary_location
)
except ValidationError:
return WorkflowResult(
status=WorkflowStatus.PARTIAL_SUCCESS,
locations=locations,
error_message="Weather data validation failed",
error_category='unknown',
suggestions=["Try again shortly"],
processing_metadata={'total_duration': time.monotonic() - start_time},
)
# SUCCESS: every step landed.
return WorkflowResult(
status=WorkflowStatus.SUCCESS,
weather_data=weather_data,
locations=locations,
processing_metadata={'total_duration': time.monotonic() - start_time},
)
Full orchestrator.py after pass 3 (about 120 lines, with steps 1-3 unchanged): expand to view or copy fresh
"""Weather dashboard orchestrator -- pass 3: full five-step workflow."""
import time
from models import WorkflowResult, WorkflowStatus
from input_validator import InputValidator, InputValidationError
from weather_api_client import WeatherAPIClient
from geocoding_processor import GeocodingProcessor
from weather_processor import WeatherProcessor
from validators import ValidationError
class WeatherOrchestrator:
def __init__(self, api_key: str):
self.input_validator = InputValidator()
self.api_client = WeatherAPIClient(api_key)
self.geocoding_processor = GeocodingProcessor()
self.weather_processor = WeatherProcessor()
def get_weather_for_city(self, city_name: str) -> WorkflowResult:
start_time = time.monotonic()
# STEP 1: validate input (unchanged from pass 1)
try:
validated = self.input_validator.validate_location_input(city_name)
except InputValidationError as e:
return WorkflowResult(
status=WorkflowStatus.FAILURE,
error_message=str(e),
error_category="user_input",
suggestions=[
"Please enter a city name",
"Examples: London; Paris, FR; 51.5074,-0.1278",
],
processing_metadata={'total_duration': time.monotonic() - start_time},
)
city_name = validated.normalized_query
# STEP 2: fetch geocoding data (unchanged from pass 2)
geocoding_result = self.api_client.fetch_geocoding(city_name, limit=5)
if not geocoding_result.success:
category = geocoding_result.error_context.get('category', 'unknown')
if category == 'not_found':
message = f"No location found for '{city_name}'"
suggestions = ["Check the spelling", "Try adding a country code: e.g. London, GB"]
elif category == 'transient':
message = f"Could not reach the geocoding service for '{city_name}'"
suggestions = ["Check your internet connection", "Try again in a moment"]
else:
message = f"Geocoding request failed for '{city_name}'"
suggestions = ["Try again or try a different city"]
return WorkflowResult(
status=WorkflowStatus.FAILURE, error_message=message,
error_category=category, suggestions=suggestions,
processing_metadata={'total_duration': time.monotonic() - start_time},
)
# STEP 3: process geocoding response (unchanged from pass 2)
try:
locations = self.geocoding_processor.process_response(
geocoding_result.data, city_name
)
except ValidationError:
return WorkflowResult(
status=WorkflowStatus.FAILURE,
error_message=f"Could not process location data for '{city_name}'",
error_category='not_found',
suggestions=[
"Check the spelling of the city name",
"Try a more specific name (include country or state)",
],
processing_metadata={'total_duration': time.monotonic() - start_time},
)
primary_location = locations[0]
# STEP 4: fetch weather data (NEW). On failure here we return
# PARTIAL_SUCCESS, not FAILURE -- geocoding succeeded, so the user has
# actionable information (the resolved location) and the right next
# step is "retry the weather call", not "retype the city".
weather_result = self.api_client.fetch_weather(
primary_location.latitude, primary_location.longitude
)
if not weather_result.success:
return WorkflowResult(
status=WorkflowStatus.PARTIAL_SUCCESS,
locations=locations,
error_message="Weather data unavailable",
error_category='transient',
suggestions=[
"Weather service is temporarily unavailable",
"Location data is still available above",
"Try again in a few moments",
],
processing_metadata={'total_duration': time.monotonic() - start_time},
)
# STEP 5: process weather response (NEW). A processor failure here is
# also PARTIAL_SUCCESS -- the location is good, the weather payload was
# malformed; the user can act on the location and retry.
try:
weather_data = self.weather_processor.process_response(
weather_result.data, primary_location
)
except ValidationError:
return WorkflowResult(
status=WorkflowStatus.PARTIAL_SUCCESS,
locations=locations,
error_message="Weather data validation failed",
error_category='unknown',
suggestions=["Try again shortly"],
processing_metadata={'total_duration': time.monotonic() - start_time},
)
# SUCCESS: every step landed.
return WorkflowResult(
status=WorkflowStatus.SUCCESS,
weather_data=weather_data,
locations=locations,
processing_metadata={'total_duration': time.monotonic() - start_time},
)
The verify script for pass 3 has two cases. The first is the full-success path. The second is a deliberate break: change the API client's weather_url to a host that doesn't exist, force the weather call to fail, and confirm the orchestrator returns PARTIAL_SUCCESS with the located coordinates rather than throwing the whole request away.
import os
from orchestrator import WeatherOrchestrator
o = WeatherOrchestrator(api_key=os.environ["OPENWEATHER_API_KEY"])
# Full success path
result = o.get_weather_for_city("London")
print(f"London (normal) -> {result.status.value}")
print(f" {result.weather_data.current.temperature:.1f}°C, "
f"{result.weather_data.current.description}")
# Force PARTIAL_SUCCESS by pointing the weather endpoint at a host that
# doesn't exist. The geocoding call still hits the real service.
o.api_client.weather_url = "https://api.openweathermapXX.org/data/2.5/weather"
result = o.get_weather_for_city("London")
print(f"London (broken weather) -> {result.status.value}")
print(f" has_locations={result.has_locations}, has_weather={result.has_weather}")
print(f" top match: {result.locations[0].display_name()}")
The retry loop in the API client will burn a few seconds before giving up on the broken URL, so this run takes longer than the others. That's expected.
London (normal) -> success
12.4°C, Light Rain
London (broken weather) -> partial_success
has_locations=True, has_weather=False
top match: London, England, GB
The break-it run is what makes PARTIAL_SUCCESS earn its keep. The user sees London, England, GB was found correctly, so they know the spelling is fine and a retry won't waste their time. A two-state success/failure model would force them to retype London after every transient weather outage. With three outcomes wired up, the architecture is functionally complete; pass 4 is production polish.
Pass 4: telemetry, low-confidence warnings, and a failure helper
The orchestrator works. Pass 4 adds the metadata production needs: per-step duration so the operator can tell which step is slow, low-confidence and low-quality warnings that surface to the user, and a _create_failure_result helper that consolidates the three FAILURE returns. Pass 4 also adds a third entry to step 3's not_found suggestions list: an Examples: row that completes Chapter 9's three-part error template (what happened, what to do, an example). The step-2 not_found path keeps two rows because its second suggestion already carries the example inline (e.g. London, GB). Earlier passes left the explicit examples row off to keep the structural concerns centre-stage; the Pass 4 orchestrator is the canonical version §7's display layer renders verbatim. The architecture doesn't change (the orchestrator still owns sequencing and judgement, never HTTP and never validation), but the result objects now carry the observability metadata downstream tooling needs.
Add the helper method first. Drop it inside WeatherOrchestrator, after get_weather_for_city:
def _create_failure_result(self, error_message, error_category,
suggestions, processing_steps, start_time):
"""Standardised FAILURE result -- consolidates the three FAILURE returns."""
return WorkflowResult(
status=WorkflowStatus.FAILURE,
error_message=error_message,
error_category=error_category,
suggestions=suggestions,
processing_metadata={
'steps': processing_steps,
'total_duration': time.monotonic() - start_time,
},
)
Then thread per-step telemetry through get_weather_for_city. Each step gets bracketed by a step_start = time.monotonic() at the top and a processing_steps.append({...}) at the bottom. The pattern, applied to step 2:
# At the top of get_weather_for_city, alongside start_time:
processing_steps = []
warnings = []
# Around each step (example shown for step 2):
step_start = time.monotonic()
geocoding_result = self.api_client.fetch_geocoding(city_name, limit=5)
processing_steps.append({
'step': 'geocoding_api',
'duration': time.monotonic() - step_start,
'status': 'success' if geocoding_result.success else 'failed',
})
# After step 3 (location processing), surface a low-confidence warning:
if locations[0].confidence_score < 0.7:
warnings.append(
f"Location match for '{city_name}' has low confidence. "
"Results may be imprecise."
)
# After step 5 (weather processing), surface a low-quality warning:
if weather_data.data_quality_score < 0.8:
warnings.append(
"Weather data quality is lower than normal. "
"Information may be less accurate."
)
warnings.extend(weather_data.validation_warnings)
Apply the pattern to all five steps, replace the three inline FAILURE returns with calls to _create_failure_result, and pass the warnings and processing_steps lists into the final SUCCESS and PARTIAL_SUCCESS returns. The full assembled file is the canonical orchestrator.py the rest of the chapter (and the tests in §9) imports from.
Confirm the metadata is now populated:
import os
from orchestrator import WeatherOrchestrator
o = WeatherOrchestrator(api_key=os.environ["OPENWEATHER_API_KEY"])
result = o.get_weather_for_city("London")
print(f"London -> {result.status.value}")
print(f" steps tracked: {len(result.processing_metadata['steps'])}")
for step in result.processing_metadata['steps']:
print(f" {step['step']:25s} {step['duration']*1000:6.0f}ms ({step['status']})")
print(f" total: {result.processing_metadata['total_duration']:.2f}s")
London -> success
steps tracked: 5
input_validation 0ms (success)
geocoding_api 412ms (success)
location_processing 8ms (success)
weather_api 387ms (success)
weather_processing 5ms (success)
total: 0.81s
Five steps, the geocoding and weather calls dominating the budget (as expected: they're the network), and a single WorkflowResult object carrying it all. The presentation layer in §7 will mostly ignore the per-step durations (the user doesn't care that geocoding took 412ms); the operator-facing dashboards and post-mortem traces depend on them. The same object also carries the user-facing warnings; one return shape, two audiences.
The full assembled orchestrator.py after pass 4: every step wrapped in telemetry, both warnings wired up, and every FAILURE return routed through _create_failure_result. Unlike earlier passes, this one stays inline rather than collapsed behind a details block; it's the canonical version §7 and the test suite in §9 import from, so it earns the screen real estate.
"""Weather dashboard orchestrator -- the assembled five-step workflow.
Owns sequencing and outcome judgement: never HTTP, never validation.
Returns one WorkflowResult on every path -- never raises.
"""
import time
from models import WorkflowResult, WorkflowStatus
from input_validator import InputValidator, InputValidationError
from weather_api_client import WeatherAPIClient
from geocoding_processor import GeocodingProcessor
from weather_processor import WeatherProcessor
from validators import ValidationError
class WeatherOrchestrator:
def __init__(self, api_key: str):
self.input_validator = InputValidator()
self.api_client = WeatherAPIClient(api_key)
self.geocoding_processor = GeocodingProcessor()
self.weather_processor = WeatherProcessor()
def get_weather_for_city(self, city_name: str) -> WorkflowResult:
start_time = time.monotonic()
processing_steps = []
warnings = []
# STEP 1: validate input
step_start = time.monotonic()
try:
validated = self.input_validator.validate_location_input(city_name)
except InputValidationError as e:
return self._create_failure_result(
error_message=str(e),
error_category="user_input",
suggestions=[
"Please enter a city name",
"Examples: London; Paris, FR; 51.5074,-0.1278",
],
processing_steps=processing_steps,
start_time=start_time,
)
city_name = validated.normalized_query
processing_steps.append({
'step': 'input_validation',
'duration': time.monotonic() - step_start,
'status': 'success',
})
# STEP 2: fetch geocoding data
step_start = time.monotonic()
geocoding_result = self.api_client.fetch_geocoding(city_name, limit=5)
processing_steps.append({
'step': 'geocoding_api',
'duration': time.monotonic() - step_start,
'status': 'success' if geocoding_result.success else 'failed',
})
if not geocoding_result.success:
category = geocoding_result.error_context.get('category', 'unknown')
if category == 'not_found':
message = f"No location found for '{city_name}'"
suggestions = [
"Check the spelling",
"Try adding a country code: e.g. London, GB",
]
elif category == 'transient':
message = f"Could not reach the geocoding service for '{city_name}'"
suggestions = [
"Check your internet connection",
"Try again in a moment",
]
else: # 'unknown'
message = f"Geocoding request failed for '{city_name}'"
suggestions = ["Try again or try a different city"]
return self._create_failure_result(
error_message=message,
error_category=category,
suggestions=suggestions,
processing_steps=processing_steps,
start_time=start_time,
)
# STEP 3: process geocoding response
step_start = time.monotonic()
try:
locations = self.geocoding_processor.process_response(
geocoding_result.data, city_name
)
processing_steps.append({
'step': 'location_processing',
'duration': time.monotonic() - step_start,
'status': 'success',
'locations_found': len(locations),
})
if locations[0].confidence_score < 0.7:
warnings.append(
f"Location match for '{city_name}' has low confidence. "
"Results may be imprecise."
)
except ValidationError:
processing_steps.append({
'step': 'location_processing',
'duration': time.monotonic() - step_start,
'status': 'failed',
})
return self._create_failure_result(
error_message=f"Could not process location data for '{city_name}'",
error_category='not_found',
suggestions=[
"Check the spelling of the city name",
"Try a more specific name (include country or state)",
"Examples: 'London, UK' or 'Paris, France'",
],
processing_steps=processing_steps,
start_time=start_time,
)
primary_location = locations[0]
# STEP 4: fetch weather data
step_start = time.monotonic()
weather_result = self.api_client.fetch_weather(
primary_location.latitude, primary_location.longitude
)
processing_steps.append({
'step': 'weather_api',
'duration': time.monotonic() - step_start,
'status': 'success' if weather_result.success else 'failed',
})
if not weather_result.success:
return WorkflowResult(
status=WorkflowStatus.PARTIAL_SUCCESS,
locations=locations,
error_message="Weather data unavailable",
error_category='transient',
warnings=warnings,
suggestions=[
"Weather service is temporarily unavailable",
"Location data is still available above",
"Try again in a few moments",
],
processing_metadata={
'steps': processing_steps,
'total_duration': time.monotonic() - start_time,
},
)
# STEP 5: process weather response
step_start = time.monotonic()
try:
weather_data = self.weather_processor.process_response(
weather_result.data, primary_location
)
processing_steps.append({
'step': 'weather_processing',
'duration': time.monotonic() - step_start,
'status': 'success',
'quality_score': weather_data.data_quality_score,
})
if weather_data.data_quality_score < 0.8:
warnings.append(
"Weather data quality is lower than normal. "
"Information may be less accurate."
)
warnings.extend(weather_data.validation_warnings)
except ValidationError:
processing_steps.append({
'step': 'weather_processing',
'duration': time.monotonic() - step_start,
'status': 'failed',
})
return WorkflowResult(
status=WorkflowStatus.PARTIAL_SUCCESS,
locations=locations,
error_message="Weather data validation failed",
error_category='unknown',
warnings=warnings,
suggestions=["Try again shortly"],
processing_metadata={
'steps': processing_steps,
'total_duration': time.monotonic() - start_time,
},
)
# SUCCESS: every step landed.
return WorkflowResult(
status=WorkflowStatus.SUCCESS,
weather_data=weather_data,
locations=locations,
warnings=warnings,
processing_metadata={
'steps': processing_steps,
'total_duration': time.monotonic() - start_time,
},
)
def _create_failure_result(self, error_message, error_category,
suggestions, processing_steps, start_time):
"""Standardised FAILURE result -- consolidates the three FAILURE returns."""
return WorkflowResult(
status=WorkflowStatus.FAILURE,
error_message=error_message,
error_category=error_category,
suggestions=suggestions,
processing_metadata={
'steps': processing_steps,
'total_duration': time.monotonic() - start_time,
},
)
End-to-end verification
The four-pass build is complete. The test below uses the live OpenWeatherMap API to confirm the assembled orchestrator handles valid and invalid input the way the chapter has described. PARTIAL_SUCCESS isn't re-tested here. Pass 3 already covered it with the deliberate-break run, and re-triggering it would mean temporarily breaking the API client, which doesn't fit a verify-the-finished-system check. Save this at the project root as test_orchestrator.py and run it with python test_orchestrator.py:
"""Test complete workflow orchestration."""
import os
from orchestrator import WeatherOrchestrator
o = WeatherOrchestrator(os.environ["OPENWEATHER_API_KEY"])
print("=== Testing Complete Workflow ===\n")
print("Test 1: Valid city - full success")
result = o.get_weather_for_city("London")
print(f" Status: {result.status.value}")
if result.has_weather:
w = result.weather_data
print(f" Location: {w.location.display_name()}")
print(f" Temperature: {w.current.temperature}°C")
print(f" Conditions: {w.current.description}")
print(f" Quality: {w.data_quality_score:.2f}")
print(f" Processing time: {result.processing_metadata['total_duration']:.2f}s")
print()
print("Test 2: Empty input - validation failure")
result = o.get_weather_for_city("")
print(f" Status: {result.status.value}")
print(f" Error category: {result.error_category}")
print()
print("Test 3: Explicit state disambiguation")
result = o.get_weather_for_city("Springfield, Illinois, US")
print(f" Status: {result.status.value}")
if result.has_locations:
print(f" Locations found: {len(result.locations)}")
print(f" Primary starts with Springfield: {result.locations[0].name == 'Springfield'}")
print(f" Primary country: {result.locations[0].country}")
print(f" Confidence: {result.locations[0].confidence_score:.2f}")
print()
print("Test 4: Non-existent city")
result = o.get_weather_for_city("Xqzthlptv99999")
print(f" Status: {result.status.value}")
print(f" Error category: {result.error_category}")
print(f" Suggestions ({len(result.suggestions)}):")
for s in result.suggestions:
print(f" - {s}")
=== Testing Complete Workflow ===
Test 1: Valid city - full success
Status: success
Location: London, England, GB
Temperature: 15.2°C
Conditions: Light Rain
Quality: 1.00
Processing time: 1.23s
Test 2: Empty input - validation failure
Status: failure
Error category: user_input
Test 3: Explicit state disambiguation
Status: success
Locations found: 5
Primary starts with Springfield: True
Primary country: US
Confidence: 1.00
Test 4: Non-existent city
Status: failure
Error category: not_found
Suggestions (3):
- Check the spelling of the city name
- Try a more specific name (include country or state)
- Examples: 'London, UK' or 'Paris, France'
Four cases land where they should. Five could-fail steps fold into three predictable outcomes. Everything below the orchestrator stays decoupled (the API client doesn't know about workflow, the processors don't know about the API client's error categories), and everything above it gets a uniform WorkflowResult to render. What makes this an orchestrator and not a service is exactly that boundary: it owns sequencing and outcome judgement, but never HTTP and never validation. Section 7 builds the presentation layer, which is the only place in the project that branches on WorkflowResult.status.