Skip to Content
DocsAI AgentsOrchestration

Orchestration - Coordinating Multiple Agents

What is Orchestration?

Orchestration is coordinating multiple agents to work together on complex tasks. Think of it like a conductor leading an orchestra - each agent has a specific role, and the orchestrator coordinates their performance.

SimpleOrchestrator

The main class for coordinating agents:

from django_cfg import SimpleOrchestrator # Create orchestrator orchestrator = SimpleOrchestrator[DjangoDeps]() # Register agents orchestrator.register_agent(content_analyzer) orchestrator.register_agent(content_enhancer) orchestrator.register_agent(content_publisher)

Execution Patterns

Sequential Execution

Agents run one after another, output from Agent N becomes input for Agent N+1:

# Sequential workflow result = await orchestrator.execute( pattern="sequential", agents=["content_analyzer", "content_enhancer", "content_publisher"], prompt="Process this article for publication", deps=deps ) # Flow: # 1. content_analyzer: analyzes content → analysis result # 2. content_enhancer: takes analysis → enhanced content # 3. content_publisher: takes enhanced content → published article

Real Example: Content Pipeline

from django_cfg import DjangoAgent, SimpleOrchestrator, DjangoDeps from pydantic import BaseModel from typing import List # Define output models class AnalysisResult(BaseModel): sentiment: str keywords: List[str] readability_score: float suggestions: List[str] class EnhancementResult(BaseModel): enhanced_text: str improvements_made: List[str] seo_optimized: bool class PublishResult(BaseModel): published: bool url: str social_media_posts: List[str] # Create agents analyzer = DjangoAgent[DjangoDeps, AnalysisResult]( name="content_analyzer", deps_type=DjangoDeps, output_type=AnalysisResult, instructions="Analyze content quality, sentiment, and SEO potential" ) enhancer = DjangoAgent[DjangoDeps, EnhancementResult]( name="content_enhancer", deps_type=DjangoDeps, output_type=EnhancementResult, instructions="Enhance content based on analysis suggestions" ) publisher = DjangoAgent[DjangoDeps, PublishResult]( name="content_publisher", deps_type=DjangoDeps, output_type=PublishResult, instructions="Publish content and create social media posts" ) # Add tools to agents @analyzer.tool async def get_content_text(ctx: RunContext[DjangoDeps], content_id: int) -> str: content = await Content.objects.aget(id=content_id) return content.text @enhancer.tool async def save_enhanced_content(ctx: RunContext[DjangoDeps], content_id: int, enhanced_text: str) -> str: content = await Content.objects.aget(id=content_id) content.enhanced_text = enhanced_text await content.asave() return f"Enhanced content saved for {content_id}" @publisher.tool async def publish_to_cms(ctx: RunContext[DjangoDeps], content_id: int) -> str: content = await Content.objects.aget(id=content_id) content.status = "published" content.published_at = timezone.now() await content.asave() return f"Published content {content_id} to CMS" # Create orchestrator and register agents orchestrator = SimpleOrchestrator[DjangoDeps]() orchestrator.register_agent(analyzer) orchestrator.register_agent(enhancer) orchestrator.register_agent(publisher)

Using the Orchestrator

In Django Views

async def process_content_pipeline(request, content_id): """Process content through full pipeline.""" # Create dependencies deps = DjangoDeps(user_id=request.user.id) try: # Execute full pipeline result = await orchestrator.execute( pattern="sequential", agents=["content_analyzer", "content_enhancer", "content_publisher"], prompt=f"Process content {content_id} through full publication pipeline", deps=deps ) # Check if workflow completed successfully if result.status == "completed": return JsonResponse({ 'success': True, 'workflow_status': result.status, 'final_result': result.final_output.dict(), 'steps_completed': len(result.step_results), 'total_execution_time': result.execution_time }) else: return JsonResponse({ 'success': False, 'error': result.error_message, 'failed_at_step': result.failed_step }, status=500) except Exception as e: logger.error(f"Pipeline execution failed: {e}") return JsonResponse({ 'success': False, 'error': 'Pipeline execution failed' }, status=500)

Background Task Processing

from django_cfg.modules.django_tasks import task @task async def process_content_background(content_id: int, user_id: int): """Process content in background.""" deps = DjangoDeps(user_id=user_id) try: result = await orchestrator.execute( pattern="sequential", agents=["content_analyzer", "content_enhancer"], prompt=f"Analyze and enhance content {content_id}", deps=deps ) # Send notification to user if result.status == "completed": await send_notification( user_id=user_id, message=f"Content {content_id} processing completed", data=result.final_output.dict() ) except Exception as e: logger.error(f"Background processing failed: {e}") await send_notification( user_id=user_id, message=f"Content {content_id} processing failed: {e}", error=True ) # Usage in view async def start_background_processing(request, content_id): # Queue background task await process_content_background.send( content_id=content_id, user_id=request.user.id ) return JsonResponse({ 'message': 'Processing started in background', 'content_id': content_id })

Workflow Configuration

Dynamic Workflows

from django_cfg.modules.django_orchestrator import WorkflowConfig # Define workflow configurations CONTENT_WORKFLOWS = { "quick_analysis": WorkflowConfig( name="quick_analysis", steps=["content_analyzer"], strategy="sequential", description="Quick content analysis only" ), "full_pipeline": WorkflowConfig( name="full_pipeline", steps=["content_analyzer", "content_enhancer", "content_publisher"], strategy="sequential", description="Complete content processing pipeline" ), "analysis_and_enhancement": WorkflowConfig( name="analysis_and_enhancement", steps=["content_analyzer", "content_enhancer"], strategy="sequential", description="Analyze and enhance content without publishing" ) } async def execute_workflow(request, workflow_name: str, content_id: int): """Execute predefined workflow.""" if workflow_name not in CONTENT_WORKFLOWS: return JsonResponse({'error': 'Unknown workflow'}, status=400) workflow = CONTENT_WORKFLOWS[workflow_name] deps = DjangoDeps(user_id=request.user.id) result = await orchestrator.execute( pattern=workflow.strategy, agents=workflow.steps, prompt=f"Execute {workflow.description} for content {content_id}", deps=deps ) return JsonResponse({ 'workflow': workflow.name, 'status': result.status, 'result': result.final_output.dict() })

Error Handling and Recovery

Handling Agent Failures

async def robust_pipeline_execution(content_id: int, user_id: int): """Execute pipeline with error handling.""" deps = DjangoDeps(user_id=user_id) try: result = await orchestrator.execute( pattern="sequential", agents=["content_analyzer", "content_enhancer", "content_publisher"], prompt=f"Process content {content_id}", deps=deps ) return result except ExecutionError as e: logger.error(f"Pipeline failed: {e}") # Try fallback workflow (analysis only) try: fallback_result = await orchestrator.execute( pattern="sequential", agents=["content_analyzer"], prompt=f"Analyze content {content_id} (fallback)", deps=deps ) logger.info(f"Fallback analysis completed for content {content_id}") return fallback_result except ExecutionError as fallback_error: logger.error(f"Fallback also failed: {fallback_error}") raise

Monitoring and Metrics

Orchestrator Metrics

# Get orchestrator performance metrics metrics = orchestrator.get_metrics() print(f"Total workflows executed: {metrics['total_executions']}") print(f"Success rate: {metrics['success_rate']:.1%}") print(f"Average execution time: {metrics['avg_execution_time']:.2f}s") print(f"Most used workflow: {metrics['most_used_pattern']}") # Agent-specific metrics within orchestrator for agent_name, agent_metrics in metrics['agent_metrics'].items(): print(f"{agent_name}:") print(f" - Executions: {agent_metrics['executions']}") print(f" - Avg time: {agent_metrics['avg_time']:.2f}s") print(f" - Error rate: {agent_metrics['error_rate']:.1%}")

Workflow Tracking in Admin

The Django admin interface shows:

  1. WorkflowExecution records:

    • Workflow name and configuration
    • Start/end times
    • Overall status
    • Final output
  2. AgentExecution records (linked to workflow):

    • Individual agent performance
    • Step-by-step execution details
    • Token usage and costs

Best Practices

✅ Good Practices

# 1. Create orchestrator once, reuse orchestrator = SimpleOrchestrator[DjangoDeps]() # Module level # 2. Register all agents at startup orchestrator.register_agent(analyzer) orchestrator.register_agent(enhancer) # 3. Use meaningful agent names agents=["content_analyzer", "seo_optimizer", "social_publisher"] # 4. Handle errors gracefully try: result = await orchestrator.execute(...) except ExecutionError as e: # Handle specific error pass # 5. Monitor performance metrics = orchestrator.get_metrics()

❌ Bad Practices

# 1. Creating orchestrator in views def my_view(request): orchestrator = SimpleOrchestrator() # Slow! # 2. Not handling errors result = await orchestrator.execute(...) # Can crash # 3. Using unclear agent names agents=["agent1", "agent2", "agent3"] # Confusing # 4. Not monitoring performance # No metrics collection or monitoring

What’s Next?