youtube-summarizer/backend/services/template_driven_agent.py

461 lines
19 KiB
Python

"""Template-driven analysis agent that can adapt its behavior based on configurable templates."""
import asyncio
import json
import logging
from typing import Dict, List, Optional, Any, Union
from datetime import datetime
from pydantic import BaseModel, Field
from ..core.exceptions import ServiceError
from ..models.analysis_templates import AnalysisTemplate, TemplateSet, TemplateRegistry
from .deepseek_service import DeepSeekService
from .template_defaults import DEFAULT_REGISTRY
logger = logging.getLogger(__name__)
class TemplateAnalysisRequest(BaseModel):
"""Request for template-driven analysis."""
content: str = Field(..., description="Content to analyze")
template_id: str = Field(..., description="Template to use for analysis")
context: Dict[str, Any] = Field(default_factory=dict, description="Additional context for template variables")
video_id: Optional[str] = Field(None, description="Video ID if analyzing video content")
class TemplateAnalysisResult(BaseModel):
"""Result from template-driven analysis."""
template_id: str
template_name: str
analysis: str
key_insights: List[str]
confidence_score: float
processing_time_seconds: float
context_used: Dict[str, Any]
template_variables: Dict[str, Any]
timestamp: datetime = Field(default_factory=datetime.utcnow)
class TemplateDrivenAgent:
"""Agent that can adapt its analysis behavior based on configurable templates."""
def __init__(
self,
ai_service: Optional[DeepSeekService] = None,
template_registry: Optional[TemplateRegistry] = None
):
"""Initialize the template-driven agent."""
self.ai_service = ai_service or DeepSeekService()
self.template_registry = template_registry or DEFAULT_REGISTRY
# Create separate AI services for parallel processing with different API keys
self._ai_services = self._initialize_multi_key_services()
self._usage_stats: Dict[str, int] = {}
def _initialize_multi_key_services(self) -> Dict[str, DeepSeekService]:
"""Initialize multiple DeepSeek services with different API keys for parallel processing."""
import os
from dotenv import load_dotenv
# Ensure environment variables are loaded
load_dotenv()
services = {}
# Map specific templates to specific API keys for load balancing
template_key_mapping = {
"educational_beginner": os.getenv("DEEPSEEK_API_KEY_1"),
"educational_expert": os.getenv("DEEPSEEK_API_KEY_2"),
"educational_scholarly": os.getenv("DEEPSEEK_API_KEY_3")
}
logger.info(f"Initializing multi-key services...")
logger.info(f"Available API keys: {[k for k, v in template_key_mapping.items() if v]}")
for template_id, api_key in template_key_mapping.items():
if api_key:
try:
services[template_id] = DeepSeekService(api_key=api_key)
logger.info(f"Initialized dedicated AI service for {template_id} with key: {api_key[:10]}...")
except Exception as e:
logger.warning(f"Failed to initialize AI service for {template_id}: {e}")
else:
logger.warning(f"No API key found for {template_id}")
logger.info(f"Successfully initialized {len(services)} dedicated AI services")
return services
async def analyze_with_template(
self,
request: TemplateAnalysisRequest
) -> TemplateAnalysisResult:
"""Perform analysis using specified template."""
start_time = datetime.utcnow()
# Get template
template = self.template_registry.get_template(request.template_id)
if not template:
raise ServiceError(f"Template not found: {request.template_id}")
if not template.is_active:
raise ServiceError(f"Template is inactive: {request.template_id}")
try:
# Prepare context with content and template variables
analysis_context = {
**template.variables,
**request.context,
"content": request.content,
"video_id": request.video_id or "unknown"
}
# Render the system prompt with context
system_prompt = template.render_prompt(analysis_context)
# Create analysis prompt
analysis_prompt = self._create_analysis_prompt(template, request.content, analysis_context)
# Use dedicated AI service for this template if available, otherwise use default
ai_service = self._ai_services.get(request.template_id, self.ai_service)
# Generate analysis using AI service
ai_response = await ai_service.generate_response(
prompt=analysis_prompt,
system_prompt=system_prompt,
max_tokens=2000,
temperature=0.7
)
# Parse the response to extract insights
key_insights = self._extract_insights(ai_response, template)
# Calculate processing time
processing_time = (datetime.utcnow() - start_time).total_seconds()
# Update usage statistics
self._update_usage_stats(request.template_id)
# Calculate confidence score based on response quality
confidence_score = self._calculate_confidence_score(ai_response, template)
return TemplateAnalysisResult(
template_id=template.id,
template_name=template.name,
analysis=ai_response,
key_insights=key_insights,
confidence_score=confidence_score,
processing_time_seconds=processing_time,
context_used=analysis_context,
template_variables=template.variables
)
except Exception as e:
import traceback
logger.error(f"Error in template analysis {request.template_id}: {e}")
logger.error(f"Full traceback for {request.template_id}: {traceback.format_exc()}")
raise ServiceError(f"Template analysis failed: {str(e)}")
async def analyze_with_template_set(
self,
content: str,
template_set_id: str,
context: Dict[str, Any] = None,
video_id: Optional[str] = None
) -> Dict[str, TemplateAnalysisResult]:
"""Analyze content using all templates in a template set."""
template_set = self.template_registry.get_template_set(template_set_id)
if not template_set:
raise ServiceError(f"Template set not found: {template_set_id}")
context = context or {}
results = {}
if template_set.parallel_execution:
# Run templates in parallel
tasks = []
for template_id, template in template_set.templates.items():
if template.is_active:
request = TemplateAnalysisRequest(
content=content,
template_id=template.id,
context=context,
video_id=video_id
)
tasks.append(self.analyze_with_template(request))
# Execute all templates in parallel
parallel_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
template_ids = [t.id for t in template_set.templates.values() if t.is_active]
for i, result in enumerate(parallel_results):
if isinstance(result, Exception):
logger.error(f"Template {template_ids[i]} failed with exception: {type(result).__name__}: {str(result)}")
logger.error(f"Full error details for {template_ids[i]}: {result}")
else:
results[template_ids[i]] = result
else:
# Run templates sequentially
execution_order = template_set.execution_order or list(template_set.templates.keys())
for template_id in execution_order:
template = template_set.templates.get(template_id)
if template and template.is_active:
try:
request = TemplateAnalysisRequest(
content=content,
template_id=template.id,
context=context,
video_id=video_id
)
result = await self.analyze_with_template(request)
results[template_id] = result
except Exception as e:
logger.error(f"Template {template_id} failed: {e}")
return results
async def synthesize_results(
self,
results: Dict[str, TemplateAnalysisResult],
template_set_id: str,
context: Dict[str, Any] = None
) -> Optional[TemplateAnalysisResult]:
"""Synthesize results from multiple template analyses."""
template_set = self.template_registry.get_template_set(template_set_id)
if not template_set or not template_set.synthesis_template:
logger.warning(f"No synthesis template found for template set: {template_set_id}")
return None
logger.info(f"Starting synthesis for {len(results)} results using template: {template_set.synthesis_template.id}")
# Prepare synthesis context
synthesis_context = context or {}
for result_id, result in results.items():
synthesis_context[f"{result_id}_analysis"] = result.analysis
synthesis_context[f"{result_id}_insights"] = result.key_insights
# Perform synthesis with dedicated timeout and original API key
start_time = datetime.utcnow()
try:
# Use the original AI service (not multi-key services) for synthesis to ensure proper timeout
template = template_set.synthesis_template
# Prepare context with content and template variables
analysis_context = {
**template.variables,
**synthesis_context,
"content": "", # Synthesis works with previous results
"video_id": synthesis_context.get("video_id", "unknown")
}
# Render the system prompt with context
system_prompt = template.render_prompt(analysis_context)
# Create analysis prompt for synthesis
synthesis_prompt = self._create_synthesis_prompt(template, results, analysis_context)
logger.info(f"Synthesis using original AI service with full 180s timeout")
# Generate synthesis using original AI service (with 180s timeout)
ai_response = await self.ai_service.generate_response(
prompt=synthesis_prompt,
system_prompt=system_prompt,
max_tokens=2500, # Longer for synthesis
temperature=0.7
)
# Parse the response to extract insights
key_insights = self._extract_insights(ai_response, template)
# Calculate processing time
processing_time = (datetime.utcnow() - start_time).total_seconds()
# Update usage statistics
self._update_usage_stats(template.id)
# Calculate confidence score based on response quality
confidence_score = self._calculate_confidence_score(ai_response, template)
logger.info(f"Synthesis completed in {processing_time:.2f}s")
return TemplateAnalysisResult(
template_id=template.id,
template_name=template.name,
analysis=ai_response,
key_insights=key_insights,
confidence_score=confidence_score,
processing_time_seconds=processing_time,
context_used=analysis_context,
template_variables=template.variables
)
except Exception as e:
import traceback
logger.error(f"Synthesis failed after {(datetime.utcnow() - start_time).total_seconds():.2f}s: {e}")
logger.error(f"Full synthesis traceback: {traceback.format_exc()}")
raise ServiceError(f"Synthesis failed: {str(e)}")
def _create_analysis_prompt(
self,
template: AnalysisTemplate,
content: str,
context: Dict[str, Any]
) -> str:
"""Create the analysis prompt for the AI service."""
return f"""
Please analyze the following content using the specified approach:
{content}
Analysis Instructions:
- Follow the output format specified in the template
- Generate between {template.min_insights} and {template.max_insights} key insights
- Target audience: {template.target_audience}
- Tone: {template.tone}
- Depth: {template.depth}
- Focus areas: {', '.join(template.analysis_focus)}
{'Include relevant examples and analogies.' if template.include_examples else ''}
{'Provide actionable recommendations.' if template.include_recommendations else ''}
Expected Output Format:
{template.output_format}
"""
def _create_synthesis_prompt(
self,
template: AnalysisTemplate,
results: Dict[str, TemplateAnalysisResult],
context: Dict[str, Any]
) -> str:
"""Create the synthesis prompt for combining multiple analyses."""
# Build synthesis input from all results
synthesis_input = []
for result_id, result in results.items():
template_name = result.template_name
synthesis_input.append(f"## {template_name} Analysis")
synthesis_input.append(result.analysis)
synthesis_input.append("\n### Key Insights:")
for insight in result.key_insights:
synthesis_input.append(f"- {insight}")
synthesis_input.append("") # Empty line between analyses
return f"""
You are tasked with synthesizing multiple educational perspective analyses into a unified comprehensive understanding.
## Input Analyses
{chr(10).join(synthesis_input)}
## Synthesis Instructions:
- Combine insights from all perspectives into a unified educational journey
- Identify common themes and complementary viewpoints
- Resolve any apparent contradictions by providing nuanced explanations
- Create a progressive learning path from beginner to advanced understanding
- Generate between {template.min_insights} and {template.max_insights} unified insights
- Target audience: {template.target_audience}
- Tone: {template.tone}
- Depth: {template.depth}
- Focus areas: {', '.join(template.analysis_focus)}
{'Include practical examples that bridge different learning levels.' if template.include_examples else ''}
{'Provide actionable learning recommendations.' if template.include_recommendations else ''}
## Expected Output Format:
{template.output_format}
Create a synthesis that honors the unique value of each perspective while creating a cohesive educational experience.
"""
def _extract_insights(self, response: str, template: AnalysisTemplate) -> List[str]:
"""Extract key insights from the AI response."""
insights = []
# Try to parse structured insights from response
lines = response.split('\n')
current_section = ""
for line in lines:
line = line.strip()
if not line:
continue
# Look for insight markers
if line.startswith('-') or line.startswith('') or line.startswith('*'):
insight = line[1:].strip()
if len(insight) > 10: # Filter out very short items
insights.append(insight)
elif "insights" in line.lower() or "key points" in line.lower():
current_section = "insights"
# If no structured insights found, extract from content
if not insights:
# Simple extraction: look for sentences that seem insightful
sentences = response.split('.')
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) > 20 and any(keyword in sentence.lower() for keyword in
['important', 'key', 'significant', 'notable', 'crucial', 'essential']):
insights.append(sentence)
# Ensure we have the right number of insights
if len(insights) < template.min_insights:
# Pad with generic insights if needed
while len(insights) < template.min_insights:
insights.append(f"Additional insight {len(insights) + 1} from analysis")
if len(insights) > template.max_insights:
insights = insights[:template.max_insights]
return insights
def _calculate_confidence_score(self, response: str, template: AnalysisTemplate) -> float:
"""Calculate confidence score based on response quality."""
score = 0.0
# Length score (20%)
if len(response) > 200:
score += 0.2
elif len(response) > 100:
score += 0.1
# Structure score (30%)
if "##" in response or "**" in response: # Has formatting
score += 0.15
if any(marker in response for marker in ['-', '', '*', '1.']): # Has lists
score += 0.15
# Content quality score (30%)
focus_matches = sum(1 for focus in template.analysis_focus
if any(word.lower() in response.lower()
for word in focus.split()))
score += min(0.3, focus_matches * 0.1)
# Completeness score (20%)
expected_sections = template.output_format.count('##')
actual_sections = response.count('##')
if expected_sections > 0:
completeness = min(1.0, actual_sections / expected_sections)
score += completeness * 0.2
else:
score += 0.2 # Default if no specific structure expected
return min(1.0, score)
def _update_usage_stats(self, template_id: str) -> None:
"""Update usage statistics for templates."""
self._usage_stats[template_id] = self._usage_stats.get(template_id, 0) + 1
def get_usage_stats(self) -> Dict[str, int]:
"""Get template usage statistics."""
return self._usage_stats.copy()
def get_available_templates(self) -> List[AnalysisTemplate]:
"""Get list of available templates."""
return [t for t in self.template_registry.templates.values() if t.is_active]
def get_available_template_sets(self) -> List[TemplateSet]:
"""Get list of available template sets."""
return [ts for ts in self.template_registry.template_sets.values() if ts.is_active]