""" Agent Framework Integration for YouTube Summarizer Provides compatibility with multiple agent frameworks and orchestration systems """ import asyncio import json import logging from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Union, Callable from datetime import datetime from dataclasses import dataclass from enum import Enum # Framework-specific imports with graceful fallbacks try: # LangChain imports from langchain.agents import AgentExecutor, create_react_agent from langchain.schema import Document from langchain.memory import ConversationBufferMemory LANGCHAIN_AVAILABLE = True except ImportError: LANGCHAIN_AVAILABLE = False try: # CrewAI imports from crewai import Agent, Task, Crew CREWAI_AVAILABLE = True except ImportError: CREWAI_AVAILABLE = False try: # AutoGen imports from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager AUTOGEN_AVAILABLE = True except ImportError: AUTOGEN_AVAILABLE = False # Backend service imports try: from ..services.dual_transcript_service import DualTranscriptService from ..services.summary_pipeline import SummaryPipeline from ..services.batch_processing_service import BatchProcessingService from .langchain_tools import get_youtube_langchain_tools BACKEND_SERVICES_AVAILABLE = True except ImportError: BACKEND_SERVICES_AVAILABLE = False logger = logging.getLogger(__name__) class FrameworkType(Enum): """Supported agent frameworks""" LANGCHAIN = "langchain" CREWAI = "crewai" AUTOGEN = "autogen" CUSTOM = "custom" @dataclass class AgentCapabilities: """Define agent capabilities and requirements""" can_extract_transcripts: bool = True can_summarize_videos: bool = True can_batch_process: bool = True can_search_content: bool = True requires_async: bool = True max_concurrent_videos: int = 5 supported_video_length_minutes: int = 180 @dataclass class AgentContext: """Context information for agent operations""" user_id: Optional[str] = None session_id: Optional[str] = None preferences: Dict[str, Any] = None rate_limits: Dict[str, int] = None cost_budget: Optional[float] = None class BaseYouTubeAgent(ABC): """Abstract base class for YouTube summarizer agents""" def __init__(self, framework_type: FrameworkType, capabilities: AgentCapabilities = None): self.framework_type = framework_type self.capabilities = capabilities or AgentCapabilities() self.context = AgentContext() self._initialize_services() def _initialize_services(self): """Initialize backend services""" if BACKEND_SERVICES_AVAILABLE: try: self.transcript_service = DualTranscriptService() self.batch_service = BatchProcessingService() # Summary pipeline requires dependency injection in real implementation self.pipeline_service = None except Exception as e: logger.warning(f"Could not initialize services: {e}") self.transcript_service = None self.batch_service = None self.pipeline_service = None else: self.transcript_service = None self.batch_service = None self.pipeline_service = None @abstractmethod async def process_video(self, video_url: str, task_type: str = "summarize", **kwargs) -> Dict[str, Any]: """Process a single video""" pass @abstractmethod async def process_batch(self, video_urls: List[str], task_type: str = "summarize", **kwargs) -> Dict[str, Any]: """Process multiple videos in batch""" pass @abstractmethod def set_context(self, context: AgentContext): """Set agent context and preferences""" pass class LangChainYouTubeAgent(BaseYouTubeAgent): """LangChain-compatible YouTube agent""" def __init__(self, llm=None, tools=None, memory=None): super().__init__(FrameworkType.LANGCHAIN) self.llm = llm self.tools = tools or (get_youtube_langchain_tools() if LANGCHAIN_AVAILABLE else []) self.memory = memory or (ConversationBufferMemory(memory_key="chat_history") if LANGCHAIN_AVAILABLE else None) self.agent_executor = None if LANGCHAIN_AVAILABLE and self.llm: self._create_agent_executor() def _create_agent_executor(self): """Create LangChain agent executor""" try: if LANGCHAIN_AVAILABLE: agent = create_react_agent( llm=self.llm, tools=self.tools, prompt=self._get_agent_prompt() ) self.agent_executor = AgentExecutor( agent=agent, tools=self.tools, memory=self.memory, verbose=True, max_iterations=5 ) except Exception as e: logger.error(f"Failed to create LangChain agent: {e}") def _get_agent_prompt(self): """Get agent prompt template""" return """You are a YouTube video processing assistant with advanced capabilities. You have access to the following tools: - youtube_transcript: Extract transcripts from YouTube videos - youtube_summarize: Generate AI summaries of videos - youtube_batch: Process multiple videos in batch - youtube_search: Search processed videos and summaries Always use the appropriate tool for the user's request and provide comprehensive, well-structured responses. {tools} Use the following format: Question: the input question you must answer Thought: you should always think about what to do Action: the action to take, should be one of [{tool_names}] Action Input: the input to the action Observation: the result of the action ... (this Thought/Action/Action Input/Observation can repeat N times) Thought: I now know the final answer Final Answer: the final answer to the original input question Begin! Question: {input} Thought:{agent_scratchpad}""" async def process_video(self, video_url: str, task_type: str = "summarize", **kwargs) -> Dict[str, Any]: """Process a single video using LangChain agent""" try: if self.agent_executor: query = self._build_query(video_url, task_type, **kwargs) result = await self.agent_executor.ainvoke({"input": query}) return { "success": True, "result": result.get("output", ""), "agent_type": "langchain", "task_type": task_type } else: # Fallback to direct tool usage return await self._direct_tool_process(video_url, task_type, **kwargs) except Exception as e: logger.error(f"LangChain agent processing error: {e}") return {"success": False, "error": str(e)} async def process_batch(self, video_urls: List[str], task_type: str = "summarize", **kwargs) -> Dict[str, Any]: """Process multiple videos using batch tool""" try: if self.tools and len(self.tools) > 2: # Assuming batch tool is third batch_tool = self.tools[2] # YouTubeBatchTool result = await batch_tool._arun( video_urls=video_urls, processing_type=task_type, **kwargs ) return { "success": True, "result": result, "agent_type": "langchain", "task_type": "batch" } else: return {"success": False, "error": "Batch tool not available"} except Exception as e: logger.error(f"LangChain batch processing error: {e}") return {"success": False, "error": str(e)} def _build_query(self, video_url: str, task_type: str, **kwargs) -> str: """Build query for LangChain agent""" if task_type == "transcribe": source = kwargs.get("source", "youtube") return f"Extract transcript from {video_url} using {source} method" elif task_type == "summarize": summary_type = kwargs.get("summary_type", "comprehensive") return f"Create a {summary_type} summary of the YouTube video at {video_url}" else: return f"Process YouTube video {video_url} for task: {task_type}" async def _direct_tool_process(self, video_url: str, task_type: str, **kwargs) -> Dict[str, Any]: """Direct tool processing fallback""" try: if task_type == "transcribe" and self.tools: tool = self.tools[0] # YouTubeTranscriptTool result = await tool._arun(video_url=video_url, **kwargs) elif task_type == "summarize" and len(self.tools) > 1: tool = self.tools[1] # YouTubeSummarizationTool result = await tool._arun(video_url=video_url, **kwargs) else: result = json.dumps({"error": "Tool not available"}) return { "success": True, "result": result, "method": "direct_tool" } except Exception as e: return {"success": False, "error": str(e)} def set_context(self, context: AgentContext): """Set agent context""" self.context = context class CrewAIYouTubeAgent(BaseYouTubeAgent): """CrewAI-compatible YouTube agent""" def __init__(self, role="YouTube Specialist", goal="Process YouTube videos efficiently", backstory="Expert in video content analysis"): super().__init__(FrameworkType.CREWAI) self.role = role self.goal = goal self.backstory = backstory self.crew_agent = None if CREWAI_AVAILABLE: self._create_crew_agent() def _create_crew_agent(self): """Create CrewAI agent""" try: if CREWAI_AVAILABLE: self.crew_agent = Agent( role=self.role, goal=self.goal, backstory=self.backstory, verbose=True, allow_delegation=False, tools=self._get_crew_tools() ) except Exception as e: logger.error(f"Failed to create CrewAI agent: {e}") def _get_crew_tools(self): """Get tools adapted for CrewAI""" # CrewAI tools would need to be adapted from LangChain tools # This is a simplified representation return [] async def process_video(self, video_url: str, task_type: str = "summarize", **kwargs) -> Dict[str, Any]: """Process video using CrewAI""" try: if CREWAI_AVAILABLE and self.crew_agent: # Create a task for the agent task_description = self._build_task_description(video_url, task_type, **kwargs) task = Task( description=task_description, agent=self.crew_agent, expected_output="Comprehensive video processing results" ) crew = Crew( agents=[self.crew_agent], tasks=[task], verbose=True ) # Execute the crew result = crew.kickoff() return { "success": True, "result": str(result), "agent_type": "crewai", "task_type": task_type } else: return await self._mock_crew_process(video_url, task_type, **kwargs) except Exception as e: logger.error(f"CrewAI processing error: {e}") return {"success": False, "error": str(e)} async def process_batch(self, video_urls: List[str], task_type: str = "summarize", **kwargs) -> Dict[str, Any]: """Process batch using CrewAI crew""" try: # Create individual tasks for each video tasks = [] for video_url in video_urls: task_description = self._build_task_description(video_url, task_type, **kwargs) task = Task( description=task_description, agent=self.crew_agent, expected_output=f"Processing results for {video_url}" ) tasks.append(task) if CREWAI_AVAILABLE and self.crew_agent: crew = Crew( agents=[self.crew_agent], tasks=tasks, verbose=True ) result = crew.kickoff() return { "success": True, "result": str(result), "agent_type": "crewai", "task_type": "batch", "video_count": len(video_urls) } else: return await self._mock_crew_batch_process(video_urls, task_type, **kwargs) except Exception as e: logger.error(f"CrewAI batch processing error: {e}") return {"success": False, "error": str(e)} def _build_task_description(self, video_url: str, task_type: str, **kwargs) -> str: """Build task description for CrewAI""" if task_type == "transcribe": return f"Extract and provide a comprehensive transcript from the YouTube video: {video_url}. Focus on accuracy and readability." elif task_type == "summarize": summary_type = kwargs.get("summary_type", "comprehensive") return f"Analyze and create a {summary_type} summary of the YouTube video: {video_url}. Include key points, insights, and actionable information." else: return f"Process the YouTube video {video_url} according to the task requirements: {task_type}" async def _mock_crew_process(self, video_url: str, task_type: str, **kwargs) -> Dict[str, Any]: """Mock CrewAI processing""" return { "success": True, "result": f"Mock CrewAI processing for {video_url} - {task_type}", "agent_type": "crewai", "mock": True } async def _mock_crew_batch_process(self, video_urls: List[str], task_type: str, **kwargs) -> Dict[str, Any]: """Mock CrewAI batch processing""" return { "success": True, "result": f"Mock CrewAI batch processing for {len(video_urls)} videos - {task_type}", "agent_type": "crewai", "mock": True } def set_context(self, context: AgentContext): """Set agent context""" self.context = context class AutoGenYouTubeAgent(BaseYouTubeAgent): """AutoGen-compatible YouTube agent""" def __init__(self, name="YouTubeAgent", system_message="You are an expert YouTube video processor."): super().__init__(FrameworkType.AUTOGEN) self.name = name self.system_message = system_message self.autogen_agent = None if AUTOGEN_AVAILABLE: self._create_autogen_agent() def _create_autogen_agent(self): """Create AutoGen assistant""" try: if AUTOGEN_AVAILABLE: self.autogen_agent = AssistantAgent( name=self.name, system_message=self.system_message, llm_config={ "timeout": 60, "cache_seed": 42, "temperature": 0, } ) except Exception as e: logger.error(f"Failed to create AutoGen agent: {e}") async def process_video(self, video_url: str, task_type: str = "summarize", **kwargs) -> Dict[str, Any]: """Process video using AutoGen""" try: if AUTOGEN_AVAILABLE and self.autogen_agent: # Create user proxy for interaction user_proxy = UserProxyAgent( name="user_proxy", human_input_mode="NEVER", max_consecutive_auto_reply=1, code_execution_config=False, ) # Create message for processing message = self._build_autogen_message(video_url, task_type, **kwargs) # Simulate conversation chat_result = user_proxy.initiate_chat( self.autogen_agent, message=message, silent=True ) return { "success": True, "result": chat_result.summary if hasattr(chat_result, 'summary') else str(chat_result), "agent_type": "autogen", "task_type": task_type } else: return await self._mock_autogen_process(video_url, task_type, **kwargs) except Exception as e: logger.error(f"AutoGen processing error: {e}") return {"success": False, "error": str(e)} async def process_batch(self, video_urls: List[str], task_type: str = "summarize", **kwargs) -> Dict[str, Any]: """Process batch using AutoGen group chat""" try: if AUTOGEN_AVAILABLE and self.autogen_agent: # Create multiple agents for batch processing agents = [self.autogen_agent] # Create group chat groupchat = GroupChat(agents=agents, messages=[], max_round=len(video_urls)) manager = GroupChatManager(groupchat=groupchat) # Process each video results = [] for video_url in video_urls: message = self._build_autogen_message(video_url, task_type, **kwargs) result = manager.generate_reply([{"content": message, "role": "user"}]) results.append(result) return { "success": True, "results": results, "agent_type": "autogen", "task_type": "batch", "video_count": len(video_urls) } else: return await self._mock_autogen_batch_process(video_urls, task_type, **kwargs) except Exception as e: logger.error(f"AutoGen batch processing error: {e}") return {"success": False, "error": str(e)} def _build_autogen_message(self, video_url: str, task_type: str, **kwargs) -> str: """Build message for AutoGen agent""" if task_type == "transcribe": return f"Please extract the transcript from this YouTube video: {video_url}. Use the most appropriate method for high quality results." elif task_type == "summarize": summary_type = kwargs.get("summary_type", "comprehensive") return f"Please analyze and create a {summary_type} summary of this YouTube video: {video_url}. Include key insights and actionable points." else: return f"Please process this YouTube video according to the task '{task_type}': {video_url}" async def _mock_autogen_process(self, video_url: str, task_type: str, **kwargs) -> Dict[str, Any]: """Mock AutoGen processing""" return { "success": True, "result": f"Mock AutoGen processing for {video_url} - {task_type}", "agent_type": "autogen", "mock": True } async def _mock_autogen_batch_process(self, video_urls: List[str], task_type: str, **kwargs) -> Dict[str, Any]: """Mock AutoGen batch processing""" return { "success": True, "result": f"Mock AutoGen batch processing for {len(video_urls)} videos - {task_type}", "agent_type": "autogen", "mock": True } def set_context(self, context: AgentContext): """Set agent context""" self.context = context class AgentFactory: """Factory for creating framework-specific agents""" @staticmethod def create_agent(framework: FrameworkType, **kwargs) -> BaseYouTubeAgent: """Create agent for specified framework""" if framework == FrameworkType.LANGCHAIN: return LangChainYouTubeAgent(**kwargs) elif framework == FrameworkType.CREWAI: return CrewAIYouTubeAgent(**kwargs) elif framework == FrameworkType.AUTOGEN: return AutoGenYouTubeAgent(**kwargs) else: raise ValueError(f"Unsupported framework: {framework}") @staticmethod def get_available_frameworks() -> List[FrameworkType]: """Get list of available frameworks""" available = [] if LANGCHAIN_AVAILABLE: available.append(FrameworkType.LANGCHAIN) if CREWAI_AVAILABLE: available.append(FrameworkType.CREWAI) if AUTOGEN_AVAILABLE: available.append(FrameworkType.AUTOGEN) return available class AgentOrchestrator: """Orchestrate multiple agents across different frameworks""" def __init__(self): self.agents: Dict[FrameworkType, BaseYouTubeAgent] = {} self.default_framework = FrameworkType.LANGCHAIN def register_agent(self, framework: FrameworkType, agent: BaseYouTubeAgent): """Register an agent for a framework""" self.agents[framework] = agent def set_default_framework(self, framework: FrameworkType): """Set default framework for operations""" if framework in self.agents: self.default_framework = framework else: raise ValueError(f"Framework {framework} not registered") async def process_video(self, video_url: str, framework: FrameworkType = None, **kwargs) -> Dict[str, Any]: """Process video using specified or default framework""" framework = framework or self.default_framework if framework not in self.agents: return {"success": False, "error": f"Framework {framework} not available"} agent = self.agents[framework] return await agent.process_video(video_url, **kwargs) async def process_batch(self, video_urls: List[str], framework: FrameworkType = None, **kwargs) -> Dict[str, Any]: """Process batch using specified or default framework""" framework = framework or self.default_framework if framework not in self.agents: return {"success": False, "error": f"Framework {framework} not available"} agent = self.agents[framework] return await agent.process_batch(video_urls, **kwargs) async def compare_frameworks(self, video_url: str, task_type: str = "summarize") -> Dict[str, Any]: """Compare results across all available frameworks""" results = {} for framework, agent in self.agents.items(): try: result = await agent.process_video(video_url, task_type) results[framework.value] = result except Exception as e: results[framework.value] = {"success": False, "error": str(e)} return { "video_url": video_url, "task_type": task_type, "framework_results": results, "comparison_timestamp": datetime.now().isoformat() } def get_capabilities_summary(self) -> Dict[str, Any]: """Get summary of all registered agents and their capabilities""" summary = { "registered_frameworks": list(self.agents.keys()), "default_framework": self.default_framework, "total_agents": len(self.agents), "available_frameworks": AgentFactory.get_available_frameworks(), "agent_details": {} } for framework, agent in self.agents.items(): summary["agent_details"][framework.value] = { "capabilities": agent.capabilities.__dict__, "context": agent.context.__dict__ if agent.context else None } return summary # Convenience functions for easy integration def create_youtube_agent_orchestrator() -> AgentOrchestrator: """Create fully configured agent orchestrator""" orchestrator = AgentOrchestrator() # Register available agents available_frameworks = AgentFactory.get_available_frameworks() for framework in available_frameworks: try: agent = AgentFactory.create_agent(framework) orchestrator.register_agent(framework, agent) except Exception as e: logger.warning(f"Failed to create {framework} agent: {e}") # Set default to most capable available framework if FrameworkType.LANGCHAIN in available_frameworks: orchestrator.set_default_framework(FrameworkType.LANGCHAIN) elif available_frameworks: orchestrator.set_default_framework(available_frameworks[0]) return orchestrator async def quick_process_video(video_url: str, task_type: str = "summarize", framework: str = "langchain") -> Dict[str, Any]: """Quick video processing with automatic framework selection""" try: framework_enum = FrameworkType(framework.lower()) agent = AgentFactory.create_agent(framework_enum) return await agent.process_video(video_url, task_type) except Exception as e: return {"success": False, "error": str(e)} # Example usage if __name__ == "__main__": async def example_usage(): # Create orchestrator orchestrator = create_youtube_agent_orchestrator() # Process a video result = await orchestrator.process_video( "https://youtube.com/watch?v=dQw4w9WgXcQ", task_type="summarize" ) print(f"Processing result: {result}") # Compare frameworks comparison = await orchestrator.compare_frameworks( "https://youtube.com/watch?v=dQw4w9WgXcQ" ) print(f"Framework comparison: {comparison}") # Run example # asyncio.run(example_usage())