"""Local base agent implementation for YouTube Summarizer unified analysis system.""" from typing import Dict, List, Any, Optional from datetime import datetime from pydantic import BaseModel from enum import Enum import uuid import logging logger = logging.getLogger(__name__) class AgentStatus(str, Enum): """Agent status states.""" INITIALIZING = "initializing" READY = "ready" BUSY = "busy" ERROR = "error" SHUTDOWN = "shutdown" class AgentMetadata(BaseModel): """Agent metadata information.""" agent_id: str name: str description: str version: str = "1.0.0" capabilities: List[str] = [] created_at: datetime = None def __init__(self, **data): if 'created_at' not in data: data['created_at'] = datetime.utcnow() super().__init__(**data) class AgentConfig(BaseModel): """Agent configuration settings.""" max_concurrent_tasks: int = 1 timeout_seconds: int = 300 retry_attempts: int = 3 enable_logging: bool = True custom_settings: Dict[str, Any] = {} class AgentState(BaseModel): """Agent runtime state.""" status: AgentStatus = AgentStatus.INITIALIZING current_task: Optional[str] = None active_tasks: List[str] = [] completed_tasks: int = 0 error_count: int = 0 last_activity: datetime = None performance_metrics: Dict[str, Any] = {} def __init__(self, **data): if 'last_activity' not in data: data['last_activity'] = datetime.utcnow() super().__init__(**data) class AgentContext(BaseModel): """Context for agent task execution.""" task_id: str request_data: Dict[str, Any] = {} user_context: Dict[str, Any] = {} execution_context: Dict[str, Any] = {} started_at: datetime = None def __init__(self, **data): if 'started_at' not in data: data['started_at'] = datetime.utcnow() super().__init__(**data) class TaskResult(BaseModel): """Result of agent task execution.""" task_id: str success: bool = True result: Dict[str, Any] = {} error: Optional[str] = None processing_time_seconds: float = 0.0 metadata: Dict[str, Any] = {} completed_at: datetime = None def __init__(self, **data): if 'completed_at' not in data: data['completed_at'] = datetime.utcnow() super().__init__(**data) class BaseAgent: """ Base agent class providing core functionality for agent implementations. This is a simplified local implementation that provides the same interface as the AI Assistant Library BaseAgent but without external dependencies. """ def __init__(self, metadata: AgentMetadata, config: Optional[AgentConfig] = None): """ Initialize the base agent. Args: metadata: Agent metadata information config: Optional agent configuration """ self.metadata = metadata self.config = config or AgentConfig() self.state = AgentState() # Initialize logger self.logger = logging.getLogger(f"agent.{self.metadata.agent_id}") if self.config.enable_logging: self.logger.setLevel(logging.INFO) # Set status to ready after initialization self.state.status = AgentStatus.READY self.logger.info(f"Agent {self.metadata.name} initialized successfully") async def execute_task(self, context: AgentContext) -> TaskResult: """ Execute a task with the given context. Args: context: Task execution context Returns: TaskResult: Result of task execution """ start_time = datetime.utcnow() self.state.current_task = context.task_id self.state.status = AgentStatus.BUSY self.state.last_activity = start_time try: # Add to active tasks if context.task_id not in self.state.active_tasks: self.state.active_tasks.append(context.task_id) # Call the implementation-specific execution logic result = await self._execute_task_impl(context) # Update state on success self.state.completed_tasks += 1 self.state.status = AgentStatus.READY self.state.current_task = None # Remove from active tasks if context.task_id in self.state.active_tasks: self.state.active_tasks.remove(context.task_id) # Calculate processing time end_time = datetime.utcnow() processing_time = (end_time - start_time).total_seconds() return TaskResult( task_id=context.task_id, success=True, result=result, processing_time_seconds=processing_time, completed_at=end_time ) except Exception as e: # Update state on error self.state.error_count += 1 self.state.status = AgentStatus.ERROR self.state.current_task = None # Remove from active tasks if context.task_id in self.state.active_tasks: self.state.active_tasks.remove(context.task_id) end_time = datetime.utcnow() processing_time = (end_time - start_time).total_seconds() self.logger.error(f"Task {context.task_id} failed: {e}") return TaskResult( task_id=context.task_id, success=False, error=str(e), processing_time_seconds=processing_time, completed_at=end_time ) async def _execute_task_impl(self, context: AgentContext) -> Dict[str, Any]: """ Implementation-specific task execution logic. Must be overridden by subclasses. Args: context: Task execution context Returns: Dict containing task results """ raise NotImplementedError("Subclasses must implement _execute_task_impl") def get_status(self) -> AgentState: """Get current agent status.""" self.state.last_activity = datetime.utcnow() return self.state def get_metadata(self) -> AgentMetadata: """Get agent metadata.""" return self.metadata def get_config(self) -> AgentConfig: """Get agent configuration.""" return self.config def get_capabilities(self) -> List[str]: """Get agent capabilities.""" return self.metadata.capabilities def is_available(self) -> bool: """Check if agent is available for new tasks.""" return ( self.state.status == AgentStatus.READY and len(self.state.active_tasks) < self.config.max_concurrent_tasks ) def get_performance_metrics(self) -> Dict[str, Any]: """Get agent performance metrics.""" return { **self.state.performance_metrics, "completed_tasks": self.state.completed_tasks, "error_count": self.state.error_count, "error_rate": self.state.error_count / max(1, self.state.completed_tasks + self.state.error_count), "active_tasks": len(self.state.active_tasks), "status": self.state.status, "uptime_seconds": (datetime.utcnow() - self.metadata.created_at).total_seconds() } async def shutdown(self): """Gracefully shutdown the agent.""" self.state.status = AgentStatus.SHUTDOWN self.logger.info(f"Agent {self.metadata.name} shutdown") def generate_task_id() -> str: """Generate a unique task ID.""" return str(uuid.uuid4())