Agent-to-Agent Communication: Async Messaging, Handoff Protocols, and Conflict Resolution
Production-grade communication primitives for multi-agent systems: async message buses with backpressure, handoff protocols with real acknowledgment tracking, and conflict resolution including LLM arbitration.
Agents that can't talk to each other aren't a system — they're a collection of independent programs. Communication is the connective tissue of multi-agent architectures: it determines whether your agents can coordinate on a refund workflow, negotiate conflicting decisions, or hand off tasks without dropping context.
This post builds three production-grade primitives from scratch: an async message bus with backpressure and dead letter queues, a handoff protocol with real acknowledgment tracking and exponential backoff, and a conflict resolver that includes actual LLM arbitration. We'll wire them together through a customer support refund workflow where a TriageAgent, BillingAgent, and ApprovalAgent coordinate end-to-end.
Prerequisites
Before building anything, understand the tradeoffs. Each pattern fits different coordination needs:
| Pattern | Coupling | Latency | Scalability | Best For |
|------------------|----------|---------|-------------|-----------------------------------|
| Direct Message | High | Low | Poor | 1:1 request/response, RPC-style |
| Pub/Sub | Low | Medium | Excellent | Event broadcasting, fan-out |
| Shared State | Medium | Low | Moderate | Coordination data, consensus |
| Event Bus | Low | Medium | Excellent | Decoupled workflows, audit trails |
| Request/Reply | Medium | Medium | Moderate | Task delegation with ack |Direct messaging is simple but creates tight coupling — every agent needs to know every other agent's address. Pub/sub decouples publishers from subscribers but loses request/reply semantics. Shared state (covered in the state management post) works well for coordination data but not for task handoffs. An event bus with topic-based routing hits the sweet spot for most multi-agent workflows.
Every message in the system needs a consistent envelope. This schema supports routing, tracing, and idempotency:
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
class MessageType(Enum):
TASK = "task"
RESPONSE = "response"
ACK = "ack"
NACK = "nack"
EVENT = "event"
HANDOFF = "handoff"
class Priority(Enum):
LOW = 0
NORMAL = 1
HIGH = 2
CRITICAL = 3
@dataclass
class Message:
topic: str
payload: dict[str, Any]
msg_type: MessageType = MessageType.TASK
priority: Priority = Priority.NORMAL
sender: str = ""
recipient: str | None = None
msg_id: str = field(default_factory=lambda: str(uuid.uuid4()))
correlation_id: str | None = None
idempotency_key: str | None = None
timestamp: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
ttl_seconds: float = 300.0
attempt: int = 0
def is_expired(self) -> bool:
created = datetime.fromisoformat(self.timestamp)
elapsed = (datetime.now(timezone.utc) - created).total_seconds()
return elapsed > self.ttl_seconds
def reply(self, payload: dict[str, Any]) -> Message:
"""Create a reply message preserving correlation chain."""
return Message(
topic=f"{self.sender}.inbox",
payload=payload,
msg_type=MessageType.RESPONSE,
sender=self.recipient or "",
recipient=self.sender,
correlation_id=self.correlation_id or self.msg_id,
)The idempotency_key prevents duplicate processing when retries occur. The correlation_id chains request/reply pairs across multiple hops. The reply() factory method inverts sender/recipient and routes to the original sender's inbox topic.
A real message bus needs three things most tutorials skip: backpressure so fast producers don't overwhelm slow consumers, a dead letter queue for messages that repeatedly fail processing, and proper async subscriber management.
import asyncio
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any, Callable, Coroutine
from messages import Message, MessageType
logger = logging.getLogger(__name__)
SubscriberFn = Callable[[Message], Coroutine[Any, Any, None]]
@dataclass
class DeadLetter:
message: Message
error: str
failed_subscriber: str
attempts: int
class MessageBus:
"""Async message bus with per-topic queues, backpressure, and DLQ."""
def __init__(
self,
max_queue_size: int = 1000,
max_retries: int = 3,
):
self._subscribers: dict[str, list[tuple[str, SubscriberFn]]] = defaultdict(list)
self._queues: dict[str, asyncio.Queue[Message]] = {}
self._max_queue_size = max_queue_size
self._max_retries = max_retries
self._dead_letters: list[DeadLetter] = []
self._processors: dict[str, asyncio.Task] = {}
self._running = False
self._seen_idempotency_keys: set[str] = set()
def subscribe(self, topic: str, name: str, handler: SubscriberFn) -> None:
"""Register a named subscriber for a topic."""
self._subscribers[topic].append((name, handler))
if topic not in self._queues:
self._queues[topic] = asyncio.Queue(maxsize=self._max_queue_size)
async def publish(self, message: Message) -> bool:
"""Publish a message. Returns False if queue is full (backpressure)."""
topic = message.topic
if topic not in self._queues:
self._queues[topic] = asyncio.Queue(maxsize=self._max_queue_size)
# Idempotency check
if message.idempotency_key:
if message.idempotency_key in self._seen_idempotency_keys:
logger.info(f"Duplicate idempotency key: {message.idempotency_key}")
return True
self._seen_idempotency_keys.add(message.idempotency_key)
try:
self._queues[topic].put_nowait(message)
return True
except asyncio.QueueFull:
logger.warning(
f"Backpressure on topic '{topic}': queue full "
f"({self._max_queue_size} messages)"
)
return False
async def _process_topic(self, topic: str) -> None:
"""Process messages for a single topic, dispatching to subscribers."""
queue = self._queues[topic]
while self._running:
try:
message = await asyncio.wait_for(queue.get(), timeout=1.0)
except asyncio.TimeoutError:
continue
if message.is_expired():
logger.info(f"Dropping expired message {message.msg_id}")
self._dead_letters.append(
DeadLetter(message, "TTL expired", "", 0)
)
continue
for sub_name, handler in self._subscribers.get(topic, []):
# Filter by recipient if specified
if message.recipient and message.recipient != sub_name:
continue
success = await self._dispatch_with_retry(
message, sub_name, handler
)
if not success:
self._dead_letters.append(
DeadLetter(
message,
f"Failed after {self._max_retries} attempts",
sub_name,
self._max_retries,
)
)
async def _dispatch_with_retry(
self, message: Message, sub_name: str, handler: SubscriberFn
) -> bool:
"""Dispatch to a subscriber with retry and exponential backoff."""
for attempt in range(self._max_retries):
try:
message.attempt = attempt + 1
await handler(message)
return True
except Exception as e:
wait = min(2 ** attempt * 0.1, 5.0)
logger.error(
f"Subscriber '{sub_name}' failed on {message.msg_id} "
f"(attempt {attempt + 1}): {e}"
)
if attempt < self._max_retries - 1:
await asyncio.sleep(wait)
return False
async def start(self) -> None:
"""Start processing all topic queues."""
self._running = True
for topic in self._queues:
self._processors[topic] = asyncio.create_task(
self._process_topic(topic)
)
async def stop(self) -> None:
"""Gracefully stop all processors."""
self._running = False
for task in self._processors.values():
task.cancel()
await asyncio.gather(*self._processors.values(), return_exceptions=True)
self._processors.clear()
@property
def dead_letters(self) -> list[DeadLetter]:
return list(self._dead_letters)
def queue_depth(self, topic: str) -> int:
if topic in self._queues:
return self._queues[topic].qsize()
return 0Backpressure matters
maxsize on the queue, a runaway producer (e.g., an agent stuck in a retry loop generating messages) will consume unbounded memory. The publish() method returns False when the queue is full, letting the caller decide whether to drop, buffer, or wait.Task handoff between agents requires guaranteed delivery. The HandoffProtocol tracks outgoing handoffs, waits for real acknowledgments using asyncio.Event, and retries with exponential backoff on timeout.
import asyncio
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from messages import Message, MessageType, Priority
from message_bus import MessageBus
logger = logging.getLogger(__name__)
class HandoffStatus(Enum):
PENDING = "pending"
ACKNOWLEDGED = "acknowledged"
REJECTED = "rejected"
TIMED_OUT = "timed_out"
@dataclass
class PendingHandoff:
message: Message
ack_event: asyncio.Event = field(default_factory=asyncio.Event)
status: HandoffStatus = HandoffStatus.PENDING
response_payload: dict[str, Any] = field(default_factory=dict)
class HandoffProtocol:
"""Reliable task handoff with ack tracking and exponential backoff."""
def __init__(
self,
bus: MessageBus,
agent_name: str,
ack_timeout: float = 10.0,
max_retries: int = 3,
):
self._bus = bus
self._agent_name = agent_name
self._ack_timeout = ack_timeout
self._max_retries = max_retries
self._pending: dict[str, PendingHandoff] = {}
# Subscribe to our own inbox for ack/nack messages
bus.subscribe(
f"{agent_name}.inbox",
agent_name,
self._handle_ack,
)
async def _handle_ack(self, message: Message) -> None:
"""Process incoming ACK/NACK for a pending handoff."""
corr_id = message.correlation_id
if not corr_id or corr_id not in self._pending:
return
pending = self._pending[corr_id]
if message.msg_type == MessageType.ACK:
pending.status = HandoffStatus.ACKNOWLEDGED
pending.response_payload = message.payload
elif message.msg_type == MessageType.NACK:
pending.status = HandoffStatus.REJECTED
pending.response_payload = message.payload
pending.ack_event.set()
async def handoff(
self,
target_agent: str,
task_payload: dict[str, Any],
priority: Priority = Priority.NORMAL,
) -> HandoffStatus:
"""Send a task to another agent and wait for acknowledgment."""
message = Message(
topic=f"{target_agent}.inbox",
payload=task_payload,
msg_type=MessageType.HANDOFF,
priority=priority,
sender=self._agent_name,
recipient=target_agent,
idempotency_key=f"{self._agent_name}:{task_payload.get('task_id', message.msg_id)}",
)
for attempt in range(self._max_retries):
pending = PendingHandoff(message=message)
self._pending[message.msg_id] = pending
published = await self._bus.publish(message)
if not published:
logger.warning(f"Backpressure: cannot deliver to {target_agent}")
await asyncio.sleep(2 ** attempt * 0.5)
continue
try:
await asyncio.wait_for(
pending.ack_event.wait(),
timeout=self._ack_timeout * (1.5 ** attempt),
)
return pending.status
except asyncio.TimeoutError:
logger.warning(
f"Handoff to {target_agent} timed out "
f"(attempt {attempt + 1}/{self._max_retries})"
)
del self._pending[message.msg_id]
return HandoffStatus.TIMED_OUT
async def acknowledge(
self, original_message: Message, payload: dict[str, Any] | None = None
) -> None:
"""Send an ACK for a received handoff."""
ack = Message(
topic=f"{original_message.sender}.inbox",
payload=payload or {},
msg_type=MessageType.ACK,
sender=self._agent_name,
recipient=original_message.sender,
correlation_id=original_message.msg_id,
)
await self._bus.publish(ack)
async def reject(
self, original_message: Message, reason: str
) -> None:
"""Send a NACK for a received handoff."""
nack = Message(
topic=f"{original_message.sender}.inbox",
payload={"reason": reason},
msg_type=MessageType.NACK,
sender=self._agent_name,
recipient=original_message.sender,
correlation_id=original_message.msg_id,
)
await self._bus.publish(nack)The key difference from toy implementations: ack_event.wait() blocks until the receiving agent explicitly calls acknowledge() or reject(). No fake sleeps, no polling. The asyncio.Event is the right primitive — it's zero-cost when waiting and instant when signaled.
When multiple agents propose conflicting actions — two agents both want to set a refund amount, or disagree on whether to escalate — you need a resolution strategy. The three strategies here are majority vote, priority-based, and LLM arbitration.
import asyncio
import json
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Any
from openai import AsyncOpenAI
logger = logging.getLogger(__name__)
class ConflictStrategy(Enum):
MAJORITY_VOTE = "majority_vote"
PRIORITY = "priority"
LLM_ARBITRATION = "llm_arbitration"
@dataclass
class Proposal:
agent_name: str
value: Any
priority: int = 0
reasoning: str = ""
@dataclass
class Resolution:
winning_value: Any
strategy_used: ConflictStrategy
votes: dict[str, int] | None = None
llm_reasoning: str | None = None
class ConflictResolver:
def __init__(
self,
strategy: ConflictStrategy = ConflictStrategy.MAJORITY_VOTE,
llm_client: AsyncOpenAI | None = None,
llm_model: str = "gpt-4o",
):
self._strategy = strategy
self._client = llm_client
self._model = llm_model
async def resolve(self, proposals: list[Proposal]) -> Resolution:
if not proposals:
raise ValueError("No proposals to resolve")
if len(proposals) == 1:
return Resolution(
winning_value=proposals[0].value,
strategy_used=self._strategy,
)
match self._strategy:
case ConflictStrategy.MAJORITY_VOTE:
return self._majority_vote(proposals)
case ConflictStrategy.PRIORITY:
return self._priority_based(proposals)
case ConflictStrategy.LLM_ARBITRATION:
return await self._llm_arbitrate(proposals)
def _majority_vote(self, proposals: list[Proposal]) -> Resolution:
"""Resolve by counting votes. Uses structural equality, not string coercion."""
# Group by value using repr for hashability while preserving types
vote_groups: dict[str, tuple[Any, int]] = {}
for p in proposals:
key = repr(p.value) # Preserves type: repr(42) != repr("42")
if key in vote_groups:
existing_val, count = vote_groups[key]
vote_groups[key] = (existing_val, count + 1)
else:
vote_groups[key] = (p.value, 1)
# Find the value with most votes
winner_key = max(vote_groups, key=lambda k: vote_groups[k][1])
winner_value, _ = vote_groups[winner_key]
votes = {k: v[1] for k, v in vote_groups.items()}
return Resolution(
winning_value=winner_value,
strategy_used=ConflictStrategy.MAJORITY_VOTE,
votes=votes,
)
def _priority_based(self, proposals: list[Proposal]) -> Resolution:
"""Highest priority proposal wins."""
winner = max(proposals, key=lambda p: p.priority)
return Resolution(
winning_value=winner.value,
strategy_used=ConflictStrategy.PRIORITY,
)
async def _llm_arbitrate(self, proposals: list[Proposal]) -> Resolution:
"""Use an LLM to evaluate proposals and pick a winner with reasoning."""
if not self._client:
raise RuntimeError(
"LLM arbitration requires an AsyncOpenAI client"
)
proposals_text = "\n".join(
f"- Agent '{p.agent_name}' proposes: {json.dumps(p.value)}\n"
f" Reasoning: {p.reasoning or 'none provided'}"
for p in proposals
)
response = await self._client.chat.completions.create(
model=self._model,
messages=[
{
"role": "system",
"content": (
"You are an arbitrator resolving conflicts between AI agents. "
"Evaluate each proposal on its merits and reasoning. "
"Respond with JSON: {\"chosen_index\": <0-based index>, "
"\"reasoning\": \"<your analysis>\"}"
),
},
{
"role": "user",
"content": f"Resolve this conflict:\n{proposals_text}",
},
],
response_format={"type": "json_object"},
temperature=0.1,
)
result = json.loads(response.choices[0].message.content)
chosen_idx = int(result["chosen_index"])
chosen = proposals[min(chosen_idx, len(proposals) - 1)]
return Resolution(
winning_value=chosen.value,
strategy_used=ConflictStrategy.LLM_ARBITRATION,
llm_reasoning=result.get("reasoning", ""),
)Why repr() instead of str()?
repr() as the grouping key, not str(). This preserves type information: repr(42) is '42' while repr('42') is "'42'". Using str() would incorrectly group the integer 42 and string "42" as the same vote.Here's the full message flow for a customer support refund. The TriageAgent receives the request, hands off to BillingAgent for amount calculation, and BillingAgent escalates to ApprovalAgent if the amount exceeds a threshold.
Here's how agents actually use the bus and handoff protocol. Each agent subscribes to its inbox, processes messages, and sends responses back through the bus. This is the complete wiring — not pseudocode.
import asyncio
import logging
from dataclasses import dataclass
from typing import Any
from messages import Message, MessageType, Priority
from message_bus import MessageBus
from handoff import HandoffProtocol, HandoffStatus
from conflict_resolver import ConflictResolver, ConflictStrategy, Proposal
logger = logging.getLogger(__name__)
class BillingAgent:
"""Processes refund calculations and escalates high-value refunds."""
ESCALATION_THRESHOLD = 100.00
def __init__(self, bus: MessageBus):
self._bus = bus
self._handoff = HandoffProtocol(bus, "billing")
bus.subscribe("billing.inbox", "billing", self.handle_message)
async def handle_message(self, message: Message) -> None:
if message.msg_type == MessageType.HANDOFF:
# Acknowledge receipt immediately
await self._handoff.acknowledge(message)
# Process the refund
result = await self._calculate_refund(message.payload)
if result["amount"] > self.ESCALATION_THRESHOLD:
# Escalate to approval agent
status = await self._handoff.handoff(
target_agent="approval",
task_payload={
"task_id": message.payload.get("task_id"),
"amount": result["amount"],
"reason": message.payload.get("reason"),
"customer_id": message.payload.get("customer_id"),
},
priority=Priority.HIGH,
)
if status == HandoffStatus.ACKNOWLEDGED:
# Wait for approval decision via response message
logger.info("Escalation acknowledged, awaiting decision")
return
else:
result["status"] = "escalation_failed"
# Send result back to triage
reply = message.reply(result)
await self._bus.publish(reply)
elif message.msg_type == MessageType.RESPONSE:
# Decision from approval agent
logger.info(f"Approval decision: {message.payload}")
async def _calculate_refund(self, payload: dict[str, Any]) -> dict:
order_amount = payload.get("order_amount", 0)
reason = payload.get("reason", "")
# Refund logic: full for defective, 80% for dissatisfaction
if reason == "defective":
amount = order_amount
elif reason == "dissatisfied":
amount = round(order_amount * 0.8, 2)
else:
amount = round(order_amount * 0.5, 2)
return {"amount": amount, "reason": reason, "status": "calculated"}
class TriageAgent:
"""Routes incoming customer requests to the appropriate agent."""
def __init__(self, bus: MessageBus):
self._bus = bus
self._handoff = HandoffProtocol(bus, "triage")
bus.subscribe("triage.inbox", "triage", self.handle_message)
async def process_refund_request(
self, customer_id: str, order_amount: float, reason: str
) -> HandoffStatus:
"""Entry point: hand off a refund to the billing agent."""
status = await self._handoff.handoff(
target_agent="billing",
task_payload={
"task_id": f"refund-{customer_id}",
"customer_id": customer_id,
"order_amount": order_amount,
"reason": reason,
},
)
return status
async def handle_message(self, message: Message) -> None:
if message.msg_type == MessageType.RESPONSE:
logger.info(
f"Refund result for {message.payload}: {message.payload}"
)
class ApprovalAgent:
"""Reviews high-value refunds and approves or rejects them."""
def __init__(self, bus: MessageBus, auto_approve_limit: float = 500.0):
self._bus = bus
self._handoff = HandoffProtocol(bus, "approval")
self._auto_approve_limit = auto_approve_limit
bus.subscribe("approval.inbox", "approval", self.handle_message)
async def handle_message(self, message: Message) -> None:
if message.msg_type == MessageType.HANDOFF:
await self._handoff.acknowledge(message)
amount = message.payload.get("amount", 0)
decision = "approved" if amount <= self._auto_approve_limit else "needs_review"
reply = message.reply({
"decision": decision,
"amount": amount,
"reviewer": "approval",
})
await self._bus.publish(reply)import asyncio
from message_bus import MessageBus
from agents import TriageAgent, BillingAgent, ApprovalAgent
async def main():
bus = MessageBus(max_queue_size=100, max_retries=3)
triage = TriageAgent(bus)
billing = BillingAgent(bus)
approval = ApprovalAgent(bus)
await bus.start()
# Process a high-value refund (will trigger escalation)
status = await triage.process_refund_request(
customer_id="cust-42",
order_amount=250.00,
reason="defective",
)
print(f"Handoff status: {status}")
# Let messages propagate
await asyncio.sleep(2)
# Check dead letters
for dl in bus.dead_letters:
print(f"Dead letter: {dl.message.msg_id} -> {dl.error}")
print(f"Billing queue depth: {bus.queue_depth('billing.inbox')}")
await bus.stop()
asyncio.run(main())When two agents disagree on a refund amount, the conflict resolver picks a winner. Here's the LLM arbitration path — the resolver sends both proposals to an LLM with their reasoning and gets back a structured decision.
import asyncio
from openai import AsyncOpenAI
from conflict_resolver import ConflictResolver, ConflictStrategy, Proposal
async def resolve_refund_dispute():
client = AsyncOpenAI()
resolver = ConflictResolver(
strategy=ConflictStrategy.LLM_ARBITRATION,
llm_client=client,
)
proposals = [
Proposal(
agent_name="billing",
value={"amount": 250.00, "type": "full_refund"},
reasoning="Product was defective on arrival. Policy requires full refund.",
),
Proposal(
agent_name="retention",
value={"amount": 200.00, "type": "partial_refund_with_credit"},
reasoning="Customer has high lifetime value. Partial refund plus store credit retains relationship.",
),
]
resolution = await resolver.resolve(proposals)
print(f"Winner: {resolution.winning_value}")
print(f"LLM reasoning: {resolution.llm_reasoning}")
asyncio.run(resolve_refund_dispute())You'll notice this post doesn't include a SharedStateManager. That's deliberate — shared state coordination is a distinct problem with its own concurrency challenges, and it's covered thoroughly in the State Management for Agents post. The primitives here (message bus, handoff protocol, conflict resolver) compose with shared state but don't duplicate it.
Shared state integration
- Persistence: The in-memory
asyncio.Queueloses messages on crash. For production, back the bus with Redis Streams, RabbitMQ, or Kafka. The subscriber interface stays the same. - Observability: Log every message publish, delivery, and ack with correlation IDs. This is your debugging lifeline when three agents are exchanging messages at speed.
- Idempotency key storage: The in-memory
setfor idempotency keys grows unbounded. Use a TTL-based cache (Redis with SETEX) or periodically prune keys older than the max message TTL. - Dead letter processing: Don't just log dead letters — alert on them. A growing DLQ means your consumers are failing and messages are being lost.
- LLM arbitration cost: Every conflict that goes to LLM arbitration costs an API call. Use it as a fallback after majority vote fails to reach consensus, not as the default strategy.
Agent communication breaks down into three concerns: routing messages between agents (the bus), guaranteeing task delivery (the handoff protocol), and resolving disagreements (the conflict resolver). Each primitive is independent and composable — you can use the bus without the handoff protocol, or the conflict resolver without either.
The critical implementation details that tutorials skip: backpressure via bounded queues, real ack tracking via asyncio.Event instead of sleep-based polling, type-preserving vote counting, and actual LLM calls for arbitration instead of stub methods. These details determine whether your multi-agent system works under load or only in demos.
Related Articles
State Management for Multi-Agent Systems: Redis, PostgreSQL, LangGraph & Checkpointing
Production state management for multi-agent workflows — Redis for ephemeral coordination, PostgreSQL for durable records, LangGraph for typed state graphs with conditional routing, and checkpoint/resume patterns that actually survive crashes.
Intent Classification for Agent Routing: LLM-Based, Embedding-Based & Hybrid Approaches
Learn intent classification for agent routing in a detailed, easy-to-understand way. This guide explains LLM-based routing, embedding similarity, hybrid classifiers, confidence thresholds, fallback logic, and multi-intent detection with a practical example.
Orchestration Architectures: Supervisor, Router & Hierarchical Patterns for Multi-Agent Systems
Build production orchestration for multi-agent systems — supervisor routing with LLM classification, parallel fan-out with error recovery, event-driven coordination, and hierarchical delegation. Includes comparison matrix and combined architecture example.