When to Use Multiple Agents
Use multiple agents when:- Different specialized skills needed (research + analysis + writing)
- Parallel workload distribution (process 10 tasks simultaneously)
- Clear role separation (supervisor + workers)
- Single workflow can handle it
- Communication overhead exceeds benefit
- Testing/debugging becomes too complex
Orchestration Approaches
Three main categories: 1. Graph-Based Orchestration- Define workflows as state machines
- Examples: LangGraph
- Define workflows as DAGs
- Examples: OpenAI Agent Builder, Digibee
- Mix agent execution with code logic
The Four Workflow Patterns
1. Sequential Workflows
When to use: Each step depends on previous step’s output. LangGraph Implementation:Copy
from langgraph.graph import StateGraph, END
from typing import TypedDict
class DocumentState(TypedDict):
"""State passed between nodes."""
raw_text: str
entities: dict
summary: str
keywords: list
def extract_entities(state: DocumentState) -> DocumentState:
"""Step 1: Extract named entities."""
response = client.messages.create(
model="claude-sonnet-4-20250514",
messages=[{
"role": "user",
"content": f"Extract people, organizations, locations as JSON: {state['raw_text']}"
}]
)
state["entities"] = json.loads(response.content[0].text)
return state
def generate_summary(state: DocumentState) -> DocumentState:
"""Step 2: Summarize with entity context."""
context = f"Known entities: {state['entities']}"
response = client.messages.create(
model="claude-sonnet-4-20250514",
messages=[{
"role": "user",
"content": f"{context}\n\nSummarize: {state['raw_text']}"
}]
)
state["summary"] = response.content[0].text
return state
def extract_keywords(state: DocumentState) -> DocumentState:
"""Step 3: Extract keywords from summary."""
response = client.messages.create(
model="gpt-4o-mini", # Cheaper for simple task
messages=[{
"role": "user",
"content": f"Extract 5 keywords: {state['summary']}"
}]
)
state["keywords"] = response.content[0].text.split(", ")
return state
# Build workflow
workflow = StateGraph(DocumentState)
workflow.add_node("extract_entities", extract_entities)
workflow.add_node("summarize", generate_summary)
workflow.add_node("extract_keywords", extract_keywords)
# Define sequence
workflow.set_entry_point("extract_entities")
workflow.add_edge("extract_entities", "summarize")
workflow.add_edge("summarize", "extract_keywords")
workflow.add_edge("extract_keywords", END)
app = workflow.compile()
# Execute
result = app.invoke({
"raw_text": "Document text...",
"entities": {},
"summary": "",
"keywords": []
})
Copy
from agents import Agent, Runner
import asyncio
import json
# Agent 1: extract entities as JSON
entities_agent = Agent(
name="Entity Extractor",
model="gpt-4o-mini",
instructions=(
"You extract named entities from user-provided text. "
"Return ONLY a valid JSON object with keys: "
'{"people": string[], "organizations": string[], "locations": string[]}. '
"No prose, no backticks."
),
)
# Agent 2: summarize using entities and return summary + keywords as JSON
summarizer_agent = Agent(
name="Summarizer",
model="gpt-4o-mini",
instructions=(
"You write a concise summary (≤100 words) using the provided entities as context, "
"and extract 5 keywords. Return ONLY a valid JSON object with keys: "
'{"summary": string, "keywords": string[]}. No prose, no backticks.'
),
)
async def run_sequential(raw_text: str) -> dict:
# 1) Run Entity Extractor
entities_run = await Runner.run(entities_agent, raw_text)
# Convert agent output to text and parse JSON
entities_messages = entities_run.to_input_list()
entities_json_str = entities_messages[-1]["content"] if isinstance(entities_messages[-1], dict) else str(entities_messages[-1])
entities = json.loads(entities_json_str)
# 2) Run Summarizer using entities
summarizer_input = (
f"Raw text:\n{raw_text}\n\n"
f"Entities (JSON):\n{json.dumps(entities, ensure_ascii=False)}"
)
summary_run = await Runner.run(summarizer_agent, summarizer_input)
summary_messages = summary_run.to_input_list()
result_json_str = summary_messages[-1]["content"] if isinstance(summary_messages[-1], dict) else str(summary_messages[-1])
result = json.loads(result_json_str)
# Combine into a single structured result (mirrors the sequential pattern)
return {
"entities": entities,
"summary": result.get("summary", ""),
"keywords": result.get("keywords", []),
}
# Example usage
if __name__ == "__main__":
example_text = "Sundar Pichai from Google met with leaders in Paris and New York."
final_result = asyncio.run(run_sequential(example_text))
print(json.dumps(final_result, indent=2, ensure_ascii=False))
2. Parallel Execution
When to use: Independent tasks that can run simultaneously. Pattern:Copy
async def parallel_analysis(documents: list[str]):
"""Analyze multiple documents in parallel."""
async def analyze_doc(doc: str) -> dict:
"""Single document analysis."""
response = await client.messages.create(
model="claude-haiku-3-5-20250305",
messages=[{
"role": "user",
"content": f"Analyze sentiment and extract topics: {doc}"
}]
)
return {"document": doc, "analysis": response.content[0].text}
# Execute all in parallel
tasks = [analyze_doc(doc) for doc in documents]
results = await asyncio.gather(*tasks)
return results
# 10 documents: Sequential = 30s, Parallel = 3s
Copy
async def parallel_with_batching(items: list, batch_size: int = 10):
"""Process in parallel batches."""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
batch_results = await asyncio.gather(*[process(item) for item in batch])
results.extend(batch_results)
return results
# 100 items, batches of 10 = 10 parallel batches
3. Conditional Branching
When to use: Different paths based on data or business logic. Customer Support Routing Example:Copy
class SupportTicketState(TypedDict):
"""Ticket workflow state."""
message: str
category: Literal["technical", "billing", "sales"]
priority: Literal["low", "medium", "high"]
resolution: str
def classify_ticket(state: SupportTicketState) -> SupportTicketState:
"""Classify incoming ticket."""
response = client.messages.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"""Classify this support ticket:
Message: {state['message']}
Return JSON: {{"category": "...", "priority": "..."}}"""
}]
)
result = json.loads(response.content[0].text)
state["category"] = result["category"]
state["priority"] = result["priority"]
return state
def handle_technical(state: SupportTicketState) -> SupportTicketState:
"""Technical support handler."""
# Search knowledge base
articles = search_kb(state["message"])
response = client.messages.create(
model="claude-sonnet-4-20250514",
messages=[{
"role": "user",
"content": f"""Provide technical support.
Issue: {state['message']}
KB Articles: {articles}
Give step-by-step solution."""
}]
)
state["resolution"] = response.content[0].text
return state
def handle_billing(state: SupportTicketState) -> SupportTicketState:
"""Billing support handler."""
# Different tools: access billing system
customer_id = extract_customer_id(state["message"])
billing_info = billing_api.get(customer_id)
response = client.messages.create(
model="claude-sonnet-4-20250514",
messages=[{
"role": "user",
"content": f"""Handle billing inquiry.
Issue: {state['message']}
Billing: {billing_info}
Explain charges clearly."""
}]
)
state["resolution"] = response.content[0].text
return state
def handle_sales(state: SupportTicketState) -> SupportTicketState:
"""Sales inquiry handler."""
products = product_catalog.search(state["message"])
response = client.messages.create(
model="claude-sonnet-4-20250514",
messages=[{
"role": "user",
"content": f"""Handle sales inquiry.
Question: {state['message']}
Products: {products}
Recommend solutions."""
}]
)
state["resolution"] = response.content[0].text
return state
def route_ticket(state: SupportTicketState) -> str:
"""Routing logic (deterministic)."""
# High priority always escalates
if state["priority"] == "high":
return "escalate"
# Route by category
routing = {
"technical": "technical",
"billing": "billing",
"sales": "sales"
}
return routing[state["category"]]
# Build workflow with conditional routing
workflow = StateGraph(SupportTicketState)
workflow.add_node("classify", classify_ticket)
workflow.add_node("technical", handle_technical)
workflow.add_node("billing", handle_billing)
workflow.add_node("sales", handle_sales)
workflow.add_node("escalate", escalate_to_human)
workflow.set_entry_point("classify")
# Conditional routing
workflow.add_conditional_edges(
"classify",
route_ticket, # Function decides next step
{
"technical": "technical",
"billing": "billing",
"sales": "sales",
"escalate": "escalate"
}
)
# All paths end
for node in ["technical", "billing", "sales", "escalate"]:
workflow.add_edge(node, END)
app = workflow.compile()
route_ticket function is pure Python logic - 100% deterministic. No LLM makes routing decisions.
4. Loops and Iteration
When to use: Quality improvement through iteration, retry logic. Code Generation with Validation Loop:Copy
class CodeGenState(TypedDict):
"""Code generation workflow state."""
requirements: str
code: str
test_results: dict
iteration: int
max_iterations: int
success: bool
def generate_code(state: CodeGenState) -> CodeGenState:
"""Generate or refine code."""
if state["iteration"] == 0:
# First attempt
prompt = f"Write Python code for: {state['requirements']}"
else:
# Refinement
prompt = f"""Previous code had issues: {state['test_results']}
Requirements: {state['requirements']}
Previous code: {state['code']}
Fix the issues."""
response = client.messages.create(
model="claude-sonnet-4-20250514",
messages=[{"role": "user", "content": prompt}],
temperature=0.2
)
# Extract code
code = extract_code_block(response.content[0].text)
state["code"] = code
state["iteration"] += 1
return state
def validate_code(state: CodeGenState) -> CodeGenState:
"""Run tests on generated code."""
results = {"syntax_valid": False, "tests_pass": False, "errors": []}
# Syntax check
try:
compile(state["code"], "<string>", "exec")
results["syntax_valid"] = True
except SyntaxError as e:
results["errors"].append(f"Syntax: {e}")
state["test_results"] = results
return state
# Functional tests
try:
exec(state["code"], {})
results["tests_pass"] = True
except Exception as e:
results["errors"].append(f"Runtime: {e}")
state["test_results"] = results
state["success"] = results["syntax_valid"] and results["tests_pass"]
return state
def should_retry(state: CodeGenState) -> str:
"""Decide: retry or finish (deterministic)."""
if state["success"]:
return "finish"
if state["iteration"] >= state["max_iterations"]:
return "finish" # Give up
return "retry"
# Build iterative workflow
workflow = StateGraph(CodeGenState)
workflow.add_node("generate", generate_code)
workflow.add_node("validate", validate_code)
workflow.set_entry_point("generate")
workflow.add_edge("generate", "validate")
# Loop back or finish
workflow.add_conditional_edges(
"validate",
should_retry,
{
"retry": "generate", # Loop back
"finish": END
}
)
app = workflow.compile()
# Execute with iteration limit
result = app.invoke({
"requirements": "Calculate fibonacci efficiently",
"code": "",
"test_results": {},
"iteration": 0,
"max_iterations": 3, # Safety limit
"success": False
})
max_iterations to prevent infinite loops. In production, also add cost/time limits.
Combining Patterns
Real workflows combine multiple patterns: Example: Document Processing with Quality ControlCopy
workflow = StateGraph(DocState)
# Sequential: Parse → Extract → Summarize
workflow.add_node("parse", parse_document)
workflow.add_node("extract", extract_entities)
workflow.add_node("summarize", create_summary)
# Quality check with loop
workflow.add_node("quality_check", validate_quality)
def quality_router(state):
"""Deterministic routing."""
if state["quality_score"] > 0.8:
return "approve"
elif state["iteration"] < 3:
return "retry"
else:
return "human_review"
workflow.add_node("approve", approve_doc)
workflow.add_node("human_review", flag_for_human)
# Build graph
workflow.set_entry_point("parse")
workflow.add_edge("parse", "extract")
workflow.add_edge("extract", "summarize")
workflow.add_edge("summarize", "quality_check")
workflow.add_conditional_edges(
"quality_check",
quality_router,
{
"approve": "approve",
"retry": "parse", # Loop back
"human_review": "human_review"
}
)
workflow.add_edge("approve", END)
workflow.add_edge("human_review", END)
- Sequential: parse → extract → summarize
- Conditional: approve vs retry vs escalate
- Loop: retry up to 3 times
Production Deployment Considerations
1. Observability
Add logging at every node:Copy
def extract_entities(state: DocumentState) -> DocumentState:
"""Step with logging."""
start = time.time()
try:
# Execute step
response = client.messages.create(...)
state["entities"] = parse(response)
# Log success
logger.info("Entity extraction succeeded", extra={
"duration_ms": (time.time() - start) * 1000,
"entities_found": len(state["entities"]),
"document_length": len(state["raw_text"])
})
return state
except Exception as e:
# Log failure
logger.error("Entity extraction failed", extra={
"duration_ms": (time.time() - start) * 1000,
"error": str(e)
})
raise
2. Cost Tracking
Monitor token usage:Copy
class CostTracker:
"""Track workflow costs."""
def __init__(self):
self.total_cost = 0
self.step_costs = {}
def track_step(self, step_name: str, tokens: int, model: str):
"""Track cost per step."""
# Model pricing
prices = {
"claude-sonnet-4": 0.003 / 1000, # per token
"gpt-4o-mini": 0.00015 / 1000
}
cost = tokens * prices[model]
self.total_cost += cost
self.step_costs[step_name] = self.step_costs.get(step_name, 0) + cost
def get_report(self) -> dict:
"""Cost breakdown."""
return {
"total_cost": self.total_cost,
"by_step": self.step_costs,
"most_expensive": max(self.step_costs, key=self.step_costs.get)
}
# Usage
tracker = CostTracker()
def expensive_step(state):
response = client.messages.create(...)
tracker.track_step(
"entity_extraction",
response.usage.total_tokens,
"claude-sonnet-4"
)
return state
3. Timeout Protection
Prevent runaway workflows:Copy
import asyncio
async def execute_with_timeout(workflow, state, timeout_seconds=300):
"""Execute workflow with timeout."""
try:
result = await asyncio.wait_for(
workflow.ainvoke(state),
timeout=timeout_seconds
)
return result
except asyncio.TimeoutError:
logger.error(f"Workflow timeout after {timeout_seconds}s")
return {
"success": False,
"error": "timeout",
"message": "Workflow exceeded time limit"
}
Check Your Understanding
- Design: You need to process 50 documents. Each requires: extract → validate → store. What pattern?
- Debugging: Your workflow sometimes hangs. What’s the most likely cause?