Back to articles
Orchestration Architectures: Supervisor, Router & Hierarchical Patterns for Multi-Agent Systems

Orchestration Architectures: Supervisor, Router & Hierarchical Patterns for Multi-Agent Systems

Build production orchestration for multi-agent systems — supervisor routing with LLM classification, parallel fan-out with error recovery, event-driven coordination, and hierarchical delegation. Includes comparison matrix and combined architecture example.

Building one capable agent is useful. Building multiple specialized agents that work together reliably is a fundamentally different problem. Once you have a billing agent, a technical support agent, an account agent, and a fraud agent, the question stops being 'how does each agent work?' and becomes 'how does work move through the system?' That coordination layer is called orchestration, and getting it wrong means your multi-agent system is just a collection of isolated specialists with no teamwork.

This guide covers six orchestration patterns — supervisor, router, sequential pipeline, parallel fan-out, event-driven, and hierarchical — with production-grade code for each. We'll use one running example throughout: a customer service platform where a user says "My API calls are failing with a 429 error, I was charged twice, and I can't log into my account." This request spans three domains and forces the orchestration layer to make real decisions about routing, parallelism, and result aggregation.

Before you build this

Make sure you've built a single-agent ReAct loop from scratch first (see Phase 2: Agent Architecture). If you don't understand how one agent reasons and acts, you'll struggle to debug a system of agents. Orchestration adds latency, cost, and failure modes — use it when you genuinely need specialization or parallelism, not because it sounds impressive.

Our customer service system has five specialized agents. BillingAgent handles invoices, refunds, and subscription changes. TechnicalSupportAgent handles API errors, bugs, and troubleshooting. AccountAgent handles login, password resets, and profile updates. PolicyAgent checks whether actions comply with company rules. ResponseAgent turns internal results into a user-facing answer.

When the user says "My API calls are failing with a 429 error, I was charged twice, and I can't log into my account," three of these agents need to be involved. The orchestration layer must decide: does it route to one agent at a time? Run all three in parallel? Use a supervisor to coordinate? The answer depends on the pattern you choose.

The supervisor pattern uses one central orchestrator that receives every request, classifies the intent using an LLM, dispatches tasks to specialized workers, waits for results, and aggregates them into a final response. It has the big picture and makes all coordination decisions.

The critical part of a supervisor is the classification step. A naive implementation checks for keywords like if "charged twice" in request — that breaks on anything remotely creative the user might type. A production supervisor uses an LLM to classify intent, detect multi-intent requests, and reformulate queries for each specialist.

supervisor_orchestrator.py
python
import json
import asyncio
from openai import AsyncOpenAI
from dataclasses import dataclass
from typing import Callable, Any

client = AsyncOpenAI()

@dataclass
class WorkerAgent:
    name: str
    description: str
    system_prompt: str
    model: str = "gpt-4o-mini"
    tools: list[dict] | None = None
    tool_fns: dict[str, Callable] | None = None

class SupervisorOrchestrator:
    """Supervisor with LLM-based intent classification and
    parallel dispatch to specialized workers."""

    def __init__(
        self,
        workers: list[WorkerAgent],
        supervisor_model: str = "gpt-4o"
    ):
        self.workers = {w.name: w for w in workers}
        self.supervisor_model = supervisor_model

    async def _classify(self, user_message: str) -> dict:
        """LLM-based multi-intent classification."""
        worker_desc = "\n".join(
            f"- {w.name}: {w.description}"
            for w in self.workers.values()
        )

        response = await client.chat.completions.create(
            model=self.supervisor_model,
            messages=[
                {
                    "role": "system",
                    "content": (
                        "Classify the user's request. It may contain "
                        "MULTIPLE intents — detect all of them.\n\n"
                        f"Available workers:\n{worker_desc}\n\n"
                        "Return JSON:\n"
                        '{"intents": [{"worker": "name", '
                        '"confidence": 0.0-1.0, '
                        '"task": "reformulated query for this worker"}], '
                        '"can_parallelize": true/false}'
                    )
                },
                {"role": "user", "content": user_message}
            ],
            response_format={"type": "json_object"}
        )

        return json.loads(response.choices[0].message.content)

    async def _run_worker(
        self,
        worker: WorkerAgent,
        task: str,
        timeout: float = 30.0
    ) -> dict:
        """Run a single worker with its own ReAct loop."""
        messages = [
            {"role": "system", "content": worker.system_prompt},
            {"role": "user", "content": task}
        ]

        try:
            for _ in range(8):
                kwargs = {"model": worker.model, "messages": messages}
                if worker.tools:
                    kwargs["tools"] = worker.tools
                    kwargs["tool_choice"] = "auto"

                response = await asyncio.wait_for(
                    client.chat.completions.create(**kwargs),
                    timeout=timeout
                )

                msg = response.choices[0].message
                messages.append(msg)

                if not msg.tool_calls:
                    return {
                        "worker": worker.name,
                        "success": True,
                        "result": msg.content or ""
                    }

                for tc in msg.tool_calls:
                    fn_name = tc.function.name
                    fn_args = json.loads(tc.function.arguments)
                    if worker.tool_fns and fn_name in worker.tool_fns:
                        result = worker.tool_fns[fn_name](**fn_args)
                    else:
                        result = f"Tool {fn_name} not available"
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "content": str(result)
                    })

            return {"worker": worker.name, "success": False,
                    "result": "Max iterations reached"}

        except asyncio.TimeoutError:
            return {"worker": worker.name, "success": False,
                    "result": "Worker timed out"}
        except Exception as e:
            return {"worker": worker.name, "success": False,
                    "result": f"Error: {e}"}

    async def _aggregate(
        self,
        user_message: str,
        worker_results: list[dict]
    ) -> str:
        """Synthesize multiple worker outputs into one response."""
        successful = [r for r in worker_results if r["success"]]
        failed = [r for r in worker_results if not r["success"]]

        results_text = "\n\n".join(
            f"[{r['worker']}]: {r['result']}" for r in successful
        )

        failure_note = ""
        if failed:
            names = ", ".join(r["worker"] for r in failed)
            failure_note = (
                f"\n\nNote: {names} failed to complete. "
                f"Address what you can and note what's unresolved."
            )

        response = await client.chat.completions.create(
            model=self.supervisor_model,
            messages=[
                {
                    "role": "system",
                    "content": (
                        "Combine specialist outputs into one coherent "
                        "response for the user. Don't mention internal "
                        "routing or agent names. If some specialists "
                        "failed, acknowledge what couldn't be resolved."
                    )
                },
                {
                    "role": "user",
                    "content": (
                        f"User asked: {user_message}\n\n"
                        f"Specialist results:\n{results_text}"
                        f"{failure_note}"
                    )
                }
            ]
        )

        return response.choices[0].message.content

    async def handle(self, user_message: str) -> str:
        """Full pipeline: classify → dispatch → aggregate."""

        # Step 1: Classify (detect all intents)
        routing = await self._classify(user_message)
        intents = routing.get("intents", [])
        can_parallel = routing.get("can_parallelize", False)

        # Filter by confidence
        qualified = [
            i for i in intents if i.get("confidence", 0) >= 0.4
        ]

        if not qualified:
            return (
                "I'm not sure how to help with that. "
                "Could you provide more details?"
            )

        # Step 2: Dispatch
        if can_parallel and len(qualified) > 1:
            # Run workers concurrently
            tasks = [
                self._run_worker(
                    self.workers[i["worker"]],
                    i["task"]
                )
                for i in qualified
                if i["worker"] in self.workers
            ]
            results = await asyncio.gather(*tasks)
        else:
            # Run sequentially
            results = []
            for i in qualified:
                if i["worker"] in self.workers:
                    r = await self._run_worker(
                        self.workers[i["worker"]], i["task"]
                    )
                    results.append(r)

        # Step 3: Aggregate
        return await self._aggregate(user_message, list(results))

Notice what this supervisor handles that a naive implementation doesn't: multi-intent detection (the user's request spans three domains), confidence filtering (low-confidence intents get dropped), parallel vs sequential dispatch (the LLM decides whether tasks are independent), worker timeouts, partial failure in aggregation (if one worker dies, the others still contribute), and tool execution within each worker's own ReAct loop.

Use a supervisor when requests frequently span multiple domains and need coordination — the supervisor's big-picture view is essential for decomposing complex requests and merging results. Avoid it when most requests go to a single agent, because you're paying for an extra LLM call (the classification step) on every request even when routing is obvious. The supervisor also becomes a single point of failure and a latency bottleneck as traffic grows.

The router is the supervisor's lighter sibling. It classifies intent and sends the request to one specialist, then gets out of the way. No multi-agent coordination, no result merging — the selected agent handles the full request independently. This is the right pattern when 80% of requests map cleanly to a single domain.

router_orchestrator.py
python
import json
from openai import AsyncOpenAI
from dataclasses import dataclass

client = AsyncOpenAI()

class RouterOrchestrator:
    """Lightweight router: classify once, dispatch once."""

    def __init__(self, agents: dict, model: str = "gpt-4o-mini"):
        self.agents = agents
        self.model = model

    async def _classify(self, request: str) -> dict:
        agent_list = "\n".join(
            f"- {name}" for name in self.agents.keys()
        )

        response = await client.chat.completions.create(
            model=self.model,
            messages=[
                {
                    "role": "system",
                    "content": (
                        f"Route this request to ONE agent.\n\n"
                        f"Agents:\n{agent_list}\n\n"
                        f"Return JSON: "
                        f'{{"agent": "name", "confidence": 0.0-1.0, '
                        f'"is_multi_domain": false}}'
                    )
                },
                {"role": "user", "content": request}
            ],
            response_format={"type": "json_object"}
        )

        return json.loads(response.choices[0].message.content)

    async def handle(self, request: str) -> dict:
        routing = await self._classify(request)
        agent_name = routing["agent"]
        confidence = routing.get("confidence", 0)
        is_multi = routing.get("is_multi_domain", False)

        # Escalate multi-domain requests to supervisor
        if is_multi:
            return {
                "escalated": True,
                "reason": "Multi-domain request detected",
                "original_routing": routing
            }

        if confidence < 0.5:
            return {
                "route": "fallback",
                "response": "Could you provide more details "
                            "so I can route you to the right team?"
            }

        agent = self.agents.get(agent_name)
        if not agent:
            return {"route": "error", "response": f"Unknown agent: {agent_name}"}

        result = await agent.handle(request)
        return {
            "route": agent_name,
            "confidence": confidence,
            "result": result
        }

The key design choice here: when the router detects a multi-domain request, it doesn't try to handle it — it escalates to a supervisor. This lets you compose patterns: router handles the common case fast, supervisor handles the complex case thoroughly.

A pipeline passes work through a fixed sequence of stages, each transforming the output of the previous stage. This works when tasks have a natural order — you need to verify identity before changing account settings, or validate policy before issuing a refund. The implementation needs to handle validation between stages and retry logic when a stage produces insufficient output.

pipeline_orchestrator.py
python
import asyncio
import time
from dataclasses import dataclass
from typing import Callable, Any, Optional
from openai import AsyncOpenAI

client = AsyncOpenAI()

@dataclass
class PipelineStage:
    name: str
    system_prompt: str
    model: str = "gpt-4o-mini"
    validate: Callable[[str], bool] | None = None
    max_retries: int = 1
    timeout_seconds: float = 30.0

@dataclass
class StageResult:
    stage: str
    output: str
    success: bool
    retries_used: int
    elapsed_ms: float
    error: str | None = None

class SequentialPipeline:
    """Pipeline with validation, retries, and stage-level metrics."""

    def __init__(self, stages: list[PipelineStage]):
        self.stages = stages

    async def _run_stage(
        self,
        stage: PipelineStage,
        input_text: str
    ) -> StageResult:
        start = time.time()

        for attempt in range(stage.max_retries + 1):
            try:
                response = await asyncio.wait_for(
                    client.chat.completions.create(
                        model=stage.model,
                        messages=[
                            {"role": "system", "content": stage.system_prompt},
                            {"role": "user", "content": input_text}
                        ]
                    ),
                    timeout=stage.timeout_seconds
                )

                output = response.choices[0].message.content or ""

                # Validate output if a validator exists
                if stage.validate and not stage.validate(output):
                    if attempt < stage.max_retries:
                        # Retry with feedback
                        input_text = (
                            f"{input_text}\n\n"
                            f"[Previous attempt was insufficient. "
                            f"Be more thorough.]"
                        )
                        continue
                    return StageResult(
                        stage=stage.name, output=output,
                        success=False, retries_used=attempt,
                        elapsed_ms=(time.time() - start) * 1000,
                        error="Validation failed after retries"
                    )

                return StageResult(
                    stage=stage.name, output=output,
                    success=True, retries_used=attempt,
                    elapsed_ms=(time.time() - start) * 1000
                )

            except asyncio.TimeoutError:
                if attempt == stage.max_retries:
                    return StageResult(
                        stage=stage.name, output="",
                        success=False, retries_used=attempt,
                        elapsed_ms=(time.time() - start) * 1000,
                        error="Stage timed out"
                    )

        return StageResult(
            stage=stage.name, output="",
            success=False, retries_used=stage.max_retries,
            elapsed_ms=(time.time() - start) * 1000,
            error="Exhausted retries"
        )

    async def run(
        self,
        initial_input: str,
        abort_on_failure: bool = True
    ) -> dict:
        current_input = initial_input
        results: list[StageResult] = []

        for stage in self.stages:
            result = await self._run_stage(stage, current_input)
            results.append(result)

            if not result.success and abort_on_failure:
                return {
                    "success": False,
                    "failed_at": stage.name,
                    "error": result.error,
                    "completed_stages": [
                        r.stage for r in results if r.success
                    ],
                    "final_output": current_input
                }

            if result.success:
                current_input = result.output

        return {
            "success": True,
            "final_output": current_input,
            "stages": [
                {
                    "name": r.stage,
                    "success": r.success,
                    "retries": r.retries_used,
                    "ms": round(r.elapsed_ms)
                }
                for r in results
            ]
        }

# ── Example: Refund processing pipeline ──────────────────────

refund_pipeline = SequentialPipeline([
    PipelineStage(
        name="extract_details",
        system_prompt=(
            "Extract: customer ID, transaction IDs, charge amounts, "
            "and dates from this request. Return structured JSON."
        ),
        validate=lambda out: len(out) > 50 and "{" in out
    ),
    PipelineStage(
        name="verify_charges",
        system_prompt=(
            "Given the extracted transaction details, verify which "
            "charges are duplicates. Explain your reasoning."
        ),
        model="gpt-4o"  # Stronger model for analysis
    ),
    PipelineStage(
        name="check_policy",
        system_prompt=(
            "Given the verified duplicate charges, check against "
            "refund policy: within 90-day window, not previously "
            "refunded, account in good standing. Return a decision."
        ),
        validate=lambda out: "approved" in out.lower() or "denied" in out.lower()
    ),
    PipelineStage(
        name="compose_response",
        system_prompt=(
            "Write a customer-facing response about the refund "
            "decision. Be clear, empathetic, and include next steps."
        ),
        validate=lambda out: len(out) > 100
    )
])

This pipeline handles what the simple for stage in stages version doesn't: per-stage validation (did the extraction actually produce JSON?), retries with feedback (the retry tells the model its previous attempt was insufficient), per-stage timeouts, abort-on-failure with partial results, and timing metrics for observability.

Fan-out sends independent tasks to multiple agents concurrently. The hard part isn't the parallelism — that's just asyncio.gather. The hard part is what happens when some agents succeed and others fail, and how you merge heterogeneous results into one coherent response.

parallel_fanout.py
python
import asyncio
import time
from dataclasses import dataclass
from openai import AsyncOpenAI

client = AsyncOpenAI()

@dataclass
class FanOutResult:
    agent_name: str
    response: str
    success: bool
    elapsed_ms: float
    error: str | None = None

async def run_agent_with_timeout(
    name: str,
    system_prompt: str,
    task: str,
    model: str = "gpt-4o-mini",
    timeout: float = 30.0
) -> FanOutResult:
    """Run one agent with timeout and error capture."""
    start = time.time()

    try:
        response = await asyncio.wait_for(
            client.chat.completions.create(
                model=model,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": task}
                ],
                max_tokens=1500
            ),
            timeout=timeout
        )

        return FanOutResult(
            agent_name=name,
            response=response.choices[0].message.content or "",
            success=True,
            elapsed_ms=(time.time() - start) * 1000
        )

    except asyncio.TimeoutError:
        return FanOutResult(
            agent_name=name, response="",
            success=False,
            elapsed_ms=(time.time() - start) * 1000,
            error=f"{name} timed out after {timeout}s"
        )
    except Exception as e:
        return FanOutResult(
            agent_name=name, response="",
            success=False,
            elapsed_ms=(time.time() - start) * 1000,
            error=f"{name} error: {type(e).__name__}: {e}"
        )

async def fan_out_and_merge(
    user_message: str,
    agent_configs: list[dict],
    merge_model: str = "gpt-4o",
    min_success_ratio: float = 0.5
) -> dict:
    """Dispatch to all agents in parallel, then merge results."""

    # Fan out
    tasks = [
        run_agent_with_timeout(
            name=cfg["name"],
            system_prompt=cfg["system_prompt"],
            task=user_message,
            model=cfg.get("model", "gpt-4o-mini"),
            timeout=cfg.get("timeout", 30.0)
        )
        for cfg in agent_configs
    ]

    results = await asyncio.gather(*tasks)

    successful = [r for r in results if r.success]
    failed = [r for r in results if not r.success]

    # Check minimum success threshold
    if len(successful) / len(results) < min_success_ratio:
        return {
            "success": False,
            "error": "Too many agents failed",
            "failed": [
                {"agent": r.agent_name, "error": r.error}
                for r in failed
            ]
        }

    # Merge successful results
    perspectives = "\n\n".join(
        f"=== {r.agent_name} ({r.elapsed_ms:.0f}ms) ===\n{r.response}"
        for r in successful
    )

    failure_note = ""
    if failed:
        names = ", ".join(r.agent_name for r in failed)
        failure_note = f"\n\nNote: {names} failed to respond."

    merge_response = await client.chat.completions.create(
        model=merge_model,
        messages=[
            {
                "role": "system",
                "content": (
                    "Synthesize these specialist responses into one "
                    "coherent answer. Resolve contradictions by noting "
                    "the disagreement. Don't mention agent names."
                )
            },
            {
                "role": "user",
                "content": (
                    f"Question: {user_message}\n\n"
                    f"Responses:\n{perspectives}{failure_note}"
                )
            }
        ]
    )

    return {
        "success": True,
        "response": merge_response.choices[0].message.content,
        "agents_used": [r.agent_name for r in successful],
        "agents_failed": [r.agent_name for r in failed],
        "timings": {
            r.agent_name: round(r.elapsed_ms)
            for r in results
        }
    }

In event-driven orchestration, agents react to events instead of being told what to do by a central controller. One agent publishes duplicate_charge_confirmed, and other agents that subscribe to that event kick off their own work. This creates loose coupling — agents don't need to know about each other, only about the events they care about. The trade-off: control flow becomes implicit and harder to trace.

event_orchestration.py
python
import asyncio
import time
from dataclasses import dataclass, field
from typing import Callable, Any
from datetime import datetime
import uuid
import json

@dataclass
class Event:
    name: str
    payload: dict
    source: str
    event_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
    correlation_id: str = ""

@dataclass
class EventRecord:
    event: Event
    handlers_invoked: list[str]
    results: list[dict]
    elapsed_ms: float

class EventBus:
    """Event bus with dead letter queue, handler timeouts,
    and full event history for tracing."""

    def __init__(self, handler_timeout: float = 15.0):
        self._handlers: dict[str, list[tuple[str, Callable]]] = {}
        self._history: list[EventRecord] = []
        self._dead_letters: list[dict] = []
        self._timeout = handler_timeout

    def subscribe(
        self,
        event_name: str,
        handler_name: str,
        handler: Callable
    ):
        """Subscribe a named handler to an event."""
        self._handlers.setdefault(event_name, []).append(
            (handler_name, handler)
        )

    async def publish(self, event: Event) -> EventRecord:
        """Publish an event and run all subscribers."""
        start = time.time()
        handlers = self._handlers.get(event.name, [])
        results = []
        invoked = []

        for handler_name, handler in handlers:
            invoked.append(handler_name)
            try:
                result = await asyncio.wait_for(
                    handler(event),
                    timeout=self._timeout
                )
                results.append({
                    "handler": handler_name,
                    "success": True,
                    "result": result
                })
            except asyncio.TimeoutError:
                self._dead_letters.append({
                    "event": event.name,
                    "handler": handler_name,
                    "error": "timeout",
                    "event_id": event.event_id
                })
                results.append({
                    "handler": handler_name,
                    "success": False,
                    "error": "timeout"
                })
            except Exception as e:
                self._dead_letters.append({
                    "event": event.name,
                    "handler": handler_name,
                    "error": str(e),
                    "event_id": event.event_id
                })
                results.append({
                    "handler": handler_name,
                    "success": False,
                    "error": str(e)
                })

        record = EventRecord(
            event=event,
            handlers_invoked=invoked,
            results=results,
            elapsed_ms=(time.time() - start) * 1000
        )
        self._history.append(record)
        return record

    def trace(self, correlation_id: str) -> list[EventRecord]:
        """Get all events in a workflow by correlation ID."""
        return [
            r for r in self._history
            if r.event.correlation_id == correlation_id
        ]

    @property
    def dead_letters(self) -> list[dict]:
        return list(self._dead_letters)

# ── Usage: refund workflow as events ──────────────────────────

bus = EventBus(handler_timeout=10.0)

async def on_charge_verified(event: Event):
    """PolicyAgent reacts to billing verification."""
    if event.payload.get("is_duplicate"):
        await bus.publish(Event(
            name="refund_eligible",
            payload={"amount": event.payload["amount"]},
            source="policy_agent",
            correlation_id=event.correlation_id
        ))
    return {"policy": "approved"}

async def on_refund_eligible(event: Event):
    """FraudAgent reacts to refund eligibility."""
    # Run fraud check
    return {"risk": "low", "approved": True}

async def on_refund_eligible_notify(event: Event):
    """NotificationAgent reacts to refund eligibility."""
    return {"notification": "queued"}

bus.subscribe("charge_verified", "policy_agent", on_charge_verified)
bus.subscribe("refund_eligible", "fraud_agent", on_refund_eligible)
bus.subscribe("refund_eligible", "notification_agent", on_refund_eligible_notify)

This event bus handles what the 11-line version doesn't: handler timeouts (a slow handler doesn't block the system), dead letter tracking (failed events are captured for debugging), named handlers (you can see which subscriber failed), full event history with correlation IDs (you can trace an entire workflow), and concurrent event propagation.

When your system has 15+ agents across multiple domains, a single supervisor becomes unwieldy. Hierarchical orchestration adds layers: a meta-orchestrator delegates to domain supervisors, each of which manages their own team of specialist workers. This mirrors how large organizations work — a CEO delegates to VPs, VPs delegate to managers.

hierarchical_orchestrator.py
python
import json
import asyncio
from openai import AsyncOpenAI
from dataclasses import dataclass

client = AsyncOpenAI()

@dataclass
class DomainSupervisor:
    """Mid-level supervisor managing a team of specialists."""
    name: str
    domain: str
    description: str
    workers: list[dict]  # [{name, system_prompt, model}]
    model: str = "gpt-4o-mini"

    async def handle(self, task: str) -> dict:
        """Route task to the best specialist in this domain."""
        worker_list = "\n".join(
            f"- {w['name']}: {w.get('description', '')}" 
            for w in self.workers
        )

        # Internal routing within this domain
        routing = await client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content":
                    f"You manage the {self.domain} team.\n"
                    f"Workers:\n{worker_list}\n\n"
                    f"Select the best worker and reformulate the task. "
                    f'Return JSON: {{"worker": "name", "task": "..."}}'},
                {"role": "user", "content": task}
            ],
            response_format={"type": "json_object"}
        )

        decision = json.loads(routing.choices[0].message.content)
        worker = next(
            (w for w in self.workers if w["name"] == decision["worker"]),
            self.workers[0]
        )

        response = await client.chat.completions.create(
            model=worker.get("model", "gpt-4o-mini"),
            messages=[
                {"role": "system", "content": worker["system_prompt"]},
                {"role": "user", "content": decision.get("task", task)}
            ]
        )

        return {
            "domain": self.domain,
            "worker": decision["worker"],
            "result": response.choices[0].message.content
        }

class MetaOrchestrator:
    """Top-level orchestrator managing domain supervisors."""

    def __init__(
        self,
        supervisors: list[DomainSupervisor],
        model: str = "gpt-4o"
    ):
        self.supervisors = {s.name: s for s in supervisors}
        self.model = model

    async def handle(self, user_request: str) -> str:
        sup_desc = "\n".join(
            f"- {s.name} ({s.domain}): {s.description}"
            for s in self.supervisors.values()
        )

        # Decompose into sub-tasks per domain
        plan_response = await client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content":
                    f"You manage these teams:\n{sup_desc}\n\n"
                    f"Decompose the request into sub-tasks per team. "
                    f'Return JSON: {{"subtasks": [{{"team": "name", '
                    f'"task": "description"}}], '
                    f'"can_parallelize": true/false}}'},
                {"role": "user", "content": user_request}
            ],
            response_format={"type": "json_object"}
        )

        plan = json.loads(plan_response.choices[0].message.content)
        subtasks = plan.get("subtasks", [])
        can_parallel = plan.get("can_parallelize", False)

        # Execute subtasks
        if can_parallel:
            coros = [
                self.supervisors[st["team"]].handle(st["task"])
                for st in subtasks
                if st["team"] in self.supervisors
            ]
            results = await asyncio.gather(*coros, return_exceptions=True)
            # Convert exceptions to error dicts
            results = [
                r if isinstance(r, dict)
                else {"domain": "unknown", "worker": "unknown",
                      "result": f"Error: {r}"}
                for r in results
            ]
        else:
            results = []
            for st in subtasks:
                if st["team"] in self.supervisors:
                    try:
                        r = await self.supervisors[st["team"]].handle(st["task"])
                        results.append(r)
                    except Exception as e:
                        results.append({"domain": st["team"],
                                        "worker": "unknown",
                                        "result": f"Error: {e}"})

        # Synthesize
        results_text = "\n\n".join(
            f"[{r.get('domain', '?')} → {r.get('worker', '?')}]\n"
            f"{r.get('result', '')}"
            for r in results
        )

        synthesis = await client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content":
                    "Synthesize these team results into one coherent "
                    "response. Don't mention teams or internal routing."},
                {"role": "user", "content":
                    f"Request: {user_request}\n\nResults:\n{results_text}"}
            ]
        )

        return synthesis.choices[0].message.content

# ── Build a hierarchical system ───────────────────────────────

billing_team = DomainSupervisor(
    name="billing",
    domain="Billing & Payments",
    description="Invoices, refunds, subscriptions, charges",
    workers=[
        {"name": "refund_specialist",
         "description": "Processes refunds and duplicate charges",
         "system_prompt": "You handle refunds. Verify charges, check policy, calculate amounts."},
        {"name": "subscription_specialist",
         "description": "Plan changes, upgrades, cancellations",
         "system_prompt": "You handle subscription changes. Compare plans, process upgrades."}
    ]
)

tech_team = DomainSupervisor(
    name="technical",
    domain="Technical Support",
    description="API errors, bugs, integration issues",
    workers=[
        {"name": "api_specialist",
         "description": "API errors, rate limits, authentication",
         "system_prompt": "You diagnose API issues. Check error codes, rate limits, auth."},
        {"name": "integration_specialist",
         "description": "SDK setup, webhooks, third-party integrations",
         "system_prompt": "You help with integrations. Guide setup, debug webhooks."}
    ]
)

meta = MetaOrchestrator([billing_team, tech_team])

Choosing the right pattern depends on your specific constraints. This matrix compares all six across the dimensions that matter most in production.

PatternLatencyComplexityMulti-DomainDebuggabilityBest When
SupervisorMedium (extra LLM call)MediumExcellentHigh (centralized)Requests frequently span multiple agents
RouterLow (classify + one agent)LowPoor (escalates)High80%+ requests go to one specialist
PipelineHigh (sequential stages)Low-MediumN/A (fixed flow)Very HighTasks have strict ordering requirements
Fan-OutLow (parallel execution)MediumGoodMediumIndependent tasks that can run concurrently
Event-DrivenVariableHighGoodLow (implicit flow)Loosely coupled, extensible systems
HierarchicalMedium-HighHighExcellentMedium15+ agents across multiple domains

Production systems rarely use one pattern in isolation. For our running example — "My API calls are failing, I was charged twice, and I can't log in" — a realistic architecture combines a router at the front door (fast path for simple requests), a supervisor for multi-domain requests (the router escalates to it), parallel fan-out within the supervisor for independent subtasks, a pipeline within each domain for ordered steps like policy-then-refund, and event-driven coordination for side effects like notifications and audit logging.

The key insight: patterns are composable building blocks, not mutually exclusive choices. Start with the simplest pattern that handles your most common case (usually a router), then add complexity only where the workload demands it.

Where to go next

This post covered how work moves through a multi-agent system. The companion posts cover how agents talk to each other (Agent-to-Agent Communication) and how they share and persist state (State Management for Agents). For the foundational single-agent patterns these orchestrators coordinate, see Phase 2: Agent Architecture.

Related Articles