Back to articles
Phase 2: Agent Architecture — ReAct, Planning, Memory & Frameworks

Phase 2: Agent Architecture — ReAct, Planning, Memory & Frameworks

A comprehensive 8-week deep dive into building AI agents from scratch — ReAct loops, planning patterns, memory systems, and frameworks like LangGraph and AutoGen. Build it yourself before you abstract it away.

Phase 1 taught you how to call LLMs, craft prompts, wire up tools, and retrieve context with RAG. But those are all single-turn patterns — you send a request and get a response. Real-world AI applications need something fundamentally different: agents that can reason about problems, take actions, observe results, and adapt their strategy across multiple steps. Phase 2 is where you learn to build those agents — and the non-negotiable rule is that you build the core loop yourself first, with raw API calls, before touching any framework.

The Cardinal Rule of Phase 2

Build the ReAct loop yourself first with raw API calls. Only then use LangGraph or any other framework. Engineers who skip this become permanently dependent on the framework's abstractions and cannot debug, customize, or optimize their agents when things go wrong in production.

The ReAct pattern (Reasoning + Acting) is the foundational architecture behind virtually every modern AI agent. Introduced by Yao et al. in their 2022 paper 'ReAct: Synergizing Reasoning and Acting in Language Models', it interleaves thinking (chain-of-thought reasoning) with acting (calling external tools) in a loop. The model generates a thought about what to do, takes an action, observes the result, then thinks again — repeating until it has enough information to produce a final answer.

At its core, the ReAct loop is deceptively simple. The LLM receives a system prompt that defines available tools and the expected output format. On each iteration, it produces a Thought (its reasoning about what to do next), an Action (a tool call with specific arguments), and then you — the orchestrator — execute that action and feed back an Observation (the tool's response). The loop continues until the model emits a Final Answer instead of another action.

Let's build this from absolute zero. No LangChain, no LangGraph, no frameworks — just Python, the OpenAI SDK, and your own loop control logic. This is the single most important exercise in this entire curriculum.

Before writing the loop, you need tools for the agent to call. We'll define them both as executable Python functions and as JSON schemas that the LLM understands. This dual definition — the schema for the model, the implementation for the runtime — is a pattern you'll use in every agent you build.

tools.py
python
import json
import math
import httpx
from datetime import datetime

# ── Tool implementations ──────────────────────────────────────

def calculator(expression: str) -> str:
    """Evaluate a mathematical expression safely."""
    allowed = set('0123456789+-*/.() ')
    if not all(c in allowed for c in expression):
        return f"Error: Invalid characters in expression"
    try:
        result = eval(expression, {"__builtins__": {}}, {"math": math})
        return str(result)
    except Exception as e:
        return f"Error: {e}"

def search_web(query: str) -> str:
    """Search the web and return top results."""
    # In production, use a real search API (Tavily, Serper, etc.)
    response = httpx.get(
        "https://api.tavily.com/search",
        params={"query": query, "max_results": 3},
        headers={"Authorization": f"Bearer {TAVILY_API_KEY}"}
    )
    results = response.json().get("results", [])
    return "\n".join(
        f"- {r['title']}: {r['content'][:200]}" for r in results
    )

def get_current_time() -> str:
    """Get the current date and time."""
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def read_file(filepath: str) -> str:
    """Read the contents of a local file."""
    try:
        with open(filepath, 'r') as f:
            return f.read()[:5000]  # Limit to 5k chars
    except FileNotFoundError:
        return f"Error: File '{filepath}' not found"

# ── Tool registry ─────────────────────────────────────────────
# Maps tool names to their implementations

TOOL_REGISTRY = {
    "calculator": calculator,
    "search_web": search_web,
    "get_current_time": get_current_time,
    "read_file": read_file,
}

# ── JSON schemas for the LLM ─────────────────────────────────
# These tell the model what tools exist and how to call them

TOOL_SCHEMAS = [
    {
        "type": "function",
        "function": {
            "name": "calculator",
            "description": "Evaluate a mathematical expression. Use for any arithmetic, unit conversions, or numerical calculations.",
            "parameters": {
                "type": "object",
                "properties": {
                    "expression": {
                        "type": "string",
                        "description": "The math expression to evaluate, e.g. '(23 * 47) + 156'"
                    }
                },
                "required": ["expression"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "search_web",
            "description": "Search the web for current information. Use when you need facts, recent events, or data you don't have.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "The search query"
                    }
                },
                "required": ["query"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "get_current_time",
            "description": "Get the current date and time.",
            "parameters": {
                "type": "object",
                "properties": {}
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "read_file",
            "description": "Read the contents of a local file by its path.",
            "parameters": {
                "type": "object",
                "properties": {
                    "filepath": {
                        "type": "string",
                        "description": "Path to the file to read"
                    }
                },
                "required": ["filepath"]
            }
        }
    }
]

Now the main event — the ReAct loop itself. This is roughly 60 lines of Python that replicate what frameworks like LangChain wrap in thousands of lines of abstraction. Read every line carefully. Understand what the loop does on each iteration, how it manages conversation history, and how it decides when to stop.

react_agent.py
python
import json
from openai import OpenAI
from tools import TOOL_REGISTRY, TOOL_SCHEMAS

client = OpenAI()

SYSTEM_PROMPT = """You are a helpful assistant with access to tools.
Always think step-by-step before acting.
When you have enough information to answer, respond directly to the user.
Do not call tools unnecessarily — if you already know the answer, just say it."""

def run_agent(
    user_message: str,
    model: str = "gpt-4o",
    max_steps: int = 10,
    verbose: bool = True
) -> str:
    """Run the ReAct agent loop."""
    
    messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": user_message}
    ]
    
    for step in range(max_steps):
        if verbose:
            print(f"\n{'='*60}")
            print(f"Step {step + 1}/{max_steps}")
            print(f"{'='*60}")
        
        # ── Call the LLM ──────────────────────────────────────
        response = client.chat.completions.create(
            model=model,
            messages=messages,
            tools=TOOL_SCHEMAS,
            tool_choice="auto"  # Let the model decide
        )
        
        assistant_message = response.choices[0].message
        messages.append(assistant_message)  # Add to history
        
        # ── Check: did the model want to call tools? ──────────
        if not assistant_message.tool_calls:
            # No tool calls → the model is giving its final answer
            if verbose:
                print(f"\nFinal Answer: {assistant_message.content}")
            return assistant_message.content
        
        # ── Execute each tool call ────────────────────────────
        for tool_call in assistant_message.tool_calls:
            fn_name = tool_call.function.name
            fn_args = json.loads(tool_call.function.arguments)
            
            if verbose:
                print(f"\nAction: {fn_name}({fn_args})")
            
            # Look up and execute the tool
            if fn_name in TOOL_REGISTRY:
                result = TOOL_REGISTRY[fn_name](**fn_args)
            else:
                result = f"Error: Unknown tool '{fn_name}'"
            
            if verbose:
                print(f"Observation: {result[:200]}")
            
            # Feed the observation back to the model
            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": str(result)
            })
    
    # ── Safety: max steps reached ─────────────────────────────
    return "Agent reached maximum steps without completing. Last state: " + (
        assistant_message.content or "No final response generated."
    )

# ── Run it ────────────────────────────────────────────────────
if __name__ == "__main__":
    answer = run_agent(
        "What's the population of Tokyo, and what's that number "
        "divided by the population of Iceland?"
    )
    print(f"\nResult: {answer}")

What Just Happened?

In ~60 lines, you built a complete agent. The model receives the user question, decides it needs to search for population data, calls the search tool, gets results, calls the calculator to do the division, and then synthesizes a final answer. This is the exact same loop that LangChain's AgentExecutor runs — you just built it without 50,000 lines of framework code.

The naive loop above works for happy paths, but production agents face every failure mode imaginable. The model might call a tool that doesn't exist. It might get stuck in an infinite loop calling the same tool repeatedly. It might generate malformed JSON arguments. It might hallucinate tool names. You need to handle all of these before your agent touches production traffic.

robust_agent.py
python
import json
import time
from collections import Counter
from openai import OpenAI
from tools import TOOL_REGISTRY, TOOL_SCHEMAS

client = OpenAI()

class AgentError(Exception):
    """Custom exception for agent failures."""
    pass

def run_robust_agent(
    user_message: str,
    model: str = "gpt-4o",
    max_steps: int = 10,
    max_retries: int = 2,
    loop_threshold: int = 3,  # Max times same tool+args can repeat
    timeout_seconds: float = 120.0,
    verbose: bool = True
) -> str:
    """Production-grade ReAct loop with comprehensive error handling."""
    
    messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": user_message}
    ]
    
    start_time = time.time()
    call_history: list[str] = []  # Track tool calls for loop detection
    
    for step in range(max_steps):
        # ── Timeout check ─────────────────────────────────────
        elapsed = time.time() - start_time
        if elapsed > timeout_seconds:
            raise AgentError(
                f"Agent timed out after {elapsed:.1f}s "
                f"({step} steps completed)"
            )
        
        # ── LLM call with retry logic ─────────────────────────
        last_error = None
        for attempt in range(max_retries + 1):
            try:
                response = client.chat.completions.create(
                    model=model,
                    messages=messages,
                    tools=TOOL_SCHEMAS,
                    tool_choice="auto",
                    timeout=30.0
                )
                break
            except Exception as e:
                last_error = e
                if attempt < max_retries:
                    time.sleep(2 ** attempt)  # Exponential backoff
                    continue
                raise AgentError(
                    f"LLM call failed after {max_retries + 1} attempts: {e}"
                )
        
        assistant_message = response.choices[0].message
        messages.append(assistant_message)
        
        # ── No tool calls → final answer ──────────────────────
        if not assistant_message.tool_calls:
            return assistant_message.content or ""
        
        # ── Process tool calls ─────────────────────────────────
        for tool_call in assistant_message.tool_calls:
            fn_name = tool_call.function.name
            
            # Parse arguments safely
            try:
                fn_args = json.loads(tool_call.function.arguments)
            except json.JSONDecodeError as e:
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": f"Error: Malformed arguments — {e}. "
                               f"Please provide valid JSON."
                })
                continue
            
            # Validate tool exists
            if fn_name not in TOOL_REGISTRY:
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": f"Error: Tool '{fn_name}' does not exist. "
                               f"Available tools: {list(TOOL_REGISTRY.keys())}"
                })
                continue
            
            # ── Loop detection ─────────────────────────────────
            call_signature = f"{fn_name}:{json.dumps(fn_args, sort_keys=True)}"
            call_history.append(call_signature)
            
            recent_calls = call_history[-loop_threshold:]
            if (len(recent_calls) == loop_threshold 
                    and len(set(recent_calls)) == 1):
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": f"Warning: You've called {fn_name} with the "
                               f"same arguments {loop_threshold} times. "
                               f"Please try a different approach or provide "
                               f"your final answer."
                })
                continue
            
            # ── Execute the tool ───────────────────────────────
            try:
                result = TOOL_REGISTRY[fn_name](**fn_args)
            except TypeError as e:
                result = f"Error: Wrong arguments for {fn_name} — {e}"
            except Exception as e:
                result = f"Error executing {fn_name}: {type(e).__name__}: {e}"
            
            if verbose:
                print(f"  Step {step+1}: {fn_name}({fn_args}) → {str(result)[:100]}")
            
            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": str(result)
            })
    
    # ── Max steps exhausted ───────────────────────────────────
    # Force the model to give a final answer
    messages.append({
        "role": "user",
        "content": "You've used all available steps. Please provide "
                   "your best answer with the information gathered so far."
    })
    
    response = client.chat.completions.create(
        model=model,
        messages=messages
        # No tools parameter → forces a text response
    )
    
    return response.choices[0].message.content or "Agent failed to produce an answer."

A well-designed agent needs multiple termination conditions, not just 'the model stopped calling tools.' Here's the complete set you should implement in every production agent:

ConditionWhat Triggers ItHow to Handle
Natural completionModel returns text with no tool callsReturn the response — this is the happy path
Max steps reachedLoop counter hits the limitForce a final answer by removing tools from the last call
Timeout exceededWall-clock time exceeds thresholdRaise an error or force a partial answer
Loop detectedSame tool+args called N times consecutivelyInject a warning message, then force termination if it persists
Budget exhaustedToken usage exceeds a set budgetTrack cumulative tokens and stop when the limit is hit
Error cascadeN consecutive tool errorsStop and report the errors rather than burning through retries
User cancellationExternal signal (e.g., cancelled HTTP request)Use asyncio.Event or threading.Event to signal the loop to stop

Anthropic's tool use API differs from OpenAI's in important ways. Tool definitions use a different schema format, tool results are sent as tool_result content blocks, and the model uses a stop_reason field to signal when it wants to use tools. Let's build the same agent using Claude.

react_agent_anthropic.py
python
import json
from anthropic import Anthropic
from tools import TOOL_REGISTRY

client = Anthropic()

# Anthropic uses a slightly different tool schema format
ANTHROPIC_TOOLS = [
    {
        "name": "calculator",
        "description": "Evaluate a mathematical expression.",
        "input_schema": {
            "type": "object",
            "properties": {
                "expression": {
                    "type": "string",
                    "description": "Math expression to evaluate"
                }
            },
            "required": ["expression"]
        }
    },
    {
        "name": "search_web",
        "description": "Search the web for current information.",
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "The search query"
                }
            },
            "required": ["query"]
        }
    }
]

def run_anthropic_agent(
    user_message: str,
    model: str = "claude-sonnet-4-20250514",
    max_steps: int = 10
) -> str:
    """ReAct loop using Anthropic's Claude API."""
    
    messages = [{"role": "user", "content": user_message}]
    
    for step in range(max_steps):
        response = client.messages.create(
            model=model,
            max_tokens=4096,
            system="You are a helpful assistant. Think step-by-step.",
            tools=ANTHROPIC_TOOLS,
            messages=messages
        )
        
        # Append the full assistant response
        messages.append({
            "role": "assistant",
            "content": response.content
        })
        
        # Check stop reason — 'end_turn' means done, 'tool_use' means
        # the model wants to call a tool
        if response.stop_reason == "end_turn":
            # Extract the final text from content blocks
            text_blocks = [
                block.text for block in response.content
                if block.type == "text"
            ]
            return "\n".join(text_blocks)
        
        # Process tool calls
        tool_results = []
        for block in response.content:
            if block.type != "tool_use":
                continue
            
            fn_name = block.name
            fn_args = block.input  # Already a dict (no JSON parsing needed)
            
            if fn_name in TOOL_REGISTRY:
                result = TOOL_REGISTRY[fn_name](**fn_args)
            else:
                result = f"Error: Unknown tool '{fn_name}'"
            
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": str(result)
            })
        
        # Send all tool results back in a single 'user' message
        messages.append({
            "role": "user",
            "content": tool_results
        })
    
    return "Max steps reached."

Key Difference: OpenAI vs Anthropic Tool Use

OpenAI uses a dedicated tool role for tool results and sends tool_calls on the assistant message. Anthropic embeds tool_use blocks inside the assistant's content array and expects tool_result blocks inside a follow-up user message. The stop_reason field ('end_turn' vs 'tool_use') tells you whether the model is done. Both achieve the same ReAct loop — the wire format just differs.

The basic ReAct loop is reactive — the model decides what to do one step at a time. For complex tasks, this leads to inefficient wandering. Planning patterns solve this by separating the strategy from the execution. The agent first creates a plan, then executes each step, and optionally revises the plan based on what it learns along the way.

The Plan-and-Execute pattern uses two separate LLM calls with different roles. A planner agent (usually a stronger model like GPT-4o or Claude Opus) decomposes the user's request into a numbered list of subtasks. An executor agent (which can be a cheaper model) then works through each subtask sequentially, reporting results back. If a step fails or reveals new information, the planner can be invoked again to revise the remaining steps.

plan_and_execute.py
python
import json
from openai import OpenAI
from tools import TOOL_REGISTRY, TOOL_SCHEMAS

client = OpenAI()

def create_plan(user_request: str) -> list[str]:
    """Use a strong model to decompose a request into steps."""
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    "You are a planning agent. Given a user request, "
                    "decompose it into a numbered list of concrete steps "
                    "that a simpler agent can execute one at a time. "
                    "Each step should be self-contained and actionable. "
                    "Return ONLY a JSON array of strings."
                )
            },
            {"role": "user", "content": user_request}
        ],
        response_format={"type": "json_object"}
    )
    
    data = json.loads(response.choices[0].message.content)
    return data.get("steps", data.get("plan", []))

def execute_step(
    step: str,
    context: str,
    model: str = "gpt-4o-mini"
) -> str:
    """Execute a single step using a cheaper model with tools."""
    
    messages = [
        {
            "role": "system",
            "content": (
                "You are an execution agent. Complete the given step "
                "using available tools. Be concise in your response.\n\n"
                f"Context from previous steps:\n{context}"
            )
        },
        {"role": "user", "content": f"Execute this step: {step}"}
    ]
    
    # Mini ReAct loop for this single step
    for _ in range(5):
        response = client.chat.completions.create(
            model=model,
            messages=messages,
            tools=TOOL_SCHEMAS,
            tool_choice="auto"
        )
        
        msg = response.choices[0].message
        messages.append(msg)
        
        if not msg.tool_calls:
            return msg.content or ""
        
        for tc in msg.tool_calls:
            fn = TOOL_REGISTRY.get(tc.function.name)
            args = json.loads(tc.function.arguments)
            result = fn(**args) if fn else f"Unknown tool: {tc.function.name}"
            messages.append({
                "role": "tool",
                "tool_call_id": tc.id,
                "content": str(result)
            })
    
    return "Step did not complete within allowed iterations."

def plan_and_execute(user_request: str) -> str:
    """Full Plan-and-Execute pipeline."""
    
    print("📋 Creating plan...")
    steps = create_plan(user_request)
    
    for i, step in enumerate(steps, 1):
        print(f"  Step {i}/{len(steps)}: {step}")
    
    context_parts = []
    
    for i, step in enumerate(steps, 1):
        print(f"\n▶ Executing step {i}: {step}")
        
        context = "\n".join(context_parts) if context_parts else "No previous context."
        result = execute_step(step, context)
        
        context_parts.append(f"Step {i} ({step}): {result}")
        print(f"  ✓ Result: {result[:150]}")
    
    # Final synthesis
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": "Synthesize the results of all steps into a final, coherent answer."
            },
            {
                "role": "user",
                "content": f"Original request: {user_request}\n\nStep results:\n" + "\n".join(context_parts)
            }
        ]
    )
    
    return response.choices[0].message.content

# Example usage
result = plan_and_execute(
    "Compare the GDP per capita of Japan and South Korea, "
    "convert both to EUR, and tell me which country has a "
    "higher cost of living adjusted income."
)
print(f"\nFinal: {result}")

Tree of Thoughts (ToT) extends chain-of-thought reasoning by exploring multiple reasoning paths simultaneously instead of following a single chain. Think of it as breadth-first search over possible thought sequences. At each step, the model generates several candidate 'thoughts,' evaluates which ones are most promising, and expands only the best branches. This is especially powerful for problems with multiple valid approaches — mathematical proofs, creative writing, strategic planning, and puzzle solving.

tree_of_thoughts.py
python
import json
from openai import OpenAI

client = OpenAI()

def generate_thoughts(
    problem: str,
    current_path: list[str],
    n_candidates: int = 3
) -> list[str]:
    """Generate n candidate next-thoughts given the current path."""
    
    path_text = "\n".join(
        f"Step {i+1}: {t}" for i, t in enumerate(current_path)
    ) or "(Starting fresh)"
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    f"You are solving this problem: {problem}\n\n"
                    f"Current reasoning path:\n{path_text}\n\n"
                    f"Generate exactly {n_candidates} distinct next steps. "
                    f"Each should take a DIFFERENT approach or angle. "
                    f"Return a JSON array of strings."
                )
            },
            {"role": "user", "content": "Generate the next possible thoughts."}
        ],
        response_format={"type": "json_object"}
    )
    
    data = json.loads(response.choices[0].message.content)
    return data.get("thoughts", [])[:n_candidates]

def evaluate_thought(
    problem: str,
    path: list[str],
    thought: str
) -> float:
    """Score a thought on how promising it is (0-1)."""
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    f"Problem: {problem}\n\n"
                    f"Evaluate this reasoning step on a scale of 0.0 to 1.0:\n"
                    f"- Is it logically sound?\n"
                    f"- Does it make progress toward solving the problem?\n"
                    f"- Is it a promising direction?\n\n"
                    f"Return ONLY a JSON object with a 'score' field."
                )
            },
            {"role": "user", "content": f"Path so far: {path}\n\nNew thought: {thought}"}
        ],
        response_format={"type": "json_object"}
    )
    
    data = json.loads(response.choices[0].message.content)
    return float(data.get("score", 0.5))

def tree_of_thoughts(
    problem: str,
    max_depth: int = 3,
    beam_width: int = 2,
    n_candidates: int = 3
) -> str:
    """Solve a problem using Tree of Thoughts with beam search."""
    
    # Each beam is a (path, cumulative_score) tuple
    beams: list[tuple[list[str], float]] = [([], 0.0)]
    
    for depth in range(max_depth):
        all_candidates = []
        
        for path, path_score in beams:
            thoughts = generate_thoughts(problem, path, n_candidates)
            
            for thought in thoughts:
                score = evaluate_thought(problem, path, thought)
                new_path = path + [thought]
                all_candidates.append((new_path, path_score + score))
        
        # Keep only the top beam_width paths
        all_candidates.sort(key=lambda x: x[1], reverse=True)
        beams = all_candidates[:beam_width]
        
        print(f"Depth {depth+1}: {len(all_candidates)} candidates → "
              f"kept top {beam_width}")
    
    # Return the best path's final synthesis
    best_path = beams[0][0]
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": "Synthesize the reasoning path into a final answer."
            },
            {
                "role": "user",
                "content": f"Problem: {problem}\n\nBest reasoning path:\n" + 
                           "\n".join(f"{i+1}. {s}" for i, s in enumerate(best_path))
            }
        ]
    )
    
    return response.choices[0].message.content

Reflexion is a pattern where the agent generates an output, then critiques its own output and uses that critique to produce an improved version. It's inspired by how humans revise their work — write a draft, review it, identify weaknesses, and rewrite. The key insight is that LLMs are often better at evaluating outputs than generating perfect ones on the first try. By giving the model a chance to reflect, you get significantly better results on tasks like code generation, writing, and analysis.

reflexion_agent.py
python
from openai import OpenAI

client = OpenAI()

def reflexion(
    task: str,
    max_iterations: int = 3,
    model: str = "gpt-4o"
) -> str:
    """Generate, critique, and improve iteratively."""
    
    # Initial generation
    response = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": "Complete the following task to the best of your ability."},
            {"role": "user", "content": task}
        ]
    )
    current_output = response.choices[0].message.content
    
    for iteration in range(max_iterations):
        print(f"\n--- Reflexion iteration {iteration + 1} ---")
        
        # ── Self-critique ─────────────────────────────────────
        critique_response = client.chat.completions.create(
            model=model,
            messages=[
                {
                    "role": "system",
                    "content": (
                        "You are a harsh but fair critic. Analyze the "
                        "following output for a given task. Identify:\n"
                        "1. Factual errors or inaccuracies\n"
                        "2. Missing information or gaps\n"
                        "3. Logical flaws or weak reasoning\n"
                        "4. Areas that could be more precise or clear\n\n"
                        "If the output is already excellent and has no "
                        "significant issues, respond with ONLY: 'PASS'"
                    )
                },
                {
                    "role": "user",
                    "content": f"Task: {task}\n\nOutput to critique:\n{current_output}"
                }
            ]
        )
        
        critique = critique_response.choices[0].message.content
        
        # Check if the critic thinks it's good enough
        if "PASS" in critique.upper() and len(critique) < 50:
            print("  ✓ Critic says: PASS — output is good enough")
            break
        
        print(f"  Critique: {critique[:200]}...")
        
        # ── Revision ──────────────────────────────────────────
        revision_response = client.chat.completions.create(
            model=model,
            messages=[
                {
                    "role": "system",
                    "content": (
                        "You are revising your previous output based on "
                        "a critique. Address EVERY point raised in the "
                        "critique while preserving what was already good. "
                        "Produce a complete revised version — not just the changes."
                    )
                },
                {
                    "role": "user",
                    "content": (
                        f"Original task: {task}\n\n"
                        f"Your previous output:\n{current_output}\n\n"
                        f"Critique:\n{critique}\n\n"
                        f"Please produce an improved version."
                    )
                }
            ]
        )
        
        current_output = revision_response.choices[0].message.content
        print(f"  ✓ Revised output generated")
    
    return current_output

These two reasoning strategies represent fundamentally different ways an agent can solve problems. Chain-of-Thought (CoT) asks the model to reason in natural language — step by step, in words. Program-of-Thought (PoT) asks the model to generate executable code that solves the problem, then runs that code. For anything involving math, data manipulation, or precise logic, PoT dramatically outperforms CoT because the code executes deterministically.

AspectChain-of-Thought (CoT)Program-of-Thought (PoT)
Reasoning mediumNatural languageExecutable code (Python)
ExecutionLLM does all reasoning internallyCode runs in a sandbox; results are exact
Best forCommonsense reasoning, qualitative analysis, ambiguous problemsMath, data processing, algorithmic problems, precise calculations
Failure modeArithmetic errors, logical drift over long chainsSyntax errors, runtime exceptions (but these are visible and fixable)
VerifiabilityHard to verify — you trust the model's wordsEasy to verify — run the code and check the output
Token costModerate (reasoning in text)Lower (concise code) + compute cost for execution
Example"Step 1: If each box has 12 items and there are 5 boxes, that's 12 × 5 = 60 items...""boxes = 5; items_per_box = 12; total = boxes * items_per_box # 60"
program_of_thought.py
python
import ast
import traceback
from openai import OpenAI

client = OpenAI()

def program_of_thought(problem: str) -> str:
    """Solve a problem by generating and executing Python code."""
    
    # Ask the model to write code that solves the problem
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    "You solve problems by writing Python code. "
                    "Write a complete Python script that computes the answer. "
                    "The last line should be: print(f'ANSWER: {result}')\n\n"
                    "Return ONLY the Python code, no explanations."
                )
            },
            {"role": "user", "content": problem}
        ]
    )
    
    code = response.choices[0].message.content
    # Strip markdown code fences if present
    code = code.replace("```python", "").replace("```", "").strip()
    
    # Safety check: parse the AST to detect dangerous operations
    try:
        tree = ast.parse(code)
        for node in ast.walk(tree):
            if isinstance(node, (ast.Import, ast.ImportFrom)):
                module = node.names[0].name if isinstance(node, ast.Import) else node.module
                allowed = {'math', 'statistics', 'datetime', 'collections', 'itertools'}
                if module not in allowed:
                    return f"Blocked: attempted to import '{module}'"
    except SyntaxError as e:
        return f"Generated code has syntax error: {e}"
    
    # Execute in a restricted namespace
    import math, statistics, collections, itertools
    safe_globals = {
        "__builtins__": {"print": print, "range": range, "len": len,
                         "sum": sum, "min": min, "max": max, "abs": abs,
                         "round": round, "sorted": sorted, "zip": zip,
                         "enumerate": enumerate, "map": map, "filter": filter},
        "math": math,
        "statistics": statistics,
        "collections": collections,
        "itertools": itertools
    }
    
    import io, contextlib
    output_buffer = io.StringIO()
    
    try:
        with contextlib.redirect_stdout(output_buffer):
            exec(code, safe_globals)
        output = output_buffer.getvalue()
        # Extract the answer line
        for line in output.strip().split('\n'):
            if line.startswith('ANSWER:'):
                return line.split('ANSWER:')[1].strip()
        return output.strip()
    except Exception as e:
        return f"Execution error: {traceback.format_exc()}"

A single-turn agent is stateless — it handles one request and forgets everything. A useful agent needs memory: the ability to recall what happened earlier in the conversation, what it learned in previous sessions, and what knowledge it has accumulated over time. Memory is what turns a tool into a colleague. There are five distinct types of memory that production agents use, each serving a different purpose.

The simplest form of memory is just passing the entire conversation history in the messages array on every LLM call. This is what you've been doing in every agent so far. But context windows are finite (even 128K or 200K tokens fill up fast when agents make many tool calls), so you need strategies for managing this. The three main approaches are sliding window, summarization, and smart truncation.

conversation_memory.py
python
from openai import OpenAI
import tiktoken

client = OpenAI()
encoding = tiktoken.encoding_for_model("gpt-4o")

def count_tokens(messages: list[dict]) -> int:
    """Estimate token count for a message list."""
    total = 0
    for msg in messages:
        content = msg.get("content", "")
        if isinstance(content, str):
            total += len(encoding.encode(content))
        elif isinstance(content, list):
            for block in content:
                if isinstance(block, dict) and "text" in block:
                    total += len(encoding.encode(block["text"]))
        total += 4  # Overhead per message (role, formatting)
    return total

# ── Strategy 1: Sliding Window ────────────────────────────────

def sliding_window(
    messages: list[dict],
    max_tokens: int = 8000,
    keep_system: bool = True
) -> list[dict]:
    """Keep the most recent messages that fit in the token budget."""
    
    system_msgs = [m for m in messages if m["role"] == "system"] if keep_system else []
    non_system = [m for m in messages if m["role"] != "system"]
    
    system_tokens = count_tokens(system_msgs)
    remaining_budget = max_tokens - system_tokens
    
    # Walk backwards from the most recent message
    kept = []
    for msg in reversed(non_system):
        msg_tokens = count_tokens([msg])
        if remaining_budget - msg_tokens < 0:
            break
        kept.append(msg)
        remaining_budget -= msg_tokens
    
    return system_msgs + list(reversed(kept))

# ── Strategy 2: Summarize-and-Compress ────────────────────────

def summarize_history(
    messages: list[dict],
    max_tokens: int = 8000
) -> list[dict]:
    """When history is too long, summarize older messages."""
    
    if count_tokens(messages) <= max_tokens:
        return messages  # No compression needed
    
    system_msgs = [m for m in messages if m["role"] == "system"]
    conversation = [m for m in messages if m["role"] != "system"]
    
    # Split into old (to summarize) and recent (to keep verbatim)
    split_point = len(conversation) // 2
    old_messages = conversation[:split_point]
    recent_messages = conversation[split_point:]
    
    # Summarize the old part
    old_text = "\n".join(
        f"{m['role']}: {m.get('content', '[tool call]')}" 
        for m in old_messages
        if isinstance(m.get('content'), str)
    )
    
    summary_response = client.chat.completions.create(
        model="gpt-4o-mini",  # Cheap model for summarization
        messages=[
            {
                "role": "system",
                "content": (
                    "Summarize this conversation history concisely. "
                    "Preserve key facts, decisions, and context that "
                    "would be needed to continue the conversation."
                )
            },
            {"role": "user", "content": old_text}
        ],
        max_tokens=500
    )
    
    summary = summary_response.choices[0].message.content
    
    # Rebuild with summary + recent messages
    return system_msgs + [
        {"role": "system", "content": f"Summary of earlier conversation: {summary}"},
        *recent_messages
    ]

# ── Strategy 3: Smart Truncation ──────────────────────────────

def smart_truncate(
    messages: list[dict],
    max_tokens: int = 8000,
    priority_roles: list[str] = ["system", "user"]
) -> list[dict]:
    """Truncate by priority: always keep system & user messages,
    drop assistant/tool messages first when over budget."""
    
    high_priority = [m for m in messages if m["role"] in priority_roles]
    low_priority = [m for m in messages if m["role"] not in priority_roles]
    
    result = list(high_priority)  # Always keep these
    remaining = max_tokens - count_tokens(result)
    
    # Add low-priority messages from most recent
    for msg in reversed(low_priority):
        msg_tokens = count_tokens([msg])
        if remaining - msg_tokens >= 0:
            result.append(msg)
            remaining -= msg_tokens
    
    # Re-sort by original order
    original_order = {id(m): i for i, m in enumerate(messages)}
    result.sort(key=lambda m: original_order.get(id(m), 0))
    
    return result

In-context memory disappears when the conversation ends. For agents that need to maintain state across multiple API calls or even across sessions (but not forever), you need external short-term storage. Redis and DynamoDB are the two most common choices. Redis is faster and simpler for session state. DynamoDB is better when you need durability and don't want to manage infrastructure.

session_memory_redis.py
python
import json
import redis
from datetime import timedelta

class RedisSessionMemory:
    """Store agent conversation history in Redis with auto-expiry."""
    
    def __init__(
        self,
        redis_url: str = "redis://localhost:6379",
        ttl: timedelta = timedelta(hours=24)
    ):
        self.client = redis.from_url(redis_url)
        self.ttl = ttl
    
    def _key(self, session_id: str) -> str:
        return f"agent:session:{session_id}"
    
    def get_messages(self, session_id: str) -> list[dict]:
        """Retrieve all messages for a session."""
        data = self.client.get(self._key(session_id))
        if data is None:
            return []
        return json.loads(data)
    
    def add_message(self, session_id: str, message: dict) -> None:
        """Append a message and refresh the TTL."""
        messages = self.get_messages(session_id)
        messages.append(message)
        self.client.setex(
            self._key(session_id),
            self.ttl,
            json.dumps(messages)
        )
    
    def add_messages(self, session_id: str, new_messages: list[dict]) -> None:
        """Append multiple messages at once."""
        messages = self.get_messages(session_id)
        messages.extend(new_messages)
        self.client.setex(
            self._key(session_id),
            self.ttl,
            json.dumps(messages)
        )
    
    def clear_session(self, session_id: str) -> None:
        """Delete a session's history."""
        self.client.delete(self._key(session_id))
    
    def get_session_metadata(self, session_id: str) -> dict:
        """Get metadata about the session."""
        messages = self.get_messages(session_id)
        ttl_remaining = self.client.ttl(self._key(session_id))
        return {
            "message_count": len(messages),
            "ttl_seconds": ttl_remaining,
            "roles": [m["role"] for m in messages]
        }

# ── Usage with the agent ──────────────────────────────────────

memory = RedisSessionMemory(ttl=timedelta(hours=4))

def run_agent_with_session(
    session_id: str,
    user_message: str
) -> str:
    """Agent that persists conversation across calls."""
    
    # Load existing conversation
    messages = memory.get_messages(session_id)
    
    if not messages:
        messages = [{"role": "system", "content": SYSTEM_PROMPT}]
    
    messages.append({"role": "user", "content": user_message})
    
    # Run the agent loop (simplified)
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages
    )
    
    assistant_msg = response.choices[0].message
    messages.append({"role": "assistant", "content": assistant_msg.content})
    
    # Persist the updated conversation
    memory.add_messages(session_id, [
        {"role": "user", "content": user_message},
        {"role": "assistant", "content": assistant_msg.content}
    ])
    
    return assistant_msg.content

Episodic memory stores past interactions that might be relevant to future conversations. When a user says 'remember, I prefer Python over JavaScript' or 'like we discussed last week,' the agent needs to retrieve those past episodes. This is built on top of a vector store — you embed summaries of past interactions, and at the start of each new conversation, retrieve the most relevant ones.

episodic_memory.py
python
import json
import hashlib
from datetime import datetime
from openai import OpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import (
    VectorParams, Distance, PointStruct, Filter, FieldCondition, MatchValue
)

client = OpenAI()
qdrant = QdrantClient(url="http://localhost:6333")

COLLECTION = "episodic_memory"

def init_episodic_store():
    """Create the vector collection if it doesn't exist."""
    collections = [c.name for c in qdrant.get_collections().collections]
    if COLLECTION not in collections:
        qdrant.create_collection(
            collection_name=COLLECTION,
            vectors_config=VectorParams(
                size=1536,  # text-embedding-3-small dimensions
                distance=Distance.COSINE
            )
        )

def embed(text: str) -> list[float]:
    """Generate an embedding for text."""
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding

def store_episode(
    user_id: str,
    session_id: str,
    summary: str,
    key_facts: list[str],
    emotional_tone: str = "neutral"
) -> None:
    """Store a conversation episode for future retrieval."""
    
    # Create a rich text representation for embedding
    episode_text = f"{summary}\nKey facts: {', '.join(key_facts)}"
    vector = embed(episode_text)
    
    point_id = hashlib.md5(
        f"{user_id}:{session_id}:{datetime.now().isoformat()}".encode()
    ).hexdigest()[:16]
    
    qdrant.upsert(
        collection_name=COLLECTION,
        points=[PointStruct(
            id=point_id,
            vector=vector,
            payload={
                "user_id": user_id,
                "session_id": session_id,
                "summary": summary,
                "key_facts": key_facts,
                "emotional_tone": emotional_tone,
                "timestamp": datetime.now().isoformat(),
            }
        )]
    )

def recall_episodes(
    user_id: str,
    current_context: str,
    top_k: int = 5
) -> list[dict]:
    """Retrieve relevant past episodes for the current context."""
    
    query_vector = embed(current_context)
    
    results = qdrant.search(
        collection_name=COLLECTION,
        query_vector=query_vector,
        query_filter=Filter(
            must=[FieldCondition(
                key="user_id",
                match=MatchValue(value=user_id)
            )]
        ),
        limit=top_k
    )
    
    return [
        {
            "summary": r.payload["summary"],
            "key_facts": r.payload["key_facts"],
            "timestamp": r.payload["timestamp"],
            "relevance": r.score
        }
        for r in results
    ]

def summarize_conversation(messages: list[dict]) -> tuple[str, list[str]]:
    """Auto-generate a summary and key facts from conversation history."""
    
    conversation_text = "\n".join(
        f"{m['role']}: {m.get('content', '')}" 
        for m in messages 
        if isinstance(m.get('content'), str)
    )
    
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "Extract from this conversation:\n"
                    "1. A concise summary (2-3 sentences)\n"
                    "2. Key facts and preferences mentioned by the user\n\n"
                    "Return JSON: {\"summary\": \"...\", \"key_facts\": [...]}"
                )
            },
            {"role": "user", "content": conversation_text}
        ],
        response_format={"type": "json_object"}
    )
    
    data = json.loads(response.choices[0].message.content)
    return data["summary"], data.get("key_facts", [])

# ── Build the system prompt with memories ─────────────────────

def build_memory_prompt(user_id: str, user_message: str) -> str:
    """Inject relevant episodic memories into the system prompt."""
    
    episodes = recall_episodes(user_id, user_message, top_k=3)
    
    if not episodes:
        return "You are a helpful assistant."
    
    memory_text = "\n".join(
        f"- [{ep['timestamp'][:10]}] {ep['summary']} "
        f"(Facts: {', '.join(ep['key_facts'])})"
        for ep in episodes
    )
    
    return (
        "You are a helpful assistant with memory of past interactions.\n\n"
        f"Relevant memories about this user:\n{memory_text}\n\n"
        "Use these memories naturally — don't explicitly say 'I remember' "
        "unless the user asks. Just use the context to be more helpful."
    )

Semantic memory is your agent's knowledge base — facts, documents, and reference material that the agent can consult. This is essentially the RAG pipeline from Phase 1, but integrated into the agent as a tool rather than a standalone retrieval step. The agent decides when it needs to look something up, queries the vector store, and uses the results in its reasoning.

semantic_memory_tool.py
python
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue
from openai import OpenAI

client = OpenAI()
qdrant = QdrantClient(url="http://localhost:6333")

def knowledge_search(query: str, collection: str = "knowledge_base", top_k: int = 5) -> str:
    """Search the knowledge base. Use as an agent tool."""
    
    query_vector = client.embeddings.create(
        model="text-embedding-3-small",
        input=query
    ).data[0].embedding
    
    results = qdrant.search(
        collection_name=collection,
        query_vector=query_vector,
        limit=top_k
    )
    
    if not results:
        return "No relevant documents found in the knowledge base."
    
    return "\n\n---\n\n".join(
        f"[Source: {r.payload.get('source', 'unknown')}]\n{r.payload['text']}"
        for r in results
    )

# Register as an agent tool
KNOWLEDGE_TOOL_SCHEMA = {
    "type": "function",
    "function": {
        "name": "knowledge_search",
        "description": (
            "Search the internal knowledge base for relevant information. "
            "Use when you need to look up company policies, product docs, "
            "technical references, or any domain-specific information."
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "The search query — be specific and descriptive"
                }
            },
            "required": ["query"]
        }
    }
}

Procedural memory stores how to do things — it's a dynamic library of tools and skills that the agent can draw from. Instead of hardcoding a fixed set of tools, the agent has access to a skill store where it can discover, load, and use tools dynamically. This is how sophisticated agents like Claude Code's agent system or AutoGPT-style systems work — they select tools from a registry based on what the current task requires.

procedural_memory.py
python
import json
from dataclasses import dataclass, field
from typing import Callable, Any
from openai import OpenAI

client = OpenAI()

@dataclass
class Skill:
    """A reusable agent skill/tool."""
    name: str
    description: str
    parameters_schema: dict
    implementation: Callable[..., str]
    tags: list[str] = field(default_factory=list)
    usage_count: int = 0
    success_rate: float = 1.0

class SkillStore:
    """Dynamic tool registry with semantic search."""
    
    def __init__(self):
        self.skills: dict[str, Skill] = {}
    
    def register(self, skill: Skill) -> None:
        """Add a skill to the store."""
        self.skills[skill.name] = skill
    
    def search(self, query: str, top_k: int = 5) -> list[Skill]:
        """Find relevant skills for a task description."""
        # In production, use embeddings. Here we use keyword matching.
        scored = []
        query_lower = query.lower()
        for skill in self.skills.values():
            score = 0
            if any(tag in query_lower for tag in skill.tags):
                score += 3
            if any(word in skill.description.lower() for word in query_lower.split()):
                score += 1
            # Boost frequently successful skills
            score *= skill.success_rate
            if score > 0:
                scored.append((skill, score))
        
        scored.sort(key=lambda x: x[1], reverse=True)
        return [s for s, _ in scored[:top_k]]
    
    def get_schemas(self, skill_names: list[str]) -> list[dict]:
        """Get OpenAI-format tool schemas for selected skills."""
        return [
            {
                "type": "function",
                "function": {
                    "name": self.skills[name].name,
                    "description": self.skills[name].description,
                    "parameters": self.skills[name].parameters_schema
                }
            }
            for name in skill_names
            if name in self.skills
        ]
    
    def execute(self, skill_name: str, **kwargs) -> str:
        """Execute a skill and track metrics."""
        skill = self.skills.get(skill_name)
        if not skill:
            return f"Error: Skill '{skill_name}' not found"
        
        try:
            result = skill.implementation(**kwargs)
            skill.usage_count += 1
            return result
        except Exception as e:
            skill.usage_count += 1
            # Update success rate with exponential moving average
            skill.success_rate = skill.success_rate * 0.9  
            return f"Error in {skill_name}: {e}"

# ── Dynamic tool selection agent ──────────────────────────────

def run_adaptive_agent(task: str, skill_store: SkillStore) -> str:
    """Agent that selects tools dynamically based on the task."""
    
    # Step 1: Select relevant tools for this task
    relevant_skills = skill_store.search(task, top_k=5)
    skill_names = [s.name for s in relevant_skills]
    tool_schemas = skill_store.get_schemas(skill_names)
    
    print(f"Selected tools for task: {skill_names}")
    
    # Step 2: Run the agent with only the relevant tools
    messages = [
        {"role": "system", "content": f"You have these tools: {skill_names}. Use them to complete the task."},
        {"role": "user", "content": task}
    ]
    
    for _ in range(10):
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            tools=tool_schemas if tool_schemas else None
        )
        
        msg = response.choices[0].message
        messages.append(msg)
        
        if not msg.tool_calls:
            return msg.content
        
        for tc in msg.tool_calls:
            result = skill_store.execute(
                tc.function.name,
                **json.loads(tc.function.arguments)
            )
            messages.append({
                "role": "tool",
                "tool_call_id": tc.id,
                "content": str(result)
            })
    
    return "Max iterations reached."

Now that you've built every core pattern from scratch — the ReAct loop, planning, memory — you're ready to use frameworks. The key mindset shift: you're not learning these frameworks because you can't build agents without them. You're using them because they handle the boring parts (state persistence, streaming, deployment) while you focus on the interesting parts (agent logic, tool design, evaluation). You understand what's underneath, so you can debug anything.

Framework Selection Rule of Thumb

Use LangGraph when you need complex, stateful workflows with conditional branching and cycles. Use LangChain for simple chains and quick prototypes with lots of integrations. Use AutoGen/crewAI when you need multiple agents collaborating. Use no framework when your agent is simple enough that adding one would be over-engineering.

LangGraph models agent workflows as state machines — directed graphs where nodes are processing steps and edges are transitions. The state is an explicit object that flows through the graph, and you define exactly how each node reads and writes to that state. This makes complex agent behaviors predictable and debuggable because the state is always visible and the transitions are always explicit.

langgraph_react_agent.py
python
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool

# ── Define the state ──────────────────────────────────────────

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]  # Auto-appends new messages
    steps_taken: int

# ── Define tools ──────────────────────────────────────────────

@tool
def calculator(expression: str) -> str:
    """Evaluate a mathematical expression."""
    try:
        return str(eval(expression, {"__builtins__": {}}))
    except Exception as e:
        return f"Error: {e}"

@tool
def search_web(query: str) -> str:
    """Search the web for information."""
    return f"Search results for: {query} (mock results here)"

tools = [calculator, search_web]
llm = ChatOpenAI(model="gpt-4o").bind_tools(tools)

# ── Define graph nodes ────────────────────────────────────────

def agent_node(state: AgentState) -> dict:
    """The thinking/deciding node — calls the LLM."""
    response = llm.invoke(state["messages"])
    return {
        "messages": [response],
        "steps_taken": state["steps_taken"] + 1
    }

def should_continue(state: AgentState) -> str:
    """Conditional edge: decide whether to use tools or finish."""
    last_message = state["messages"][-1]
    
    # Safety: stop after too many steps
    if state["steps_taken"] >= 10:
        return "end"
    
    # If the LLM returned tool calls, route to the tool node
    if last_message.tool_calls:
        return "tools"
    
    return "end"

# ── Build the graph ───────────────────────────────────────────

graph = StateGraph(AgentState)

# Add nodes
graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(tools))

# Add edges
graph.set_entry_point("agent")
graph.add_conditional_edges(
    "agent",
    should_continue,
    {
        "tools": "tools",  # If tools needed → go to tool node
        "end": END          # If done → finish
    }
)
graph.add_edge("tools", "agent")  # After tools → back to agent

# Compile
app = graph.compile()

# ── Run it ────────────────────────────────────────────────────

result = app.invoke({
    "messages": [("user", "What is 23 * 47 + the current population of Mars?")],
    "steps_taken": 0
})

print(result["messages"][-1].content)

Notice the structure: the graph has explicit nodes (agent, tools), explicit edges (conditional routing based on whether tool calls exist), and explicit state (messages + step count). Compare this to the raw ReAct loop you built earlier — the logic is identical, but now the control flow is a visible, inspectable graph rather than a Python for-loop. This matters when your agent has 10+ nodes with complex branching.

Let's implement the Plan-and-Execute pattern from earlier using LangGraph's state machine. This shows the real power of graph-based workflows — you can model the planner and executor as separate nodes with state flowing between them.

langgraph_plan_execute.py
python
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
import json

# ── State includes the plan ───────────────────────────────────

class PlanExecuteState(TypedDict):
    messages: Annotated[list, add_messages]
    plan: list[str]          # The current plan
    current_step: int         # Which step we're on
    step_results: list[str]   # Results from completed steps
    final_answer: str         # The synthesized answer

planner_llm = ChatOpenAI(model="gpt-4o")       # Strong model plans
executor_llm = ChatOpenAI(model="gpt-4o-mini")  # Cheap model executes

# ── Planner node ──────────────────────────────────────────────

def planner(state: PlanExecuteState) -> dict:
    """Create or revise the plan."""
    user_msg = state["messages"][-1].content if state["messages"] else ""
    
    response = planner_llm.invoke([
        ("system", 
         "Decompose this request into 3-7 concrete steps. "
         "Return a JSON array of strings. Nothing else."),
        ("user", user_msg)
    ])
    
    steps = json.loads(response.content)
    return {"plan": steps, "current_step": 0, "step_results": []}

# ── Executor node ─────────────────────────────────────────────

def executor(state: PlanExecuteState) -> dict:
    """Execute the current step in the plan."""
    step_idx = state["current_step"]
    step = state["plan"][step_idx]
    
    context = "\n".join(
        f"Step {i+1} result: {r}" 
        for i, r in enumerate(state["step_results"])
    )
    
    response = executor_llm.invoke([
        ("system", f"Execute this step. Context:\n{context}"),
        ("user", step)
    ])
    
    new_results = state["step_results"] + [response.content]
    return {
        "step_results": new_results,
        "current_step": step_idx + 1
    }

# ── Synthesizer node ──────────────────────────────────────────

def synthesizer(state: PlanExecuteState) -> dict:
    """Combine all step results into a final answer."""
    all_results = "\n".join(
        f"Step {i+1} ({state['plan'][i]}): {r}"
        for i, r in enumerate(state["step_results"])
    )
    
    response = planner_llm.invoke([
        ("system", "Synthesize these results into a clear final answer."),
        ("user", all_results)
    ])
    
    return {"final_answer": response.content}

# ── Routing logic ─────────────────────────────────────────────

def check_plan_progress(state: PlanExecuteState) -> str:
    if state["current_step"] >= len(state["plan"]):
        return "synthesize"  # All steps done
    return "execute"          # More steps to do

# ── Build the graph ───────────────────────────────────────────

graph = StateGraph(PlanExecuteState)

graph.add_node("planner", planner)
graph.add_node("executor", executor)
graph.add_node("synthesizer", synthesizer)

graph.set_entry_point("planner")
graph.add_edge("planner", "executor")
graph.add_conditional_edges(
    "executor",
    check_plan_progress,
    {"execute": "executor", "synthesize": "synthesizer"}
)
graph.add_edge("synthesizer", END)

app = graph.compile()

LangChain's AgentExecutor is the original high-level agent abstraction. It wraps the ReAct loop into a single class that handles tool execution, memory, and output parsing. While LangGraph is now recommended for complex workflows, AgentExecutor is still the fastest way to spin up a simple agent. Here's the full pattern including custom tools and memory.

langchain_agent.py
python
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
from langchain.memory import ConversationBufferWindowMemory
import httpx

# ── Define tools ──────────────────────────────────────────────

@tool
def calculator(expression: str) -> str:
    """Evaluate a mathematical expression.
    Use for arithmetic, conversions, or any numerical calculation.
    """
    try:
        return str(eval(expression, {"__builtins__": {}}))
    except Exception as e:
        return f"Error: {e}"

@tool
def web_search(query: str) -> str:
    """Search the web for current information.
    Use when you need recent facts, news, or data.
    """
    response = httpx.get(
        "https://api.tavily.com/search",
        params={"query": query, "max_results": 3}
    )
    results = response.json().get("results", [])
    return "\n".join(f"- {r['title']}: {r['content'][:200]}" for r in results)

@tool  
def python_executor(code: str) -> str:
    """Execute Python code and return the output.
    Use for data processing, analysis, or complex calculations.
    """
    import io, contextlib
    buf = io.StringIO()
    try:
        with contextlib.redirect_stdout(buf):
            exec(code, {"__builtins__": __builtins__})
        return buf.getvalue() or "Code executed successfully (no output)"
    except Exception as e:
        return f"Error: {e}"

tools = [calculator, web_search, python_executor]

# ── Create the agent ──────────────────────────────────────────

llm = ChatOpenAI(model="gpt-4o", temperature=0)

prompt = ChatPromptTemplate.from_messages([
    ("system", 
     "You are a helpful assistant. Think step-by-step. "
     "Use tools when needed, but answer directly if you can."),
    MessagesPlaceholder(variable_name="chat_history"),
    ("user", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad")
])

agent = create_openai_tools_agent(llm, tools, prompt)

# Memory keeps the last 10 exchanges
memory = ConversationBufferWindowMemory(
    memory_key="chat_history",
    return_messages=True,
    k=10
)

agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    memory=memory,
    verbose=True,
    max_iterations=10,
    handle_parsing_errors=True  # Gracefully handle malformed LLM output
)

# ── Use it ────────────────────────────────────────────────────

# First question
result = agent_executor.invoke({"input": "What's 15% of 847?"})
print(result["output"])

# Follow-up — memory is automatic
result = agent_executor.invoke({"input": "Now double that number"})
print(result["output"])

AutoGen (by Microsoft) models agent systems as conversations between multiple agents. Instead of a single agent with tools, you create specialized agents that talk to each other. An 'assistant' agent generates plans and code. A 'user proxy' agent executes code on behalf of the human and returns results. A 'critic' agent reviews outputs. This conversational architecture is surprisingly powerful for complex tasks because each agent can have different models, tools, and system prompts.

autogen_agents.py
python
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager

# ── Configuration ─────────────────────────────────────────────

llm_config = {
    "model": "gpt-4o",
    "temperature": 0,
    "api_key": "your-api-key"
}

# ── Define specialized agents ─────────────────────────────────

planner = AssistantAgent(
    name="Planner",
    system_message=(
        "You are a planning specialist. Given a task, break it down "
        "into clear steps. You do NOT execute steps — you only plan. "
        "Hand off execution to the Coder or Researcher."
    ),
    llm_config=llm_config
)

coder = AssistantAgent(
    name="Coder",
    system_message=(
        "You are a Python expert. Write clean, correct code to solve "
        "problems. Always include error handling. When your code is "
        "ready, ask the Executor to run it."
    ),
    llm_config=llm_config
)

critic = AssistantAgent(
    name="Critic",
    system_message=(
        "You review code and plans for correctness, edge cases, and "
        "potential issues. Be thorough but constructive. If the output "
        "is good, say APPROVED. If not, explain what needs fixing."
    ),
    llm_config=llm_config
)

executor = UserProxyAgent(
    name="Executor",
    human_input_mode="NEVER",
    code_execution_config={
        "work_dir": "workspace",
        "use_docker": False  # Set True in production!
    },
    max_consecutive_auto_reply=5
)

# ── Group chat: all agents collaborate ────────────────────────

groupchat = GroupChat(
    agents=[planner, coder, critic, executor],
    messages=[],
    max_round=20,
    speaker_selection_method="auto"  # LLM decides who speaks next
)

manager = GroupChatManager(
    groupchat=groupchat,
    llm_config=llm_config
)

# ── Run the multi-agent system ────────────────────────────────

executor.initiate_chat(
    manager,
    message=(
        "Analyze the top 10 programming languages by GitHub stars in 2024. "
        "Create a bar chart comparing them and save it as 'languages.png'. "
        "Include error bars for the margin."
    )
)

crewAI takes a different approach to multi-agent orchestration. Instead of free-form conversations, it defines roles, goals, and tasks explicitly. Each agent has a specific role (like 'Senior Data Analyst' or 'Technical Writer'), and tasks are assigned to agents with defined expected outputs. This structure makes it easier to reason about what each agent does and makes the workflow more predictable.

crewai_example.py
python
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o")

# ── Define agents with roles ─────────────────────────────────

researcher = Agent(
    role="Senior Research Analyst",
    goal="Uncover cutting-edge developments in AI agents",
    backstory=(
        "You work at a leading AI research firm. Your expertise "
        "is in identifying emerging trends and synthesizing complex "
        "technical information into clear insights."
    ),
    verbose=True,
    llm=llm,
    allow_delegation=False
)

writer = Agent(
    role="Technical Content Strategist",
    goal="Craft compelling technical content about AI agents",
    backstory=(
        "You are a renowned content strategist known for transforming "
        "complex technical concepts into engaging narratives. You "
        "write for an audience of senior engineers."
    ),
    verbose=True,
    llm=llm,
    allow_delegation=False
)

reviewer = Agent(
    role="Technical Reviewer",
    goal="Ensure technical accuracy and completeness",
    backstory=(
        "You are a meticulous reviewer with deep expertise in AI. "
        "You catch factual errors, identify gaps, and ensure the "
        "content meets the highest standards of accuracy."
    ),
    verbose=True,
    llm=llm,
    allow_delegation=False
)

# ── Define tasks ──────────────────────────────────────────────

research_task = Task(
    description=(
        "Research the current state of AI agent frameworks in 2024-2025. "
        "Compare LangGraph, AutoGen, crewAI, and others. Focus on "
        "architecture patterns, production readiness, and community adoption."
    ),
    expected_output="A detailed research report with comparisons and analysis.",
    agent=researcher
)

writing_task = Task(
    description=(
        "Using the research report, write a technical blog post about "
        "choosing the right agent framework. Include code examples "
        "and architectural diagrams."
    ),
    expected_output="A polished blog post of 2000+ words with code examples.",
    agent=writer,
    context=[research_task]  # This task depends on the research
)

review_task = Task(
    description=(
        "Review the blog post for technical accuracy. Check all code "
        "examples, verify claims, and suggest improvements."
    ),
    expected_output="A reviewed and approved blog post with corrections applied.",
    agent=reviewer,
    context=[writing_task]
)

# ── Create and run the crew ───────────────────────────────────

crew = Crew(
    agents=[researcher, writer, reviewer],
    tasks=[research_task, writing_task, review_task],
    process=Process.sequential,  # Tasks run in order
    verbose=True
)

result = crew.kickoff()
print(result)

IBM watsonx Orchestrate is an enterprise agent platform that takes a constraints-first approach. Rather than giving agents unlimited freedom, it defines strict skill flows — sequences of actions that agents can take, with guardrails at every step. This is the right model for enterprise environments where agents need to comply with regulations, audit requirements, and governance policies. Understanding Orchestrate's constraints as a design philosophy — not a limitation — makes you a better agent architect.

watsonx_orchestrate_pattern.py
python
# watsonx Orchestrate skill definition pattern
# This shows the conceptual model — actual deployment uses 
# the Orchestrate UI or OpenAPI skill imports

from dataclasses import dataclass
from typing import Optional
from enum import Enum

class SkillType(Enum):
    AUTOMATION = "automation"     # Execute a fixed workflow
    CONVERSATIONAL = "conversational"  # Free-form LLM interaction
    COMPOSITE = "composite"      # Chain of sub-skills

@dataclass
class OrchestrateSkill:
    """A skill in the watsonx Orchestrate model."""
    name: str
    description: str
    skill_type: SkillType
    input_schema: dict
    output_schema: dict
    guardrails: list[str]       # Constraints the skill must respect
    requires_approval: bool      # Human-in-the-loop gate
    audit_log: bool = True       # Log all invocations

# Example: A constrained enterprise skill
customer_lookup = OrchestrateSkill(
    name="customer_lookup",
    description="Look up a customer's account details by ID or email.",
    skill_type=SkillType.AUTOMATION,
    input_schema={
        "type": "object",
        "properties": {
            "customer_id": {"type": "string"},
            "email": {"type": "string", "format": "email"}
        },
        "oneOf": [
            {"required": ["customer_id"]},
            {"required": ["email"]}
        ]
    },
    output_schema={
        "type": "object",
        "properties": {
            "name": {"type": "string"},
            "account_status": {"type": "string"},
            "tier": {"type": "string"}
            # Note: NO sensitive data like SSN or payment info
        }
    },
    guardrails=[
        "Never expose PII beyond name and account status",
        "Rate limit: max 10 lookups per minute per user",
        "Log all queries for compliance audit",
        "Require manager approval for tier changes"
    ],
    requires_approval=False,
    audit_log=True
)

# Composite skill: chains multiple skills with gates
order_refund = OrchestrateSkill(
    name="process_refund",
    description="Process a customer refund for a given order.",
    skill_type=SkillType.COMPOSITE,
    input_schema={
        "type": "object",
        "properties": {
            "order_id": {"type": "string"},
            "reason": {"type": "string"},
            "amount": {"type": "number"}
        },
        "required": ["order_id", "reason"]
    },
    output_schema={
        "type": "object",
        "properties": {
            "refund_id": {"type": "string"},
            "status": {"type": "string"},
            "approved_by": {"type": "string"}
        }
    },
    guardrails=[
        "Refunds over $500 require manager approval",
        "Maximum 3 refunds per customer per month",
        "Must verify order exists and is within return window",
        "All refunds logged to financial audit system"
    ],
    requires_approval=True,  # Human-in-the-loop!
    audit_log=True
)

print(f"Skill: {order_refund.name}")
print(f"Type: {order_refund.skill_type.value}")
print(f"Requires approval: {order_refund.requires_approval}")
print(f"Guardrails: {len(order_refund.guardrails)}")
FrameworkArchitectureBest ForLearning CurveProduction Ready
Raw Python + APICustom loopUnderstanding fundamentals, simple agents, full controlLow (you write everything)Yes (you own every line)
LangGraphState machine / directed graphComplex stateful workflows, conditional branching, cyclesMedium-HighYes — built for production
LangChain AgentExecutorPrebuilt ReAct loopQuick prototypes, lots of integrations, simple chainsLow-MediumYes, but less flexible than LangGraph
AutoGenMulti-agent conversationCollaborative agents, code generation, research tasksMediumMaturing rapidly
crewAIRole-based task assignmentTeam-based workflows, content pipelines, structured outputLowGood for defined workflows
watsonx OrchestrateConstrained skill flowsEnterprise compliance, audit trails, governed automationMediumEnterprise-grade

Let's build a complete agent system that combines everything from Phase 2 — the ReAct loop, planning, multiple memory types, and robust error handling — into a single, production-ready architecture. This is the kind of system you'd actually deploy.

complete_agent_system.py
python
import json
import time
from dataclasses import dataclass, field
from typing import Optional, Callable
from openai import OpenAI

client = OpenAI()

@dataclass
class AgentConfig:
    """Configuration for the agent system."""
    model: str = "gpt-4o"
    max_steps: int = 15
    timeout_seconds: float = 180.0
    max_retries: int = 2
    loop_threshold: int = 3
    memory_window: int = 20      # Keep last N messages in context
    planning_enabled: bool = True
    reflexion_enabled: bool = False
    verbose: bool = True

@dataclass
class AgentResult:
    """The result of an agent run."""
    answer: str
    steps_taken: int
    tools_called: list[str]
    total_tokens: int
    elapsed_seconds: float
    plan: Optional[list[str]] = None
    errors: list[str] = field(default_factory=list)

class ProductionAgent:
    """A complete agent combining ReAct, planning, and memory."""
    
    def __init__(
        self,
        config: AgentConfig,
        tools: dict[str, Callable],
        tool_schemas: list[dict],
        system_prompt: str = "You are a helpful assistant."
    ):
        self.config = config
        self.tools = tools
        self.tool_schemas = tool_schemas
        self.system_prompt = system_prompt
        self.total_tokens = 0
    
    def _call_llm(self, messages: list[dict], use_tools: bool = True) -> dict:
        """Make an LLM call with retry logic."""
        kwargs = {
            "model": self.config.model,
            "messages": messages,
        }
        if use_tools and self.tool_schemas:
            kwargs["tools"] = self.tool_schemas
            kwargs["tool_choice"] = "auto"
        
        for attempt in range(self.config.max_retries + 1):
            try:
                response = client.chat.completions.create(**kwargs)
                self.total_tokens += response.usage.total_tokens
                return response
            except Exception as e:
                if attempt == self.config.max_retries:
                    raise
                time.sleep(2 ** attempt)
    
    def _create_plan(self, user_message: str) -> list[str]:
        """Generate a plan for complex tasks."""
        response = self._call_llm(
            [
                {"role": "system", "content": 
                    "Analyze this task. If it requires multiple steps, "
                    "return a JSON object with a 'steps' array. "
                    "If it's simple, return {\"steps\": []}."},
                {"role": "user", "content": user_message}
            ],
            use_tools=False
        )
        try:
            data = json.loads(response.choices[0].message.content)
            return data.get("steps", [])
        except (json.JSONDecodeError, AttributeError):
            return []
    
    def run(self, user_message: str) -> AgentResult:
        """Execute the full agent pipeline."""
        start_time = time.time()
        self.total_tokens = 0
        tools_called = []
        errors = []
        call_history = []
        plan = None
        
        # ── Optional planning phase ───────────────────────────
        if self.config.planning_enabled:
            plan = self._create_plan(user_message)
            if plan and self.config.verbose:
                for i, step in enumerate(plan, 1):
                    print(f"  Plan step {i}: {step}")
        
        # ── Build initial messages ────────────────────────────
        messages = [{"role": "system", "content": self.system_prompt}]
        
        if plan:
            plan_text = "\n".join(f"{i+1}. {s}" for i, s in enumerate(plan))
            messages.append({
                "role": "system",
                "content": f"Follow this plan:\n{plan_text}"
            })
        
        messages.append({"role": "user", "content": user_message})
        
        # ── ReAct loop ────────────────────────────────────────
        for step in range(self.config.max_steps):
            elapsed = time.time() - start_time
            if elapsed > self.config.timeout_seconds:
                errors.append(f"Timeout after {elapsed:.1f}s")
                break
            
            response = self._call_llm(messages)
            msg = response.choices[0].message
            messages.append(msg)
            
            if not msg.tool_calls:
                # Done!
                result_text = msg.content or ""
                
                # Optional reflexion
                if self.config.reflexion_enabled and step > 0:
                    result_text = self._reflect(user_message, result_text)
                
                return AgentResult(
                    answer=result_text,
                    steps_taken=step + 1,
                    tools_called=tools_called,
                    total_tokens=self.total_tokens,
                    elapsed_seconds=time.time() - start_time,
                    plan=plan,
                    errors=errors
                )
            
            for tc in msg.tool_calls:
                fn_name = tc.function.name
                
                try:
                    fn_args = json.loads(tc.function.arguments)
                except json.JSONDecodeError:
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "content": "Error: malformed JSON arguments"
                    })
                    errors.append(f"Malformed args for {fn_name}")
                    continue
                
                # Loop detection
                sig = f"{fn_name}:{json.dumps(fn_args, sort_keys=True)}"
                call_history.append(sig)
                recent = call_history[-self.config.loop_threshold:]
                if len(recent) == self.config.loop_threshold and len(set(recent)) == 1:
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "content": "Loop detected. Try a different approach."
                    })
                    continue
                
                # Execute
                if fn_name in self.tools:
                    try:
                        result = self.tools[fn_name](**fn_args)
                    except Exception as e:
                        result = f"Error: {e}"
                        errors.append(f"{fn_name}: {e}")
                else:
                    result = f"Unknown tool: {fn_name}"
                
                tools_called.append(fn_name)
                messages.append({
                    "role": "tool",
                    "tool_call_id": tc.id,
                    "content": str(result)
                })
        
        # Exceeded max steps — force a final answer
        messages.append({
            "role": "user",
            "content": "Provide your best answer with available information."
        })
        response = self._call_llm(messages, use_tools=False)
        
        return AgentResult(
            answer=response.choices[0].message.content or "",
            steps_taken=self.config.max_steps,
            tools_called=tools_called,
            total_tokens=self.total_tokens,
            elapsed_seconds=time.time() - start_time,
            plan=plan,
            errors=errors
        )
    
    def _reflect(self, task: str, output: str) -> str:
        """Reflexion pass — critique and improve the output."""
        critique_resp = self._call_llm(
            [
                {"role": "system", "content": 
                    "Critique this output. If it's good, say PASS. "
                    "Otherwise, list specific issues."},
                {"role": "user", "content": f"Task: {task}\nOutput: {output}"}
            ],
            use_tools=False
        )
        critique = critique_resp.choices[0].message.content
        
        if "PASS" in critique.upper():
            return output
        
        revision_resp = self._call_llm(
            [
                {"role": "system", "content": "Revise the output based on the critique."},
                {"role": "user", "content": 
                    f"Task: {task}\nOutput: {output}\nCritique: {critique}"}
            ],
            use_tools=False
        )
        return revision_resp.choices[0].message.content or output

Phase 2 Checklist

Before moving to Phase 3, make sure you can do ALL of these from scratch:
  1. Build a ReAct loop with raw API calls — both OpenAI and Anthropic
  2. Implement all termination conditions (max steps, timeout, loop detection, budget)
  3. Build Plan-and-Execute with separate planner and executor models
  4. Implement at least two memory strategies (sliding window + summarization)
  5. Set up episodic memory with a vector store
  6. Build the same agent in LangGraph and explain how the state machine maps to your raw loop
  7. Explain when to use each framework — and when to use none

Phase 2 transforms you from someone who can call APIs into someone who can build autonomous systems. The patterns here — ReAct, planning, memory, self-critique — are the building blocks of every production agent. In Phase 3, we'll scale these patterns into multi-agent systems, add evaluation and observability, and tackle the hardest problem in agent engineering: making agents reliable enough to trust in production.

Related Articles