π System Prompt Templates
Well-crafted system prompts are the agent's "job description." They define identity, purpose, boundaries, and capabilities.
The Four-Part Prompt Structure
Template Formula
# Part 1: Identity (Who you are)
You are a [Role] Agent.
# Part 2: Purpose (What you do)
Your sole purpose is to [specific task/responsibility].
# Part 3: Boundaries (What you don't do)
You do NOT [actions outside scope].
# Part 4: Capabilities (What tools you have)
You ONLY have access to: [list of tools].Agent Types & Templates
You are a Data Validation Agent.
Your sole purpose is to check incoming user data against predefined schemas
and return validation results. You verify data types, required fields,
format constraints, and business rules.
You do NOT:
- Perform calculations or transformations on the data
- Interact with external APIs or databases
- Make decisions about what to do with invalid data
- Store or cache validation results
You ONLY have access to the validate_schema tool.
When you receive data:
1. Identify the appropriate schema
2. Validate against all constraints
3. Return clear validation results with specific error messages
4. Do not attempt to fix or modify the dataYou are a Task Routing Agent.
Your purpose is to analyze incoming requests and route them to the most
appropriate specialist agent based on content, urgency, and complexity.
You do NOT:
- Execute tasks yourself
- Modify the original request
- Make decisions outside of routing logic
- Interact directly with external systems
You have access to these tools:
- analyze_request_type: Categorizes request content
- check_agent_availability: Verifies specialist agent status
- route_to_agent: Sends request to chosen agent
Routing logic:
- Technical questions β Technical Support Agent
- Account issues β Account Management Agent
- Billing questions β Billing Agent
- Urgent issues (keywords: "urgent", "emergency") β Priority queue
- Complex issues requiring multiple steps β Orchestrator AgentYou are a Research Agent specialized in gathering and synthesizing information.
Your purpose is to:
1. Query multiple information sources
2. Extract relevant facts and data
3. Synthesize findings into coherent summaries
4. Cite sources for all claims
You have access to:
- search_knowledge_base: Query internal documents
- search_web: Search external sources
- extract_data: Parse structured data from sources
- summarize_findings: Create synthesis reports
Research process:
1. Analyze the research question
2. Identify key information needs
3. Query multiple sources in parallel
4. Cross-reference findings
5. Synthesize into a clear answer with citations
You do NOT:
- Make claims without source verification
- Generate fictional information
- Make business decisions based on research
- Modify or store source documentsYou are an Orchestrator Agent managing a team of specialist agents.
Your purpose is to:
1. Break down complex requests into subtasks
2. Delegate subtasks to appropriate specialist agents
3. Coordinate the sequence and timing of agent activities
4. Synthesize results from multiple agents
5. Handle errors and retry failed operations
You manage these specialist agents:
- DataAgent: Data retrieval and validation
- AnalysisAgent: Data analysis and calculations
- ReportAgent: Report generation
- NotificationAgent: User communications
Orchestration patterns:
- Sequential: Execute tasks one after another (use when order matters)
- Parallel: Execute independent tasks simultaneously (use for speed)
- Conditional: Branch based on results (use for decision trees)
Tools available:
- delegate_to_agent: Send task to specialist
- wait_for_completion: Block until agent finishes
- gather_results: Collect outputs from multiple agents
- handle_failure: Implement fallback logic
Always maintain workflow state and provide clear status updates.Prompt Engineering Best Practices
π§ Tool Design Patterns
Tools give agents the ability to perform actions beyond language processing. Well-designed tools are robust, validated, and logged.
Tool Anatomy
Complete Tool Template
from typing import Optional
import logging
def tool_name(
param1: str,
param2: int,
optional_param: Optional[str] = None
) -> dict:
"""
Clear description of what this tool does.
Args:
param1: Description of parameter 1
param2: Description of parameter 2
optional_param: Description of optional parameter
Returns:
dict with 'success' (bool) and 'data' or 'error' fields
Example:
result = tool_name("example", 42)
# Returns: {"success": True, "data": {...}}
"""
try:
# 1. Validate inputs
if not param1 or param2 < 0:
return {
"success": False,
"error": "Invalid input parameters"
}
# 2. Log the operation
logging.info(f"Executing tool_name with param1={param1}")
# 3. Perform the operation
result = perform_operation(param1, param2)
# 4. Return structured response
return {
"success": True,
"data": result
}
except Exception as e:
# 5. Handle errors gracefully
logging.error(f"Tool error: {str(e)}")
return {
"success": False,
"error": f"Operation failed: {str(e)}"
}Common Tool Patterns
import sqlite3
from typing import List, Dict
def query_database(
query: str,
params: tuple = ()
) -> Dict[str, any]:
"""
Execute a SELECT query on the database.
Args:
query: SQL SELECT query (validated for safety)
params: Query parameters for safe parameterization
Returns:
dict with results or error
"""
try:
# Validation: Only allow SELECT statements
if not query.strip().upper().startswith('SELECT'):
return {
"success": False,
"error": "Only SELECT queries allowed"
}
# Execute with connection management
with sqlite3.connect('database.db') as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(query, params)
# Convert to list of dicts
results = [dict(row) for row in cursor.fetchall()]
return {
"success": True,
"data": results,
"count": len(results)
}
except sqlite3.Error as e:
logging.error(f"Database error: {e}")
return {
"success": False,
"error": "Database query failed"
}import requests
from typing import Dict, Optional
def call_external_api(
endpoint: str,
method: str = "GET",
data: Optional[Dict] = None,
timeout: int = 30
) -> Dict[str, any]:
"""
Make HTTP request to external API.
Args:
endpoint: API endpoint URL
method: HTTP method (GET, POST, PUT, DELETE)
data: Request body data
timeout: Request timeout in seconds
Returns:
dict with response data or error
"""
try:
# Validate endpoint
if not endpoint.startswith('https://'):
return {
"success": False,
"error": "Only HTTPS endpoints allowed"
}
# Make request with timeout
response = requests.request(
method=method.upper(),
url=endpoint,
json=data,
timeout=timeout,
headers={"User-Agent": "MultiAgentSystem/1.0"}
)
# Check status code
response.raise_for_status()
return {
"success": True,
"data": response.json(),
"status_code": response.status_code
}
except requests.Timeout:
return {
"success": False,
"error": "Request timed out"
}
except requests.HTTPError as e:
return {
"success": False,
"error": f"HTTP error: {e.response.status_code}"
}
except Exception as e:
return {
"success": False,
"error": f"Request failed: {str(e)}"
}from pydantic import BaseModel, ValidationError
from typing import Dict, Any
def validate_data(
data: Dict[str, Any],
schema: BaseModel
) -> Dict[str, any]:
"""
Validate data against a Pydantic schema.
Args:
data: Dictionary of data to validate
schema: Pydantic model class
Returns:
dict with validation results
"""
try:
# Attempt validation
validated = schema(**data)
return {
"success": True,
"data": validated.dict(),
"message": "Validation passed"
}
except ValidationError as e:
# Extract validation errors
errors = []
for error in e.errors():
errors.append({
"field": ".".join(str(x) for x in error['loc']),
"message": error['msg'],
"type": error['type']
})
return {
"success": False,
"errors": errors,
"message": "Validation failed"
}Tool Design Principles
π― Orchestration Patterns
Orchestration coordinates multiple agents to accomplish complex workflows. Different patterns suit different scenarios.
The Four Core Patterns
1. Sequential Orchestration
Use when order mattersExecute agents one after another, passing results forward. Each step depends on the previous one.
def sequential_workflow(user_request: str) -> dict:
"""Execute agents in sequence, passing data forward."""
# Step 1: Validate input
validation_result = validation_agent.run(user_request)
if not validation_result["success"]:
return validation_result
# Step 2: Process validated data
processing_result = processing_agent.run(validation_result["data"])
if not processing_result["success"]:
return processing_result
# Step 3: Generate final output
final_result = output_agent.run(processing_result["data"])
return final_resultBest for: Data pipelines, multi-step validations, document processing workflows
2. Parallel Orchestration
Use for independent tasksExecute multiple agents simultaneously to improve speed. Tasks don't depend on each other.
import asyncio
from typing import List, Dict
async def parallel_workflow(user_request: str) -> Dict[str, any]:
"""Execute multiple agents in parallel."""
# Define tasks to run concurrently
tasks = [
research_agent.run_async(user_request),
data_agent.run_async(user_request),
analytics_agent.run_async(user_request)
]
# Wait for all to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Combine results
combined = {
"research": results[0],
"data": results[1],
"analytics": results[2]
}
# Synthesize final output
final_result = synthesis_agent.run(combined)
return final_resultBest for: Research tasks, multi-source data gathering, independent analyses
3. Conditional Orchestration
Use for branching logicRoute to different agents based on conditions or results. Creates decision trees.
def conditional_workflow(user_request: str) -> dict:
"""Route based on request classification."""
# Step 1: Classify the request
classification = classifier_agent.run(user_request)
request_type = classification["type"]
# Step 2: Route to appropriate handler
if request_type == "technical":
result = technical_agent.run(user_request)
elif request_type == "billing":
result = billing_agent.run(user_request)
elif request_type == "account":
result = account_agent.run(user_request)
else:
# Complex request needs orchestrator
result = orchestrator_agent.run(user_request)
# Step 3: Post-process if needed
if result.get("requires_followup"):
result = followup_agent.run(result)
return resultBest for: Customer support routing, approval workflows, multi-tier escalation
4. Hybrid Orchestration
Use for complex workflowsCombine sequential, parallel, and conditional patterns for sophisticated workflows.
async def hybrid_workflow(user_request: str) -> dict:
"""Complex workflow combining all patterns."""
# Phase 1: Sequential validation
validation = await validation_agent.run_async(user_request)
if not validation["success"]:
return validation
# Phase 2: Parallel research
research_tasks = [
internal_search_agent.run_async(validation["data"]),
external_search_agent.run_async(validation["data"]),
database_agent.run_async(validation["data"])
]
research_results = await asyncio.gather(*research_tasks)
# Phase 3: Conditional processing
combined_data = combine_results(research_results)
if combined_data["confidence"] > 0.8:
# High confidence: direct response
result = response_agent.run(combined_data)
else:
# Low confidence: escalate to human
result = escalation_agent.run(combined_data)
return resultBest for: Automated sales systems, complex decision engines, intelligent routing systems
Orchestrator Implementation
from typing import Dict, List, Any
import logging
class WorkflowOrchestrator:
"""Orchestrates multi-agent workflows with state tracking."""
def __init__(self, agents: Dict[str, Agent]):
self.agents = agents
self.state = {}
self.history = []
def execute_workflow(self, request: str) -> dict:
"""Execute the complete workflow."""
try:
# Initialize state
self.state = {
"request": request,
"status": "in_progress",
"steps_completed": []
}
# Step 1: Validation (sequential)
validation_result = self._run_agent(
"validator",
request
)
if not validation_result["success"]:
return self._handle_failure("validation", validation_result)
# Step 2: Parallel processing
parallel_results = self._run_parallel([
("researcher", validation_result["data"]),
("analyzer", validation_result["data"])
])
# Step 3: Synthesis (sequential)
final_result = self._run_agent(
"synthesizer",
parallel_results
)
self.state["status"] = "completed"
return final_result
except Exception as e:
return self._handle_failure("orchestration", str(e))
def _run_agent(self, agent_name: str, data: Any) -> dict:
"""Run a single agent with tracking."""
logging.info(f"Running agent: {agent_name}")
agent = self.agents.get(agent_name)
result = agent.run(data)
# Track execution
self.history.append({
"agent": agent_name,
"result": result
})
self.state["steps_completed"].append(agent_name)
return result
def _run_parallel(self, tasks: List[tuple]) -> dict:
"""Run multiple agents in parallel."""
results = {}
for agent_name, data in tasks:
result = self._run_agent(agent_name, data)
results[agent_name] = result
return results
def _handle_failure(self, stage: str, error: Any) -> dict:
"""Handle workflow failures."""
logging.error(f"Workflow failed at {stage}: {error}")
self.state["status"] = "failed"
self.state["error_stage"] = stage
return {
"success": False,
"error": error,
"state": self.state
}π Routing Patterns
Routing determines which agent handles which request. Choose the right routing strategy for your use case.
Routing Strategies
1. Content-Based Routing
Route by message contentAnalyze the request content and route to the most appropriate agent based on keywords, intent, or classification.
def content_based_routing(request: str) -> str:
"""Route based on request content analysis."""
# Extract keywords
request_lower = request.lower()
# Define routing rules
if any(word in request_lower for word in
['bug', 'error', 'broken', 'not working']):
return "technical_support_agent"
elif any(word in request_lower for word in
['bill', 'payment', 'charge', 'invoice']):
return "billing_agent"
elif any(word in request_lower for word in
['account', 'password', 'login', 'access']):
return "account_agent"
else:
# Default to general support
return "general_support_agent"
# Advanced: Use ML classification
def ml_based_routing(request: str) -> str:
"""Use ML model to classify and route requests."""
# Use a classifier agent
classification = classifier_agent.run(request)
intent = classification["intent"]
confidence = classification["confidence"]
if confidence < 0.7:
# Low confidence: route to human
return "human_agent"
return intent_to_agent_mapping[intent]2. Priority-Based Routing
Route by urgencyRoute high-priority requests to faster agents or escalate immediately to humans.
from enum import Enum
class Priority(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
URGENT = 4
def priority_based_routing(request: str) -> dict:
"""Route based on request priority."""
# Determine priority
priority = determine_priority(request)
if priority == Priority.URGENT:
# Urgent: immediate human escalation
return {
"agent": "human_agent",
"queue": "urgent",
"sla": "15_minutes"
}
elif priority == Priority.HIGH:
# High: premium agent with fast response
return {
"agent": "premium_agent",
"queue": "high_priority",
"sla": "1_hour"
}
elif priority == Priority.MEDIUM:
# Medium: standard agent
return {
"agent": "standard_agent",
"queue": "normal",
"sla": "4_hours"
}
else:
# Low: batch processing
return {
"agent": "batch_agent",
"queue": "low_priority",
"sla": "24_hours"
}
def determine_priority(request: str) -> Priority:
"""Analyze request to determine priority level."""
urgent_keywords = ['urgent', 'emergency', 'critical', 'down']
high_keywords = ['asap', 'important', 'escalate']
request_lower = request.lower()
if any(word in request_lower for word in urgent_keywords):
return Priority.URGENT
elif any(word in request_lower for word in high_keywords):
return Priority.HIGH
else:
return Priority.MEDIUM3. Round-Robin Routing
Distribute load evenlyDistribute requests evenly across multiple agents to balance load.
class RoundRobinRouter:
"""Distributes requests evenly across agents."""
def __init__(self, agents: List[str]):
self.agents = agents
self.current_index = 0
def route(self, request: str) -> str:
"""Get next agent in rotation."""
agent = self.agents[self.current_index]
# Move to next agent
self.current_index = (self.current_index + 1) % len(self.agents)
return agent
# Usage
router = RoundRobinRouter([
"agent_1",
"agent_2",
"agent_3"
])
for request in requests:
agent = router.route(request)
agent.process(request)4. Capability-Based Routing
Route by agent capabilitiesMatch requests to agents based on their specific capabilities and current load.
class CapabilityRouter:
"""Routes based on agent capabilities and availability."""
def __init__(self):
self.agent_registry = {
"sql_agent": {
"capabilities": ["database", "sql", "queries"],
"max_concurrent": 5,
"current_load": 2
},
"api_agent": {
"capabilities": ["api", "rest", "external"],
"max_concurrent": 10,
"current_load": 7
},
"analysis_agent": {
"capabilities": ["analytics", "statistics", "reporting"],
"max_concurrent": 3,
"current_load": 1
}
}
def route(self, required_capability: str) -> Optional[str]:
"""Find best agent for required capability."""
candidates = []
# Find agents with required capability
for agent_name, info in self.agent_registry.items():
if required_capability in info["capabilities"]:
# Check if agent has capacity
if info["current_load"] < info["max_concurrent"]:
load_ratio = info["current_load"] / info["max_concurrent"]
candidates.append((agent_name, load_ratio))
if not candidates:
return None # No available agents
# Select agent with lowest load
best_agent = min(candidates, key=lambda x: x[1])[0]
# Increment load
self.agent_registry[best_agent]["current_load"] += 1
return best_agent
def release_agent(self, agent_name: str):
"""Release agent after task completion."""
if agent_name in self.agent_registry:
self.agent_registry[agent_name]["current_load"] -= 1Routing Best Practices
πΎ State Management Patterns
State management determines what information persists across agent interactions and how conflicts are handled.
Types of State
1. Ephemeral (Short-term) State
In-memory, temporaryState that exists only during a single workflow execution. Lost when workflow completes.
class EphemeralState:
"""Temporary state for single workflow execution."""
def __init__(self):
self.data = {}
self.created_at = datetime.now()
def set(self, key: str, value: Any):
"""Store temporary data."""
self.data[key] = value
def get(self, key: str) -> Any:
"""Retrieve temporary data."""
return self.data.get(key)
def clear(self):
"""Clear all temporary data."""
self.data = {}
# Usage in workflow
def process_request(request: str):
state = EphemeralState()
# Store intermediate results
state.set("original_request", request)
state.set("validation_passed", True)
# Use in subsequent steps
if state.get("validation_passed"):
result = process_agent.run(request)
# State automatically discarded after function returns
return resultBest for: Workflow context, intermediate results, temporary calculations
2. Persistent (Long-term) State
Database-backed, permanentState that survives across multiple workflows and system restarts.
import sqlite3
from typing import Any, Optional
class PersistentState:
"""Long-term state stored in database."""
def __init__(self, db_path: str = "state.db"):
self.conn = sqlite3.connect(db_path)
self._initialize_db()
def _initialize_db(self):
"""Create state table if not exists."""
self.conn.execute("""
CREATE TABLE IF NOT EXISTS state (
key TEXT PRIMARY KEY,
value TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
def set(self, key: str, value: Any):
"""Store persistent data."""
import json
value_json = json.dumps(value)
self.conn.execute("""
INSERT OR REPLACE INTO state (key, value, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
""", (key, value_json))
self.conn.commit()
def get(self, key: str) -> Optional[Any]:
"""Retrieve persistent data."""
import json
cursor = self.conn.execute(
"SELECT value FROM state WHERE key = ?",
(key,)
)
row = cursor.fetchone()
if row:
return json.loads(row[0])
return None
# Usage for user sessions
state = PersistentState()
state.set(f"user:{user_id}:preferences", user_preferences)
state.set(f"workflow:{workflow_id}:status", "completed")Best for: User data, workflow history, configuration, audit logs
State Coordination Techniques
Lock resources before access to prevent concurrent modifications.
import sqlite3
def update_with_lock(user_id: int, amount: float):
"""Update account balance with database lock."""
conn = sqlite3.connect("accounts.db")
try:
# Start transaction
conn.execute("BEGIN EXCLUSIVE")
# Read current balance (row is now locked)
cursor = conn.execute(
"SELECT balance FROM accounts WHERE user_id = ?",
(user_id,)
)
current_balance = cursor.fetchone()[0]
# Calculate new balance
new_balance = current_balance + amount
# Update balance
conn.execute(
"UPDATE accounts SET balance = ? WHERE user_id = ?",
(new_balance, user_id)
)
# Commit (releases lock)
conn.commit()
return {"success": True, "new_balance": new_balance}
except Exception as e:
conn.rollback()
return {"success": False, "error": str(e)}
finally:
conn.close()Allow concurrent access, detect conflicts on write, retry if needed.
def update_with_version_check(
user_id: int,
new_data: dict,
max_retries: int = 3
):
"""Update with optimistic concurrency control."""
for attempt in range(max_retries):
conn = sqlite3.connect("data.db")
try:
# Read current version
cursor = conn.execute(
"SELECT data, version FROM records WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
current_version = row[1]
# Attempt update with version check
result = conn.execute("""
UPDATE records
SET data = ?, version = version + 1
WHERE user_id = ? AND version = ?
""", (json.dumps(new_data), user_id, current_version))
conn.commit()
# Check if update succeeded
if result.rowcount > 0:
return {"success": True, "version": current_version + 1}
# Version mismatch: someone else updated
logging.warning(f"Version conflict, retry {attempt + 1}")
except Exception as e:
conn.rollback()
return {"success": False, "error": str(e)}
finally:
conn.close()
return {"success": False, "error": "Max retries exceeded"}Store all state changes as events, reconstruct current state by replaying events.
class EventStore:
"""Store and replay state changes as events."""
def __init__(self):
self.events = []
def append_event(self, event: dict):
"""Add new event to the log."""
event["timestamp"] = datetime.now()
event["id"] = len(self.events) + 1
self.events.append(event)
def get_current_state(self, entity_id: str) -> dict:
"""Reconstruct current state by replaying events."""
state = {}
for event in self.events:
if event["entity_id"] == entity_id:
# Apply event to state
if event["type"] == "created":
state = event["data"]
elif event["type"] == "updated":
state.update(event["data"])
elif event["type"] == "deleted":
state = {}
return state
# Usage
store = EventStore()
# Record events
store.append_event({
"entity_id": "order_123",
"type": "created",
"data": {"status": "pending", "amount": 100}
})
store.append_event({
"entity_id": "order_123",
"type": "updated",
"data": {"status": "completed"}
})
# Reconstruct current state
current_state = store.get_current_state("order_123")
# Result: {"status": "completed", "amount": 100}State Management Best Practices
β οΈ Error Handling Patterns
Robust error handling prevents cascading failures and maintains system reliability.
Core Error Handling Strategies
1. Retry with Exponential Backoff
For transient failuresRetry failed operations with increasing delays to handle temporary issues.
import time
from typing import Callable, Any
def retry_with_backoff(
func: Callable,
max_retries: int = 3,
initial_delay: float = 1.0,
backoff_factor: float = 2.0
) -> Any:
"""
Retry function with exponential backoff.
Args:
func: Function to retry
max_retries: Maximum number of retry attempts
initial_delay: Initial delay in seconds
backoff_factor: Multiplier for each retry
"""
delay = initial_delay
for attempt in range(max_retries):
try:
result = func()
return {"success": True, "data": result}
except Exception as e:
if attempt == max_retries - 1:
# Last attempt failed
logging.error(f"All retries exhausted: {e}")
return {
"success": False,
"error": f"Failed after {max_retries} attempts"
}
# Wait before retrying
logging.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s")
time.sleep(delay)
delay *= backoff_factor
# Usage
result = retry_with_backoff(
lambda: api_call_that_might_fail(),
max_retries=5,
initial_delay=1.0,
backoff_factor=2.0
)2. Fallback Pattern
For graceful degradationProvide alternative paths when primary operation fails.
def get_data_with_fallback(query: str) -> dict:
"""Try multiple data sources with fallback."""
# Try primary source
try:
result = primary_database.query(query)
return {
"success": True,
"data": result,
"source": "primary"
}
except Exception as e:
logging.warning(f"Primary source failed: {e}")
# Fallback to cache
try:
cached_result = cache.get(query)
if cached_result:
logging.info("Using cached data")
return {
"success": True,
"data": cached_result,
"source": "cache",
"warning": "Data may be stale"
}
except Exception as e:
logging.warning(f"Cache failed: {e}")
# Fallback to backup database
try:
result = backup_database.query(query)
logging.info("Using backup database")
return {
"success": True,
"data": result,
"source": "backup"
}
except Exception as e:
logging.error(f"All sources failed: {e}")
# All fallbacks failed
return {
"success": False,
"error": "All data sources unavailable"
}3. Compensation Pattern
For reversing partial failuresUndo completed operations when later steps fail.
def process_order_with_compensation(order_data: dict) -> dict:
"""Process order with compensation on failure."""
completed_steps = []
try:
# Step 1: Reserve inventory
inventory_result = reserve_inventory(order_data["items"])
completed_steps.append(("inventory", inventory_result))
# Step 2: Process payment
payment_result = process_payment(order_data["payment"])
completed_steps.append(("payment", payment_result))
# Step 3: Create shipment
shipment_result = create_shipment(order_data["address"])
completed_steps.append(("shipment", shipment_result))
return {"success": True, "order_id": shipment_result["id"]}
except Exception as e:
logging.error(f"Order processing failed: {e}")
# Compensate completed steps in reverse order
for step_name, step_result in reversed(completed_steps):
try:
if step_name == "inventory":
release_inventory(step_result["reservation_id"])
logging.info("Inventory released")
elif step_name == "payment":
refund_payment(step_result["transaction_id"])
logging.info("Payment refunded")
elif step_name == "shipment":
cancel_shipment(step_result["shipment_id"])
logging.info("Shipment canceled")
except Exception as comp_error:
logging.error(f"Compensation failed for {step_name}: {comp_error}")
return {
"success": False,
"error": str(e),
"compensated": True
}4. Circuit Breaker Pattern
For preventing cascading failuresStop calling failing services to allow recovery time.
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failures detected, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
"""Prevent cascading failures by stopping calls to failing services."""
def __init__(
self,
failure_threshold: int = 5,
timeout_seconds: int = 60
):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout_seconds = timeout_seconds
self.last_failure_time = None
def call(self, func: Callable) -> dict:
"""Execute function with circuit breaker protection."""
# Check if circuit is open
if self.state == CircuitState.OPEN:
# Check if timeout expired
if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout_seconds):
self.state = CircuitState.HALF_OPEN
logging.info("Circuit half-open, testing service")
else:
return {
"success": False,
"error": "Circuit breaker OPEN"
}
# Attempt the call
try:
result = func()
self._on_success()
return {"success": True, "data": result}
except Exception as e:
self._on_failure()
return {"success": False, "error": str(e)}
def _on_success(self):
"""Reset on successful call."""
self.failure_count = 0
self.state = CircuitState.CLOSED
logging.info("Circuit closed")
def _on_failure(self):
"""Track failures and open circuit if threshold reached."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
logging.warning("Circuit breaker OPEN")
# Usage
breaker = CircuitBreaker(failure_threshold=3, timeout_seconds=30)
result = breaker.call(lambda: external_api_call())5. Human-in-the-Loop Escalation
For complex failuresEscalate to humans when automated recovery isn't possible.
def process_with_escalation(request: dict) -> dict:
"""Process request with human escalation for failures."""
try:
# Attempt automated processing
result = automated_agent.process(request)
# Check confidence level
if result.get("confidence", 0) < 0.7:
# Low confidence: escalate to human
return escalate_to_human(
request,
reason="Low confidence",
automated_result=result
)
return result
except Exception as e:
# Automated processing failed: escalate
return escalate_to_human(
request,
reason=f"Processing failed: {str(e)}",
automated_result=None
)
def escalate_to_human(
request: dict,
reason: str,
automated_result: dict = None
) -> dict:
"""Create escalation ticket for human review."""
ticket = {
"id": generate_ticket_id(),
"request": request,
"escalation_reason": reason,
"automated_result": automated_result,
"created_at": datetime.now(),
"status": "pending_human_review"
}
# Store in escalation queue
escalation_queue.add(ticket)
# Notify human operators
notify_operators(ticket)
logging.info(f"Escalated to human: {ticket['id']}")
return {
"success": False,
"escalated": True,
"ticket_id": ticket["id"],
"message": "Request escalated for human review"
}Error Handling Best Practices
π οΈ Framework Comparisons
Choose the right framework based on your use case, complexity, and team expertise.
Framework Overview
| Framework | Best For | Learning Curve | Flexibility | Community |
|---|---|---|---|---|
| LangGraph | Complex workflows, state graphs | Steep | High | Large |
| Swarm | Simple multi-agent, quick prototypes | Gentle | Medium | Medium |
| Pydantic AI | Type-safe agents, data validation | Medium | Medium | Growing |
Framework Deep Dives
LangGraph models agent workflows as state graphs with nodes and edges.
from langgraph.graph import StateGraph
from typing import TypedDict
# Define state structure
class AgentState(TypedDict):
messages: list
next_agent: str
# Create graph
workflow = StateGraph(AgentState)
# Add agent nodes
workflow.add_node("researcher", research_agent)
workflow.add_node("analyzer", analysis_agent)
workflow.add_node("writer", writer_agent)
# Define edges (transitions)
workflow.add_edge("researcher", "analyzer")
workflow.add_edge("analyzer", "writer")
# Set entry point
workflow.set_entry_point("researcher")
# Compile and run
app = workflow.compile()
result = app.invoke({"messages": [user_input]})Key Features:
- Visual graph representation of workflows
- Built-in state management and persistence
- Conditional routing between agents
- Checkpointing for long-running workflows
- Time travel debugging
Use When: Building complex, multi-stage workflows with conditional logic and state persistence needs.
Swarm focuses on simplicity and ease of use for basic multi-agent coordination.
from swarm import Agent, Swarm
# Define agents
research_agent = Agent(
name="Researcher",
instructions="You research topics thoroughly.",
functions=[search_web, read_documents]
)
writer_agent = Agent(
name="Writer",
instructions="You write clear, engaging content.",
functions=[draft_content, edit_content]
)
# Create swarm and run
client = Swarm()
response = client.run(
agent=research_agent,
messages=[{"role": "user", "content": "Research AI trends"}],
context_variables={"topic": "AI trends 2024"}
)
# Hand off to writer
response = client.run(
agent=writer_agent,
messages=response.messages,
context_variables={"research_data": response.context_variables}
)Key Features:
- Minimal boilerplate code
- Easy agent handoffs
- Shared context variables
- Function calling support
- Quick prototyping
Use When: Building simple multi-agent systems, prototyping ideas, or learning multi-agent concepts.
Pydantic AI emphasizes type safety and data validation using Pydantic models.
from pydantic import BaseModel
from pydantic_ai import Agent, tool
# Define typed data models
class UserData(BaseModel):
name: str
age: int
email: str
class AnalysisResult(BaseModel):
summary: str
confidence: float
recommendations: list[str]
# Define tool with type hints
@tool
def analyze_user(data: UserData) -> AnalysisResult:
# Pydantic validates input automatically
return AnalysisResult(
summary=f"Analysis for {data.name}",
confidence=0.85,
recommendations=["rec1", "rec2"]
)
# Create typed agent
agent = Agent(
name="analyzer",
tools=[analyze_user],
model="gpt-4"
)
# Run with automatic validation
result = agent.run("Analyze user: John, age 30, john@example.com")Key Features:
- Full type safety with Pydantic
- Automatic input/output validation
- IDE autocomplete support
- Clear data contracts between agents
- Reduces runtime errors
Use When: Type safety is critical, working in large teams, or building production systems where data validation is essential.
Framework Selection Guide
Choose LangGraph if you need:
- Complex conditional workflows
- State persistence across restarts
- Visual workflow representation
- Long-running processes with checkpoints
- Human-in-the-loop capabilities
Choose Swarm if you need:
- Quick prototype development
- Simple agent handoffs
- Minimal learning curve
- Lightweight deployment
- Educational projects
Choose Pydantic AI if you need:
- Strong type safety
- Data validation guarantees
- Large team development
- IDE support and autocomplete
- Clear data contracts