Back to articles

Agent-to-Agent Communication: Async Messaging, Handoff Protocols, and Conflict Resolution

Production-grade communication primitives for multi-agent systems: async message buses with backpressure, handoff protocols with real acknowledgment tracking, and conflict resolution including LLM arbitration.

Agents that can't talk to each other aren't a system — they're a collection of independent programs. Communication is the connective tissue of multi-agent architectures: it determines whether your agents can coordinate on a refund workflow, negotiate conflicting decisions, or hand off tasks without dropping context.

This post builds three production-grade primitives from scratch: an async message bus with backpressure and dead letter queues, a handoff protocol with real acknowledgment tracking and exponential backoff, and a conflict resolver that includes actual LLM arbitration. We'll wire them together through a customer support refund workflow where a TriageAgent, BillingAgent, and ApprovalAgent coordinate end-to-end.

Prerequisites

This post builds on concepts from Orchestration Architectures and State Management for Agents. If you haven't read those, the orchestration patterns and shared state concepts referenced here are covered in depth there.

Before building anything, understand the tradeoffs. Each pattern fits different coordination needs:

markdown
| Pattern          | Coupling | Latency | Scalability | Best For                          |
|------------------|----------|---------|-------------|-----------------------------------|
| Direct Message   | High     | Low     | Poor        | 1:1 request/response, RPC-style   |
| Pub/Sub          | Low      | Medium  | Excellent   | Event broadcasting, fan-out       |
| Shared State     | Medium   | Low     | Moderate    | Coordination data, consensus      |
| Event Bus        | Low      | Medium  | Excellent   | Decoupled workflows, audit trails |
| Request/Reply    | Medium   | Medium  | Moderate    | Task delegation with ack          |

Direct messaging is simple but creates tight coupling — every agent needs to know every other agent's address. Pub/sub decouples publishers from subscribers but loses request/reply semantics. Shared state (covered in the state management post) works well for coordination data but not for task handoffs. An event bus with topic-based routing hits the sweet spot for most multi-agent workflows.

Every message in the system needs a consistent envelope. This schema supports routing, tracing, and idempotency:

messages.py
python
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from dataclasses import dataclass, field
from enum import Enum
from typing import Any


class MessageType(Enum):
    TASK = "task"
    RESPONSE = "response"
    ACK = "ack"
    NACK = "nack"
    EVENT = "event"
    HANDOFF = "handoff"


class Priority(Enum):
    LOW = 0
    NORMAL = 1
    HIGH = 2
    CRITICAL = 3


@dataclass
class Message:
    topic: str
    payload: dict[str, Any]
    msg_type: MessageType = MessageType.TASK
    priority: Priority = Priority.NORMAL
    sender: str = ""
    recipient: str | None = None
    msg_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    correlation_id: str | None = None
    idempotency_key: str | None = None
    timestamp: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )
    ttl_seconds: float = 300.0
    attempt: int = 0

    def is_expired(self) -> bool:
        created = datetime.fromisoformat(self.timestamp)
        elapsed = (datetime.now(timezone.utc) - created).total_seconds()
        return elapsed > self.ttl_seconds

    def reply(self, payload: dict[str, Any]) -> Message:
        """Create a reply message preserving correlation chain."""
        return Message(
            topic=f"{self.sender}.inbox",
            payload=payload,
            msg_type=MessageType.RESPONSE,
            sender=self.recipient or "",
            recipient=self.sender,
            correlation_id=self.correlation_id or self.msg_id,
        )

The idempotency_key prevents duplicate processing when retries occur. The correlation_id chains request/reply pairs across multiple hops. The reply() factory method inverts sender/recipient and routes to the original sender's inbox topic.

A real message bus needs three things most tutorials skip: backpressure so fast producers don't overwhelm slow consumers, a dead letter queue for messages that repeatedly fail processing, and proper async subscriber management.

message_bus.py
python
import asyncio
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any, Callable, Coroutine

from messages import Message, MessageType

logger = logging.getLogger(__name__)

SubscriberFn = Callable[[Message], Coroutine[Any, Any, None]]


@dataclass
class DeadLetter:
    message: Message
    error: str
    failed_subscriber: str
    attempts: int


class MessageBus:
    """Async message bus with per-topic queues, backpressure, and DLQ."""

    def __init__(
        self,
        max_queue_size: int = 1000,
        max_retries: int = 3,
    ):
        self._subscribers: dict[str, list[tuple[str, SubscriberFn]]] = defaultdict(list)
        self._queues: dict[str, asyncio.Queue[Message]] = {}
        self._max_queue_size = max_queue_size
        self._max_retries = max_retries
        self._dead_letters: list[DeadLetter] = []
        self._processors: dict[str, asyncio.Task] = {}
        self._running = False
        self._seen_idempotency_keys: set[str] = set()

    def subscribe(self, topic: str, name: str, handler: SubscriberFn) -> None:
        """Register a named subscriber for a topic."""
        self._subscribers[topic].append((name, handler))
        if topic not in self._queues:
            self._queues[topic] = asyncio.Queue(maxsize=self._max_queue_size)

    async def publish(self, message: Message) -> bool:
        """Publish a message. Returns False if queue is full (backpressure)."""
        topic = message.topic
        if topic not in self._queues:
            self._queues[topic] = asyncio.Queue(maxsize=self._max_queue_size)

        # Idempotency check
        if message.idempotency_key:
            if message.idempotency_key in self._seen_idempotency_keys:
                logger.info(f"Duplicate idempotency key: {message.idempotency_key}")
                return True
            self._seen_idempotency_keys.add(message.idempotency_key)

        try:
            self._queues[topic].put_nowait(message)
            return True
        except asyncio.QueueFull:
            logger.warning(
                f"Backpressure on topic '{topic}': queue full "
                f"({self._max_queue_size} messages)"
            )
            return False

    async def _process_topic(self, topic: str) -> None:
        """Process messages for a single topic, dispatching to subscribers."""
        queue = self._queues[topic]
        while self._running:
            try:
                message = await asyncio.wait_for(queue.get(), timeout=1.0)
            except asyncio.TimeoutError:
                continue

            if message.is_expired():
                logger.info(f"Dropping expired message {message.msg_id}")
                self._dead_letters.append(
                    DeadLetter(message, "TTL expired", "", 0)
                )
                continue

            for sub_name, handler in self._subscribers.get(topic, []):
                # Filter by recipient if specified
                if message.recipient and message.recipient != sub_name:
                    continue

                success = await self._dispatch_with_retry(
                    message, sub_name, handler
                )
                if not success:
                    self._dead_letters.append(
                        DeadLetter(
                            message,
                            f"Failed after {self._max_retries} attempts",
                            sub_name,
                            self._max_retries,
                        )
                    )

    async def _dispatch_with_retry(
        self, message: Message, sub_name: str, handler: SubscriberFn
    ) -> bool:
        """Dispatch to a subscriber with retry and exponential backoff."""
        for attempt in range(self._max_retries):
            try:
                message.attempt = attempt + 1
                await handler(message)
                return True
            except Exception as e:
                wait = min(2 ** attempt * 0.1, 5.0)
                logger.error(
                    f"Subscriber '{sub_name}' failed on {message.msg_id} "
                    f"(attempt {attempt + 1}): {e}"
                )
                if attempt < self._max_retries - 1:
                    await asyncio.sleep(wait)
        return False

    async def start(self) -> None:
        """Start processing all topic queues."""
        self._running = True
        for topic in self._queues:
            self._processors[topic] = asyncio.create_task(
                self._process_topic(topic)
            )

    async def stop(self) -> None:
        """Gracefully stop all processors."""
        self._running = False
        for task in self._processors.values():
            task.cancel()
        await asyncio.gather(*self._processors.values(), return_exceptions=True)
        self._processors.clear()

    @property
    def dead_letters(self) -> list[DeadLetter]:
        return list(self._dead_letters)

    def queue_depth(self, topic: str) -> int:
        if topic in self._queues:
            return self._queues[topic].qsize()
        return 0

Backpressure matters

Without maxsize on the queue, a runaway producer (e.g., an agent stuck in a retry loop generating messages) will consume unbounded memory. The publish() method returns False when the queue is full, letting the caller decide whether to drop, buffer, or wait.

Task handoff between agents requires guaranteed delivery. The HandoffProtocol tracks outgoing handoffs, waits for real acknowledgments using asyncio.Event, and retries with exponential backoff on timeout.

handoff.py
python
import asyncio
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Any

from messages import Message, MessageType, Priority
from message_bus import MessageBus

logger = logging.getLogger(__name__)


class HandoffStatus(Enum):
    PENDING = "pending"
    ACKNOWLEDGED = "acknowledged"
    REJECTED = "rejected"
    TIMED_OUT = "timed_out"


@dataclass
class PendingHandoff:
    message: Message
    ack_event: asyncio.Event = field(default_factory=asyncio.Event)
    status: HandoffStatus = HandoffStatus.PENDING
    response_payload: dict[str, Any] = field(default_factory=dict)


class HandoffProtocol:
    """Reliable task handoff with ack tracking and exponential backoff."""

    def __init__(
        self,
        bus: MessageBus,
        agent_name: str,
        ack_timeout: float = 10.0,
        max_retries: int = 3,
    ):
        self._bus = bus
        self._agent_name = agent_name
        self._ack_timeout = ack_timeout
        self._max_retries = max_retries
        self._pending: dict[str, PendingHandoff] = {}

        # Subscribe to our own inbox for ack/nack messages
        bus.subscribe(
            f"{agent_name}.inbox",
            agent_name,
            self._handle_ack,
        )

    async def _handle_ack(self, message: Message) -> None:
        """Process incoming ACK/NACK for a pending handoff."""
        corr_id = message.correlation_id
        if not corr_id or corr_id not in self._pending:
            return

        pending = self._pending[corr_id]
        if message.msg_type == MessageType.ACK:
            pending.status = HandoffStatus.ACKNOWLEDGED
            pending.response_payload = message.payload
        elif message.msg_type == MessageType.NACK:
            pending.status = HandoffStatus.REJECTED
            pending.response_payload = message.payload

        pending.ack_event.set()

    async def handoff(
        self,
        target_agent: str,
        task_payload: dict[str, Any],
        priority: Priority = Priority.NORMAL,
    ) -> HandoffStatus:
        """Send a task to another agent and wait for acknowledgment."""
        message = Message(
            topic=f"{target_agent}.inbox",
            payload=task_payload,
            msg_type=MessageType.HANDOFF,
            priority=priority,
            sender=self._agent_name,
            recipient=target_agent,
            idempotency_key=f"{self._agent_name}:{task_payload.get('task_id', message.msg_id)}",
        )

        for attempt in range(self._max_retries):
            pending = PendingHandoff(message=message)
            self._pending[message.msg_id] = pending

            published = await self._bus.publish(message)
            if not published:
                logger.warning(f"Backpressure: cannot deliver to {target_agent}")
                await asyncio.sleep(2 ** attempt * 0.5)
                continue

            try:
                await asyncio.wait_for(
                    pending.ack_event.wait(),
                    timeout=self._ack_timeout * (1.5 ** attempt),
                )
                return pending.status
            except asyncio.TimeoutError:
                logger.warning(
                    f"Handoff to {target_agent} timed out "
                    f"(attempt {attempt + 1}/{self._max_retries})"
                )
                del self._pending[message.msg_id]

        return HandoffStatus.TIMED_OUT

    async def acknowledge(
        self, original_message: Message, payload: dict[str, Any] | None = None
    ) -> None:
        """Send an ACK for a received handoff."""
        ack = Message(
            topic=f"{original_message.sender}.inbox",
            payload=payload or {},
            msg_type=MessageType.ACK,
            sender=self._agent_name,
            recipient=original_message.sender,
            correlation_id=original_message.msg_id,
        )
        await self._bus.publish(ack)

    async def reject(
        self, original_message: Message, reason: str
    ) -> None:
        """Send a NACK for a received handoff."""
        nack = Message(
            topic=f"{original_message.sender}.inbox",
            payload={"reason": reason},
            msg_type=MessageType.NACK,
            sender=self._agent_name,
            recipient=original_message.sender,
            correlation_id=original_message.msg_id,
        )
        await self._bus.publish(nack)

The key difference from toy implementations: ack_event.wait() blocks until the receiving agent explicitly calls acknowledge() or reject(). No fake sleeps, no polling. The asyncio.Event is the right primitive — it's zero-cost when waiting and instant when signaled.

When multiple agents propose conflicting actions — two agents both want to set a refund amount, or disagree on whether to escalate — you need a resolution strategy. The three strategies here are majority vote, priority-based, and LLM arbitration.

conflict_resolver.py
python
import asyncio
import json
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Any

from openai import AsyncOpenAI

logger = logging.getLogger(__name__)


class ConflictStrategy(Enum):
    MAJORITY_VOTE = "majority_vote"
    PRIORITY = "priority"
    LLM_ARBITRATION = "llm_arbitration"


@dataclass
class Proposal:
    agent_name: str
    value: Any
    priority: int = 0
    reasoning: str = ""


@dataclass
class Resolution:
    winning_value: Any
    strategy_used: ConflictStrategy
    votes: dict[str, int] | None = None
    llm_reasoning: str | None = None


class ConflictResolver:
    def __init__(
        self,
        strategy: ConflictStrategy = ConflictStrategy.MAJORITY_VOTE,
        llm_client: AsyncOpenAI | None = None,
        llm_model: str = "gpt-4o",
    ):
        self._strategy = strategy
        self._client = llm_client
        self._model = llm_model

    async def resolve(self, proposals: list[Proposal]) -> Resolution:
        if not proposals:
            raise ValueError("No proposals to resolve")
        if len(proposals) == 1:
            return Resolution(
                winning_value=proposals[0].value,
                strategy_used=self._strategy,
            )

        match self._strategy:
            case ConflictStrategy.MAJORITY_VOTE:
                return self._majority_vote(proposals)
            case ConflictStrategy.PRIORITY:
                return self._priority_based(proposals)
            case ConflictStrategy.LLM_ARBITRATION:
                return await self._llm_arbitrate(proposals)

    def _majority_vote(self, proposals: list[Proposal]) -> Resolution:
        """Resolve by counting votes. Uses structural equality, not string coercion."""
        # Group by value using repr for hashability while preserving types
        vote_groups: dict[str, tuple[Any, int]] = {}
        for p in proposals:
            key = repr(p.value)  # Preserves type: repr(42) != repr("42")
            if key in vote_groups:
                existing_val, count = vote_groups[key]
                vote_groups[key] = (existing_val, count + 1)
            else:
                vote_groups[key] = (p.value, 1)

        # Find the value with most votes
        winner_key = max(vote_groups, key=lambda k: vote_groups[k][1])
        winner_value, _ = vote_groups[winner_key]

        votes = {k: v[1] for k, v in vote_groups.items()}
        return Resolution(
            winning_value=winner_value,
            strategy_used=ConflictStrategy.MAJORITY_VOTE,
            votes=votes,
        )

    def _priority_based(self, proposals: list[Proposal]) -> Resolution:
        """Highest priority proposal wins."""
        winner = max(proposals, key=lambda p: p.priority)
        return Resolution(
            winning_value=winner.value,
            strategy_used=ConflictStrategy.PRIORITY,
        )

    async def _llm_arbitrate(self, proposals: list[Proposal]) -> Resolution:
        """Use an LLM to evaluate proposals and pick a winner with reasoning."""
        if not self._client:
            raise RuntimeError(
                "LLM arbitration requires an AsyncOpenAI client"
            )

        proposals_text = "\n".join(
            f"- Agent '{p.agent_name}' proposes: {json.dumps(p.value)}\n"
            f"  Reasoning: {p.reasoning or 'none provided'}"
            for p in proposals
        )

        response = await self._client.chat.completions.create(
            model=self._model,
            messages=[
                {
                    "role": "system",
                    "content": (
                        "You are an arbitrator resolving conflicts between AI agents. "
                        "Evaluate each proposal on its merits and reasoning. "
                        "Respond with JSON: {\"chosen_index\": <0-based index>, "
                        "\"reasoning\": \"<your analysis>\"}"
                    ),
                },
                {
                    "role": "user",
                    "content": f"Resolve this conflict:\n{proposals_text}",
                },
            ],
            response_format={"type": "json_object"},
            temperature=0.1,
        )

        result = json.loads(response.choices[0].message.content)
        chosen_idx = int(result["chosen_index"])
        chosen = proposals[min(chosen_idx, len(proposals) - 1)]

        return Resolution(
            winning_value=chosen.value,
            strategy_used=ConflictStrategy.LLM_ARBITRATION,
            llm_reasoning=result.get("reasoning", ""),
        )

Why repr() instead of str()?

The majority_vote method uses repr() as the grouping key, not str(). This preserves type information: repr(42) is '42' while repr('42') is "'42'". Using str() would incorrectly group the integer 42 and string "42" as the same vote.

Here's the full message flow for a customer support refund. The TriageAgent receives the request, hands off to BillingAgent for amount calculation, and BillingAgent escalates to ApprovalAgent if the amount exceeds a threshold.

Here's how agents actually use the bus and handoff protocol. Each agent subscribes to its inbox, processes messages, and sends responses back through the bus. This is the complete wiring — not pseudocode.

agents.py
python
import asyncio
import logging
from dataclasses import dataclass
from typing import Any

from messages import Message, MessageType, Priority
from message_bus import MessageBus
from handoff import HandoffProtocol, HandoffStatus
from conflict_resolver import ConflictResolver, ConflictStrategy, Proposal

logger = logging.getLogger(__name__)


class BillingAgent:
    """Processes refund calculations and escalates high-value refunds."""

    ESCALATION_THRESHOLD = 100.00

    def __init__(self, bus: MessageBus):
        self._bus = bus
        self._handoff = HandoffProtocol(bus, "billing")
        bus.subscribe("billing.inbox", "billing", self.handle_message)

    async def handle_message(self, message: Message) -> None:
        if message.msg_type == MessageType.HANDOFF:
            # Acknowledge receipt immediately
            await self._handoff.acknowledge(message)

            # Process the refund
            result = await self._calculate_refund(message.payload)

            if result["amount"] > self.ESCALATION_THRESHOLD:
                # Escalate to approval agent
                status = await self._handoff.handoff(
                    target_agent="approval",
                    task_payload={
                        "task_id": message.payload.get("task_id"),
                        "amount": result["amount"],
                        "reason": message.payload.get("reason"),
                        "customer_id": message.payload.get("customer_id"),
                    },
                    priority=Priority.HIGH,
                )
                if status == HandoffStatus.ACKNOWLEDGED:
                    # Wait for approval decision via response message
                    logger.info("Escalation acknowledged, awaiting decision")
                    return
                else:
                    result["status"] = "escalation_failed"

            # Send result back to triage
            reply = message.reply(result)
            await self._bus.publish(reply)

        elif message.msg_type == MessageType.RESPONSE:
            # Decision from approval agent
            logger.info(f"Approval decision: {message.payload}")

    async def _calculate_refund(self, payload: dict[str, Any]) -> dict:
        order_amount = payload.get("order_amount", 0)
        reason = payload.get("reason", "")

        # Refund logic: full for defective, 80% for dissatisfaction
        if reason == "defective":
            amount = order_amount
        elif reason == "dissatisfied":
            amount = round(order_amount * 0.8, 2)
        else:
            amount = round(order_amount * 0.5, 2)

        return {"amount": amount, "reason": reason, "status": "calculated"}


class TriageAgent:
    """Routes incoming customer requests to the appropriate agent."""

    def __init__(self, bus: MessageBus):
        self._bus = bus
        self._handoff = HandoffProtocol(bus, "triage")
        bus.subscribe("triage.inbox", "triage", self.handle_message)

    async def process_refund_request(
        self, customer_id: str, order_amount: float, reason: str
    ) -> HandoffStatus:
        """Entry point: hand off a refund to the billing agent."""
        status = await self._handoff.handoff(
            target_agent="billing",
            task_payload={
                "task_id": f"refund-{customer_id}",
                "customer_id": customer_id,
                "order_amount": order_amount,
                "reason": reason,
            },
        )
        return status

    async def handle_message(self, message: Message) -> None:
        if message.msg_type == MessageType.RESPONSE:
            logger.info(
                f"Refund result for {message.payload}: {message.payload}"
            )


class ApprovalAgent:
    """Reviews high-value refunds and approves or rejects them."""

    def __init__(self, bus: MessageBus, auto_approve_limit: float = 500.0):
        self._bus = bus
        self._handoff = HandoffProtocol(bus, "approval")
        self._auto_approve_limit = auto_approve_limit
        bus.subscribe("approval.inbox", "approval", self.handle_message)

    async def handle_message(self, message: Message) -> None:
        if message.msg_type == MessageType.HANDOFF:
            await self._handoff.acknowledge(message)

            amount = message.payload.get("amount", 0)
            decision = "approved" if amount <= self._auto_approve_limit else "needs_review"

            reply = message.reply({
                "decision": decision,
                "amount": amount,
                "reviewer": "approval",
            })
            await self._bus.publish(reply)
run_refund.py
python
import asyncio
from message_bus import MessageBus
from agents import TriageAgent, BillingAgent, ApprovalAgent


async def main():
    bus = MessageBus(max_queue_size=100, max_retries=3)

    triage = TriageAgent(bus)
    billing = BillingAgent(bus)
    approval = ApprovalAgent(bus)

    await bus.start()

    # Process a high-value refund (will trigger escalation)
    status = await triage.process_refund_request(
        customer_id="cust-42",
        order_amount=250.00,
        reason="defective",
    )
    print(f"Handoff status: {status}")

    # Let messages propagate
    await asyncio.sleep(2)

    # Check dead letters
    for dl in bus.dead_letters:
        print(f"Dead letter: {dl.message.msg_id} -> {dl.error}")

    print(f"Billing queue depth: {bus.queue_depth('billing.inbox')}")
    await bus.stop()


asyncio.run(main())

When two agents disagree on a refund amount, the conflict resolver picks a winner. Here's the LLM arbitration path — the resolver sends both proposals to an LLM with their reasoning and gets back a structured decision.

resolve_conflict.py
python
import asyncio
from openai import AsyncOpenAI
from conflict_resolver import ConflictResolver, ConflictStrategy, Proposal


async def resolve_refund_dispute():
    client = AsyncOpenAI()

    resolver = ConflictResolver(
        strategy=ConflictStrategy.LLM_ARBITRATION,
        llm_client=client,
    )

    proposals = [
        Proposal(
            agent_name="billing",
            value={"amount": 250.00, "type": "full_refund"},
            reasoning="Product was defective on arrival. Policy requires full refund.",
        ),
        Proposal(
            agent_name="retention",
            value={"amount": 200.00, "type": "partial_refund_with_credit"},
            reasoning="Customer has high lifetime value. Partial refund plus store credit retains relationship.",
        ),
    ]

    resolution = await resolver.resolve(proposals)
    print(f"Winner: {resolution.winning_value}")
    print(f"LLM reasoning: {resolution.llm_reasoning}")


asyncio.run(resolve_refund_dispute())

You'll notice this post doesn't include a SharedStateManager. That's deliberate — shared state coordination is a distinct problem with its own concurrency challenges, and it's covered thoroughly in the State Management for Agents post. The primitives here (message bus, handoff protocol, conflict resolver) compose with shared state but don't duplicate it.

Shared state integration

To combine these patterns: agents read/write shared state for coordination data (session context, accumulated results) and use the message bus for task routing and handoffs. See State Management for Agents for the SharedStateManager implementation.
  • Persistence: The in-memory asyncio.Queue loses messages on crash. For production, back the bus with Redis Streams, RabbitMQ, or Kafka. The subscriber interface stays the same.
  • Observability: Log every message publish, delivery, and ack with correlation IDs. This is your debugging lifeline when three agents are exchanging messages at speed.
  • Idempotency key storage: The in-memory set for idempotency keys grows unbounded. Use a TTL-based cache (Redis with SETEX) or periodically prune keys older than the max message TTL.
  • Dead letter processing: Don't just log dead letters — alert on them. A growing DLQ means your consumers are failing and messages are being lost.
  • LLM arbitration cost: Every conflict that goes to LLM arbitration costs an API call. Use it as a fallback after majority vote fails to reach consensus, not as the default strategy.

Agent communication breaks down into three concerns: routing messages between agents (the bus), guaranteeing task delivery (the handoff protocol), and resolving disagreements (the conflict resolver). Each primitive is independent and composable — you can use the bus without the handoff protocol, or the conflict resolver without either.

The critical implementation details that tutorials skip: backpressure via bounded queues, real ack tracking via asyncio.Event instead of sleep-based polling, type-preserving vote counting, and actual LLM calls for arbitration instead of stub methods. These details determine whether your multi-agent system works under load or only in demos.

Related Articles