"""Template Agent Factory - Dynamic agent creation and registry management.""" import logging import asyncio from typing import Dict, List, Optional, Any, Set, Type from datetime import datetime from ..core.base_agent import BaseAgent, AgentMetadata from ..models.analysis_templates import ( TemplateRegistry, AnalysisTemplate, TemplateSet, TemplateType ) from ..services.deepseek_service import DeepSeekService from .unified_analysis_agent import UnifiedAnalysisAgent, UnifiedAgentConfig logger = logging.getLogger(__name__) class AgentInstance: """Lightweight agent instance for registry tracking.""" def __init__( self, agent: UnifiedAnalysisAgent, template_id: str, created_at: Optional[datetime] = None ): self.agent = agent self.template_id = template_id self.created_at = created_at or datetime.utcnow() self.last_used = created_at or datetime.utcnow() self.usage_count = 0 self.is_active = True def update_usage(self) -> None: """Update usage statistics.""" self.usage_count += 1 self.last_used = datetime.utcnow() def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" return { "agent_id": self.agent.agent_id, "template_id": self.template_id, "template_name": self.agent.template.name, "capabilities": self.agent.get_capabilities(), "created_at": self.created_at.isoformat(), "last_used": self.last_used.isoformat(), "usage_count": self.usage_count, "is_active": self.is_active, "performance_metrics": self.agent.get_performance_metrics() } class TemplateAgentFactory: """ Factory for creating and managing template-driven analysis agents. Features: - Dynamic agent creation from templates - Agent lifecycle management - Capability-based agent discovery - Performance monitoring and optimization - Template-set orchestration support """ def __init__( self, template_registry: TemplateRegistry, ai_service: Optional[DeepSeekService] = None, max_agents_per_template: int = 3, agent_ttl_minutes: int = 60 ): """Initialize the template agent factory. Args: template_registry: Registry containing analysis templates ai_service: AI service for agent creation max_agents_per_template: Maximum agent instances per template agent_ttl_minutes: Agent time-to-live for cleanup """ self.template_registry = template_registry self.ai_service = ai_service or DeepSeekService() self.max_agents_per_template = max_agents_per_template self.agent_ttl_minutes = agent_ttl_minutes # Agent instance registry self._agent_instances: Dict[str, AgentInstance] = {} # agent_id -> AgentInstance self._template_agents: Dict[str, Set[str]] = {} # template_id -> set of agent_ids self._capability_index: Dict[str, Set[str]] = {} # capability -> set of agent_ids # Factory statistics self._created_count = 0 self._destroyed_count = 0 self._factory_start_time = datetime.utcnow() logger.info("TemplateAgentFactory initialized") async def create_agent( self, template_id: str, config: Optional[UnifiedAgentConfig] = None, force_new: bool = False ) -> UnifiedAnalysisAgent: """Create or retrieve an agent for the specified template. Args: template_id: Template identifier config: Optional agent configuration force_new: Force creation of new agent instance Returns: Configured UnifiedAnalysisAgent Raises: ValueError: If template not found or inactive """ template = self.template_registry.get_template(template_id) if not template: raise ValueError(f"Template not found: {template_id}") if not template.is_active: raise ValueError(f"Template is inactive: {template_id}") # Check if we can reuse an existing agent if not force_new: existing_agent = self._get_available_agent(template_id) if existing_agent: logger.debug(f"Reusing existing agent for template {template_id}") return existing_agent # Check agent limits if not force_new and self._template_agents.get(template_id, set()): if len(self._template_agents[template_id]) >= self.max_agents_per_template: # Try to reuse least recently used agent lru_agent = self._get_lru_agent(template_id) if lru_agent: logger.debug(f"Reusing LRU agent for template {template_id}") return lru_agent # Create new agent instance agent = UnifiedAnalysisAgent( template=template, ai_service=self.ai_service, template_registry=self.template_registry, config=config ) # Initialize the agent await agent.initialize() # Register the agent instance await self._register_agent_instance(agent, template_id) self._created_count += 1 logger.info(f"Created new agent for template {template_id}: {agent.agent_id}") return agent async def create_agent_set( self, template_set_id: str, config: Optional[Dict[str, UnifiedAgentConfig]] = None ) -> Dict[str, UnifiedAnalysisAgent]: """Create agents for all templates in a template set. Args: template_set_id: Template set identifier config: Optional per-template agent configurations Returns: Dictionary mapping template_id to agent instances """ template_set = self.template_registry.get_template_set(template_set_id) if not template_set: raise ValueError(f"Template set not found: {template_set_id}") if not template_set.is_active: raise ValueError(f"Template set is inactive: {template_set_id}") agents = {} config = config or {} # Create agents for all templates in the set creation_tasks = [] for template_id in template_set.templates: template_config = config.get(template_id) task = self.create_agent(template_id, template_config) creation_tasks.append((template_id, task)) # Create agents concurrently for template_id, task in creation_tasks: try: agent = await task agents[template_id] = agent except Exception as e: logger.error(f"Failed to create agent for template {template_id}: {e}") # Continue with other agents logger.info(f"Created {len(agents)} agents for template set {template_set_id}") return agents def get_agent_by_id(self, agent_id: str) -> Optional[UnifiedAnalysisAgent]: """Get agent by ID.""" instance = self._agent_instances.get(agent_id) return instance.agent if instance else None def get_agents_by_template(self, template_id: str) -> List[UnifiedAnalysisAgent]: """Get all agents for a specific template.""" agent_ids = self._template_agents.get(template_id, set()) return [ self._agent_instances[aid].agent for aid in agent_ids if aid in self._agent_instances ] def get_agents_by_capability( self, capability: str, limit: int = 5, prefer_idle: bool = True ) -> List[UnifiedAnalysisAgent]: """Get agents that have the specified capability. Args: capability: Required capability limit: Maximum number of agents to return prefer_idle: Prefer agents that haven't been used recently Returns: List of capable agents, sorted by preference """ agent_ids = self._capability_index.get(capability, set()) candidates = [ self._agent_instances[aid] for aid in agent_ids if aid in self._agent_instances and self._agent_instances[aid].is_active ] if not candidates: return [] # Sort by preference criteria def sort_key(instance: AgentInstance): # Prefer less recently used agents time_since_use = (datetime.utcnow() - instance.last_used).total_seconds() return (-time_since_use, instance.usage_count) if prefer_idle: candidates.sort(key=sort_key) return [instance.agent for instance in candidates[:limit]] def find_best_agent_for_task( self, required_capabilities: List[str], template_type: Optional[TemplateType] = None ) -> Optional[UnifiedAnalysisAgent]: """Find the best agent for a task with specific requirements. Args: required_capabilities: List of required capabilities template_type: Preferred template type Returns: Best matching agent or None """ candidates = [] # Get agents with required capabilities capability_agents = set() for capability in required_capabilities: if capability in self._capability_index: if not capability_agents: capability_agents = self._capability_index[capability].copy() else: capability_agents &= self._capability_index[capability] # Filter by template type if specified for agent_id in capability_agents: instance = self._agent_instances.get(agent_id) if instance and instance.is_active: if template_type is None or instance.agent.template.template_type == template_type: candidates.append(instance) if not candidates: return None # Score candidates def score_agent(instance: AgentInstance) -> float: agent = instance.agent score = 0.0 # Capability match completeness (40%) agent_caps = set(agent.get_capabilities()) required_caps = set(required_capabilities) match_ratio = len(agent_caps & required_caps) / len(required_caps) score += match_ratio * 0.4 # Usage-based load balancing (30%) # Prefer less used agents max_usage = max(i.usage_count for i in candidates) if max_usage > 0: usage_score = 1.0 - (instance.usage_count / max_usage) else: usage_score = 1.0 score += usage_score * 0.3 # Performance metrics (30%) metrics = agent.get_performance_metrics() avg_confidence = metrics.get('average_confidence', 0.5) score += avg_confidence * 0.3 return score # Return highest scoring agent best_instance = max(candidates, key=score_agent) return best_instance.agent def get_factory_statistics(self) -> Dict[str, Any]: """Get comprehensive factory statistics.""" active_agents = len([i for i in self._agent_instances.values() if i.is_active]) # Template distribution template_distribution = {} for template_id, agent_ids in self._template_agents.items(): active_count = len([aid for aid in agent_ids if aid in self._agent_instances and self._agent_instances[aid].is_active]) template_distribution[template_id] = active_count # Capability coverage capability_coverage = { cap: len(agents) for cap, agents in self._capability_index.items() } # Usage statistics total_usage = sum(i.usage_count for i in self._agent_instances.values()) avg_usage = total_usage / max(len(self._agent_instances), 1) uptime = (datetime.utcnow() - self._factory_start_time).total_seconds() return { "factory_uptime_seconds": uptime, "total_agents_created": self._created_count, "total_agents_destroyed": self._destroyed_count, "active_agents": active_agents, "total_registered_agents": len(self._agent_instances), "template_distribution": template_distribution, "capability_coverage": capability_coverage, "total_usage_count": total_usage, "average_usage_per_agent": avg_usage, "max_agents_per_template": self.max_agents_per_template, "agent_ttl_minutes": self.agent_ttl_minutes } async def cleanup_stale_agents(self) -> int: """Clean up agents that haven't been used recently. Returns: Number of agents cleaned up """ cutoff_time = datetime.utcnow().timestamp() - (self.agent_ttl_minutes * 60) stale_agents = [] for agent_id, instance in self._agent_instances.items(): if instance.last_used.timestamp() < cutoff_time: stale_agents.append(agent_id) cleanup_count = 0 for agent_id in stale_agents: if await self._deregister_agent_instance(agent_id): cleanup_count += 1 if cleanup_count > 0: logger.info(f"Cleaned up {cleanup_count} stale agents") return cleanup_count async def shutdown_all_agents(self) -> None: """Shutdown all agent instances.""" logger.info("Shutting down all agent instances") shutdown_tasks = [] for instance in self._agent_instances.values(): if instance.is_active: task = instance.agent.shutdown() shutdown_tasks.append(task) # Shutdown all agents concurrently if shutdown_tasks: await asyncio.gather(*shutdown_tasks, return_exceptions=True) # Clear registries self._agent_instances.clear() self._template_agents.clear() self._capability_index.clear() logger.info("All agent instances shut down") # Private helper methods def _get_available_agent(self, template_id: str) -> Optional[UnifiedAnalysisAgent]: """Get an available agent for the template.""" agent_ids = self._template_agents.get(template_id, set()) if not agent_ids: return None # Find least recently used active agent available_instances = [ self._agent_instances[aid] for aid in agent_ids if aid in self._agent_instances and self._agent_instances[aid].is_active ] if not available_instances: return None # Return least recently used agent lru_instance = min(available_instances, key=lambda i: i.last_used) return lru_instance.agent def _get_lru_agent(self, template_id: str) -> Optional[UnifiedAnalysisAgent]: """Get the least recently used agent for the template.""" return self._get_available_agent(template_id) async def _register_agent_instance(self, agent: UnifiedAnalysisAgent, template_id: str) -> None: """Register an agent instance in the factory.""" instance = AgentInstance(agent, template_id) # Add to main registry self._agent_instances[agent.agent_id] = instance # Update template index if template_id not in self._template_agents: self._template_agents[template_id] = set() self._template_agents[template_id].add(agent.agent_id) # Update capability index for capability in agent.get_capabilities(): if capability not in self._capability_index: self._capability_index[capability] = set() self._capability_index[capability].add(agent.agent_id) logger.debug(f"Registered agent instance: {agent.agent_id}") async def _deregister_agent_instance(self, agent_id: str) -> bool: """Deregister an agent instance from the factory.""" instance = self._agent_instances.get(agent_id) if not instance: return False try: # Shutdown the agent if instance.is_active: await instance.agent.shutdown() # Remove from capability index for capability in instance.agent.get_capabilities(): if capability in self._capability_index: self._capability_index[capability].discard(agent_id) if not self._capability_index[capability]: del self._capability_index[capability] # Remove from template index template_id = instance.template_id if template_id in self._template_agents: self._template_agents[template_id].discard(agent_id) if not self._template_agents[template_id]: del self._template_agents[template_id] # Remove from main registry del self._agent_instances[agent_id] self._destroyed_count += 1 logger.debug(f"Deregistered agent instance: {agent_id}") return True except Exception as e: logger.error(f"Error deregistering agent {agent_id}: {e}") return False # Global factory instance for easy access _factory_instance: Optional[TemplateAgentFactory] = None def get_template_agent_factory( template_registry: Optional[TemplateRegistry] = None, ai_service: Optional[DeepSeekService] = None ) -> TemplateAgentFactory: """Get or create the global template agent factory.""" global _factory_instance if _factory_instance is None: from .template_defaults import DEFAULT_REGISTRY registry = template_registry or DEFAULT_REGISTRY _factory_instance = TemplateAgentFactory(registry, ai_service) return _factory_instance async def shutdown_template_agent_factory() -> None: """Shutdown the global template agent factory.""" global _factory_instance if _factory_instance: await _factory_instance.shutdown_all_agents() _factory_instance = None