"""DeepSeek AI service for YouTube video summarization.""" import asyncio import logging from typing import Optional, Dict, Any, List from datetime import datetime import json import aiohttp from ..core.exceptions import ServiceError logger = logging.getLogger(__name__) class DeepSeekService: """Service for interacting with DeepSeek AI API.""" def __init__(self, api_key: Optional[str] = None, base_url: str = "https://api.deepseek.com"): """Initialize DeepSeek service. Args: api_key: DeepSeek API key (will try to get from settings if not provided) base_url: DeepSeek API base URL """ from ..core.config import settings self.api_key = api_key or settings.DEEPSEEK_API_KEY if not self.api_key: raise ServiceError("DEEPSEEK_API_KEY not found in environment variables") self.base_url = base_url.rstrip("/") self.session: Optional[aiohttp.ClientSession] = None # Default model configuration self.default_model = "deepseek-chat" self.default_temperature = 0.7 self.default_max_tokens = 1500 logger.info("DeepSeek service initialized") async def __aenter__(self): """Async context manager entry.""" await self._ensure_session() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self.close() async def _ensure_session(self): """Ensure aiohttp session is created.""" if self.session is None or self.session.closed: timeout = aiohttp.ClientTimeout(total=60) # 60 second timeout self.session = aiohttp.ClientSession( timeout=timeout, headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } ) async def close(self): """Close the HTTP session.""" if self.session and not self.session.closed: await self.session.close() self.session = None async def generate_response( self, prompt: str, model: Optional[str] = None, temperature: Optional[float] = None, max_tokens: Optional[int] = None, system_prompt: Optional[str] = None ) -> str: """Generate a response using DeepSeek API. Args: prompt: User prompt model: Model to use (defaults to deepseek-chat) temperature: Sampling temperature (0.0 to 1.0) max_tokens: Maximum tokens to generate system_prompt: Optional system prompt Returns: Generated response text """ await self._ensure_session() # Build messages array messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": prompt}) # Request payload payload = { "model": model or self.default_model, "messages": messages, "temperature": temperature if temperature is not None else self.default_temperature, "max_tokens": max_tokens or self.default_max_tokens, "stream": False } try: logger.debug(f"Making DeepSeek API request with model {payload['model']}") async with self.session.post(f"{self.base_url}/v1/chat/completions", json=payload) as response: if response.status == 200: data = await response.json() # Extract response content if "choices" in data and len(data["choices"]) > 0: content = data["choices"][0]["message"]["content"] logger.debug(f"DeepSeek API response received ({len(content)} characters)") return content else: raise ServiceError("Invalid response format from DeepSeek API") elif response.status == 429: # Rate limit exceeded error_data = await response.text() logger.warning(f"DeepSeek rate limit exceeded: {error_data}") raise ServiceError("Rate limit exceeded. Please try again later.") elif response.status == 401: raise ServiceError("Invalid API key for DeepSeek API") else: error_data = await response.text() logger.error(f"DeepSeek API error {response.status}: {error_data}") raise ServiceError(f"DeepSeek API error: {response.status}") except aiohttp.ClientError as e: logger.error(f"Network error calling DeepSeek API: {e}") raise ServiceError(f"Network error: {str(e)}") except json.JSONDecodeError as e: logger.error(f"Invalid JSON response from DeepSeek API: {e}") raise ServiceError("Invalid response format from DeepSeek API") async def summarize_transcript( self, transcript: str, video_title: str = "", summary_length: str = "standard", focus_areas: Optional[List[str]] = None ) -> Dict[str, Any]: """Summarize a video transcript using DeepSeek. Args: transcript: Video transcript text video_title: Video title for context summary_length: "brief", "standard", or "detailed" focus_areas: List of specific areas to focus on Returns: Structured summary data """ if not transcript or len(transcript.strip()) < 50: raise ServiceError("Transcript is too short for summarization") # Determine summary configuration length_configs = { "brief": {"max_tokens": 800, "target_length": "2-3 paragraphs"}, "standard": {"max_tokens": 1200, "target_length": "4-5 paragraphs"}, "detailed": {"max_tokens": 1800, "target_length": "6-8 paragraphs"} } config = length_configs.get(summary_length, length_configs["standard"]) focus_text = f"Focus particularly on: {', '.join(focus_areas)}" if focus_areas else "" system_prompt = f"""You are an expert at creating comprehensive video summaries. Create a {config['target_length']} summary that captures the key concepts, main points, and actionable insights from the video content. {focus_text} Structure your response as valid JSON with this format: {{ "summary": "Main summary text in {config['target_length']}", "key_points": ["List of 5-8 key takeaways and important points"], "main_topics": ["List of primary topics covered"], "actionable_insights": ["List of 3-5 actionable insights or recommendations"], "sentiment": "positive/neutral/negative overall tone", "complexity_level": "beginner/intermediate/advanced", "estimated_reading_time": "X minutes" }}""" # Build the prompt with context context = f"Video Title: {video_title}\n\n" if video_title else "" prompt = f"""{context}Please summarize the following video transcript: {transcript[:8000]} # Limit to avoid token limits Provide a comprehensive yet concise summary that would be valuable for someone who hasn't watched the video.""" try: response = await self.generate_response( prompt=prompt, system_prompt=system_prompt, max_tokens=config["max_tokens"], temperature=0.3 # Lower temperature for more consistent summaries ) # Try to parse as JSON try: summary_data = json.loads(response) # Validate required fields required_fields = ["summary", "key_points", "main_topics"] for field in required_fields: if field not in summary_data: summary_data[field] = [] return summary_data except json.JSONDecodeError: # Fallback: return response as plain summary logger.warning("DeepSeek response was not valid JSON, using as plain text") return { "summary": response, "key_points": [], "main_topics": [], "actionable_insights": [], "sentiment": "neutral", "complexity_level": "unknown", "estimated_reading_time": "5 minutes" } except Exception as e: logger.error(f"Error summarizing transcript: {e}") raise ServiceError(f"Failed to generate summary: {str(e)}") async def analyze_content( self, content: str, analysis_type: str = "general", custom_prompt: Optional[str] = None ) -> Dict[str, Any]: """Analyze content with specific focus. Args: content: Content to analyze analysis_type: Type of analysis ("sentiment", "topics", "structure", etc.) custom_prompt: Custom analysis prompt Returns: Analysis results """ if custom_prompt: system_prompt = custom_prompt else: analysis_prompts = { "sentiment": "Analyze the sentiment and emotional tone of this content.", "topics": "Identify and extract the main topics and themes from this content.", "structure": "Analyze the structure and organization of this content.", "technical": "Analyze the technical concepts and complexity of this content.", "business": "Analyze the business implications and value propositions in this content." } system_prompt = analysis_prompts.get(analysis_type, "Provide a comprehensive analysis of this content.") try: response = await self.generate_response( prompt=content[:6000], # Limit content length system_prompt=system_prompt, temperature=0.4, max_tokens=1000 ) return { "analysis_type": analysis_type, "result": response, "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Error analyzing content: {e}") raise ServiceError(f"Content analysis failed: {str(e)}") async def test_connection(self) -> Dict[str, Any]: """Test the connection to DeepSeek API. Returns: Connection test results """ try: start_time = datetime.now() response = await self.generate_response( prompt="Hello, this is a connection test.", max_tokens=50, temperature=0.1 ) response_time = (datetime.now() - start_time).total_seconds() return { "status": "connected", "response_time_seconds": response_time, "api_key_valid": True, "model": self.default_model, "test_response": response[:100] + "..." if len(response) > 100 else response } except ServiceError as e: return { "status": "error", "error": str(e), "api_key_valid": "Invalid API key" not in str(e) } except Exception as e: return { "status": "error", "error": f"Unexpected error: {str(e)}", "api_key_valid": False } async def get_model_info(self) -> Dict[str, Any]: """Get information about available models. Returns: Model information """ return { "current_model": self.default_model, "available_models": [ "deepseek-chat", "deepseek-coder" ], "default_config": { "temperature": self.default_temperature, "max_tokens": self.default_max_tokens }, "api_endpoint": f"{self.base_url}/v1/chat/completions" } def get_cost_estimate(self, input_tokens: int, output_tokens: int) -> Dict[str, Any]: """Estimate costs for DeepSeek API usage. Args: input_tokens: Number of input tokens output_tokens: Number of output tokens Returns: Cost estimation """ # DeepSeek pricing (as of 2024 - verify current rates) input_price_per_1k = 0.0014 # $0.0014 per 1K input tokens output_price_per_1k = 0.0028 # $0.0028 per 1K output tokens input_cost = (input_tokens / 1000) * input_price_per_1k output_cost = (output_tokens / 1000) * output_price_per_1k total_cost = input_cost + output_cost return { "input_tokens": input_tokens, "output_tokens": output_tokens, "input_cost_usd": round(input_cost, 6), "output_cost_usd": round(output_cost, 6), "total_cost_usd": round(total_cost, 6), "model": self.default_model }