youtube-summarizer/backend/services/enhanced_orchestrator.py

667 lines
26 KiB
Python

"""Enhanced Multi-Agent Orchestrator with template and registry integration."""
import asyncio
import logging
from typing import Dict, List, Optional, Any, Union
from datetime import datetime
from sqlalchemy.orm import Session
from ..core.base_agent import AgentState, AgentContext
from ..models.analysis_templates import TemplateSet, TemplateRegistry, TemplateType
from ..models.agent_models import AgentSummary
from ..services.deepseek_service import DeepSeekService
from .template_agent_factory import TemplateAgentFactory, get_template_agent_factory
from .unified_analysis_agent import UnifiedAnalysisAgent
from .template_driven_agent import TemplateAnalysisResult
logger = logging.getLogger(__name__)
class OrchestrationConfig:
"""Configuration for multi-agent orchestration."""
def __init__(
self,
parallel_execution: bool = True,
synthesis_enabled: bool = True,
max_concurrent_agents: int = 4,
timeout_seconds: int = 300,
quality_threshold: float = 0.7,
enable_database_persistence: bool = True
):
self.parallel_execution = parallel_execution
self.synthesis_enabled = synthesis_enabled
self.max_concurrent_agents = max_concurrent_agents
self.timeout_seconds = timeout_seconds
self.quality_threshold = quality_threshold
self.enable_database_persistence = enable_database_persistence
class OrchestrationResult:
"""Result from multi-agent orchestration."""
def __init__(
self,
job_id: str,
template_set_id: str,
results: Dict[str, TemplateAnalysisResult],
synthesis_result: Optional[TemplateAnalysisResult] = None,
processing_time_seconds: float = 0.0,
success: bool = True,
error: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
):
self.job_id = job_id
self.template_set_id = template_set_id
self.results = results
self.synthesis_result = synthesis_result
self.processing_time_seconds = processing_time_seconds
self.success = success
self.error = error
self.metadata = metadata or {}
self.timestamp = datetime.utcnow()
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary representation."""
return {
"job_id": self.job_id,
"template_set_id": self.template_set_id,
"results": {k: v.dict() for k, v in self.results.items()},
"synthesis_result": self.synthesis_result.dict() if self.synthesis_result else None,
"processing_time_seconds": self.processing_time_seconds,
"success": self.success,
"error": self.error,
"metadata": self.metadata,
"timestamp": self.timestamp.isoformat()
}
class EnhancedMultiAgentOrchestrator:
"""
Enhanced orchestrator for template-driven multi-agent analysis.
Features:
- Template-set-based orchestration
- Agent factory integration for dynamic agent creation
- Parallel and sequential execution modes
- Synthesis capability for unified results
- Database persistence and performance tracking
- Mixed perspective workflows (Educational + Domain)
"""
def __init__(
self,
template_registry: Optional[TemplateRegistry] = None,
agent_factory: Optional[TemplateAgentFactory] = None,
ai_service: Optional[DeepSeekService] = None,
config: Optional[OrchestrationConfig] = None
):
"""Initialize the enhanced orchestrator.
Args:
template_registry: Registry for template lookups
agent_factory: Factory for agent creation
ai_service: AI service for synthesis operations
config: Orchestration configuration
"""
self.template_registry = template_registry
self.agent_factory = agent_factory or get_template_agent_factory(template_registry)
self.ai_service = ai_service or DeepSeekService()
self.config = config or OrchestrationConfig()
# Active orchestration jobs
self._active_jobs: Dict[str, Dict[str, Any]] = {}
# Performance tracking
self._total_orchestrations = 0
self._successful_orchestrations = 0
self._total_processing_time = 0.0
logger.info("Enhanced Multi-Agent Orchestrator initialized")
async def orchestrate_template_set(
self,
job_id: str,
template_set_id: str,
content: str,
context: Optional[Dict[str, Any]] = None,
video_id: Optional[str] = None
) -> OrchestrationResult:
"""Orchestrate analysis using all templates in a template set.
Args:
job_id: Unique job identifier
template_set_id: Template set to use for analysis
content: Content to analyze
context: Additional context for templates
video_id: Video ID if analyzing video content
Returns:
Orchestration result with all analysis outputs
"""
start_time = datetime.utcnow()
try:
# Get template set
template_set = self.template_registry.get_template_set(template_set_id)
if not template_set:
raise ValueError(f"Template set not found: {template_set_id}")
if not template_set.is_active:
raise ValueError(f"Template set is inactive: {template_set_id}")
# Initialize job tracking
self._active_jobs[job_id] = {
"start_time": start_time,
"template_set_id": template_set_id,
"status": "running",
"agents_count": len(template_set.templates)
}
# Create agents for the template set
agents = await self.agent_factory.create_agent_set(template_set_id)
# Prepare analysis state and context
state = AgentState({
"content": content,
"transcript": content, # Alias for compatibility
"video_id": video_id,
"context": context or {},
"job_id": job_id,
"template_set_id": template_set_id
})
agent_context = AgentContext(
request_id=job_id,
metadata={"template_set_id": template_set_id, "video_id": video_id}
)
# Execute agents
if template_set.parallel_execution and self.config.parallel_execution:
results = await self._execute_agents_parallel(agents, state, agent_context)
else:
results = await self._execute_agents_sequential(agents, template_set, state, agent_context)
# Perform synthesis if enabled and configured
synthesis_result = None
if self.config.synthesis_enabled and template_set.synthesis_template:
synthesis_result = await self._synthesize_results(
results, template_set, state, agent_context
)
# Calculate total processing time
processing_time = (datetime.utcnow() - start_time).total_seconds()
# Create orchestration result
orchestration_result = OrchestrationResult(
job_id=job_id,
template_set_id=template_set_id,
results=results,
synthesis_result=synthesis_result,
processing_time_seconds=processing_time,
success=True,
metadata={
"agents_used": list(agents.keys()),
"parallel_execution": template_set.parallel_execution and self.config.parallel_execution,
"synthesis_performed": synthesis_result is not None
}
)
# Update performance metrics
self._update_performance_metrics(orchestration_result)
# Remove from active jobs
self._active_jobs.pop(job_id, None)
logger.info(f"Orchestration {job_id} completed in {processing_time:.2f}s")
return orchestration_result
except Exception as e:
logger.error(f"Orchestration {job_id} failed: {e}")
processing_time = (datetime.utcnow() - start_time).total_seconds()
self._active_jobs.pop(job_id, None)
return OrchestrationResult(
job_id=job_id,
template_set_id=template_set_id,
results={},
processing_time_seconds=processing_time,
success=False,
error=str(e)
)
async def orchestrate_mixed_perspectives(
self,
job_id: str,
template_ids: List[str],
content: str,
context: Optional[Dict[str, Any]] = None,
video_id: Optional[str] = None,
enable_synthesis: bool = True
) -> OrchestrationResult:
"""Orchestrate analysis using a custom mix of templates.
This allows combining Educational and Domain perspectives in a single workflow.
Args:
job_id: Unique job identifier
template_ids: List of template IDs to use
content: Content to analyze
context: Additional context for templates
video_id: Video ID if analyzing video content
enable_synthesis: Whether to synthesize results
Returns:
Orchestration result with all analysis outputs
"""
start_time = datetime.utcnow()
try:
# Validate templates
templates = {}
for template_id in template_ids:
template = self.template_registry.get_template(template_id)
if not template:
raise ValueError(f"Template not found: {template_id}")
if not template.is_active:
raise ValueError(f"Template is inactive: {template_id}")
templates[template_id] = template
# Initialize job tracking
self._active_jobs[job_id] = {
"start_time": start_time,
"template_ids": template_ids,
"status": "running",
"agents_count": len(templates)
}
# Create agents for the templates
agents = {}
for template_id in template_ids:
agent = await self.agent_factory.create_agent(template_id)
agents[template_id] = agent
# Prepare analysis state and context
state = AgentState({
"content": content,
"transcript": content,
"video_id": video_id,
"context": context or {},
"job_id": job_id,
"mixed_perspectives": True
})
agent_context = AgentContext(
request_id=job_id,
metadata={"template_ids": template_ids, "video_id": video_id}
)
# Execute agents in parallel (mixed perspectives work well in parallel)
results = await self._execute_agents_parallel(agents, state, agent_context)
# Perform synthesis if enabled
synthesis_result = None
if enable_synthesis and len(results) > 1:
synthesis_result = await self._synthesize_mixed_results(
results, templates, state, agent_context
)
# Calculate total processing time
processing_time = (datetime.utcnow() - start_time).total_seconds()
# Create orchestration result
orchestration_result = OrchestrationResult(
job_id=job_id,
template_set_id="mixed_perspectives",
results=results,
synthesis_result=synthesis_result,
processing_time_seconds=processing_time,
success=True,
metadata={
"template_ids": template_ids,
"template_types": [t.template_type.value for t in templates.values()],
"mixed_perspectives": True,
"synthesis_performed": synthesis_result is not None
}
)
# Update performance metrics
self._update_performance_metrics(orchestration_result)
# Remove from active jobs
self._active_jobs.pop(job_id, None)
logger.info(f"Mixed perspective orchestration {job_id} completed in {processing_time:.2f}s")
return orchestration_result
except Exception as e:
logger.error(f"Mixed perspective orchestration {job_id} failed: {e}")
processing_time = (datetime.utcnow() - start_time).total_seconds()
self._active_jobs.pop(job_id, None)
return OrchestrationResult(
job_id=job_id,
template_set_id="mixed_perspectives",
results={},
processing_time_seconds=processing_time,
success=False,
error=str(e)
)
async def save_orchestration_to_database(
self,
orchestration_result: OrchestrationResult,
summary_id: str,
db: Session
) -> List[AgentSummary]:
"""Save orchestration results to database.
Args:
orchestration_result: Results to save
summary_id: Parent summary ID
db: Database session
Returns:
List of created AgentSummary records
"""
if not self.config.enable_database_persistence:
return []
try:
agent_summaries = []
# Save individual agent results
for template_id, result in orchestration_result.results.items():
agent_summary = AgentSummary(
summary_id=summary_id,
agent_name=result.template_name,
agent_type=template_id,
analysis_result=result.analysis,
key_insights=result.key_insights,
confidence_score=result.confidence_score,
processing_time=result.processing_time_seconds,
template_used=template_id,
analysis_metadata={
"context_used": result.context_used,
"template_variables": result.template_variables,
"timestamp": result.timestamp.isoformat()
}
)
db.add(agent_summary)
agent_summaries.append(agent_summary)
# Save synthesis result if available
if orchestration_result.synthesis_result:
synthesis_summary = AgentSummary(
summary_id=summary_id,
agent_name="Synthesis Agent",
agent_type="synthesis",
analysis_result=orchestration_result.synthesis_result.analysis,
key_insights=orchestration_result.synthesis_result.key_insights,
confidence_score=orchestration_result.synthesis_result.confidence_score,
processing_time=orchestration_result.synthesis_result.processing_time_seconds,
template_used=orchestration_result.synthesis_result.template_id,
analysis_metadata={
"orchestration_metadata": orchestration_result.metadata,
"total_processing_time": orchestration_result.processing_time_seconds,
"timestamp": orchestration_result.timestamp.isoformat()
}
)
db.add(synthesis_summary)
agent_summaries.append(synthesis_summary)
db.commit()
logger.info(f"Saved {len(agent_summaries)} agent summaries to database")
return agent_summaries
except Exception as e:
logger.error(f"Failed to save orchestration results to database: {e}")
db.rollback()
return []
def get_active_orchestrations(self) -> Dict[str, Dict[str, Any]]:
"""Get information about active orchestration jobs."""
return {
job_id: {
**job_info,
"runtime_seconds": (datetime.utcnow() - job_info["start_time"]).total_seconds()
}
for job_id, job_info in self._active_jobs.items()
}
def get_orchestration_statistics(self) -> Dict[str, Any]:
"""Get comprehensive orchestration statistics."""
success_rate = (
self._successful_orchestrations / max(self._total_orchestrations, 1)
)
avg_processing_time = (
self._total_processing_time / max(self._successful_orchestrations, 1)
)
return {
"total_orchestrations": self._total_orchestrations,
"successful_orchestrations": self._successful_orchestrations,
"success_rate": success_rate,
"active_orchestrations": len(self._active_jobs),
"average_processing_time_seconds": avg_processing_time,
"total_processing_time_seconds": self._total_processing_time,
"factory_statistics": self.agent_factory.get_factory_statistics()
}
async def _execute_agents_parallel(
self,
agents: Dict[str, UnifiedAnalysisAgent],
state: AgentState,
context: AgentContext
) -> Dict[str, TemplateAnalysisResult]:
"""Execute agents in parallel."""
semaphore = asyncio.Semaphore(self.config.max_concurrent_agents)
async def execute_agent_with_semaphore(template_id: str, agent: UnifiedAnalysisAgent):
async with semaphore:
try:
updated_state = await agent.execute(state.copy(), context)
agent_key = f"agent_{template_id}"
return template_id, updated_state[agent_key]["result"]
except Exception as e:
logger.error(f"Agent {template_id} failed: {e}")
return template_id, None
# Execute all agents concurrently
tasks = [
execute_agent_with_semaphore(template_id, agent)
for template_id, agent in agents.items()
]
# Wait for all agents with timeout
try:
results = await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=self.config.timeout_seconds
)
except asyncio.TimeoutError:
logger.error("Agent execution timed out")
raise
# Process results
agent_results = {}
for template_id, result in results:
if result is not None and not isinstance(result, Exception):
# Convert dict back to TemplateAnalysisResult
if isinstance(result, dict):
agent_results[template_id] = TemplateAnalysisResult(**result)
else:
agent_results[template_id] = result
return agent_results
async def _execute_agents_sequential(
self,
agents: Dict[str, UnifiedAnalysisAgent],
template_set: TemplateSet,
state: AgentState,
context: AgentContext
) -> Dict[str, TemplateAnalysisResult]:
"""Execute agents in sequential order."""
agent_results = {}
execution_order = template_set.execution_order or list(agents.keys())
for template_id in execution_order:
if template_id not in agents:
continue
agent = agents[template_id]
try:
updated_state = await agent.execute(state.copy(), context)
agent_key = f"agent_{template_id}"
result = updated_state[agent_key]["result"]
# Convert dict to TemplateAnalysisResult if needed
if isinstance(result, dict):
agent_results[template_id] = TemplateAnalysisResult(**result)
else:
agent_results[template_id] = result
except Exception as e:
logger.error(f"Sequential agent {template_id} failed: {e}")
return agent_results
async def _synthesize_results(
self,
results: Dict[str, TemplateAnalysisResult],
template_set: TemplateSet,
state: AgentState,
context: AgentContext
) -> Optional[TemplateAnalysisResult]:
"""Synthesize results using the template set's synthesis template."""
if not template_set.synthesis_template:
return None
try:
# Create synthesis agent
synthesis_agent = await self.agent_factory.create_agent(
template_set.synthesis_template.id
)
# Prepare synthesis context
synthesis_state = state.copy()
for template_id, result in results.items():
synthesis_state[f"{template_id}_analysis"] = result.analysis
synthesis_state[f"{template_id}_insights"] = result.key_insights
# Execute synthesis
updated_state = await synthesis_agent.execute(synthesis_state, context)
agent_key = f"agent_{template_set.synthesis_template.id}"
result = updated_state[agent_key]["result"]
# Convert dict to TemplateAnalysisResult if needed
if isinstance(result, dict):
return TemplateAnalysisResult(**result)
else:
return result
except Exception as e:
logger.error(f"Synthesis failed: {e}")
return None
async def _synthesize_mixed_results(
self,
results: Dict[str, TemplateAnalysisResult],
templates: Dict[str, Any],
state: AgentState,
context: AgentContext
) -> Optional[TemplateAnalysisResult]:
"""Synthesize results from mixed perspectives."""
try:
# Create a dynamic synthesis prompt
synthesis_prompt = self._create_mixed_synthesis_prompt(results, templates)
# Use AI service directly for synthesis
synthesis_response = await self.ai_service.generate_summary({
"prompt": synthesis_prompt,
"system_prompt": "You are a synthesis expert combining multiple analytical perspectives.",
"max_tokens": 2000,
"temperature": 0.7
})
# Create synthesis result
return TemplateAnalysisResult(
template_id="mixed_synthesis",
template_name="Mixed Perspective Synthesis",
analysis=synthesis_response,
key_insights=self._extract_synthesis_insights(synthesis_response),
confidence_score=0.8, # Default for synthesis
processing_time_seconds=1.0,
context_used={},
template_variables={}
)
except Exception as e:
logger.error(f"Mixed synthesis failed: {e}")
return None
def _create_mixed_synthesis_prompt(
self,
results: Dict[str, TemplateAnalysisResult],
templates: Dict[str, Any]
) -> str:
"""Create synthesis prompt for mixed perspectives."""
analyses = []
for template_id, result in results.items():
template = templates[template_id]
analyses.append(f"""
**{template.name} Analysis:**
{result.analysis}
**Key Insights:**
{chr(10).join(f"- {insight}" for insight in result.key_insights)}
""")
return f"""
Please synthesize the following multi-perspective analyses into a unified understanding:
{chr(10).join(analyses)}
Create a comprehensive synthesis that:
1. Identifies common themes across perspectives
2. Highlights unique insights from each viewpoint
3. Provides an integrated understanding
4. Offers actionable recommendations
Format as a structured analysis with clear sections.
"""
def _extract_synthesis_insights(self, synthesis_response: str) -> List[str]:
"""Extract key insights from synthesis response."""
insights = []
lines = synthesis_response.split('\n')
for line in lines:
line = line.strip()
if line.startswith('-') or line.startswith(''):
insight = line[1:].strip()
if len(insight) > 10:
insights.append(insight)
# Ensure we have at least some insights
if len(insights) < 3:
sentences = synthesis_response.split('.')
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) > 20 and any(word in sentence.lower() for word in
['important', 'key', 'significant', 'synthesis']):
if len(insights) < 5:
insights.append(sentence)
return insights[:5] # Limit to 5 insights
def _update_performance_metrics(self, result: OrchestrationResult) -> None:
"""Update orchestration performance metrics."""
self._total_orchestrations += 1
if result.success:
self._successful_orchestrations += 1
self._total_processing_time += result.processing_time_seconds