πŸ”§ Multi-Agent Implementation Patterns Library

Your tactical reference for building production-ready multi-agent systems. Real code, real patterns, real solutions.

πŸ“ System Prompt Templates

Well-crafted system prompts are the agent's "job description." They define identity, purpose, boundaries, and capabilities.

The Four-Part Prompt Structure

Template Formula

# Part 1: Identity (Who you are) You are a [Role] Agent. # Part 2: Purpose (What you do) Your sole purpose is to [specific task/responsibility]. # Part 3: Boundaries (What you don't do) You do NOT [actions outside scope]. # Part 4: Capabilities (What tools you have) You ONLY have access to: [list of tools].

Agent Types & Templates

πŸ” Data Validation Agent β–Ό
You are a Data Validation Agent. Your sole purpose is to check incoming user data against predefined schemas and return validation results. You verify data types, required fields, format constraints, and business rules. You do NOT: - Perform calculations or transformations on the data - Interact with external APIs or databases - Make decisions about what to do with invalid data - Store or cache validation results You ONLY have access to the validate_schema tool. When you receive data: 1. Identify the appropriate schema 2. Validate against all constraints 3. Return clear validation results with specific error messages 4. Do not attempt to fix or modify the data
βœ“ Best Practice: Always specify what the agent should NOT do. This prevents scope creep and unexpected behavior.
🎯 Task Routing Agent β–Ό
You are a Task Routing Agent. Your purpose is to analyze incoming requests and route them to the most appropriate specialist agent based on content, urgency, and complexity. You do NOT: - Execute tasks yourself - Modify the original request - Make decisions outside of routing logic - Interact directly with external systems You have access to these tools: - analyze_request_type: Categorizes request content - check_agent_availability: Verifies specialist agent status - route_to_agent: Sends request to chosen agent Routing logic: - Technical questions β†’ Technical Support Agent - Account issues β†’ Account Management Agent - Billing questions β†’ Billing Agent - Urgent issues (keywords: "urgent", "emergency") β†’ Priority queue - Complex issues requiring multiple steps β†’ Orchestrator Agent
πŸ”¬ Research Agent β–Ό
You are a Research Agent specialized in gathering and synthesizing information. Your purpose is to: 1. Query multiple information sources 2. Extract relevant facts and data 3. Synthesize findings into coherent summaries 4. Cite sources for all claims You have access to: - search_knowledge_base: Query internal documents - search_web: Search external sources - extract_data: Parse structured data from sources - summarize_findings: Create synthesis reports Research process: 1. Analyze the research question 2. Identify key information needs 3. Query multiple sources in parallel 4. Cross-reference findings 5. Synthesize into a clear answer with citations You do NOT: - Make claims without source verification - Generate fictional information - Make business decisions based on research - Modify or store source documents
🎭 Orchestrator Agent β–Ό
You are an Orchestrator Agent managing a team of specialist agents. Your purpose is to: 1. Break down complex requests into subtasks 2. Delegate subtasks to appropriate specialist agents 3. Coordinate the sequence and timing of agent activities 4. Synthesize results from multiple agents 5. Handle errors and retry failed operations You manage these specialist agents: - DataAgent: Data retrieval and validation - AnalysisAgent: Data analysis and calculations - ReportAgent: Report generation - NotificationAgent: User communications Orchestration patterns: - Sequential: Execute tasks one after another (use when order matters) - Parallel: Execute independent tasks simultaneously (use for speed) - Conditional: Branch based on results (use for decision trees) Tools available: - delegate_to_agent: Send task to specialist - wait_for_completion: Block until agent finishes - gather_results: Collect outputs from multiple agents - handle_failure: Implement fallback logic Always maintain workflow state and provide clear status updates.

Prompt Engineering Best Practices

βœ“ Be explicit and unambiguous: Use clear, definitive language. Avoid "maybe", "sometimes", "usually".
βœ“ Define clear boundaries: Explicitly state what the agent does NOT do to prevent scope creep.
βœ“ Specify expected behaviors: Include step-by-step processes when relevant.
βœ“ Include examples: Show input/output examples for complex scenarios.
βœ“ Keep prompts focused: One clear purpose per agent. Don't overload with responsibilities.
βœ“ Test and refine: Iterate based on actual agent behavior in production.

πŸ”§ Tool Design Patterns

Tools give agents the ability to perform actions beyond language processing. Well-designed tools are robust, validated, and logged.

Tool Anatomy

Complete Tool Template

from typing import Optional import logging def tool_name( param1: str, param2: int, optional_param: Optional[str] = None ) -> dict: """ Clear description of what this tool does. Args: param1: Description of parameter 1 param2: Description of parameter 2 optional_param: Description of optional parameter Returns: dict with 'success' (bool) and 'data' or 'error' fields Example: result = tool_name("example", 42) # Returns: {"success": True, "data": {...}} """ try: # 1. Validate inputs if not param1 or param2 < 0: return { "success": False, "error": "Invalid input parameters" } # 2. Log the operation logging.info(f"Executing tool_name with param1={param1}") # 3. Perform the operation result = perform_operation(param1, param2) # 4. Return structured response return { "success": True, "data": result } except Exception as e: # 5. Handle errors gracefully logging.error(f"Tool error: {str(e)}") return { "success": False, "error": f"Operation failed: {str(e)}" }

Common Tool Patterns

πŸ’Ύ Database Query Tool β–Ό
import sqlite3 from typing import List, Dict def query_database( query: str, params: tuple = () ) -> Dict[str, any]: """ Execute a SELECT query on the database. Args: query: SQL SELECT query (validated for safety) params: Query parameters for safe parameterization Returns: dict with results or error """ try: # Validation: Only allow SELECT statements if not query.strip().upper().startswith('SELECT'): return { "success": False, "error": "Only SELECT queries allowed" } # Execute with connection management with sqlite3.connect('database.db') as conn: conn.row_factory = sqlite3.Row cursor = conn.execute(query, params) # Convert to list of dicts results = [dict(row) for row in cursor.fetchall()] return { "success": True, "data": results, "count": len(results) } except sqlite3.Error as e: logging.error(f"Database error: {e}") return { "success": False, "error": "Database query failed" }
⚠ Security Warning: Always validate and parameterize queries to prevent SQL injection. Never concatenate user input directly into queries.
🌐 API Client Tool β–Ό
import requests from typing import Dict, Optional def call_external_api( endpoint: str, method: str = "GET", data: Optional[Dict] = None, timeout: int = 30 ) -> Dict[str, any]: """ Make HTTP request to external API. Args: endpoint: API endpoint URL method: HTTP method (GET, POST, PUT, DELETE) data: Request body data timeout: Request timeout in seconds Returns: dict with response data or error """ try: # Validate endpoint if not endpoint.startswith('https://'): return { "success": False, "error": "Only HTTPS endpoints allowed" } # Make request with timeout response = requests.request( method=method.upper(), url=endpoint, json=data, timeout=timeout, headers={"User-Agent": "MultiAgentSystem/1.0"} ) # Check status code response.raise_for_status() return { "success": True, "data": response.json(), "status_code": response.status_code } except requests.Timeout: return { "success": False, "error": "Request timed out" } except requests.HTTPError as e: return { "success": False, "error": f"HTTP error: {e.response.status_code}" } except Exception as e: return { "success": False, "error": f"Request failed: {str(e)}" }
βœ… Data Validation Tool β–Ό
from pydantic import BaseModel, ValidationError from typing import Dict, Any def validate_data( data: Dict[str, Any], schema: BaseModel ) -> Dict[str, any]: """ Validate data against a Pydantic schema. Args: data: Dictionary of data to validate schema: Pydantic model class Returns: dict with validation results """ try: # Attempt validation validated = schema(**data) return { "success": True, "data": validated.dict(), "message": "Validation passed" } except ValidationError as e: # Extract validation errors errors = [] for error in e.errors(): errors.append({ "field": ".".join(str(x) for x in error['loc']), "message": error['msg'], "type": error['type'] }) return { "success": False, "errors": errors, "message": "Validation failed" }
βœ“ Best Practice: Use Pydantic for structured validation. It provides clear error messages and automatic type coercion.

Tool Design Principles

βœ“ Single Responsibility: Each tool does one thing well. Don't create Swiss Army knife tools.
βœ“ Never Crash: Always return a structured response, even on error. Use try-except blocks.
βœ“ Validate Inputs: Check parameters before processing. Fail fast with clear messages.
βœ“ Log Everything: Record tool execution for debugging and auditing.
βœ“ Consistent Interface: Return similar structures across all tools (success/error pattern).

🎯 Orchestration Patterns

Orchestration coordinates multiple agents to accomplish complex workflows. Different patterns suit different scenarios.

The Four Core Patterns

1. Sequential Orchestration

Use when order matters

Execute agents one after another, passing results forward. Each step depends on the previous one.

def sequential_workflow(user_request: str) -> dict: """Execute agents in sequence, passing data forward.""" # Step 1: Validate input validation_result = validation_agent.run(user_request) if not validation_result["success"]: return validation_result # Step 2: Process validated data processing_result = processing_agent.run(validation_result["data"]) if not processing_result["success"]: return processing_result # Step 3: Generate final output final_result = output_agent.run(processing_result["data"]) return final_result

Best for: Data pipelines, multi-step validations, document processing workflows

2. Parallel Orchestration

Use for independent tasks

Execute multiple agents simultaneously to improve speed. Tasks don't depend on each other.

import asyncio from typing import List, Dict async def parallel_workflow(user_request: str) -> Dict[str, any]: """Execute multiple agents in parallel.""" # Define tasks to run concurrently tasks = [ research_agent.run_async(user_request), data_agent.run_async(user_request), analytics_agent.run_async(user_request) ] # Wait for all to complete results = await asyncio.gather(*tasks, return_exceptions=True) # Combine results combined = { "research": results[0], "data": results[1], "analytics": results[2] } # Synthesize final output final_result = synthesis_agent.run(combined) return final_result

Best for: Research tasks, multi-source data gathering, independent analyses

3. Conditional Orchestration

Use for branching logic

Route to different agents based on conditions or results. Creates decision trees.

def conditional_workflow(user_request: str) -> dict: """Route based on request classification.""" # Step 1: Classify the request classification = classifier_agent.run(user_request) request_type = classification["type"] # Step 2: Route to appropriate handler if request_type == "technical": result = technical_agent.run(user_request) elif request_type == "billing": result = billing_agent.run(user_request) elif request_type == "account": result = account_agent.run(user_request) else: # Complex request needs orchestrator result = orchestrator_agent.run(user_request) # Step 3: Post-process if needed if result.get("requires_followup"): result = followup_agent.run(result) return result

Best for: Customer support routing, approval workflows, multi-tier escalation

4. Hybrid Orchestration

Use for complex workflows

Combine sequential, parallel, and conditional patterns for sophisticated workflows.

async def hybrid_workflow(user_request: str) -> dict: """Complex workflow combining all patterns.""" # Phase 1: Sequential validation validation = await validation_agent.run_async(user_request) if not validation["success"]: return validation # Phase 2: Parallel research research_tasks = [ internal_search_agent.run_async(validation["data"]), external_search_agent.run_async(validation["data"]), database_agent.run_async(validation["data"]) ] research_results = await asyncio.gather(*research_tasks) # Phase 3: Conditional processing combined_data = combine_results(research_results) if combined_data["confidence"] > 0.8: # High confidence: direct response result = response_agent.run(combined_data) else: # Low confidence: escalate to human result = escalation_agent.run(combined_data) return result

Best for: Automated sales systems, complex decision engines, intelligent routing systems

Orchestrator Implementation

Complete Orchestrator Class Example β–Ό
from typing import Dict, List, Any import logging class WorkflowOrchestrator: """Orchestrates multi-agent workflows with state tracking.""" def __init__(self, agents: Dict[str, Agent]): self.agents = agents self.state = {} self.history = [] def execute_workflow(self, request: str) -> dict: """Execute the complete workflow.""" try: # Initialize state self.state = { "request": request, "status": "in_progress", "steps_completed": [] } # Step 1: Validation (sequential) validation_result = self._run_agent( "validator", request ) if not validation_result["success"]: return self._handle_failure("validation", validation_result) # Step 2: Parallel processing parallel_results = self._run_parallel([ ("researcher", validation_result["data"]), ("analyzer", validation_result["data"]) ]) # Step 3: Synthesis (sequential) final_result = self._run_agent( "synthesizer", parallel_results ) self.state["status"] = "completed" return final_result except Exception as e: return self._handle_failure("orchestration", str(e)) def _run_agent(self, agent_name: str, data: Any) -> dict: """Run a single agent with tracking.""" logging.info(f"Running agent: {agent_name}") agent = self.agents.get(agent_name) result = agent.run(data) # Track execution self.history.append({ "agent": agent_name, "result": result }) self.state["steps_completed"].append(agent_name) return result def _run_parallel(self, tasks: List[tuple]) -> dict: """Run multiple agents in parallel.""" results = {} for agent_name, data in tasks: result = self._run_agent(agent_name, data) results[agent_name] = result return results def _handle_failure(self, stage: str, error: Any) -> dict: """Handle workflow failures.""" logging.error(f"Workflow failed at {stage}: {error}") self.state["status"] = "failed" self.state["error_stage"] = stage return { "success": False, "error": error, "state": self.state }
βœ“ Track State: Maintain workflow state throughout execution for debugging and resume capabilities.
βœ“ Log Everything: Record each agent execution, timing, and results.
βœ“ Handle Partial Failures: Don't let one agent failure crash the entire workflow.

πŸ”€ Routing Patterns

Routing determines which agent handles which request. Choose the right routing strategy for your use case.

Routing Strategies

1. Content-Based Routing

Route by message content

Analyze the request content and route to the most appropriate agent based on keywords, intent, or classification.

def content_based_routing(request: str) -> str: """Route based on request content analysis.""" # Extract keywords request_lower = request.lower() # Define routing rules if any(word in request_lower for word in ['bug', 'error', 'broken', 'not working']): return "technical_support_agent" elif any(word in request_lower for word in ['bill', 'payment', 'charge', 'invoice']): return "billing_agent" elif any(word in request_lower for word in ['account', 'password', 'login', 'access']): return "account_agent" else: # Default to general support return "general_support_agent" # Advanced: Use ML classification def ml_based_routing(request: str) -> str: """Use ML model to classify and route requests.""" # Use a classifier agent classification = classifier_agent.run(request) intent = classification["intent"] confidence = classification["confidence"] if confidence < 0.7: # Low confidence: route to human return "human_agent" return intent_to_agent_mapping[intent]

2. Priority-Based Routing

Route by urgency

Route high-priority requests to faster agents or escalate immediately to humans.

from enum import Enum class Priority(Enum): LOW = 1 MEDIUM = 2 HIGH = 3 URGENT = 4 def priority_based_routing(request: str) -> dict: """Route based on request priority.""" # Determine priority priority = determine_priority(request) if priority == Priority.URGENT: # Urgent: immediate human escalation return { "agent": "human_agent", "queue": "urgent", "sla": "15_minutes" } elif priority == Priority.HIGH: # High: premium agent with fast response return { "agent": "premium_agent", "queue": "high_priority", "sla": "1_hour" } elif priority == Priority.MEDIUM: # Medium: standard agent return { "agent": "standard_agent", "queue": "normal", "sla": "4_hours" } else: # Low: batch processing return { "agent": "batch_agent", "queue": "low_priority", "sla": "24_hours" } def determine_priority(request: str) -> Priority: """Analyze request to determine priority level.""" urgent_keywords = ['urgent', 'emergency', 'critical', 'down'] high_keywords = ['asap', 'important', 'escalate'] request_lower = request.lower() if any(word in request_lower for word in urgent_keywords): return Priority.URGENT elif any(word in request_lower for word in high_keywords): return Priority.HIGH else: return Priority.MEDIUM

3. Round-Robin Routing

Distribute load evenly

Distribute requests evenly across multiple agents to balance load.

class RoundRobinRouter: """Distributes requests evenly across agents.""" def __init__(self, agents: List[str]): self.agents = agents self.current_index = 0 def route(self, request: str) -> str: """Get next agent in rotation.""" agent = self.agents[self.current_index] # Move to next agent self.current_index = (self.current_index + 1) % len(self.agents) return agent # Usage router = RoundRobinRouter([ "agent_1", "agent_2", "agent_3" ]) for request in requests: agent = router.route(request) agent.process(request)

4. Capability-Based Routing

Route by agent capabilities

Match requests to agents based on their specific capabilities and current load.

class CapabilityRouter: """Routes based on agent capabilities and availability.""" def __init__(self): self.agent_registry = { "sql_agent": { "capabilities": ["database", "sql", "queries"], "max_concurrent": 5, "current_load": 2 }, "api_agent": { "capabilities": ["api", "rest", "external"], "max_concurrent": 10, "current_load": 7 }, "analysis_agent": { "capabilities": ["analytics", "statistics", "reporting"], "max_concurrent": 3, "current_load": 1 } } def route(self, required_capability: str) -> Optional[str]: """Find best agent for required capability.""" candidates = [] # Find agents with required capability for agent_name, info in self.agent_registry.items(): if required_capability in info["capabilities"]: # Check if agent has capacity if info["current_load"] < info["max_concurrent"]: load_ratio = info["current_load"] / info["max_concurrent"] candidates.append((agent_name, load_ratio)) if not candidates: return None # No available agents # Select agent with lowest load best_agent = min(candidates, key=lambda x: x[1])[0] # Increment load self.agent_registry[best_agent]["current_load"] += 1 return best_agent def release_agent(self, agent_name: str): """Release agent after task completion.""" if agent_name in self.agent_registry: self.agent_registry[agent_name]["current_load"] -= 1

Routing Best Practices

βœ“ Monitor Agent Load: Track concurrent requests to prevent overload.
βœ“ Have Fallback Options: Define what happens when no agent is available.
βœ“ Log Routing Decisions: Record why each routing decision was made.
⚠ Watch For: Routing loops where agents keep passing requests back and forth.

πŸ’Ύ State Management Patterns

State management determines what information persists across agent interactions and how conflicts are handled.

Types of State

1. Ephemeral (Short-term) State

In-memory, temporary

State that exists only during a single workflow execution. Lost when workflow completes.

class EphemeralState: """Temporary state for single workflow execution.""" def __init__(self): self.data = {} self.created_at = datetime.now() def set(self, key: str, value: Any): """Store temporary data.""" self.data[key] = value def get(self, key: str) -> Any: """Retrieve temporary data.""" return self.data.get(key) def clear(self): """Clear all temporary data.""" self.data = {} # Usage in workflow def process_request(request: str): state = EphemeralState() # Store intermediate results state.set("original_request", request) state.set("validation_passed", True) # Use in subsequent steps if state.get("validation_passed"): result = process_agent.run(request) # State automatically discarded after function returns return result

Best for: Workflow context, intermediate results, temporary calculations

2. Persistent (Long-term) State

Database-backed, permanent

State that survives across multiple workflows and system restarts.

import sqlite3 from typing import Any, Optional class PersistentState: """Long-term state stored in database.""" def __init__(self, db_path: str = "state.db"): self.conn = sqlite3.connect(db_path) self._initialize_db() def _initialize_db(self): """Create state table if not exists.""" self.conn.execute(""" CREATE TABLE IF NOT EXISTS state ( key TEXT PRIMARY KEY, value TEXT, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) self.conn.commit() def set(self, key: str, value: Any): """Store persistent data.""" import json value_json = json.dumps(value) self.conn.execute(""" INSERT OR REPLACE INTO state (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP) """, (key, value_json)) self.conn.commit() def get(self, key: str) -> Optional[Any]: """Retrieve persistent data.""" import json cursor = self.conn.execute( "SELECT value FROM state WHERE key = ?", (key,) ) row = cursor.fetchone() if row: return json.loads(row[0]) return None # Usage for user sessions state = PersistentState() state.set(f"user:{user_id}:preferences", user_preferences) state.set(f"workflow:{workflow_id}:status", "completed")

Best for: User data, workflow history, configuration, audit logs

State Coordination Techniques

πŸ”’ Pessimistic Locking (Database Locks) β–Ό

Lock resources before access to prevent concurrent modifications.

import sqlite3 def update_with_lock(user_id: int, amount: float): """Update account balance with database lock.""" conn = sqlite3.connect("accounts.db") try: # Start transaction conn.execute("BEGIN EXCLUSIVE") # Read current balance (row is now locked) cursor = conn.execute( "SELECT balance FROM accounts WHERE user_id = ?", (user_id,) ) current_balance = cursor.fetchone()[0] # Calculate new balance new_balance = current_balance + amount # Update balance conn.execute( "UPDATE accounts SET balance = ? WHERE user_id = ?", (new_balance, user_id) ) # Commit (releases lock) conn.commit() return {"success": True, "new_balance": new_balance} except Exception as e: conn.rollback() return {"success": False, "error": str(e)} finally: conn.close()
⚠ Trade-off: Pessimistic locking is safe but can create bottlenecks. Use for critical operations like financial transactions.
✨ Optimistic Concurrency Control β–Ό

Allow concurrent access, detect conflicts on write, retry if needed.

def update_with_version_check( user_id: int, new_data: dict, max_retries: int = 3 ): """Update with optimistic concurrency control.""" for attempt in range(max_retries): conn = sqlite3.connect("data.db") try: # Read current version cursor = conn.execute( "SELECT data, version FROM records WHERE user_id = ?", (user_id,) ) row = cursor.fetchone() current_version = row[1] # Attempt update with version check result = conn.execute(""" UPDATE records SET data = ?, version = version + 1 WHERE user_id = ? AND version = ? """, (json.dumps(new_data), user_id, current_version)) conn.commit() # Check if update succeeded if result.rowcount > 0: return {"success": True, "version": current_version + 1} # Version mismatch: someone else updated logging.warning(f"Version conflict, retry {attempt + 1}") except Exception as e: conn.rollback() return {"success": False, "error": str(e)} finally: conn.close() return {"success": False, "error": "Max retries exceeded"}
βœ“ Best for: High-concurrency scenarios where conflicts are rare. More efficient than pessimistic locking.
🎯 Event Sourcing β–Ό

Store all state changes as events, reconstruct current state by replaying events.

class EventStore: """Store and replay state changes as events.""" def __init__(self): self.events = [] def append_event(self, event: dict): """Add new event to the log.""" event["timestamp"] = datetime.now() event["id"] = len(self.events) + 1 self.events.append(event) def get_current_state(self, entity_id: str) -> dict: """Reconstruct current state by replaying events.""" state = {} for event in self.events: if event["entity_id"] == entity_id: # Apply event to state if event["type"] == "created": state = event["data"] elif event["type"] == "updated": state.update(event["data"]) elif event["type"] == "deleted": state = {} return state # Usage store = EventStore() # Record events store.append_event({ "entity_id": "order_123", "type": "created", "data": {"status": "pending", "amount": 100} }) store.append_event({ "entity_id": "order_123", "type": "updated", "data": {"status": "completed"} }) # Reconstruct current state current_state = store.get_current_state("order_123") # Result: {"status": "completed", "amount": 100}
βœ“ Benefits: Complete audit trail, time travel (replay to any point), easy debugging.

State Management Best Practices

βœ“ Choose Appropriate Scope: Use ephemeral state for temporary data, persistent for long-term.
βœ“ Version Your State: Include version numbers to detect conflicts.
βœ“ Validate Before Persisting: Always validate state data before storing.
βœ“ Clean Up Old State: Implement retention policies for ephemeral and temporary state.

⚠️ Error Handling Patterns

Robust error handling prevents cascading failures and maintains system reliability.

Core Error Handling Strategies

1. Retry with Exponential Backoff

For transient failures

Retry failed operations with increasing delays to handle temporary issues.

import time from typing import Callable, Any def retry_with_backoff( func: Callable, max_retries: int = 3, initial_delay: float = 1.0, backoff_factor: float = 2.0 ) -> Any: """ Retry function with exponential backoff. Args: func: Function to retry max_retries: Maximum number of retry attempts initial_delay: Initial delay in seconds backoff_factor: Multiplier for each retry """ delay = initial_delay for attempt in range(max_retries): try: result = func() return {"success": True, "data": result} except Exception as e: if attempt == max_retries - 1: # Last attempt failed logging.error(f"All retries exhausted: {e}") return { "success": False, "error": f"Failed after {max_retries} attempts" } # Wait before retrying logging.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s") time.sleep(delay) delay *= backoff_factor # Usage result = retry_with_backoff( lambda: api_call_that_might_fail(), max_retries=5, initial_delay=1.0, backoff_factor=2.0 )

2. Fallback Pattern

For graceful degradation

Provide alternative paths when primary operation fails.

def get_data_with_fallback(query: str) -> dict: """Try multiple data sources with fallback.""" # Try primary source try: result = primary_database.query(query) return { "success": True, "data": result, "source": "primary" } except Exception as e: logging.warning(f"Primary source failed: {e}") # Fallback to cache try: cached_result = cache.get(query) if cached_result: logging.info("Using cached data") return { "success": True, "data": cached_result, "source": "cache", "warning": "Data may be stale" } except Exception as e: logging.warning(f"Cache failed: {e}") # Fallback to backup database try: result = backup_database.query(query) logging.info("Using backup database") return { "success": True, "data": result, "source": "backup" } except Exception as e: logging.error(f"All sources failed: {e}") # All fallbacks failed return { "success": False, "error": "All data sources unavailable" }

3. Compensation Pattern

For reversing partial failures

Undo completed operations when later steps fail.

def process_order_with_compensation(order_data: dict) -> dict: """Process order with compensation on failure.""" completed_steps = [] try: # Step 1: Reserve inventory inventory_result = reserve_inventory(order_data["items"]) completed_steps.append(("inventory", inventory_result)) # Step 2: Process payment payment_result = process_payment(order_data["payment"]) completed_steps.append(("payment", payment_result)) # Step 3: Create shipment shipment_result = create_shipment(order_data["address"]) completed_steps.append(("shipment", shipment_result)) return {"success": True, "order_id": shipment_result["id"]} except Exception as e: logging.error(f"Order processing failed: {e}") # Compensate completed steps in reverse order for step_name, step_result in reversed(completed_steps): try: if step_name == "inventory": release_inventory(step_result["reservation_id"]) logging.info("Inventory released") elif step_name == "payment": refund_payment(step_result["transaction_id"]) logging.info("Payment refunded") elif step_name == "shipment": cancel_shipment(step_result["shipment_id"]) logging.info("Shipment canceled") except Exception as comp_error: logging.error(f"Compensation failed for {step_name}: {comp_error}") return { "success": False, "error": str(e), "compensated": True }

4. Circuit Breaker Pattern

For preventing cascading failures

Stop calling failing services to allow recovery time.

from enum import Enum from datetime import datetime, timedelta class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failures detected, reject requests HALF_OPEN = "half_open" # Testing if service recovered class CircuitBreaker: """Prevent cascading failures by stopping calls to failing services.""" def __init__( self, failure_threshold: int = 5, timeout_seconds: int = 60 ): self.state = CircuitState.CLOSED self.failure_count = 0 self.failure_threshold = failure_threshold self.timeout_seconds = timeout_seconds self.last_failure_time = None def call(self, func: Callable) -> dict: """Execute function with circuit breaker protection.""" # Check if circuit is open if self.state == CircuitState.OPEN: # Check if timeout expired if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout_seconds): self.state = CircuitState.HALF_OPEN logging.info("Circuit half-open, testing service") else: return { "success": False, "error": "Circuit breaker OPEN" } # Attempt the call try: result = func() self._on_success() return {"success": True, "data": result} except Exception as e: self._on_failure() return {"success": False, "error": str(e)} def _on_success(self): """Reset on successful call.""" self.failure_count = 0 self.state = CircuitState.CLOSED logging.info("Circuit closed") def _on_failure(self): """Track failures and open circuit if threshold reached.""" self.failure_count += 1 self.last_failure_time = datetime.now() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN logging.warning("Circuit breaker OPEN") # Usage breaker = CircuitBreaker(failure_threshold=3, timeout_seconds=30) result = breaker.call(lambda: external_api_call())

5. Human-in-the-Loop Escalation

For complex failures

Escalate to humans when automated recovery isn't possible.

def process_with_escalation(request: dict) -> dict: """Process request with human escalation for failures.""" try: # Attempt automated processing result = automated_agent.process(request) # Check confidence level if result.get("confidence", 0) < 0.7: # Low confidence: escalate to human return escalate_to_human( request, reason="Low confidence", automated_result=result ) return result except Exception as e: # Automated processing failed: escalate return escalate_to_human( request, reason=f"Processing failed: {str(e)}", automated_result=None ) def escalate_to_human( request: dict, reason: str, automated_result: dict = None ) -> dict: """Create escalation ticket for human review.""" ticket = { "id": generate_ticket_id(), "request": request, "escalation_reason": reason, "automated_result": automated_result, "created_at": datetime.now(), "status": "pending_human_review" } # Store in escalation queue escalation_queue.add(ticket) # Notify human operators notify_operators(ticket) logging.info(f"Escalated to human: {ticket['id']}") return { "success": False, "escalated": True, "ticket_id": ticket["id"], "message": "Request escalated for human review" }

Error Handling Best Practices

βœ“ Fail Fast: Detect errors early and handle them immediately.
βœ“ Provide Context: Include enough information to debug (request ID, timestamp, state).
βœ“ Log Comprehensively: Log all errors with full context for post-mortem analysis.
βœ“ Test Failure Paths: Regularly test error handling and recovery mechanisms.
⚠ Never Silence Errors: Always log errors even if you handle them gracefully.

πŸ› οΈ Framework Comparisons

Choose the right framework based on your use case, complexity, and team expertise.

Framework Overview

Framework Best For Learning Curve Flexibility Community
LangGraph Complex workflows, state graphs Steep High Large
Swarm Simple multi-agent, quick prototypes Gentle Medium Medium
Pydantic AI Type-safe agents, data validation Medium Medium Growing

Framework Deep Dives

πŸ•ΈοΈ LangGraph - State Graph Framework β–Ό

LangGraph models agent workflows as state graphs with nodes and edges.

from langgraph.graph import StateGraph from typing import TypedDict # Define state structure class AgentState(TypedDict): messages: list next_agent: str # Create graph workflow = StateGraph(AgentState) # Add agent nodes workflow.add_node("researcher", research_agent) workflow.add_node("analyzer", analysis_agent) workflow.add_node("writer", writer_agent) # Define edges (transitions) workflow.add_edge("researcher", "analyzer") workflow.add_edge("analyzer", "writer") # Set entry point workflow.set_entry_point("researcher") # Compile and run app = workflow.compile() result = app.invoke({"messages": [user_input]})

Key Features:

  • Visual graph representation of workflows
  • Built-in state management and persistence
  • Conditional routing between agents
  • Checkpointing for long-running workflows
  • Time travel debugging

Use When: Building complex, multi-stage workflows with conditional logic and state persistence needs.

🐝 Swarm - Simple Multi-Agent Framework β–Ό

Swarm focuses on simplicity and ease of use for basic multi-agent coordination.

from swarm import Agent, Swarm # Define agents research_agent = Agent( name="Researcher", instructions="You research topics thoroughly.", functions=[search_web, read_documents] ) writer_agent = Agent( name="Writer", instructions="You write clear, engaging content.", functions=[draft_content, edit_content] ) # Create swarm and run client = Swarm() response = client.run( agent=research_agent, messages=[{"role": "user", "content": "Research AI trends"}], context_variables={"topic": "AI trends 2024"} ) # Hand off to writer response = client.run( agent=writer_agent, messages=response.messages, context_variables={"research_data": response.context_variables} )

Key Features:

  • Minimal boilerplate code
  • Easy agent handoffs
  • Shared context variables
  • Function calling support
  • Quick prototyping

Use When: Building simple multi-agent systems, prototyping ideas, or learning multi-agent concepts.

πŸ”’ Pydantic AI - Type-Safe Agent Framework β–Ό

Pydantic AI emphasizes type safety and data validation using Pydantic models.

from pydantic import BaseModel from pydantic_ai import Agent, tool # Define typed data models class UserData(BaseModel): name: str age: int email: str class AnalysisResult(BaseModel): summary: str confidence: float recommendations: list[str] # Define tool with type hints @tool def analyze_user(data: UserData) -> AnalysisResult: # Pydantic validates input automatically return AnalysisResult( summary=f"Analysis for {data.name}", confidence=0.85, recommendations=["rec1", "rec2"] ) # Create typed agent agent = Agent( name="analyzer", tools=[analyze_user], model="gpt-4" ) # Run with automatic validation result = agent.run("Analyze user: John, age 30, john@example.com")

Key Features:

  • Full type safety with Pydantic
  • Automatic input/output validation
  • IDE autocomplete support
  • Clear data contracts between agents
  • Reduces runtime errors

Use When: Type safety is critical, working in large teams, or building production systems where data validation is essential.

Framework Selection Guide

Choose LangGraph if you need:

  • Complex conditional workflows
  • State persistence across restarts
  • Visual workflow representation
  • Long-running processes with checkpoints
  • Human-in-the-loop capabilities

Choose Swarm if you need:

  • Quick prototype development
  • Simple agent handoffs
  • Minimal learning curve
  • Lightweight deployment
  • Educational projects

Choose Pydantic AI if you need:

  • Strong type safety
  • Data validation guarantees
  • Large team development
  • IDE support and autocomplete
  • Clear data contracts