
Orchestration Architectures: Supervisor, Router & Hierarchical Patterns for Multi-Agent Systems
Build production orchestration for multi-agent systems — supervisor routing with LLM classification, parallel fan-out with error recovery, event-driven coordination, and hierarchical delegation. Includes comparison matrix and combined architecture example.
Building one capable agent is useful. Building multiple specialized agents that work together reliably is a fundamentally different problem. Once you have a billing agent, a technical support agent, an account agent, and a fraud agent, the question stops being 'how does each agent work?' and becomes 'how does work move through the system?' That coordination layer is called orchestration, and getting it wrong means your multi-agent system is just a collection of isolated specialists with no teamwork.
This guide covers six orchestration patterns — supervisor, router, sequential pipeline, parallel fan-out, event-driven, and hierarchical — with production-grade code for each. We'll use one running example throughout: a customer service platform where a user says "My API calls are failing with a 429 error, I was charged twice, and I can't log into my account." This request spans three domains and forces the orchestration layer to make real decisions about routing, parallelism, and result aggregation.
Before you build this
Our customer service system has five specialized agents. BillingAgent handles invoices, refunds, and subscription changes. TechnicalSupportAgent handles API errors, bugs, and troubleshooting. AccountAgent handles login, password resets, and profile updates. PolicyAgent checks whether actions comply with company rules. ResponseAgent turns internal results into a user-facing answer.
When the user says "My API calls are failing with a 429 error, I was charged twice, and I can't log into my account," three of these agents need to be involved. The orchestration layer must decide: does it route to one agent at a time? Run all three in parallel? Use a supervisor to coordinate? The answer depends on the pattern you choose.
The supervisor pattern uses one central orchestrator that receives every request, classifies the intent using an LLM, dispatches tasks to specialized workers, waits for results, and aggregates them into a final response. It has the big picture and makes all coordination decisions.
The critical part of a supervisor is the classification step. A naive implementation checks for keywords like if "charged twice" in request — that breaks on anything remotely creative the user might type. A production supervisor uses an LLM to classify intent, detect multi-intent requests, and reformulate queries for each specialist.
import json
import asyncio
from openai import AsyncOpenAI
from dataclasses import dataclass
from typing import Callable, Any
client = AsyncOpenAI()
@dataclass
class WorkerAgent:
name: str
description: str
system_prompt: str
model: str = "gpt-4o-mini"
tools: list[dict] | None = None
tool_fns: dict[str, Callable] | None = None
class SupervisorOrchestrator:
"""Supervisor with LLM-based intent classification and
parallel dispatch to specialized workers."""
def __init__(
self,
workers: list[WorkerAgent],
supervisor_model: str = "gpt-4o"
):
self.workers = {w.name: w for w in workers}
self.supervisor_model = supervisor_model
async def _classify(self, user_message: str) -> dict:
"""LLM-based multi-intent classification."""
worker_desc = "\n".join(
f"- {w.name}: {w.description}"
for w in self.workers.values()
)
response = await client.chat.completions.create(
model=self.supervisor_model,
messages=[
{
"role": "system",
"content": (
"Classify the user's request. It may contain "
"MULTIPLE intents — detect all of them.\n\n"
f"Available workers:\n{worker_desc}\n\n"
"Return JSON:\n"
'{"intents": [{"worker": "name", '
'"confidence": 0.0-1.0, '
'"task": "reformulated query for this worker"}], '
'"can_parallelize": true/false}'
)
},
{"role": "user", "content": user_message}
],
response_format={"type": "json_object"}
)
return json.loads(response.choices[0].message.content)
async def _run_worker(
self,
worker: WorkerAgent,
task: str,
timeout: float = 30.0
) -> dict:
"""Run a single worker with its own ReAct loop."""
messages = [
{"role": "system", "content": worker.system_prompt},
{"role": "user", "content": task}
]
try:
for _ in range(8):
kwargs = {"model": worker.model, "messages": messages}
if worker.tools:
kwargs["tools"] = worker.tools
kwargs["tool_choice"] = "auto"
response = await asyncio.wait_for(
client.chat.completions.create(**kwargs),
timeout=timeout
)
msg = response.choices[0].message
messages.append(msg)
if not msg.tool_calls:
return {
"worker": worker.name,
"success": True,
"result": msg.content or ""
}
for tc in msg.tool_calls:
fn_name = tc.function.name
fn_args = json.loads(tc.function.arguments)
if worker.tool_fns and fn_name in worker.tool_fns:
result = worker.tool_fns[fn_name](**fn_args)
else:
result = f"Tool {fn_name} not available"
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": str(result)
})
return {"worker": worker.name, "success": False,
"result": "Max iterations reached"}
except asyncio.TimeoutError:
return {"worker": worker.name, "success": False,
"result": "Worker timed out"}
except Exception as e:
return {"worker": worker.name, "success": False,
"result": f"Error: {e}"}
async def _aggregate(
self,
user_message: str,
worker_results: list[dict]
) -> str:
"""Synthesize multiple worker outputs into one response."""
successful = [r for r in worker_results if r["success"]]
failed = [r for r in worker_results if not r["success"]]
results_text = "\n\n".join(
f"[{r['worker']}]: {r['result']}" for r in successful
)
failure_note = ""
if failed:
names = ", ".join(r["worker"] for r in failed)
failure_note = (
f"\n\nNote: {names} failed to complete. "
f"Address what you can and note what's unresolved."
)
response = await client.chat.completions.create(
model=self.supervisor_model,
messages=[
{
"role": "system",
"content": (
"Combine specialist outputs into one coherent "
"response for the user. Don't mention internal "
"routing or agent names. If some specialists "
"failed, acknowledge what couldn't be resolved."
)
},
{
"role": "user",
"content": (
f"User asked: {user_message}\n\n"
f"Specialist results:\n{results_text}"
f"{failure_note}"
)
}
]
)
return response.choices[0].message.content
async def handle(self, user_message: str) -> str:
"""Full pipeline: classify → dispatch → aggregate."""
# Step 1: Classify (detect all intents)
routing = await self._classify(user_message)
intents = routing.get("intents", [])
can_parallel = routing.get("can_parallelize", False)
# Filter by confidence
qualified = [
i for i in intents if i.get("confidence", 0) >= 0.4
]
if not qualified:
return (
"I'm not sure how to help with that. "
"Could you provide more details?"
)
# Step 2: Dispatch
if can_parallel and len(qualified) > 1:
# Run workers concurrently
tasks = [
self._run_worker(
self.workers[i["worker"]],
i["task"]
)
for i in qualified
if i["worker"] in self.workers
]
results = await asyncio.gather(*tasks)
else:
# Run sequentially
results = []
for i in qualified:
if i["worker"] in self.workers:
r = await self._run_worker(
self.workers[i["worker"]], i["task"]
)
results.append(r)
# Step 3: Aggregate
return await self._aggregate(user_message, list(results))Notice what this supervisor handles that a naive implementation doesn't: multi-intent detection (the user's request spans three domains), confidence filtering (low-confidence intents get dropped), parallel vs sequential dispatch (the LLM decides whether tasks are independent), worker timeouts, partial failure in aggregation (if one worker dies, the others still contribute), and tool execution within each worker's own ReAct loop.
Use a supervisor when requests frequently span multiple domains and need coordination — the supervisor's big-picture view is essential for decomposing complex requests and merging results. Avoid it when most requests go to a single agent, because you're paying for an extra LLM call (the classification step) on every request even when routing is obvious. The supervisor also becomes a single point of failure and a latency bottleneck as traffic grows.
The router is the supervisor's lighter sibling. It classifies intent and sends the request to one specialist, then gets out of the way. No multi-agent coordination, no result merging — the selected agent handles the full request independently. This is the right pattern when 80% of requests map cleanly to a single domain.
import json
from openai import AsyncOpenAI
from dataclasses import dataclass
client = AsyncOpenAI()
class RouterOrchestrator:
"""Lightweight router: classify once, dispatch once."""
def __init__(self, agents: dict, model: str = "gpt-4o-mini"):
self.agents = agents
self.model = model
async def _classify(self, request: str) -> dict:
agent_list = "\n".join(
f"- {name}" for name in self.agents.keys()
)
response = await client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": (
f"Route this request to ONE agent.\n\n"
f"Agents:\n{agent_list}\n\n"
f"Return JSON: "
f'{{"agent": "name", "confidence": 0.0-1.0, '
f'"is_multi_domain": false}}'
)
},
{"role": "user", "content": request}
],
response_format={"type": "json_object"}
)
return json.loads(response.choices[0].message.content)
async def handle(self, request: str) -> dict:
routing = await self._classify(request)
agent_name = routing["agent"]
confidence = routing.get("confidence", 0)
is_multi = routing.get("is_multi_domain", False)
# Escalate multi-domain requests to supervisor
if is_multi:
return {
"escalated": True,
"reason": "Multi-domain request detected",
"original_routing": routing
}
if confidence < 0.5:
return {
"route": "fallback",
"response": "Could you provide more details "
"so I can route you to the right team?"
}
agent = self.agents.get(agent_name)
if not agent:
return {"route": "error", "response": f"Unknown agent: {agent_name}"}
result = await agent.handle(request)
return {
"route": agent_name,
"confidence": confidence,
"result": result
}The key design choice here: when the router detects a multi-domain request, it doesn't try to handle it — it escalates to a supervisor. This lets you compose patterns: router handles the common case fast, supervisor handles the complex case thoroughly.
A pipeline passes work through a fixed sequence of stages, each transforming the output of the previous stage. This works when tasks have a natural order — you need to verify identity before changing account settings, or validate policy before issuing a refund. The implementation needs to handle validation between stages and retry logic when a stage produces insufficient output.
import asyncio
import time
from dataclasses import dataclass
from typing import Callable, Any, Optional
from openai import AsyncOpenAI
client = AsyncOpenAI()
@dataclass
class PipelineStage:
name: str
system_prompt: str
model: str = "gpt-4o-mini"
validate: Callable[[str], bool] | None = None
max_retries: int = 1
timeout_seconds: float = 30.0
@dataclass
class StageResult:
stage: str
output: str
success: bool
retries_used: int
elapsed_ms: float
error: str | None = None
class SequentialPipeline:
"""Pipeline with validation, retries, and stage-level metrics."""
def __init__(self, stages: list[PipelineStage]):
self.stages = stages
async def _run_stage(
self,
stage: PipelineStage,
input_text: str
) -> StageResult:
start = time.time()
for attempt in range(stage.max_retries + 1):
try:
response = await asyncio.wait_for(
client.chat.completions.create(
model=stage.model,
messages=[
{"role": "system", "content": stage.system_prompt},
{"role": "user", "content": input_text}
]
),
timeout=stage.timeout_seconds
)
output = response.choices[0].message.content or ""
# Validate output if a validator exists
if stage.validate and not stage.validate(output):
if attempt < stage.max_retries:
# Retry with feedback
input_text = (
f"{input_text}\n\n"
f"[Previous attempt was insufficient. "
f"Be more thorough.]"
)
continue
return StageResult(
stage=stage.name, output=output,
success=False, retries_used=attempt,
elapsed_ms=(time.time() - start) * 1000,
error="Validation failed after retries"
)
return StageResult(
stage=stage.name, output=output,
success=True, retries_used=attempt,
elapsed_ms=(time.time() - start) * 1000
)
except asyncio.TimeoutError:
if attempt == stage.max_retries:
return StageResult(
stage=stage.name, output="",
success=False, retries_used=attempt,
elapsed_ms=(time.time() - start) * 1000,
error="Stage timed out"
)
return StageResult(
stage=stage.name, output="",
success=False, retries_used=stage.max_retries,
elapsed_ms=(time.time() - start) * 1000,
error="Exhausted retries"
)
async def run(
self,
initial_input: str,
abort_on_failure: bool = True
) -> dict:
current_input = initial_input
results: list[StageResult] = []
for stage in self.stages:
result = await self._run_stage(stage, current_input)
results.append(result)
if not result.success and abort_on_failure:
return {
"success": False,
"failed_at": stage.name,
"error": result.error,
"completed_stages": [
r.stage for r in results if r.success
],
"final_output": current_input
}
if result.success:
current_input = result.output
return {
"success": True,
"final_output": current_input,
"stages": [
{
"name": r.stage,
"success": r.success,
"retries": r.retries_used,
"ms": round(r.elapsed_ms)
}
for r in results
]
}
# ── Example: Refund processing pipeline ──────────────────────
refund_pipeline = SequentialPipeline([
PipelineStage(
name="extract_details",
system_prompt=(
"Extract: customer ID, transaction IDs, charge amounts, "
"and dates from this request. Return structured JSON."
),
validate=lambda out: len(out) > 50 and "{" in out
),
PipelineStage(
name="verify_charges",
system_prompt=(
"Given the extracted transaction details, verify which "
"charges are duplicates. Explain your reasoning."
),
model="gpt-4o" # Stronger model for analysis
),
PipelineStage(
name="check_policy",
system_prompt=(
"Given the verified duplicate charges, check against "
"refund policy: within 90-day window, not previously "
"refunded, account in good standing. Return a decision."
),
validate=lambda out: "approved" in out.lower() or "denied" in out.lower()
),
PipelineStage(
name="compose_response",
system_prompt=(
"Write a customer-facing response about the refund "
"decision. Be clear, empathetic, and include next steps."
),
validate=lambda out: len(out) > 100
)
])This pipeline handles what the simple for stage in stages version doesn't: per-stage validation (did the extraction actually produce JSON?), retries with feedback (the retry tells the model its previous attempt was insufficient), per-stage timeouts, abort-on-failure with partial results, and timing metrics for observability.
Fan-out sends independent tasks to multiple agents concurrently. The hard part isn't the parallelism — that's just asyncio.gather. The hard part is what happens when some agents succeed and others fail, and how you merge heterogeneous results into one coherent response.
import asyncio
import time
from dataclasses import dataclass
from openai import AsyncOpenAI
client = AsyncOpenAI()
@dataclass
class FanOutResult:
agent_name: str
response: str
success: bool
elapsed_ms: float
error: str | None = None
async def run_agent_with_timeout(
name: str,
system_prompt: str,
task: str,
model: str = "gpt-4o-mini",
timeout: float = 30.0
) -> FanOutResult:
"""Run one agent with timeout and error capture."""
start = time.time()
try:
response = await asyncio.wait_for(
client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": task}
],
max_tokens=1500
),
timeout=timeout
)
return FanOutResult(
agent_name=name,
response=response.choices[0].message.content or "",
success=True,
elapsed_ms=(time.time() - start) * 1000
)
except asyncio.TimeoutError:
return FanOutResult(
agent_name=name, response="",
success=False,
elapsed_ms=(time.time() - start) * 1000,
error=f"{name} timed out after {timeout}s"
)
except Exception as e:
return FanOutResult(
agent_name=name, response="",
success=False,
elapsed_ms=(time.time() - start) * 1000,
error=f"{name} error: {type(e).__name__}: {e}"
)
async def fan_out_and_merge(
user_message: str,
agent_configs: list[dict],
merge_model: str = "gpt-4o",
min_success_ratio: float = 0.5
) -> dict:
"""Dispatch to all agents in parallel, then merge results."""
# Fan out
tasks = [
run_agent_with_timeout(
name=cfg["name"],
system_prompt=cfg["system_prompt"],
task=user_message,
model=cfg.get("model", "gpt-4o-mini"),
timeout=cfg.get("timeout", 30.0)
)
for cfg in agent_configs
]
results = await asyncio.gather(*tasks)
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
# Check minimum success threshold
if len(successful) / len(results) < min_success_ratio:
return {
"success": False,
"error": "Too many agents failed",
"failed": [
{"agent": r.agent_name, "error": r.error}
for r in failed
]
}
# Merge successful results
perspectives = "\n\n".join(
f"=== {r.agent_name} ({r.elapsed_ms:.0f}ms) ===\n{r.response}"
for r in successful
)
failure_note = ""
if failed:
names = ", ".join(r.agent_name for r in failed)
failure_note = f"\n\nNote: {names} failed to respond."
merge_response = await client.chat.completions.create(
model=merge_model,
messages=[
{
"role": "system",
"content": (
"Synthesize these specialist responses into one "
"coherent answer. Resolve contradictions by noting "
"the disagreement. Don't mention agent names."
)
},
{
"role": "user",
"content": (
f"Question: {user_message}\n\n"
f"Responses:\n{perspectives}{failure_note}"
)
}
]
)
return {
"success": True,
"response": merge_response.choices[0].message.content,
"agents_used": [r.agent_name for r in successful],
"agents_failed": [r.agent_name for r in failed],
"timings": {
r.agent_name: round(r.elapsed_ms)
for r in results
}
}In event-driven orchestration, agents react to events instead of being told what to do by a central controller. One agent publishes duplicate_charge_confirmed, and other agents that subscribe to that event kick off their own work. This creates loose coupling — agents don't need to know about each other, only about the events they care about. The trade-off: control flow becomes implicit and harder to trace.
import asyncio
import time
from dataclasses import dataclass, field
from typing import Callable, Any
from datetime import datetime
import uuid
import json
@dataclass
class Event:
name: str
payload: dict
source: str
event_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
correlation_id: str = ""
@dataclass
class EventRecord:
event: Event
handlers_invoked: list[str]
results: list[dict]
elapsed_ms: float
class EventBus:
"""Event bus with dead letter queue, handler timeouts,
and full event history for tracing."""
def __init__(self, handler_timeout: float = 15.0):
self._handlers: dict[str, list[tuple[str, Callable]]] = {}
self._history: list[EventRecord] = []
self._dead_letters: list[dict] = []
self._timeout = handler_timeout
def subscribe(
self,
event_name: str,
handler_name: str,
handler: Callable
):
"""Subscribe a named handler to an event."""
self._handlers.setdefault(event_name, []).append(
(handler_name, handler)
)
async def publish(self, event: Event) -> EventRecord:
"""Publish an event and run all subscribers."""
start = time.time()
handlers = self._handlers.get(event.name, [])
results = []
invoked = []
for handler_name, handler in handlers:
invoked.append(handler_name)
try:
result = await asyncio.wait_for(
handler(event),
timeout=self._timeout
)
results.append({
"handler": handler_name,
"success": True,
"result": result
})
except asyncio.TimeoutError:
self._dead_letters.append({
"event": event.name,
"handler": handler_name,
"error": "timeout",
"event_id": event.event_id
})
results.append({
"handler": handler_name,
"success": False,
"error": "timeout"
})
except Exception as e:
self._dead_letters.append({
"event": event.name,
"handler": handler_name,
"error": str(e),
"event_id": event.event_id
})
results.append({
"handler": handler_name,
"success": False,
"error": str(e)
})
record = EventRecord(
event=event,
handlers_invoked=invoked,
results=results,
elapsed_ms=(time.time() - start) * 1000
)
self._history.append(record)
return record
def trace(self, correlation_id: str) -> list[EventRecord]:
"""Get all events in a workflow by correlation ID."""
return [
r for r in self._history
if r.event.correlation_id == correlation_id
]
@property
def dead_letters(self) -> list[dict]:
return list(self._dead_letters)
# ── Usage: refund workflow as events ──────────────────────────
bus = EventBus(handler_timeout=10.0)
async def on_charge_verified(event: Event):
"""PolicyAgent reacts to billing verification."""
if event.payload.get("is_duplicate"):
await bus.publish(Event(
name="refund_eligible",
payload={"amount": event.payload["amount"]},
source="policy_agent",
correlation_id=event.correlation_id
))
return {"policy": "approved"}
async def on_refund_eligible(event: Event):
"""FraudAgent reacts to refund eligibility."""
# Run fraud check
return {"risk": "low", "approved": True}
async def on_refund_eligible_notify(event: Event):
"""NotificationAgent reacts to refund eligibility."""
return {"notification": "queued"}
bus.subscribe("charge_verified", "policy_agent", on_charge_verified)
bus.subscribe("refund_eligible", "fraud_agent", on_refund_eligible)
bus.subscribe("refund_eligible", "notification_agent", on_refund_eligible_notify)This event bus handles what the 11-line version doesn't: handler timeouts (a slow handler doesn't block the system), dead letter tracking (failed events are captured for debugging), named handlers (you can see which subscriber failed), full event history with correlation IDs (you can trace an entire workflow), and concurrent event propagation.
When your system has 15+ agents across multiple domains, a single supervisor becomes unwieldy. Hierarchical orchestration adds layers: a meta-orchestrator delegates to domain supervisors, each of which manages their own team of specialist workers. This mirrors how large organizations work — a CEO delegates to VPs, VPs delegate to managers.
import json
import asyncio
from openai import AsyncOpenAI
from dataclasses import dataclass
client = AsyncOpenAI()
@dataclass
class DomainSupervisor:
"""Mid-level supervisor managing a team of specialists."""
name: str
domain: str
description: str
workers: list[dict] # [{name, system_prompt, model}]
model: str = "gpt-4o-mini"
async def handle(self, task: str) -> dict:
"""Route task to the best specialist in this domain."""
worker_list = "\n".join(
f"- {w['name']}: {w.get('description', '')}"
for w in self.workers
)
# Internal routing within this domain
routing = await client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content":
f"You manage the {self.domain} team.\n"
f"Workers:\n{worker_list}\n\n"
f"Select the best worker and reformulate the task. "
f'Return JSON: {{"worker": "name", "task": "..."}}'},
{"role": "user", "content": task}
],
response_format={"type": "json_object"}
)
decision = json.loads(routing.choices[0].message.content)
worker = next(
(w for w in self.workers if w["name"] == decision["worker"]),
self.workers[0]
)
response = await client.chat.completions.create(
model=worker.get("model", "gpt-4o-mini"),
messages=[
{"role": "system", "content": worker["system_prompt"]},
{"role": "user", "content": decision.get("task", task)}
]
)
return {
"domain": self.domain,
"worker": decision["worker"],
"result": response.choices[0].message.content
}
class MetaOrchestrator:
"""Top-level orchestrator managing domain supervisors."""
def __init__(
self,
supervisors: list[DomainSupervisor],
model: str = "gpt-4o"
):
self.supervisors = {s.name: s for s in supervisors}
self.model = model
async def handle(self, user_request: str) -> str:
sup_desc = "\n".join(
f"- {s.name} ({s.domain}): {s.description}"
for s in self.supervisors.values()
)
# Decompose into sub-tasks per domain
plan_response = await client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content":
f"You manage these teams:\n{sup_desc}\n\n"
f"Decompose the request into sub-tasks per team. "
f'Return JSON: {{"subtasks": [{{"team": "name", '
f'"task": "description"}}], '
f'"can_parallelize": true/false}}'},
{"role": "user", "content": user_request}
],
response_format={"type": "json_object"}
)
plan = json.loads(plan_response.choices[0].message.content)
subtasks = plan.get("subtasks", [])
can_parallel = plan.get("can_parallelize", False)
# Execute subtasks
if can_parallel:
coros = [
self.supervisors[st["team"]].handle(st["task"])
for st in subtasks
if st["team"] in self.supervisors
]
results = await asyncio.gather(*coros, return_exceptions=True)
# Convert exceptions to error dicts
results = [
r if isinstance(r, dict)
else {"domain": "unknown", "worker": "unknown",
"result": f"Error: {r}"}
for r in results
]
else:
results = []
for st in subtasks:
if st["team"] in self.supervisors:
try:
r = await self.supervisors[st["team"]].handle(st["task"])
results.append(r)
except Exception as e:
results.append({"domain": st["team"],
"worker": "unknown",
"result": f"Error: {e}"})
# Synthesize
results_text = "\n\n".join(
f"[{r.get('domain', '?')} → {r.get('worker', '?')}]\n"
f"{r.get('result', '')}"
for r in results
)
synthesis = await client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content":
"Synthesize these team results into one coherent "
"response. Don't mention teams or internal routing."},
{"role": "user", "content":
f"Request: {user_request}\n\nResults:\n{results_text}"}
]
)
return synthesis.choices[0].message.content
# ── Build a hierarchical system ───────────────────────────────
billing_team = DomainSupervisor(
name="billing",
domain="Billing & Payments",
description="Invoices, refunds, subscriptions, charges",
workers=[
{"name": "refund_specialist",
"description": "Processes refunds and duplicate charges",
"system_prompt": "You handle refunds. Verify charges, check policy, calculate amounts."},
{"name": "subscription_specialist",
"description": "Plan changes, upgrades, cancellations",
"system_prompt": "You handle subscription changes. Compare plans, process upgrades."}
]
)
tech_team = DomainSupervisor(
name="technical",
domain="Technical Support",
description="API errors, bugs, integration issues",
workers=[
{"name": "api_specialist",
"description": "API errors, rate limits, authentication",
"system_prompt": "You diagnose API issues. Check error codes, rate limits, auth."},
{"name": "integration_specialist",
"description": "SDK setup, webhooks, third-party integrations",
"system_prompt": "You help with integrations. Guide setup, debug webhooks."}
]
)
meta = MetaOrchestrator([billing_team, tech_team])Choosing the right pattern depends on your specific constraints. This matrix compares all six across the dimensions that matter most in production.
| Pattern | Latency | Complexity | Multi-Domain | Debuggability | Best When |
|---|---|---|---|---|---|
| Supervisor | Medium (extra LLM call) | Medium | Excellent | High (centralized) | Requests frequently span multiple agents |
| Router | Low (classify + one agent) | Low | Poor (escalates) | High | 80%+ requests go to one specialist |
| Pipeline | High (sequential stages) | Low-Medium | N/A (fixed flow) | Very High | Tasks have strict ordering requirements |
| Fan-Out | Low (parallel execution) | Medium | Good | Medium | Independent tasks that can run concurrently |
| Event-Driven | Variable | High | Good | Low (implicit flow) | Loosely coupled, extensible systems |
| Hierarchical | Medium-High | High | Excellent | Medium | 15+ agents across multiple domains |
Production systems rarely use one pattern in isolation. For our running example — "My API calls are failing, I was charged twice, and I can't log in" — a realistic architecture combines a router at the front door (fast path for simple requests), a supervisor for multi-domain requests (the router escalates to it), parallel fan-out within the supervisor for independent subtasks, a pipeline within each domain for ordered steps like policy-then-refund, and event-driven coordination for side effects like notifications and audit logging.
The key insight: patterns are composable building blocks, not mutually exclusive choices. Start with the simplest pattern that handles your most common case (usually a router), then add complexity only where the workload demands it.
Where to go next
Related Articles
State Management for Multi-Agent Systems: Redis, PostgreSQL, LangGraph & Checkpointing
Production state management for multi-agent workflows — Redis for ephemeral coordination, PostgreSQL for durable records, LangGraph for typed state graphs with conditional routing, and checkpoint/resume patterns that actually survive crashes.
Phase 2: Agent Architecture — ReAct, Planning, Memory & Frameworks
A comprehensive 8-week deep dive into building AI agents from scratch — ReAct loops, planning patterns, memory systems, and frameworks like LangGraph and AutoGen. Build it yourself before you abstract it away.
Intent Classification for Agent Routing: LLM-Based, Embedding-Based & Hybrid Approaches
Learn intent classification for agent routing in a detailed, easy-to-understand way. This guide explains LLM-based routing, embedding similarity, hybrid classifiers, confidence thresholds, fallback logic, and multi-intent detection with a practical example.