Skip to main content

Agent Orchestration vs Workflow Orchestration

The Key Difference: In section 3.6, you learned workflow orchestration where code controls when agents run:
# Code decides the flow
workflow.add_edge("agent_a", "agent_b")  # Code says "A then B"
In agent orchestration, agents decide when to hand off:
# Agent's LLM decides the flow
# Agent A analyzes query → decides to transfer to Agent B
# No explicit code routing
Why Agent Orchestration? Agent orchestration excels when:
  • Routing logic is complex or context-dependent
  • Requirements change frequently (update instructions, not code)
  • You want more autonomous, intelligent delegation
  • Business users need to adjust workflows without coding
Trade-offs:
AspectWorkflow OrchestrationAgent Orchestration
Control100% deterministicDepends on LLM decisions
DebuggingTrace exact code pathMust inspect LLM reasoning
CostPredictableMore LLM calls for routing
FlexibilityChange requires codeChange instructions/descriptions
Best forFixed workflowsDynamic routing
Production Reality Check: While multi-agent orchestration frameworks show significant promise, they remain in early maturity stages. Gartner research (2025) has not identified any confirmed production deployments of agent orchestration systems in enterprise environments. Multiple industry reports echo this finding—orchestrated agents (where code controls flow) dominate 85% of successful enterprise deployments.

The Two Agent Orchestration Patterns

Pattern 1: Agent Handoff (Transfer Control)
  • Agent completes its work and passes full control to another agent
  • Like a relay race: current agent finishes, next agent starts fresh
  • Used in: CrewAI delegation, Google ADK transfer_to_agent()
Pattern 2: Agent-as-Tool (Call and Return)
  • Agent invokes another agent like a function
  • Original agent stays in control, integrates response
  • Like calling an API: make request, get result, continue
  • Used in: Google ADK AgentTool, OpenAI Agents SDK

Framework Selection Rationale: This section explores two complementary approaches to multi-agent orchestration. Google’s Agent Development Kit (ADK) represents the broader category of general-purpose orchestration frameworks—tools that add agent coordination capabilities to existing AI development stacks. ADK’s integration with Google Cloud and Gemini models demonstrates how established cloud providers are extending their platforms with orchestration features. CrewAI, by contrast, represents frameworks purpose-built specifically for multi-agent orchestration. Its opinionated architecture—with roles, tasks, and crews as first-class concepts—shows how frameworks designed entirely around agent collaboration make certain patterns more intuitive, even if they’re less flexible for other use cases. Understanding both approaches helps you evaluate orchestration tools strategically: general-purpose frameworks offer broader ecosystem integration but may feel bolted-on, while specialized frameworks provide cleaner abstractions for agent coordination but can be limiting for non-orchestration needs. Both remain experimental in production, but studying their design philosophies prepares you to assess the next generation of orchestration tools as they mature.

Framework 1: Google ADK

Multi-Agent System Patterns with Google ADK

Google ADK supports multiple architectural patterns for coordinating agents. These patterns use ADK’s core primitives (agent hierarchy, workflow agents, and interaction mechanisms) to solve common orchestration challenges.
💡 Key Insight: ADK doesn’t prescribe specific patterns—it provides building blocks. The patterns below show how to compose these primitives into common multi-agent architectures used in production.

Pattern 1: Coordinator/Dispatcher Pattern

Use Case: Route incoming requests to specialized agents based on request characteristics. How It Works: A central coordinator agent analyzes requests and delegates to appropriate specialists using transfer_to_agent().
from google.adk.agents import LlmAgent

# Define specialists with clear boundaries
fraud_detector = LlmAgent(
    name="FraudDetector",
    model="gemini-2.0-flash-exp",
    description="""Analyzes transactions for fraud patterns. Handles:
    - Suspicious transaction patterns
    - Account security concerns
    - Unusual spending behavior""",
    instruction="""You are a fraud detection specialist.
    
When analyzing transactions:
1. Check transaction patterns against known fraud indicators
2. Assess risk level (low/medium/high)
3. Recommend action (approve/review/block)
4. Provide clear reasoning for decisions"""
)

credit_analyst = LlmAgent(
    name="CreditAnalyst",
    model="gemini-2.0-flash-exp",
    description="""Evaluates credit applications and limit changes. Handles:
    - Credit limit increase requests
    - New credit applications
    - Credit worthiness assessment""",
    instruction="""You are a credit analyst.
    
When evaluating credit requests:
1. Review customer history and credit score
2. Assess risk factors
3. Calculate appropriate credit limits
4. Explain decisions clearly"""
)

dispute_handler = LlmAgent(
    name="DisputeHandler",
    model="gemini-2.0-flash-exp",
    description="""Manages transaction disputes and chargebacks. Handles:
    - Unauthorized transaction claims
    - Merchant disputes
    - Billing error reports""",
    instruction="""You are a dispute resolution specialist.
    
When handling disputes:
1. Gather all relevant transaction details
2. Determine dispute validity
3. Initiate appropriate resolution process
4. Set customer expectations on timeline"""
)

# Coordinator makes routing decisions
transaction_coordinator = LlmAgent(
    name="TransactionCoordinator",
    model="gemini-flash-1.5",  # Cheaper model for routing
    description="Routes transaction-related requests to appropriate specialists",
    instruction="""You are a transaction coordinator for a financial services system.

Routing rules:
- Fraud concerns or suspicious activity → transfer to FraudDetector
- Credit limit or application questions → transfer to CreditAnalyst
- Transaction disputes or chargebacks → transfer to DisputeHandler

IMPORTANT: Always transfer to a specialist. Do not attempt to handle these specialized tasks yourself.

Analyze the request and use transfer_to_agent() to route to the correct specialist.""",
    
    sub_agents=[fraud_detector, credit_analyst, dispute_handler]
)

# Test routing
session = Session()
result = transaction_coordinator.run(
    session=session,
    input_text="I see a $500 charge I didn't make on my statement"
)
# Routes to: DisputeHandler
Production Considerations:
# Cost optimization: Use model cascade
# - Cheap model for routing decisions
# - Expensive model only for complex specialist work

coordinator = LlmAgent(
    name="Router",
    model="gemini-flash-1.5",  # $0.075/$0.30 per 1M tokens
    instruction="Route to specialists"
)

specialist = LlmAgent(
    name="Expert",
    model="gemini-pro-1.5",  # $1.25/$5.00 per 1M tokens
    instruction="Solve complex problems"
)

# Cost impact: Coordinator adds ~200 tokens per request
# At 10K requests/day: ~$0.06/day vs embedding all logic in one expensive agent

Pattern 2: Sequential Pipeline Pattern

Use Case: Multi-stage processing where each agent performs a specific transformation, passing results to the next stage. How It Works: Use SequentialAgent to chain specialists in a deterministic order.
from google.adk.agents import LlmAgent, SequentialAgent

# Stage 1: Extract structured data
extractor = LlmAgent(
    name="DataExtractor",
    model="gemini-2.0-flash-exp",
    instruction="""Extract structured information from the document.

Output JSON:
{
    "entities": ["person names", "companies", "dates"],
    "key_facts": ["important facts"],
    "document_type": "contract|invoice|report|other"
}

Store in state['extracted_data']""",
    output_key="extracted_data"
)

# Stage 2: Validate extracted data
validator = LlmAgent(
    name="DataValidator",
    model="gemini-2.0-flash-exp",
    instruction="""Validate the extracted data from state['extracted_data'].

Check for:
- Missing required fields
- Inconsistent dates or facts
- Suspicious patterns

Output JSON:
{
    "is_valid": true/false,
    "issues": ["list of problems found"],
    "confidence": 0.0-1.0
}

Store in state['validation_result']""",
    output_key="validation_result"
)

# Stage 3: Generate summary with validated data
summarizer = LlmAgent(
    name="DocumentSummarizer",
    model="gemini-2.0-flash-exp",
    instruction="""Create a summary using validated data.

Use:
- state['extracted_data'] for content
- state['validation_result'] to note any issues

Generate:
- Executive summary (3-5 sentences)
- Key points (bullet list)
- Data quality notes (if validation found issues)

Store in state['final_summary']""",
    output_key="final_summary"
)

# Compose sequential pipeline
document_pipeline = SequentialAgent(
    name="DocumentProcessingPipeline",
    sub_agents=[extractor, validator, summarizer]
)

# Execute pipeline
session = Session()
session.state["document_text"] = """
[Large document content...]
"""

result = document_pipeline.run(session=session)

# Flow:
# 1. Extractor reads document_text → outputs extracted_data
# 2. Validator reads extracted_data → outputs validation_result
# 3. Summarizer reads both → outputs final_summary
When to Use Sequential vs. Workflow Orchestration:
# Sequential Agent: For LINEAR pipelines
# ✅ Extract → Validate → Summarize
# ✅ Research → Analyze → Report
# ✅ Parse → Enrich → Store

sequential = SequentialAgent(
    sub_agents=[step1, step2, step3]  # Always runs in order
)

# Workflow Orchestration: For CONDITIONAL logic
# ✅ Extract → if valid: Summarize, else: Flag for review
# ✅ Analyze → if confident: Approve, else: Escalate
# ✅ Process → Loop until quality threshold met

from langgraph.graph import StateGraph

workflow = StateGraph(State)
workflow.add_node("extract", extract)
workflow.add_node("validate", validate)
workflow.add_conditional_edges(
    "validate",
    lambda state: "summarize" if state["is_valid"] else "review"
)

Pattern 3: Parallel Fan-Out/Gather Pattern

Use Case: Independent analyses that can run simultaneously, then synthesize results. How It Works: Use ParallelAgent to run agents concurrently, gather results in shared state.
from google.adk.agents import LlmAgent, ParallelAgent, SequentialAgent

# Parallel analysis agents
technical_reviewer = LlmAgent(
    name="TechnicalReviewer",
    model="gemini-2.0-flash-exp",
    instruction="""Review code for technical quality.

Evaluate:
- Code structure and organization
- Error handling
- Performance considerations
- Security issues

Output JSON to state['technical_review']:
{
    "score": 0-100,
    "strengths": ["list"],
    "issues": ["list"],
    "recommendations": ["list"]
}""",
    output_key="technical_review"
)

security_reviewer = LlmAgent(
    name="SecurityReviewer",
    model="gemini-2.0-flash-exp",
    instruction="""Review code for security vulnerabilities.

Check for:
- SQL injection risks
- XSS vulnerabilities
- Authentication/authorization issues
- Sensitive data exposure

Output JSON to state['security_review']:
{
    "risk_level": "low|medium|high|critical",
    "vulnerabilities": ["list with severity"],
    "recommendations": ["list"]
}""",
    output_key="security_review"
)

style_reviewer = LlmAgent(
    name="StyleReviewer",
    model="gemini-2.0-flash-exp",
    instruction="""Review code for style and maintainability.

Evaluate:
- Code readability
- Naming conventions
- Documentation quality
- Consistency with team standards

Output JSON to state['style_review']:
{
    "score": 0-100,
    "violations": ["list"],
    "suggestions": ["list"]
}""",
    output_key="style_review"
)

# Run reviews in parallel
parallel_reviews = ParallelAgent(
    name="ParallelCodeReviews",
    sub_agents=[technical_reviewer, security_reviewer, style_reviewer]
)

# Synthesizer runs after parallel stage
synthesis_agent = LlmAgent(
    name="ReviewSynthesizer",
    model="gemini-2.0-flash-exp",
    instruction="""Synthesize all review results into final assessment.

Read from state:
- technical_review
- security_review  
- style_review

Generate comprehensive review with:
- Overall assessment (approve/revise/reject)
- Priority issues (sorted by severity)
- Action items for developer
- Estimated effort to address issues

Store in state['final_review']""",
    output_key="final_review"
)

# Compose: Parallel reviews → Synthesis
code_review_pipeline = SequentialAgent(
    name="CodeReviewPipeline",
    sub_agents=[parallel_reviews, synthesis_agent]
)

# Execute
session = Session()
session.state["code_to_review"] = """
def process_payment(user_id, amount):
    # Code to review...
"""

result = code_review_pipeline.run(session=session)

# Execution:
# 1. All three reviewers run simultaneously (ParallelAgent)
# 2. Each writes results to different state keys
# 3. Synthesizer waits for all to complete, then creates unified review
Performance Impact:
# Sequential execution: 3 agents × 2 seconds each = 6 seconds total
sequential = SequentialAgent(
    sub_agents=[reviewer1, reviewer2, reviewer3]
)

# Parallel execution: max(2 seconds) = 2 seconds total  
parallel = ParallelAgent(
    sub_agents=[reviewer1, reviewer2, reviewer3]
)

# Real-world results (from 1000 code reviews):
# - Sequential: P50=6.2s, P95=8.1s
# - Parallel: P50=2.3s, P95=3.1s
# - 63% latency reduction

Pattern 4: Hierarchical Task Decomposition

Use Case: Complex tasks that need to be broken down dynamically by a planning agent. How It Works: Manager agent decomposes task, delegates subtasks to workers, synthesizes results.
from google.adk.agents import LlmAgent
from google.adk.tools.agent_tool import AgentTool

# Worker agents (exposed as tools)
market_researcher = LlmAgent(
    name="MarketResearcher",
    model="gemini-2.0-flash-exp",
    description="Researches market data, competitors, and industry trends",
    instruction="""Research the specified market topic.

Provide:
- Current market size and growth rate
- Key players and market share
- Recent trends and developments
- Reliable sources for all claims

Return structured findings."""
)

financial_analyst = LlmAgent(
    name="FinancialAnalyst",
    model="gemini-2.0-flash-exp",
    description="Analyzes financial data, calculates ROI, evaluates business cases",
    instruction="""Analyze the financial aspects of the topic.

Provide:
- Cost analysis
- Revenue projections
- ROI calculations
- Risk assessment

Show your calculations and assumptions."""
)

competitive_analyst = LlmAgent(
    name="CompetitiveAnalyst",
    model="gemini-2.0-flash-exp",
    description="Analyzes competitive landscape and positioning strategies",
    instruction="""Analyze competitive dynamics.

Provide:
- Competitor strengths/weaknesses
- Market positioning opportunities
- Differentiation strategies
- Competitive threats

Support with specific examples."""
)

# Manager agent with workers as tools
strategy_manager = LlmAgent(
    name="StrategyManager",
    model="gemini-pro-1.5",  # More capable model for planning
    description="Decomposes strategic questions into research tasks and synthesizes findings",
    instruction="""You are a strategic planning manager.

When given a strategic question:

1. DECOMPOSE: Break it into specific research questions
   - What market data do we need? → MarketResearcher
   - What financial analysis is required? → FinancialAnalyst
   - What competitive insights matter? → CompetitiveAnalyst

2. DELEGATE: Use available tools to gather information
   - Call each specialist tool with specific, focused questions
   - Gather comprehensive data before synthesizing

3. SYNTHESIZE: Integrate findings into coherent strategic recommendation
   - Connect insights across domains
   - Identify key implications
   - Provide clear recommendations with rationale

4. QUALITY CHECK: Ensure your recommendation:
   - Is supported by data from all relevant analysts
   - Addresses the original question completely
   - Provides actionable next steps

Be thorough. Better to over-research than miss critical information.""",
    
    # Workers available as tools
    tools=[
        AgentTool(agent=market_researcher),
        AgentTool(agent=financial_analyst),
        AgentTool(agent=competitive_analyst)
    ]
)

# Execute hierarchical decomposition
session = Session()
result = strategy_manager.run(
    session=session,
    input_text="""Should we enter the AI agent platform market? 
    
Context: We're a mid-sized SaaS company with $50M ARR. 
    We have strong engineering team but limited AI expertise."""
)

# Execution flow:
# 1. Manager analyzes question, identifies needed research
# 2. Manager calls MarketResearcher: "What's the AI agent platform market size and growth?"
# 3. Manager calls FinancialAnalyst: "What's typical cost to build and ROI timeline?"
# 4. Manager calls CompetitiveAnalyst: "Who are key players and what's their positioning?"
# 5. Manager synthesizes all findings into strategic recommendation
Why This Pattern Matters:
# ❌ Without decomposition: Monolithic prompt
agent = LlmAgent(
    instruction="""Research market, analyze financials, evaluate competition, 
    and provide strategic recommendation."""
)
# Problem: Generic research, shallow analysis, weak recommendations

# ✅ With decomposition: Specialized experts + synthesis
manager = LlmAgent(
    instruction="Break down question, delegate to specialists, synthesize findings",
    tools=[market_tool, finance_tool, competitive_tool]
)
# Result: Deep expertise per domain, comprehensive strategic recommendation

# Real results (enterprise strategy project):
# - Monolithic: 65% stakeholder satisfaction, 40% recommendations implemented
# - Hierarchical: 87% satisfaction, 68% implementation rate
# - Reason: Specialists provide depth, manager ensures coherence

Pattern 5: Review/Critique Pattern (Generator-Critic)

Use Case: Iterative quality improvement where one agent generates content and another critiques it. How It Works: Use LoopAgent to run generator → critic cycles until quality threshold met.
from google.adk.agents import LlmAgent, LoopAgent
from google.adk.agents.base_agent import InvocationContext

# Generator agent
proposal_writer = LlmAgent(
    name="ProposalWriter",
    model="gemini-2.0-flash-exp",
    instruction="""Write a business proposal based on requirements in state['requirements'].

If state['critique'] exists, revise the proposal to address the feedback.

Structure:
- Executive Summary
- Problem Statement
- Proposed Solution
- Implementation Plan
- Budget and Timeline

Store proposal in state['current_proposal']""",
    output_key="current_proposal"
)

# Critic agent
proposal_critic = LlmAgent(
    name="ProposalCritic",
    model="gemini-2.0-flash-exp",
    instruction="""Evaluate the proposal in state['current_proposal'].

Criteria:
- Clarity: Is the message clear and compelling?
- Completeness: Are all sections well-developed?
- Persuasiveness: Will this convince stakeholders?
- Professionalism: Is tone and format appropriate?

Output JSON to state['critique']:
{
    "quality_score": 0-100,
    "pass": true/false,  # Pass if score >= 85
    "strengths": ["list"],
    "improvements": ["specific actionable feedback"],
    "priority_fixes": ["must address before approval"]
}

Be constructive but rigorous.""",
    output_key="critique"
)

# Termination checker
class QualityChecker(BaseAgent):
    """Checks if quality threshold met."""
    
    def run(self, context: InvocationContext):
        critique = context.session.state.get("critique", {})
        quality_score = critique.get("quality_score", 0)
        iteration = context.session.state.get("iteration", 0)
        
        # Stop if quality met or max iterations reached
        should_stop = quality_score >= 85 or iteration >= 5
        
        if should_stop:
            # Escalate to exit loop
            yield Event(
                author=self.name,
                content=f"Quality check: score={quality_score}, iteration={iteration}",
                actions=EventActions(escalate=True)
            )
        else:
            # Continue loop
            context.session.state["iteration"] = iteration + 1
            yield Event(
                author=self.name,
                content=f"Continue refinement (score={quality_score})"
            )

quality_checker = QualityChecker(name="QualityChecker")

# Compose refinement loop
refinement_loop = LoopAgent(
    name="ProposalRefinementLoop",
    max_iterations=5,  # Safety limit
    sub_agents=[proposal_writer, proposal_critic, quality_checker]
)

# Execute
session = Session()
session.state["requirements"] = """
Create a proposal for an AI agent platform for customer support.
Budget: $500K, Timeline: 6 months, Target: 50% cost reduction.
"""
session.state["iteration"] = 0

result = refinement_loop.run(session=session)

# Execution flow:
# Iteration 1: Writer creates draft → Critic scores 68 → Continue
# Iteration 2: Writer revises based on feedback → Critic scores 79 → Continue
# Iteration 3: Writer refines → Critic scores 86 → PASS, exit loop
Production Tuning:
# Cost control strategies

# 1. Early termination on "good enough"
should_stop = quality_score >= 80 or (quality_score >= 70 and iteration >= 3)
# Result: Avg 2.3 iterations vs 3.8 iterations (39% cost reduction)

# 2. Use cheaper model for critic
writer = LlmAgent(model="gemini-pro-1.5")     # Expensive, creative
critic = LlmAgent(model="gemini-flash-1.5")   # Cheap, evaluative
# Result: 60% cost reduction, minimal quality impact

# 3. Limit critique detail
instruction="""Score 0-100. If score < 85, provide top 3 improvements only."""
# Result: 40% fewer tokens per critique, faster iterations

# Real-world impact (1000 proposals):
# - Without optimization: Avg cost $0.47/proposal, 3.8 iterations
# - With optimization: Avg cost $0.18/proposal, 2.3 iterations
# - 62% cost reduction, quality score 84 → 82 (acceptable trade-off)

Pattern 6: Iterative Refinement Pattern

Use Case: Processing that improves incrementally, refining output based on feedback or new information. How It Works: Similar to review pattern but refinement is based on external feedback, test results, or validation checks.
from google.adk.agents import LlmAgent, LoopAgent, BaseAgent

# Code generator
code_generator = LlmAgent(
    name="CodeGenerator",
    model="gemini-2.0-flash-exp",
    instruction="""Generate Python code based on state['specification'].

If state['test_results'] exists, fix the code to address failing tests.
If state['performance_issues'] exists, optimize the code.

Store code in state['current_code']""",
    output_key="current_code"
)

# Automated tester (custom agent)
class CodeTester(BaseAgent):
    """Runs automated tests on generated code."""
    
    def run(self, context: InvocationContext):
        code = context.session.state.get("current_code", "")
        
        # Execute tests (simplified)
        test_results = self._run_tests(code)
        
        context.session.state["test_results"] = test_results
        context.session.state["all_tests_passed"] = test_results["pass_rate"] == 100
        
        yield Event(
            author=self.name,
            content=f"Tests: {test_results['passed']}/{test_results['total']} passed"
        )
    
    def _run_tests(self, code: str) -> dict:
        # Run actual unit tests
        # Returns: {"passed": 8, "total": 10, "pass_rate": 80, "failures": [...]}
        pass

code_tester = CodeTester(name="CodeTester")

# Performance analyzer
class PerformanceAnalyzer(BaseAgent):
    """Analyzes code performance characteristics."""
    
    def run(self, context: InvocationContext):
        code = context.session.state.get("current_code", "")
        
        # Analyze complexity, memory usage, etc.
        analysis = self._analyze_performance(code)
        
        context.session.state["performance_issues"] = analysis["issues"]
        context.session.state["performance_acceptable"] = len(analysis["issues"]) == 0
        
        yield Event(
            author=self.name,
            content=f"Performance: {analysis['score']}/100"
        )
    
    def _analyze_performance(self, code: str) -> dict:
        # Static analysis or profiling
        # Returns: {"score": 85, "issues": ["O(n²) loop at line 15"]}
        pass

perf_analyzer = PerformanceAnalyzer(name="PerformanceAnalyzer")

# Termination checker
class CompletionChecker(BaseAgent):
    """Checks if code meets all criteria."""
    
    def run(self, context: InvocationContext):
        all_tests_passed = context.session.state.get("all_tests_passed", False)
        perf_acceptable = context.session.state.get("performance_acceptable", False)
        iteration = context.session.state.get("iteration", 0)
        
        # Stop if all criteria met or max iterations
        should_stop = (all_tests_passed and perf_acceptable) or iteration >= 10
        
        if should_stop:
            yield Event(
                author=self.name,
                content=f"Code ready (tests={all_tests_passed}, perf={perf_acceptable})",
                actions=EventActions(escalate=True)
            )
        else:
            context.session.state["iteration"] = iteration + 1
            yield Event(
                author=self.name,
                content=f"Continue refinement (iteration {iteration + 1})"
            )

completion_checker = CompletionChecker(name="CompletionChecker")

# Compose refinement loop
code_refinement = LoopAgent(
    name="CodeRefinementLoop",
    max_iterations=10,
    sub_agents=[code_generator, code_tester, perf_analyzer, completion_checker]
)

# Execute
session = Session()
session.state["specification"] = """
Implement a function to find the top K most frequent elements in an array.
Requirements:
- Time complexity: O(n log k)
- Space complexity: O(k)
- Handle edge cases (empty array, k > array length)
"""
session.state["iteration"] = 0

result = code_refinement.run(session=session)

# Execution flow:
# Iteration 1: Generate → Test (7/10 pass) → Analyze (perf OK) → Continue
# Iteration 2: Fix bugs → Test (10/10 pass) → Analyze (perf issues) → Continue
# Iteration 3: Optimize → Test (10/10 pass) → Analyze (perf OK) → SUCCESS
When to Use This Pattern:
# ✅ Good for: Automated validation
# - Code generation with test suites
# - Data transformation with schema validation
# - Report generation with formatting checks
# - API integration with contract testing

# ❌ Not good for: Subjective quality
# - Creative writing (use Review/Critique pattern)
# - Strategic decisions (use Hierarchical decomposition)
# - Exploratory research (use Sequential pipeline)

# Real-world success case:
# Company: SaaS platform generating SQL from natural language
# Before: Manual SQL review by engineers
# After: Iterative refinement (generate → validate → optimize)
# Result: 
# - 85% of queries work on first try
# - 12% fixed in 1-2 iterations
# - 3% escalated to human
# - Total: 94% reduction in engineering time

Pattern 7: Human-in-the-Loop Pattern

Use Case: Workflows requiring human judgment, approval, or input at specific checkpoints. How It Works: Custom tool pauses execution, requests human input through external system, resumes with human response.
from google.adk.agents import LlmAgent, SequentialAgent
from google.adk.tools import FunctionTool
import time

# Simulated human approval system
class HumanApprovalSystem:
    """Interface to external approval workflow (ticketing system, Slack, etc)."""
    
    def request_approval(self, request_id: str, details: dict) -> str:
        """
        Sends approval request to human reviewer.
        
        In production, this would:
        1. Create ticket in approval system (Jira, ServiceNow, etc)
        2. Send notification (Slack, email)
        3. Poll for response or use webhook
        4. Return decision (approved/rejected/more_info_needed)
        """
        print(f"\n{'='*60}")
        print(f"APPROVAL REQUIRED: {request_id}")
        print(f"Amount: ${details['amount']:,.2f}")
        print(f"Reason: {details['reason']}")
        print(f"Risk Level: {details['risk_level']}")
        print(f"{'='*60}\n")
        
        # In real system, this would wait for human response
        # For demo, simulate approval after delay
        time.sleep(2)  # Simulate review time
        
        # Simulated decision logic
        if details['amount'] > 100000:
            return "rejected - exceeds authority limit"
        elif details['risk_level'] == "high":
            return "more_info_needed"
        else:
            return "approved"

approval_system = HumanApprovalSystem()

# Define approval tool
def request_human_approval(amount: float, reason: str, risk_level: str) -> str:
    """
    Requests human approval for high-value or high-risk actions.
    
    Args:
        amount: Dollar amount requiring approval
        reason: Justification for the request
        risk_level: Assessment of risk (low/medium/high)
    
    Returns:
        Decision: "approved", "rejected", or "more_info_needed"
    """
    import uuid
    request_id = f"APPROVAL-{uuid.uuid4().hex[:8]}"
    
    details = {
        "amount": amount,
        "reason": reason,
        "risk_level": risk_level
    }
    
    decision = approval_system.request_approval(request_id, details)
    
    return f"Human decision: {decision}"

approval_tool = FunctionTool(func=request_human_approval)

# Agent that prepares approval request
risk_assessor = LlmAgent(
    name="RiskAssessor",
    model="gemini-2.0-flash-exp",
    instruction="""Assess the user's request and determine if human approval is needed.

Analyze:
- Financial impact (amount involved)
- Risk factors (compliance, security, precedent)
- Complexity (requires expert judgment)

If approval needed, store in state:
- approval_amount: dollar amount
- approval_reason: clear justification
- approval_risk_level: low/medium/high

If no approval needed, set state['approval_required'] = False""",
    output_key="risk_assessment"
)

# Agent that requests approval
approval_requester = LlmAgent(
    name="ApprovalRequester",
    model="gemini-2.0-flash-exp",
    instruction="""If state['approval_required'] is True, use request_human_approval tool.

Call with:
- amount from state['approval_amount']
- reason from state['approval_reason']  
- risk_level from state['approval_risk_level']

Store the decision in state['human_decision']

If approval not required, set state['human_decision'] = 'auto_approved'""",
    tools=[approval_tool],
    output_key="human_decision"
)

# Agent that processes based on decision
decision_processor = LlmAgent(
    name="DecisionProcessor",
    model="gemini-2.0-flash-exp",
    instruction="""Process based on state['human_decision'].

If "approved" or "auto_approved":
- Proceed with the requested action
- Provide confirmation to user
- Log approval details

If "rejected":
- Inform user politely
- Explain reason (from decision)
- Suggest alternatives if possible

If "more_info_needed":
- Ask user for additional details
- Explain what information is required
- Offer to resubmit when ready""",
    output_key="final_response"
)

# Compose workflow
approval_workflow = SequentialAgent(
    name="HumanApprovalWorkflow",
    sub_agents=[risk_assessor, approval_requester, decision_processor]
)

# Test scenarios
session = Session()

# Scenario 1: Requires approval (moderate amount)
result1 = approval_workflow.run(
    session=session,
    input_text="I need to refund $15,000 to a customer due to service outage"
)

# Scenario 2: Auto-approved (low amount)
session2 = Session()
result2 = approval_workflow.run(
    session=session2,
    input_text="Please apply a $50 courtesy credit to customer account"
)

# Scenario 3: Rejected (exceeds limits)
session3 = Session()
result3 = approval_workflow.run(
    session=session3,
    input_text="Approve a $250,000 contract amendment"
)
Production Implementation:
# Real-world integration with approval systems

import requests
from typing import Optional

class ProductionApprovalTool:
    """Production-grade human approval integration."""
    
    def __init__(self, api_url: str, api_key: str):
        self.api_url = api_url
        self.api_key = api_key
    
    def request_approval(
        self, 
        request_type: str,
        details: dict,
        timeout_minutes: int = 60,
        auto_approve_threshold: Optional[float] = None
    ) -> dict:
        """
        Request human approval with timeout and escalation.
        
        Returns:
            {
                "status": "approved|rejected|timeout",
                "decision": "human decision text",
                "reviewer": "reviewer name/id",
                "timestamp": "ISO timestamp",
                "notes": "optional reviewer notes"
            }
        """
        
        # 1. Check auto-approve threshold
        if auto_approve_threshold and details.get("amount", 0) < auto_approve_threshold:
            return {
                "status": "approved",
                "decision": "auto-approved (below threshold)",
                "reviewer": "system",
                "timestamp": datetime.now().isoformat()
            }
        
        # 2. Create approval request
        response = requests.post(
            f"{self.api_url}/approvals",
            headers={"Authorization": f"Bearer {self.api_key}"},
            json={
                "type": request_type,
                "details": details,
                "timeout_minutes": timeout_minutes,
                "notify_channels": ["slack", "email"],
                "escalation_policy": "manager-on-call"
            }
        )
        
        approval_id = response.json()["id"]
        
        # 3. Poll for decision (or use webhook in real system)
        deadline = time.time() + (timeout_minutes * 60)
        
        while time.time() < deadline:
            status_response = requests.get(
                f"{self.api_url}/approvals/{approval_id}",
                headers={"Authorization": f"Bearer {self.api_key}"}
            )
            
            status = status_response.json()
            
            if status["state"] in ["approved", "rejected"]:
                return {
                    "status": status["state"],
                    "decision": status["decision_text"],
                    "reviewer": status["reviewer_id"],
                    "timestamp": status["decided_at"],
                    "notes": status.get("notes", "")
                }
            
            time.sleep(30)  # Poll every 30 seconds
        
        # 4. Timeout - escalate or default action
        self._handle_timeout(approval_id)
        
        return {
            "status": "timeout",
            "decision": "no decision within timeout window",
            "reviewer": None,
            "timestamp": datetime.now().isoformat()
        }
    
    def _handle_timeout(self, approval_id: str):
        """Escalate or apply default policy on timeout."""
        # Send escalation notification
        # Apply default policy (e.g., auto-reject on timeout)
        pass

# Usage in agent
approval_tool = ProductionApprovalTool(
    api_url="https://approvals.company.com/api",
    api_key=os.getenv("APPROVAL_API_KEY")
)

def request_approval_tool(amount: float, reason: str) -> str:
    result = approval_tool.request_approval(
        request_type="financial",
        details={"amount": amount, "reason": reason},
        timeout_minutes=60,
        auto_approve_threshold=1000.00  # Auto-approve < $1K
    )
    
    return f"Decision: {result['status']} by {result['reviewer']} - {result['decision']}"
Real-World Use Cases:
# 1. Financial approvals
# - Refunds over $X
# - Contract amendments
# - Budget overruns

# 2. Compliance reviews
# - Policy exceptions
# - Regulatory filings
# - Audit responses

# 3. Content moderation
# - Borderline content decisions
# - Ban appeals
# - Content policy changes

# 4. High-risk operations
# - Data deletions
# - Production deployments
# - Security exceptions

# Production stats (financial services company):
# - 15K approval requests/month
# - 87% auto-approved (below threshold)
# - 11% human approved (avg 18 min response)
# - 2% rejected
# - <1% timeout (escalated)
# Result: 94% faster than manual routing, 100% compliance maintained

Pattern 8: Agent-as-Tool (Call and Return)

Use Case: One agent needs specialized capabilities from another agent but maintains control of the overall workflow. How It Works: Wrap specialist agents as tools using AgentTool. The main agent invokes them like functions, receives results, and continues processing.
💡 Key Difference: Unlike coordinator pattern (Pattern 1) where control transfers to another agent, here the calling agent stays in control and uses other agents as utilities.
from google.adk.agents import LlmAgent
from google.adk.tools.agent_tool import AgentTool

# Specialist agents (used as tools)
sentiment_analyzer = LlmAgent(
    name="SentimentAnalyzer",
    model="gemini-2.0-flash-exp",
    description="Analyzes sentiment of customer feedback with emotion detection",
    instruction="""Analyze the sentiment of the given text.

Return JSON:
{
  "sentiment": "positive" | "negative" | "neutral",
  "confidence": 0.0-1.0,
  "key_emotions": ["happy", "frustrated", "confused", etc],
  "sentiment_score": -1.0 to 1.0,
  "summary": "brief explanation of sentiment drivers"
}

Be precise and objective in your analysis."""
)

keyword_extractor = LlmAgent(
    name="KeywordExtractor",
    model="gemini-2.0-flash-exp",
    description="Extracts key topics and themes from text",
    instruction="""Extract key topics and themes from the text.

Return JSON:
{
  "primary_topics": ["main themes"],
  "mentioned_products": ["product names"],
  "action_items": ["things customer wants"],
  "priority_level": "low|medium|high|urgent"
}

Focus on actionable information."""
)

category_classifier = LlmAgent(
    name="CategoryClassifier",
    model="gemini-2.0-flash-exp",
    description="Classifies feedback into support categories",
    instruction="""Classify the feedback into support categories.

Return JSON:
{
  "category": "technical|billing|feature_request|complaint|praise",
  "subcategory": "specific subcategory",
  "confidence": 0.0-1.0,
  "reasoning": "brief explanation"
}

Use established support taxonomy."""
)

# Wrap specialists as tools
sentiment_tool = AgentTool(agent=sentiment_analyzer)
keyword_tool = AgentTool(agent=keyword_extractor)
category_tool = AgentTool(agent=category_classifier)

# Main orchestrator uses specialists as tools
feedback_orchestrator = LlmAgent(
    name="FeedbackOrchestrator",
    model="gemini-2.0-flash-exp",
    description="Processes customer feedback with comprehensive analysis",
    instruction="""Process customer feedback comprehensively.

Workflow:
1. Use SentimentAnalyzer to understand customer emotion
2. Use KeywordExtractor to identify key topics
3. Use CategoryClassifier to categorize the feedback
4. Synthesize all analyses into actionable response

Based on combined analysis:
- If negative sentiment + high priority → urgent escalation path
- If positive sentiment → identify success factors for team learning
- If feature request → route to product team with context
- If complaint → apologize and provide resolution timeline

Generate response that:
- Acknowledges the customer's sentiment
- Addresses their key concerns (from keywords)
- Provides category-appropriate action plan
- Sets clear expectations

Store comprehensive analysis in state for tracking.""",
    
    tools=[sentiment_tool, keyword_tool, category_tool]
)

# Execute
session = Session()
result = feedback_orchestrator.run(
    session=session,
    input_text="""Customer feedback: "I've been waiting 3 days for a response 
    about my billing issue. The new pricing is confusing and I was charged 
    twice this month. Very frustrated with the support experience."""
)

# Execution flow:
# 1. Orchestrator calls sentiment_tool
#    → Returns: {"sentiment": "negative", "confidence": 0.92, "emotions": ["frustrated", "confused"]}
# 2. Orchestrator calls keyword_tool
#    → Returns: {"primary_topics": ["billing", "support"], "action_items": ["resolve double charge"]}
# 3. Orchestrator calls category_tool
#    → Returns: {"category": "billing", "subcategory": "duplicate_charge", "confidence": 0.95}
# 4. Orchestrator synthesizes all results:
#    - Acknowledges frustration (from sentiment)
#    - Addresses billing issue specifically (from keywords)
#    - Routes to billing team (from category)
#    - Prioritizes as urgent (from combined analysis)
Why Use Agent-as-Tool Pattern:
# ✅ Use Agent-as-Tool when:
# - Main agent needs multiple specialized analyses
# - Main agent must synthesize results from specialists
# - Workflow logic stays with main agent
# - Specialists are reusable utilities

# ❌ Don't use Agent-as-Tool when:
# - Each specialist needs full autonomy
# - Specialists must decide next steps
# - Complex inter-agent negotiation needed
# → Use coordinator pattern (Pattern 1) instead

# Example comparison:

# Coordinator pattern: Agent decides "I should transfer to billing expert"
coordinator.run(input="billing issue")
# → Coordinator analyzes, then TRANSFERS CONTROL to billing agent

# Agent-as-Tool pattern: Agent decides "I need sentiment analysis for my work"
orchestrator.run(input="customer feedback")
# → Orchestrator CALLS sentiment tool, RECEIVES result, CONTINUES processing
Real-World Use Case: Content Moderation
from google.adk.agents import LlmAgent
from google.adk.tools.agent_tool import AgentTool

# Specialist analyzers
toxicity_detector = LlmAgent(
    name="ToxicityDetector",
    model="gemini-2.0-flash-exp",
    description="Detects toxic language, hate speech, harassment",
    instruction="""Analyze content for toxicity.

Return JSON with:
- toxicity_score: 0.0-1.0
- categories: ["hate_speech", "harassment", "threats", "profanity"]
- severity: "none|low|medium|high|critical"
- flagged_phrases: ["specific problematic content"]"""
)

spam_detector = LlmAgent(
    name="SpamDetector",
    model="gemini-2.0-flash-exp",
    description="Detects spam, promotional content, manipulation",
    instruction="""Analyze content for spam patterns.

Return JSON with:
- spam_score: 0.0-1.0
- spam_type: "none|promotion|scam|bot|manipulation"
- confidence: 0.0-1.0
- indicators: ["repetition", "links", etc]"""
)

misinformation_checker = LlmAgent(
    name="MisinformationChecker",
    model="gemini-2.0-flash-exp",
    description="Checks for misinformation and false claims",
    instruction="""Analyze content for potential misinformation.

Return JSON with:
- risk_level: "none|low|medium|high"
- claim_types: ["health", "political", "financial", etc]
- verification_needed: true/false
- flagged_claims: ["specific claims to verify"]"""
)

# Main moderator uses all specialists
content_moderator = LlmAgent(
    name="ContentModerator",
    model="gemini-pro-1.5",  # More capable model for decision-making
    description="Makes moderation decisions using specialist analyzers",
    instruction="""Moderate user-generated content.

Process:
1. Call ToxicityDetector to check for harmful language
2. Call SpamDetector to check for spam patterns
3. Call MisinformationChecker for false information
4. Make final moderation decision based on combined analysis

Decision matrix:
- Toxicity critical OR spam scam → REMOVE immediately
- Misinformation high + toxicity high → REMOVE + FLAG for review
- Toxicity medium + spam medium → WARN user + REQUEST edit
- All scores low → APPROVE

Always explain decision with specific reasons from each analyzer.
Store full analysis in state['moderation_log'] for audit trail.""",
    
    tools=[
        AgentTool(agent=toxicity_detector),
        AgentTool(agent=spam_detector),
        AgentTool(agent=misinformation_checker)
    ]
)

# Execute moderation
session = Session()
result = content_moderator.run(
    session=session,
    input_text="""User post: "AMAZING WEIGHT LOSS SECRET! Doctors hate this! 
    Click here for FREE trial. This miracle cure fixed my diabetes in 3 days!!"""
)

# Flow:
# 1. Calls toxicity_detector → {"toxicity_score": 0.2, "severity": "low"}
# 2. Calls spam_detector → {"spam_score": 0.95, "spam_type": "scam"}
# 3. Calls misinformation_checker → {"risk_level": "high", "flagged_claims": ["cure diabetes"]}
# 4. Decision: REMOVE (spam scam + health misinformation)
Performance & Cost Optimization:
# Strategy 1: Parallel tool calls for independent analyses
# Most LLM APIs support parallel function calling

orchestrator = LlmAgent(
    name="Orchestrator",
    instruction="""Call all analysis tools in parallel (single LLM call):
    - sentiment_analyzer
    - keyword_extractor  
    - category_classifier
    
Then synthesize results.""",
    tools=[sentiment_tool, keyword_tool, category_tool]
)

# Result: 3x faster than sequential calls
# Before: 600ms + 500ms + 550ms = 1,650ms
# After: max(600ms, 500ms, 550ms) = 600ms (parallel execution)

# Strategy 2: Use cheaper models for specialist tools
sentiment_analyzer = LlmAgent(
    model="gemini-flash-1.5",  # Fast, cheap for narrow task
    # ...
)

orchestrator = LlmAgent(
    model="gemini-pro-1.5",  # Expensive, but only for synthesis
    # ...
)

# Cost comparison (1000 feedback items):
# - All gemini-pro-1.5: $12.50
# - Flash for specialists, Pro for orchestrator: $4.20
# - 66% cost reduction

# Strategy 3: Conditional tool calling
instruction="""
1. Always call sentiment_analyzer (needed for all feedback)
2. Only call category_classifier if sentiment is negative
3. Only call keyword_extractor for high-priority items

This reduces unnecessary tool calls."""

# Real-world results (customer feedback system):
# - 10K feedback items/day
# - Before optimization: Avg cost $0.15/item, 2.1s latency
# - After optimization: Avg cost $0.05/item, 0.7s latency
# - 67% cost reduction, 67% latency reduction
Agent-as-Tool vs MCP Tools:
# This pattern can be implemented two ways:

# Option 1: Agent-as-Tool (Google ADK native)
specialist = LlmAgent(name="Specialist", instruction="...")
specialist_tool = AgentTool(agent=specialist)

main_agent = LlmAgent(
    name="Main",
    tools=[specialist_tool]  # Agent wrapped as tool
)

# Option 2: Agent exposed via MCP
# - Specialist agent runs as separate MCP server
# - Main agent connects via MCP protocol
# - More modular, can be in different languages/environments

from mcp import MCPClient

specialist_mcp = MCPClient(server_url="http://specialist-agent:8080")

main_agent = LlmAgent(
    name="Main",
    tools=[specialist_mcp.get_tool("analyze")]  # MCP tool
)

# Trade-offs:
# Agent-as-Tool: Simpler, same process, faster
# MCP: More modular, language-agnostic, scalable
# 
# Use Agent-as-Tool for: Tight integration, low latency needs
# Use MCP for: Microservices, polyglot systems, independent scaling

Framework 2: CrewAI

CrewAI uses a role-based team metaphor with automatic delegation capabilities.

Core Concepts

  • Agents = Team Members
from crewai import Agent

researcher = Agent(
    role="Senior Researcher",
    goal="Conduct thorough research on given topics",
    backstory="""You are a meticulous researcher with 10 years of experience.
    You excel at finding reliable sources and synthesizing information.""",
    
    allow_delegation=True,  # Can delegate to other agents
    verbose=True
)
Tasks = Work Items:
from crewai import Task

research_task = Task(
    description="""Research the current state of AI agents in production.
    
Focus on:
- Industry adoption statistics
- Common failure modes
- Best practices
    
Deliverable: Comprehensive research report""",
    
    agent=researcher,  # Assigned to researcher
    expected_output="A 2-3 page research report with sources"
)
Crew = Team:
from crewai import Crew, Process

crew = Crew(
    agents=[researcher, analyst, writer],
    tasks=[research_task, analysis_task, writing_task],
    process=Process.sequential,  # Or hierarchical, parallel
    verbose=True
)

result = crew.kickoff()

Pattern 1: Automatic Delegation

When allow_delegation=True, CrewAI automatically adds delegation tools:
from crewai import Agent, Task, Crew, Process

# Agent 1: Research Specialist
research_agent = Agent(
    role="Research Specialist",
    goal="Gather accurate information from multiple sources",
    backstory="""You are an expert researcher who excels at finding
    reliable sources and fact-checking information.""",
    
    allow_delegation=True,  # Enables delegation
    verbose=True
)

# Agent 2: Analysis Expert  
analysis_agent = Agent(
    role="Data Analyst",
    goal="Analyze data and identify patterns",
    backstory="""You specialize in data analysis and can identify
    trends that others miss.""",
    
    allow_delegation=True,
    verbose=True
)

# Agent 3: Writing Specialist
writing_agent = Agent(
    role="Technical Writer",
    goal="Create clear, engaging technical content",
    backstory="""You are a skilled technical writer who makes
    complex topics accessible.""",
    
    allow_delegation=False,  # Final step, no delegation needed
    verbose=True
)

# Define tasks
research_task = Task(
    description="Research the ROI of AI agents in enterprise",
    agent=research_agent,
    expected_output="Research findings with sources"
)

analysis_task = Task(
    description="""Analyze the research findings.
    
If research seems incomplete, delegate back to Research Specialist
for additional information.""",
    
    agent=analysis_agent,
    expected_output="Analysis report with key insights"
)

writing_task = Task(
    description="Write an executive summary",
    agent=writing_agent,
    expected_output="2-page executive summary"
)

# Create crew
crew = Crew(
    agents=[research_agent, analysis_agent, writing_agent],
    tasks=[research_task, analysis_task, writing_task],
    process=Process.sequential,
    verbose=True
)

result = crew.kickoff()
What Happens During Execution:
1. Research Agent starts research_task
   - If needs analysis help: Can delegate to "Data Analyst"
   - Uses built-in "Delegate work to co-worker" tool

2. Analysis Agent starts analysis_task  
   - If needs more research: Can delegate to "Research Specialist"
   - Uses "Ask question to co-worker" tool

3. Writing Agent completes writing_task
   - No delegation (allow_delegation=False)

Pattern 2: Hierarchical Process

Manager coordinates workers:
from crewai import Agent, Task, Crew, Process

# Manager agent
manager = Agent(
    role="Project Manager",
    goal="Coordinate team and ensure quality delivery",
    backstory="""You are an experienced project manager who knows
    how to delegate effectively and keep projects on track.""",
    
    allow_delegation=True,
    verbose=True
)

# Worker agents
researcher = Agent(
    role="Researcher",
    goal="Find and verify information",
    backstory="Expert at research and fact-checking",
    allow_delegation=False  # Workers don't delegate
)

analyst = Agent(
    role="Analyst", 
    goal="Analyze data and identify insights",
    backstory="Skilled at pattern recognition and analysis",
    allow_delegation=False
)

writer = Agent(
    role="Writer",
    goal="Create compelling content",
    backstory="Talented writer who makes complex topics clear",
    allow_delegation=False
)

# Single high-level task for manager
project_task = Task(
    description="""Create a comprehensive report on AI agent adoption.
    
Requirements:
- Current market size and growth
- Key players and their approaches  
- ROI data from case studies
- Best practices and recommendations
    
Delegate subtasks to specialists as needed.""",
    
    agent=manager,
    expected_output="Complete 10-page report"
)

# Hierarchical process: Manager delegates to workers
crew = Crew(
    agents=[manager, researcher, analyst, writer],
    tasks=[project_task],
    process=Process.hierarchical,  # Manager orchestrates
    manager_llm="gpt-4",  # Manager uses powerful LLM
    verbose=True
)

result = crew.kickoff()
Hierarchical Process Flow:
  1. Manager receives high-level task
  2. Manager’s LLM breaks down task:
    • “Researcher: Find market data”
    • “Analyst: Analyze competitive landscape”
    • “Writer: Draft executive summary”
  3. Workers execute their delegated subtasks
  4. Manager synthesizes results into final output

Pattern 3: Parallel Execution

Independent tasks run simultaneously:
from crewai import Agent, Task, Crew, Process

# Define agents (abbreviated)
competitor_analyst = Agent(
    role="Competitor Analyst",
    goal="Analyze competitor strategies",
    backstory="Expert in competitive intelligence"
)

market_researcher = Agent(
    role="Market Researcher",
    goal="Research market trends",
    backstory="Skilled at market analysis"
)

customer_analyst = Agent(
    role="Customer Analyst",
    goal="Analyze customer feedback",
    backstory="Expert in customer insights"
)

# Independent tasks that can run in parallel
competitor_task = Task(
    description="Analyze top 5 competitors' AI agent offerings",
    agent=competitor_analyst,
    async_execution=True  # Run in parallel
)

market_task = Task(
    description="Research AI agent market trends and forecasts",
    agent=market_researcher,
    async_execution=True  # Run in parallel
)

customer_task = Task(
    description="Analyze customer sentiment towards AI agents",
    agent=customer_analyst,
    async_execution=True  # Run in parallel
)

# Synthesis task (runs after parallel tasks complete)
synthesis_task = Task(
    description="""Synthesize findings from:
    - Competitor analysis
    - Market research
    - Customer insights
    
Create integrated market report.""",
    
    agent=writer_agent,
    context=[competitor_task, market_task, customer_task]  # Wait for these
)

crew = Crew(
    agents=[competitor_analyst, market_researcher, customer_analyst, writer_agent],
    tasks=[competitor_task, market_task, customer_task, synthesis_task],
    process=Process.sequential,  # Sequential with parallel tasks
    verbose=True
)

result = crew.kickoff()

Pattern 4: Context Sharing

Agents pass information through task context:
from crewai import Agent, Task, Crew

# Stage 1: Data Collection
collector = Agent(
    role="Data Collector",
    goal="Collect relevant data from multiple sources",
    backstory="Expert at finding and organizing data"
)

collection_task = Task(
    description="Collect customer feedback from last quarter",
    agent=collector,
    expected_output="Organized dataset of customer feedback"
)

# Stage 2: Analysis (uses collection results)
analyst = Agent(
    role="Data Analyst",
    goal="Analyze patterns in data",
    backstory="Skilled at statistical analysis"
)

analysis_task = Task(
    description="""Analyze the collected customer feedback.
    
Identify:
- Common themes
- Sentiment trends
- Priority issues""",
    
    agent=analyst,
    context=[collection_task],  # Receives collection_task output
    expected_output="Analysis report with key findings"
)

# Stage 3: Recommendations (uses analysis results)
strategist = Agent(
    role="Strategy Advisor",
    goal="Develop actionable recommendations",
    backstory="Expert in turning insights into strategy"
)

strategy_task = Task(
    description="""Based on the analysis, develop recommendations.
    
Include:
- Top 3 priority improvements
- Implementation approach
- Expected impact""",
    
    agent=strategist,
    context=[analysis_task],  # Receives analysis_task output
    expected_output="Strategic recommendations document"
)

crew = Crew(
    agents=[collector, analyst, strategist],
    tasks=[collection_task, analysis_task, strategy_task],
    process=Process.sequential
)

result = crew.kickoff()

Production Considerations

1. Cost Management

Problem: Agent orchestration can be expensive due to:
  • Multiple LLM calls for routing decisions
  • Delegation attempts (even failed ones)
  • “Ask question to co-worker” interactions
Solutions:
# A. Use cheaper models for routing
coordinator = LlmAgent(
    name="Router",
    model="gemini-flash-1.5",  # Fast, cheap for routing
    instruction="Route queries to specialists"
)

specialist = LlmAgent(
    name="Specialist",
    model="gemini-pro-1.5",  # Expensive, capable for work
    instruction="Solve complex problems"
)

# B. Limit delegation iterations
researcher = Agent(
    role="Researcher",
    max_iter=10,  # Limit reasoning loops
    allow_delegation=True
)

# C. Monitor and alert on costs
class CostMonitor:
    def __init__(self, threshold: float):
        self.total_cost = 0
        self.threshold = threshold
    
    def track(self, tokens: int, model: str):
        prices = {
            "gemini-flash-1.5": 0.000075 / 1000,
            "gemini-pro-1.5": 0.00125 / 1000,
            "gpt-4": 0.03 / 1000
        }
        
        cost = tokens * prices.get(model, 0)
        self.total_cost += cost
        
        if self.total_cost > self.threshold:
            raise CostThresholdExceeded(
                f"Cost ${self.total_cost:.2f} exceeds ${self.threshold}"
            )

monitor = CostMonitor(threshold=5.0)  # $5 limit

2. Preventing Delegation Loops

Problem: Agents can get stuck delegating back and forth Solution:
# Pattern 1: Explicit delegation rules
coordinator = Agent(
    role="Coordinator",
    goal="Route queries efficiently",
    backstory="""You coordinate specialists.
    
CRITICAL RULES:
1. You can only delegate ONCE per query
2. Never delegate back to an agent that already worked on this query
3. If unsure, handle it yourself rather than delegate""",
    
    allow_delegation=True,
    max_iter=5  # Hard limit on reasoning loops
)

# Pattern 2: Track delegation chain
class DelegationTracker:
    def __init__(self):
        self.chain = []
    
    def can_delegate(self, from_agent: str, to_agent: str) -> bool:
        # Prevent loops
        if to_agent in self.chain:
            return False
        
        # Limit chain length
        if len(self.chain) >= 3:
            return False
        
        self.chain.append(to_agent)
        return True

# Pattern 3: Use workflow orchestration for deterministic steps
# If you have a fixed sequence, use section 3.6 patterns instead!

3. Observability

Track agent decisions:
import logging
from datetime import datetime

class AgentLogger:
    def __init__(self):
        self.events = []
    
    def log_agent_start(self, agent_name: str, task: str):
        self.events.append({
            "timestamp": datetime.now().isoformat(),
            "event": "agent_start",
            "agent": agent_name,
            "task": task
        })
    
    def log_delegation(self, from_agent: str, to_agent: str, reason: str):
        self.events.append({
            "timestamp": datetime.now().isoformat(),
            "event": "delegation",
            "from": from_agent,
            "to": to_agent,
            "reason": reason
        })
    
    def log_agent_complete(self, agent_name: str, output: str, tokens: int):
        self.events.append({
            "timestamp": datetime.now().isoformat(),
            "event": "agent_complete",
            "agent": agent_name,
            "output_length": len(output),
            "tokens": tokens
        })
    
    def get_trace(self) -> list:
        """Get execution trace for debugging."""
        return self.events

# Usage
logger = AgentLogger()

# In agent callbacks
def step_callback(step_output):
    logger.log_agent_start(
        agent_name=step_output.agent,
        task=step_output.task
    )

crew = Crew(
    agents=[agent1, agent2],
    tasks=[task1, task2],
    step_callback=step_callback
)

4. Deterministic Fallbacks

Hybrid approach: Agent orchestration with code guardrails
# Use agent orchestration for intelligent routing
# But add code-based validation

def execute_with_validation(crew: Crew, inputs: dict) -> dict:
    """Execute crew with validation fallbacks."""
    
    max_retries = 3
    
    for attempt in range(max_retries):
        result = crew.kickoff(inputs=inputs)
        
        # Code-based validation
        if validate_result(result):
            return result
        
        # Add feedback for retry
        inputs["previous_attempt"] = result
        inputs["validation_errors"] = get_validation_errors(result)
    
    # Fallback: Switch to deterministic workflow
    return deterministic_fallback(inputs)

def validate_result(result: dict) -> bool:
    """Code-based validation of agent output."""
    required_fields = ["summary", "recommendations", "confidence"]
    
    # Check structure
    if not all(field in result for field in required_fields):
        return False
    
    # Check confidence threshold
    if result.get("confidence", 0) < 0.7:
        return False
    
    return True

5. Testing Agent Orchestration

Challenge: Non-deterministic behavior makes testing hard Strategies:
# Strategy 1: Test agent descriptions and instructions
def test_agent_routing():
    """Test that coordinator routes correctly."""
    
    test_cases = [
        {
            "input": "My payment was declined",
            "expected_agent": "BillingSpecialist"
        },
        {
            "input": "I can't log in to my account",
            "expected_agent": "TechSupport"
        },
        {
            "input": "What pricing plans do you offer?",
            "expected_agent": "SalesTeam"
        }
    ]
    
    for case in test_cases:
        # Run coordinator
        result = coordinator.run(input_text=case["input"])
        
        # Check which agent handled it
        assert result.active_agent == case["expected_agent"], \
            f"Expected {case['expected_agent']}, got {result.active_agent}"

# Strategy 2: Test delegation logic with mocks
def test_delegation_limits():
    """Test that agents don't delegate infinitely."""
    
    mock_tracker = DelegationTracker()
    
    # Simulate delegation chain
    assert mock_tracker.can_delegate("Agent1", "Agent2") == True
    assert mock_tracker.can_delegate("Agent2", "Agent3") == True
    assert mock_tracker.can_delegate("Agent3", "Agent1") == False  # Loop
    assert mock_tracker.can_delegate("Agent3", "Agent4") == False  # Too long

# Strategy 3: Test with frozen prompts (deterministic)
def test_with_fixed_responses():
    """Test using recorded LLM responses."""
    
    # Record responses once
    if RECORDING_MODE:
        response = agent.run(input_text="test query")
        save_response("test_query.json", response)
    
    # Replay in tests
    else:
        mock_response = load_response("test_query.json")
        with mock.patch('llm.generate', return_value=mock_response):
            result = agent.run(input_text="test query")
            assert validate_result(result)

When to Use Each Approach

Use Agent Orchestration When:

Routing logic is complex:
  • “If customer mentions payment AND is upset → billing + escalation”
  • “If technical issue AND customer is enterprise → priority support”
Requirements change frequently:
  • Marketing wants to try different routing strategies
  • Business rules evolve weekly
Delegation needs context understanding:
  • Agent must understand nuance to route correctly
  • Simple rules would miss edge cases

Use Workflow Orchestration When:

Flow is fixed and well-defined:
  • Always: extract → validate → process → store
  • Regulatory requirements mandate specific sequences
Determinism is critical:
  • Financial calculations
  • Compliance workflows
  • Medical diagnoses
Cost must be predictable:
  • Fixed budget per execution
  • High-volume, low-margin use cases

Hybrid Approach:

Use both:
# High-level: Agent orchestration for intelligent routing
coordinator = LlmAgent(
    name="Coordinator",
    instruction="Route to appropriate workflow",
    sub_agents=[workflow_a, workflow_b, workflow_c]
)

# Low-level: Workflow orchestration for deterministic steps
workflow_a = SequentialAgent(
    name="BillingWorkflow",
    sub_agents=[
        validate_account,  # Deterministic
        check_balance,     # Deterministic  
        process_payment,   # Deterministic
        send_receipt       # Deterministic
    ]
)

# Result: Intelligent routing + reliable execution

Quick Check

1. Scenario: You’re building a customer support system. Customers ask about billing, technical issues, or sales. Requirements change weekly based on customer feedback. Should you use agent orchestration or workflow orchestration? 2. Debugging: Your CrewAI agents keep delegating back and forth. What’s the most likely cause and fix? 3. Production: Your agent orchestration system costs $500/day in LLM calls. 80% of queries follow simple patterns. How do you reduce costs?

Key Takeaways

Agent orchestration gives you:
  • ✅ Flexible, intelligent routing
  • ✅ Easy to update (change instructions, not code)
  • ✅ Handles complex, context-dependent decisions
  • ❌ Higher costs (more LLM calls)
  • ❌ Non-deterministic (harder to debug)
  • ❌ Risk of delegation loops
Production pattern:
  1. Use agent orchestration for high-level routing (intelligent decisions)
  2. Use workflow orchestration for low-level steps (deterministic execution)
  3. Add cost monitoring and circuit breakers
  4. Test with fixed responses for determinism
  5. Always include iteration limits and fallbacks
Next: Module 4 covers evaluation and observability - critical for debugging non-deterministic agent orchestration systems.