
State Management for Multi-Agent Systems: Redis, PostgreSQL, LangGraph & Checkpointing
Production state management for multi-agent workflows — Redis for ephemeral coordination, PostgreSQL for durable records, LangGraph for typed state graphs with conditional routing, and checkpoint/resume patterns that actually survive crashes.
State is what separates a multi-agent system from a collection of stateless function calls. Without it, agents can't remember what another agent already discovered, can't resume after a crash, and can't coordinate updates to shared data without overwriting each other. Every reliability problem in multi-agent systems — lost progress, inconsistent answers, duplicated work — traces back to state management.
This post covers four approaches: Redis for fast ephemeral state, PostgreSQL for durable transactional records, LangGraph for typed state graphs with conditional routing across agent nodes, and checkpoint/resume patterns that survive process crashes. We'll build each with production-grade code and wire them through a running example: a customer support workflow handling "I was charged twice, and I can't log in" — a request that spans billing and account recovery and forces real state coordination.
Prerequisites
Not all state deserves the same storage. Picking the wrong backend for a given state category is the most common architectural mistake in multi-agent systems.
- Ephemeral state — session context, in-progress variables, temporary caches. Lives in Redis or memory. Lost on restart is acceptable.
- Durable state — transactions, approvals, audit logs. Must survive crashes. Lives in PostgreSQL or equivalent RDBMS.
- Shared state — data multiple agents read and write during a workflow. Needs concurrency control regardless of backend.
- Private agent state — scratchpad, reasoning traces, tool call history. Owned by one agent, never shared.
- Checkpoint state — frozen snapshots of workflow progress at specific boundaries. Enables resume-from-failure.
A user says: "I was charged twice, and I still can't log in." The system routes to a BillingAgent and an AccountAgent in parallel. Both write findings to shared state. A PolicyAgent reads those findings and decides whether to approve the refund. A ResponseAgent reads everything and composes the final answer. The workflow ID is case_8842.
The state that accumulates across this workflow includes: the original request, conversation history, billing findings (duplicate charge confirmed, transaction IDs), account recovery status (password reset sent), policy decision (refund approved), retry counts, and the final resolution. Every piece needs to be stored somewhere, readable by the right agents, and recoverable if the process dies mid-workflow.
Redis is the right tool for state that needs to be fast, shared, and temporary — active workflow coordination, session context, rate limiting, and distributed locks. It's the wrong tool for anything that must survive a Redis restart without persistence configured.
import redis
import json
import time
from typing import Any, Optional
class RedisStateManager:
"""Ephemeral state management with Redis.
Handles workflow state, distributed locks, and atomic updates.
Does NOT own durable records — use PostgreSQL for those.
"""
def __init__(self, redis_url: str = "redis://localhost:6379", default_ttl: int = 3600):
self._redis = redis.from_url(redis_url, decode_responses=True)
self._default_ttl = default_ttl
# --- Workflow state ---
def get_workflow_state(self, workflow_id: str) -> Optional[dict]:
"""Read current workflow state. Returns None if expired or missing."""
data = self._redis.get(f"workflow:{workflow_id}")
return json.loads(data) if data else None
def set_workflow_state(self, workflow_id: str, state: dict, ttl: Optional[int] = None):
"""Write workflow state with TTL. Overwrites entirely."""
self._redis.setex(
f"workflow:{workflow_id}",
ttl or self._default_ttl,
json.dumps(state)
)
def update_workflow_field(self, workflow_id: str, field: str, value: Any) -> bool:
"""Atomically update a single top-level field using Lua.
Returns False if the workflow doesn't exist.
"""
lua = """
local key = KEYS[1]
local field = ARGV[1]
local value = ARGV[2]
local current = redis.call('GET', key)
if not current then return 0 end
local state = cjson.decode(current)
state[field] = cjson.decode(value)
redis.call('SET', key, cjson.encode(state), 'KEEPTTL')
return 1
"""
result = self._redis.eval(lua, 1, f"workflow:{workflow_id}", field, json.dumps(value))
return bool(result)
def update_workflow_section(self, workflow_id: str, section: str, updates: dict) -> bool:
"""Atomically merge updates into a nested section.
Example: update_workflow_section('case_8842', 'billing', {'refund_approved': True})
Merges into state['billing'] without touching other sections.
"""
lua = """
local key = KEYS[1]
local section = ARGV[1]
local updates = cjson.decode(ARGV[2])
local current = redis.call('GET', key)
if not current then return 0 end
local state = cjson.decode(current)
if not state[section] then state[section] = {} end
for k, v in pairs(updates) do
state[section][k] = v
end
state['updated_at'] = ARGV[3]
redis.call('SET', key, cjson.encode(state), 'KEEPTTL')
return 1
"""
from datetime import datetime
result = self._redis.eval(
lua, 1, f"workflow:{workflow_id}",
section, json.dumps(updates), datetime.utcnow().isoformat()
)
return bool(result)
# --- Distributed locks ---
def acquire_lock(self, resource: str, timeout: int = 10) -> Optional[str]:
"""Acquire a distributed lock. Returns lock token or None."""
import uuid
token = str(uuid.uuid4())
if self._redis.set(f"lock:{resource}", token, nx=True, ex=timeout):
return token
return None
def release_lock(self, resource: str, token: str) -> bool:
"""Release lock only if we still own it (compare-and-delete)."""
lua = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
end
return 0
"""
return bool(self._redis.eval(lua, 1, f"lock:{resource}", token))Two design decisions worth noting. First, the Redis client is private (_redis) — no external code should reach into it directly. Second, section-level updates use Lua scripts for atomicity. Without Lua, a read-modify-write cycle between two agents can silently drop one agent's update.
State infrastructure is useless if agents don't use it consistently. Here's a base class that standardizes how agents load and save state, with a concrete BillingAgent that demonstrates the pattern:
from abc import ABC, abstractmethod
from typing import Any, Optional
import logging
logger = logging.getLogger(__name__)
class StatefulAgent(ABC):
"""Base class for agents that read/write workflow state.
Subclasses implement process() with their domain logic.
State loading, saving, and error handling are standardized.
"""
def __init__(self, agent_id: str, state_manager: RedisStateManager, section: str):
self.agent_id = agent_id
self._state_manager = state_manager
self._section = section # which part of workflow state this agent owns
async def run(self, workflow_id: str, task: dict) -> dict:
"""Execute agent with automatic state management."""
# Load current workflow state
workflow_state = self._state_manager.get_workflow_state(workflow_id)
if not workflow_state:
return {"success": False, "error": f"No workflow state for {workflow_id}"}
# Extract this agent's section (may be empty on first run)
my_state = workflow_state.get(self._section, {})
try:
# Run domain logic
result = await self.process(task, my_state, workflow_state)
# Save results back to this agent's section
self._state_manager.update_workflow_section(
workflow_id, self._section, result
)
# Update workflow status
self._state_manager.update_workflow_field(
workflow_id, "current_owner", self.agent_id
)
logger.info(f"{self.agent_id} completed on {workflow_id}")
return {"success": True, "result": result}
except Exception as e:
logger.error(f"{self.agent_id} failed on {workflow_id}: {e}")
self._state_manager.update_workflow_section(
workflow_id, self._section,
{"error": str(e), "status": "failed"}
)
return {"success": False, "error": str(e)}
@abstractmethod
async def process(self, task: dict, my_state: dict, workflow_state: dict) -> dict:
"""Domain-specific processing. Returns updates for this agent's section."""
...
class BillingAgent(StatefulAgent):
"""Checks for duplicate charges and determines refund eligibility."""
def __init__(self, state_manager: RedisStateManager):
super().__init__("billing-agent", state_manager, section="billing")
async def process(self, task: dict, my_state: dict, workflow_state: dict) -> dict:
transaction_ids = task.get("transaction_ids", [])
customer_id = task.get("customer_id")
# Simulate billing system lookup
charges = await self._fetch_charges(customer_id, transaction_ids)
amounts = [c["amount"] for c in charges]
has_duplicate = len(amounts) != len(set(amounts))
duplicate_amount = None
if has_duplicate:
from collections import Counter
counts = Counter(amounts)
duplicate_amount = max(k for k, v in counts.items() if v > 1)
return {
"duplicate_detected": has_duplicate,
"duplicate_amount": duplicate_amount,
"transaction_ids": transaction_ids,
"charges": charges,
"refund_eligible": has_duplicate,
"status": "completed"
}
async def _fetch_charges(self, customer_id: str, txn_ids: list) -> list:
"""Call billing service API. Simplified for illustration."""
return [
{"txn_id": "txn_100", "amount": 49.99, "date": "2026-05-28"},
{"txn_id": "txn_101", "amount": 49.99, "date": "2026-05-28"}
]The key pattern: agents never touch the state manager's internals. They call update_workflow_section with their own section name, and the state manager handles atomicity. An AccountAgent would own the account section, a PolicyAgent the policy section. No agent can accidentally overwrite another's data.
Redis handles the hot path. PostgreSQL handles everything that must survive: approved refunds, audit trails, compliance records, and state history for debugging. If your workflow involves money, approvals, or regulatory requirements, the final decisions must land in a durable store.
import psycopg2
from psycopg2.extras import Json, RealDictCursor
from typing import Any, Optional
from datetime import datetime
import json
class PostgreSQLStateManager:
"""Durable state with full audit history and optimistic locking."""
def __init__(self, dsn: str):
self.conn = psycopg2.connect(dsn)
self._init_schema()
def _init_schema(self):
with self.conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS workflow_state (
workflow_id VARCHAR(255) PRIMARY KEY,
state JSONB NOT NULL DEFAULT '{}',
version INTEGER NOT NULL DEFAULT 1,
status VARCHAR(50) NOT NULL DEFAULT 'created',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS state_audit_log (
id BIGSERIAL PRIMARY KEY,
workflow_id VARCHAR(255) NOT NULL,
agent_id VARCHAR(255) NOT NULL,
section VARCHAR(100) NOT NULL,
old_value JSONB,
new_value JSONB NOT NULL,
version INTEGER NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_audit_workflow
ON state_audit_log(workflow_id, created_at DESC);
""")
self.conn.commit()
def create_workflow(self, workflow_id: str, initial_state: dict) -> int:
"""Create a new workflow record. Returns version (1)."""
with self.conn.cursor() as cur:
cur.execute("""
INSERT INTO workflow_state (workflow_id, state, status)
VALUES (%s, %s, 'active')
ON CONFLICT (workflow_id) DO NOTHING
RETURNING version
""", (workflow_id, Json(initial_state)))
row = cur.fetchone()
self.conn.commit()
return row[0] if row else 0
def update_section(
self, workflow_id: str, agent_id: str, section: str,
updates: dict, expected_version: Optional[int] = None
) -> int:
"""Update a section with optimistic locking and audit trail.
Returns new version number. Raises if version mismatch.
"""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
# Lock the row
cur.execute("""
SELECT state, version FROM workflow_state
WHERE workflow_id = %s FOR UPDATE
""", (workflow_id,))
row = cur.fetchone()
if not row:
raise ValueError(f"Workflow {workflow_id} not found")
if expected_version and row['version'] != expected_version:
self.conn.rollback()
raise OptimisticLockError(
f"Version mismatch: expected {expected_version}, got {row['version']}"
)
old_section = row['state'].get(section, {})
new_state = {**row['state'], section: {**old_section, **updates}}
new_version = row['version'] + 1
cur.execute("""
UPDATE workflow_state
SET state = %s, version = %s, updated_at = NOW()
WHERE workflow_id = %s
""", (Json(new_state), new_version, workflow_id))
# Audit log
cur.execute("""
INSERT INTO state_audit_log
(workflow_id, agent_id, section, old_value, new_value, version)
VALUES (%s, %s, %s, %s, %s, %s)
""", (workflow_id, agent_id, section, Json(old_section), Json(updates), new_version))
self.conn.commit()
return new_version
def get_audit_trail(self, workflow_id: str, limit: int = 50) -> list:
"""Full audit trail for a workflow — who changed what, when."""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT agent_id, section, old_value, new_value, version, created_at
FROM state_audit_log
WHERE workflow_id = %s
ORDER BY created_at DESC
LIMIT %s
""", (workflow_id, limit))
return [dict(r) for r in cur.fetchall()]
class OptimisticLockError(Exception):
passThe critical addition here is the audit log. Every state mutation records who changed what, the before and after values, and the version number. When a refund goes wrong three weeks later, you can reconstruct exactly what each agent saw and decided.
| Dimension | Redis | PostgreSQL | LangGraph State | In-Memory (dict) |
|------------------------|------------------------------|-------------------------------|-------------------------|--------------------------|
| Latency | Sub-ms reads/writes | 1-5ms typical | Depends on backend | Instant |
| Durability | Optional (RDB/AOF) | Full ACID | Backend-dependent | None |
| Concurrency control | Lua scripts, WATCH | Row locks, MVCC | Reducer functions | Manual locks |
| Audit trail | Manual | Built-in with triggers/logs | Checkpoint history | None |
| Max state size | ~512MB per key | Unlimited (JSONB) | Serialization-dependent | Process memory limit |
| Crash recovery | Configurable | Automatic | Checkpoint-based | Lost |
| Multi-agent safe | With Lua atomics | With transactions | With typed reducers | With asyncio.Lock |
| Best for | Hot path, coordination | Durable records, compliance | Workflow state flow | Prototyping, tests |Most production systems use both: Redis for the active workflow (fast reads during agent execution) and PostgreSQL for durable decisions (the refund was approved, the audit trail). Sync the final decision to PostgreSQL when the workflow completes or at critical state transitions.
LangGraph models multi-agent workflows as directed graphs where typed state flows through nodes. Each node is an agent function that reads state, does work, and returns updates. Conditional edges route state to different agents based on the current values. This makes workflow logic explicit and inspectable — you can see exactly which agent runs next and why.
Here's the refund + account recovery workflow as a full LangGraph implementation with typed state, three agent nodes, conditional routing, and state accumulation:
from typing import TypedDict, Annotated, Literal, Optional
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.sqlite import SqliteSaver
import operator
import sqlite3
# --- Typed workflow state ---
class BillingResult(TypedDict, total=False):
duplicate_detected: bool
duplicate_amount: Optional[float]
refund_eligible: bool
class AccountResult(TypedDict, total=False):
issue_found: bool
reset_sent: bool
account_locked: bool
class PolicyResult(TypedDict, total=False):
approved: bool
reason: str
requires_human_review: bool
class WorkflowState(TypedDict):
messages: Annotated[list, add_messages]
customer_id: str
request_text: str
billing: BillingResult
account: AccountResult
policy: PolicyResult
current_step: str
completed_steps: Annotated[list[str], operator.add] # accumulates across nodes
final_response: str
error: Optional[str]
# --- Agent nodes ---
async def billing_agent(state: WorkflowState) -> dict:
"""Check for duplicate charges."""
# In production: call billing API, parse response
request = state["request_text"].lower()
has_billing_issue = "charged" in request or "billing" in request
return {
"billing": {
"duplicate_detected": has_billing_issue,
"duplicate_amount": 49.99 if has_billing_issue else None,
"refund_eligible": has_billing_issue
},
"current_step": "billing_complete",
"completed_steps": ["billing"]
}
async def account_agent(state: WorkflowState) -> dict:
"""Handle login/account recovery."""
request = state["request_text"].lower()
has_account_issue = "log in" in request or "login" in request or "password" in request
return {
"account": {
"issue_found": has_account_issue,
"reset_sent": has_account_issue,
"account_locked": False
},
"current_step": "account_complete",
"completed_steps": ["account"]
}
async def policy_agent(state: WorkflowState) -> dict:
"""Review billing and account findings against policy rules."""
billing = state.get("billing", {})
account = state.get("account", {})
# Business rules
refund_approved = billing.get("refund_eligible", False)
needs_human = account.get("account_locked", False)
if refund_approved and billing.get("duplicate_amount", 0) > 500:
needs_human = True
refund_approved = False
reason = []
if billing.get("duplicate_detected"):
reason.append("duplicate charge confirmed")
if account.get("reset_sent"):
reason.append("password reset initiated")
return {
"policy": {
"approved": refund_approved,
"reason": "; ".join(reason) or "no action needed",
"requires_human_review": needs_human
},
"current_step": "policy_complete",
"completed_steps": ["policy"]
}
async def response_agent(state: WorkflowState) -> dict:
"""Compose final user-facing response from all findings."""
billing = state.get("billing", {})
account = state.get("account", {})
policy = state.get("policy", {})
parts = []
if billing.get("duplicate_detected"):
amt = billing.get('duplicate_amount', 'N/A')
if policy.get("approved"):
parts.append(f"We confirmed a duplicate charge of ${amt}. Your refund has been approved.")
else:
parts.append(f"We found a duplicate charge of ${amt}. This requires manual review.")
if account.get("reset_sent"):
parts.append("We've sent a password reset link to your email.")
return {
"final_response": " ".join(parts) or "We reviewed your request but found no issues.",
"current_step": "complete",
"completed_steps": ["response"]
}
# --- Build the graph ---
def build_workflow() -> StateGraph:
graph = StateGraph(WorkflowState)
# Add nodes
graph.add_node("billing_agent", billing_agent)
graph.add_node("account_agent", account_agent)
graph.add_node("policy_agent", policy_agent)
graph.add_node("response_agent", response_agent)
# Entry: route to specialists
graph.set_entry_point("billing_agent")
graph.add_edge("billing_agent", "account_agent")
graph.add_edge("account_agent", "policy_agent")
graph.add_edge("policy_agent", "response_agent")
graph.add_edge("response_agent", END)
return graph
# --- Compile with checkpointing ---
conn = sqlite3.connect("workflow_checkpoints.db", check_same_thread=False)
checkpointer = SqliteSaver(conn)
workflow = build_workflow()
app = workflow.compile(checkpointer=checkpointer)
# Run the workflow
import asyncio
async def main():
config = {"configurable": {"thread_id": "case_8842"}}
result = await app.ainvoke(
{
"messages": [],
"customer_id": "cust_123",
"request_text": "I was charged twice, and I can't log in",
"billing": {},
"account": {},
"policy": {},
"current_step": "intake",
"completed_steps": [],
"final_response": "",
"error": None
},
config=config
)
print(f"Response: {result['final_response']}")
print(f"Steps completed: {result['completed_steps']}")
# Resume from checkpoint (simulating crash recovery)
resumed = await app.ainvoke(None, config=config)
print(f"Resumed state matches: {resumed['final_response'] == result['final_response']}")
asyncio.run(main())Three things make this different from a toy LangGraph example. First, the state is typed with domain-specific sub-structures (BillingResult, AccountResult, PolicyResult) — not a generic dict. Second, completed_steps uses an accumulator reducer (operator.add) so each node appends to the list without overwriting previous entries. Third, the policy agent makes real decisions based on accumulated state from previous nodes, including a threshold rule that escalates high-value refunds to human review.
LangGraph checkpointing
app.ainvoke(None, config=config) resumes from the last completed node — no manual checkpoint code required.LangGraph handles checkpointing automatically if you use its built-in savers. But many systems use custom orchestration where you need manual checkpoint control. The key design requirement: O(1) lookup of the latest checkpoint for any workflow. Scanning keys or iterating lists to find the latest checkpoint is a production bug waiting to happen.
import json
from typing import Optional
from datetime import datetime
import hashlib
import redis
class CheckpointManager:
"""Checkpoint/resume for custom orchestration workflows.
Key design: O(1) latest checkpoint lookup via a 'latest' pointer key.
No scanning, no iteration, no list traversal.
"""
def __init__(self, redis_url: str = "redis://localhost:6379", ttl: int = 86400):
self._redis = redis.from_url(redis_url, decode_responses=True)
self._ttl = ttl
def create_checkpoint(
self,
workflow_id: str,
step_name: str,
state: dict,
metadata: Optional[dict] = None
) -> str:
"""Save a checkpoint. Updates the 'latest' pointer atomically."""
checkpoint_id = hashlib.md5(
f"{workflow_id}:{step_name}:{datetime.utcnow().isoformat()}".encode()
).hexdigest()[:16]
checkpoint_data = {
"checkpoint_id": checkpoint_id,
"workflow_id": workflow_id,
"step_name": step_name,
"state": state,
"metadata": metadata or {},
"created_at": datetime.utcnow().isoformat()
}
# Store the checkpoint
checkpoint_key = f"ckpt:{workflow_id}:{checkpoint_id}"
# Store the latest pointer (O(1) lookup)
latest_key = f"ckpt:{workflow_id}:latest"
pipe = self._redis.pipeline()
pipe.setex(checkpoint_key, self._ttl, json.dumps(checkpoint_data))
pipe.setex(latest_key, self._ttl, checkpoint_key) # points to the actual key
pipe.lpush(f"ckpt:{workflow_id}:history", checkpoint_id)
pipe.expire(f"ckpt:{workflow_id}:history", self._ttl)
pipe.execute()
return checkpoint_id
def get_latest_checkpoint(self, workflow_id: str) -> Optional[dict]:
"""O(1) lookup of the latest checkpoint. No scanning."""
latest_key = f"ckpt:{workflow_id}:latest"
checkpoint_key = self._redis.get(latest_key)
if not checkpoint_key:
return None
data = self._redis.get(checkpoint_key)
return json.loads(data) if data else None
def resume_from_checkpoint(self, workflow_id: str) -> Optional[dict]:
"""Load latest checkpoint and return resumption context."""
checkpoint = self.get_latest_checkpoint(workflow_id)
if not checkpoint:
return None
return {
"resume_from_step": checkpoint["step_name"],
"state": checkpoint["state"],
"checkpoint_id": checkpoint["checkpoint_id"],
"checkpointed_at": checkpoint["created_at"]
}
def get_checkpoint_history(self, workflow_id: str, limit: int = 10) -> list:
"""List recent checkpoint IDs for debugging."""
ids = self._redis.lrange(f"ckpt:{workflow_id}:history", 0, limit - 1)
results = []
for cid in ids:
data = self._redis.get(f"ckpt:{workflow_id}:{cid}")
if data:
cp = json.loads(data)
results.append({
"checkpoint_id": cp["checkpoint_id"],
"step_name": cp["step_name"],
"created_at": cp["created_at"]
})
return results
def clear_checkpoints(self, workflow_id: str):
"""Remove all checkpoints for a completed workflow."""
ids = self._redis.lrange(f"ckpt:{workflow_id}:history", 0, -1)
if ids:
keys = [f"ckpt:{workflow_id}:{cid}" for cid in ids]
keys.extend([
f"ckpt:{workflow_id}:latest",
f"ckpt:{workflow_id}:history"
])
self._redis.delete(*keys)The original version of this code had a broken get_latest_checkpoint that stored the checkpoint ID in a list, then did a scan_iter across all keys to find the matching one — O(n) where n is the number of checkpoint keys for that workflow. The fix is simple: store a latest pointer key that contains the direct Redis key of the most recent checkpoint. One GET, done.
Checkpoint at natural workflow boundaries — not after every line of code, and not only at the end.
- After expensive operations — LLM calls, external API lookups, database queries. These cost time and money to repeat.
- Before side effects — refund issuance, email sends, account changes. If the process dies after the side effect but before the checkpoint, resumption will repeat the side effect. Checkpoint before so you can detect and skip on resume.
- At human-in-the-loop boundaries — before waiting for approval, after receiving it. Humans are slow; don't make them repeat themselves.
- After each agent completes — in a multi-agent pipeline, checkpoint between agent handoffs.
State grows. Conversation histories accumulate. Agent reasoning traces get verbose. Retrieved documents get attached. Without size management, your state eventually hits Redis key size limits (512MB), causes serialization timeouts, or makes checkpoint/resume painfully slow.
import json
import gzip
import sys
from typing import Any, Optional
class StateSizeManager:
"""Strategies for keeping workflow state manageable."""
# Thresholds
WARN_SIZE_BYTES = 1_000_000 # 1MB: log a warning
COMPRESS_SIZE_BYTES = 500_000 # 500KB: start compressing
PRUNE_MESSAGE_COUNT = 50 # keep last N messages
MAX_REASONING_CHARS = 2000 # truncate agent reasoning traces
@staticmethod
def measure(state: dict) -> dict:
"""Measure state size and identify large sections."""
total = sys.getsizeof(json.dumps(state))
sections = {}
for key, value in state.items():
sections[key] = sys.getsizeof(json.dumps(value)) if value else 0
return {
"total_bytes": total,
"sections": dict(sorted(sections.items(), key=lambda x: x[1], reverse=True)),
"needs_pruning": total > StateSizeManager.WARN_SIZE_BYTES,
"needs_compression": total > StateSizeManager.COMPRESS_SIZE_BYTES
}
@staticmethod
def prune_messages(state: dict, max_messages: int = 50) -> dict:
"""Keep only the last N messages, with a summary of pruned ones."""
messages = state.get("messages", [])
if len(messages) <= max_messages:
return state
pruned_count = len(messages) - max_messages
state["messages"] = messages[-max_messages:]
state.setdefault("metadata", {})["pruned_messages"] = pruned_count
return state
@staticmethod
def compress_state(state: dict) -> bytes:
"""Gzip compress for storage. Use for checkpoints of large state."""
return gzip.compress(json.dumps(state).encode())
@staticmethod
def decompress_state(data: bytes) -> dict:
return json.loads(gzip.decompress(data).decode())
@staticmethod
def truncate_reasoning(state: dict, max_chars: int = 2000) -> dict:
"""Truncate verbose agent reasoning traces."""
for section in ["billing", "account", "policy"]:
if section in state and "reasoning" in state[section]:
reasoning = state[section]["reasoning"]
if len(reasoning) > max_chars:
state[section]["reasoning"] = reasoning[:max_chars] + "... [truncated]"
return state
@staticmethod
def archive_completed_sections(state: dict, archive_store) -> dict:
"""Move completed agent sections to cold storage.
Keeps a reference in the state so it can be retrieved if needed.
"""
archived = []
for section in ["billing", "account", "policy"]:
if state.get(section, {}).get("status") == "completed":
archive_store.save(
f"{state.get('workflow_id', 'unknown')}:{section}",
state[section]
)
state[section] = {"status": "archived", "ref": f"archive:{section}"}
archived.append(section)
return stateThe most common state bloat sources are conversation message histories (which grow linearly with turns), agent reasoning traces (which can be thousands of tokens each), and retrieved document chunks. Prune messages to a rolling window, truncate reasoning after the decision is made, and store large retrievals by reference rather than inlining them in state.
In practice, most production multi-agent systems use Redis and PostgreSQL together. Redis handles the hot path — fast reads during agent execution, distributed locks, ephemeral coordination. PostgreSQL handles the cold path — durable decisions, audit trails, compliance. Here's how they connect:
async def run_workflow(workflow_id: str, request: dict):
"""Execute a multi-agent workflow with dual-store state management."""
redis_state = RedisStateManager()
pg_state = PostgreSQLStateManager(dsn="postgresql://localhost/agents")
checkpointer = CheckpointManager()
# 1. Initialize workflow in both stores
initial_state = {
"workflow_id": workflow_id,
"customer_id": request["customer_id"],
"request_text": request["text"],
"status": "active",
"billing": {},
"account": {},
"policy": {},
}
redis_state.set_workflow_state(workflow_id, initial_state)
pg_state.create_workflow(workflow_id, initial_state)
# 2. Check for existing checkpoint (crash recovery)
resume = checkpointer.resume_from_checkpoint(workflow_id)
if resume:
print(f"Resuming from step: {resume['resume_from_step']}")
initial_state = resume["state"]
redis_state.set_workflow_state(workflow_id, initial_state)
# 3. Run agents with checkpointing
billing = BillingAgent(redis_state)
result = await billing.run(workflow_id, {
"customer_id": request["customer_id"],
"transaction_ids": request.get("transaction_ids", [])
})
checkpointer.create_checkpoint(workflow_id, "billing_complete",
redis_state.get_workflow_state(workflow_id)
)
# 4. Persist durable decisions to PostgreSQL
if result["success"]:
pg_state.update_section(
workflow_id, "billing-agent", "billing", result["result"]
)
# 5. Continue with remaining agents...
# (account_agent, policy_agent, response_agent follow same pattern)
# 6. Cleanup ephemeral state after completion
checkpointer.clear_checkpoints(workflow_id)
redis_state.set_workflow_state(workflow_id, {
**redis_state.get_workflow_state(workflow_id),
"status": "completed"
}, ttl=300) # keep for 5 min for late reads, then expire- Lost progress after crash — Mitigated by checkpointing at agent boundaries. Cost: one Redis write per checkpoint.
- Stale reads — Agent B reads state before Agent A's write lands. Mitigated by reading state inside the agent's run method, not before dispatch.
- Concurrent overwrites — Two agents write to the same state field. Mitigated by section ownership (each agent owns its section) and Lua atomic updates.
- State bloat — Messages and reasoning traces grow unbounded. Mitigated by StateSizeManager pruning and compression.
- Schema drift — State structure changes between deploys, breaking in-flight workflows. Mitigated by version fields and migration functions on resume.
- Redis TTL expiry during long workflows — State disappears mid-workflow if TTL is too short. Set TTLs based on worst-case workflow duration, and refresh TTL at each checkpoint.
- Prototyping a single-agent workflow? — Use in-memory dicts. Don't add infrastructure until you need it.
- Multi-agent with <10s workflows? — Redis only. Checkpoint at each agent boundary. Persist final result to PostgreSQL.
- Multi-agent with human-in-the-loop? — Redis + PostgreSQL + checkpointing. Humans are slow; you need durable resume.
- Using LangGraph? — Use its built-in checkpointer (SQLite for dev, PostgreSQL for prod). Don't build your own unless you need custom checkpoint logic.
- Compliance/audit requirements? — PostgreSQL with audit log table. Every state mutation gets a row.
Key takeaway
Related Articles
Orchestration Architectures: Supervisor, Router & Hierarchical Patterns for Multi-Agent Systems
Build production orchestration for multi-agent systems — supervisor routing with LLM classification, parallel fan-out with error recovery, event-driven coordination, and hierarchical delegation. Includes comparison matrix and combined architecture example.
Intent Classification for Agent Routing: LLM-Based, Embedding-Based & Hybrid Approaches
Learn intent classification for agent routing in a detailed, easy-to-understand way. This guide explains LLM-based routing, embedding similarity, hybrid classifiers, confidence thresholds, fallback logic, and multi-intent detection with a practical example.
Phase 2: Agent Architecture — ReAct, Planning, Memory & Frameworks
A comprehensive 8-week deep dive into building AI agents from scratch — ReAct loops, planning patterns, memory systems, and frameworks like LangGraph and AutoGen. Build it yourself before you abstract it away.