youtube-summarizer/backend/services/deepseek_service.py

360 lines
14 KiB
Python

"""DeepSeek AI service for YouTube video summarization."""
import asyncio
import logging
from typing import Optional, Dict, Any, List
from datetime import datetime
import json
import aiohttp
from ..core.exceptions import ServiceError
logger = logging.getLogger(__name__)
class DeepSeekService:
"""Service for interacting with DeepSeek AI API."""
def __init__(self, api_key: Optional[str] = None, base_url: str = "https://api.deepseek.com"):
"""Initialize DeepSeek service.
Args:
api_key: DeepSeek API key (will try to get from settings if not provided)
base_url: DeepSeek API base URL
"""
from ..core.config import settings
self.api_key = api_key or settings.DEEPSEEK_API_KEY
if not self.api_key:
raise ServiceError("DEEPSEEK_API_KEY not found in environment variables")
self.base_url = base_url.rstrip("/")
self.session: Optional[aiohttp.ClientSession] = None
# Default model configuration
self.default_model = "deepseek-chat"
self.default_temperature = 0.7
self.default_max_tokens = 1500
logger.info("DeepSeek service initialized")
async def __aenter__(self):
"""Async context manager entry."""
await self._ensure_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()
async def _ensure_session(self):
"""Ensure aiohttp session is created."""
if self.session is None or self.session.closed:
timeout = aiohttp.ClientTimeout(total=60) # 60 second timeout
self.session = aiohttp.ClientSession(
timeout=timeout,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
)
async def close(self):
"""Close the HTTP session."""
if self.session and not self.session.closed:
await self.session.close()
self.session = None
async def generate_response(
self,
prompt: str,
model: Optional[str] = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
system_prompt: Optional[str] = None
) -> str:
"""Generate a response using DeepSeek API.
Args:
prompt: User prompt
model: Model to use (defaults to deepseek-chat)
temperature: Sampling temperature (0.0 to 1.0)
max_tokens: Maximum tokens to generate
system_prompt: Optional system prompt
Returns:
Generated response text
"""
await self._ensure_session()
# Build messages array
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
# Request payload
payload = {
"model": model or self.default_model,
"messages": messages,
"temperature": temperature if temperature is not None else self.default_temperature,
"max_tokens": max_tokens or self.default_max_tokens,
"stream": False
}
try:
logger.debug(f"Making DeepSeek API request with model {payload['model']}")
async with self.session.post(f"{self.base_url}/v1/chat/completions", json=payload) as response:
if response.status == 200:
data = await response.json()
# Extract response content
if "choices" in data and len(data["choices"]) > 0:
content = data["choices"][0]["message"]["content"]
logger.debug(f"DeepSeek API response received ({len(content)} characters)")
return content
else:
raise ServiceError("Invalid response format from DeepSeek API")
elif response.status == 429:
# Rate limit exceeded
error_data = await response.text()
logger.warning(f"DeepSeek rate limit exceeded: {error_data}")
raise ServiceError("Rate limit exceeded. Please try again later.")
elif response.status == 401:
raise ServiceError("Invalid API key for DeepSeek API")
else:
error_data = await response.text()
logger.error(f"DeepSeek API error {response.status}: {error_data}")
raise ServiceError(f"DeepSeek API error: {response.status}")
except aiohttp.ClientError as e:
logger.error(f"Network error calling DeepSeek API: {e}")
raise ServiceError(f"Network error: {str(e)}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response from DeepSeek API: {e}")
raise ServiceError("Invalid response format from DeepSeek API")
async def summarize_transcript(
self,
transcript: str,
video_title: str = "",
summary_length: str = "standard",
focus_areas: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Summarize a video transcript using DeepSeek.
Args:
transcript: Video transcript text
video_title: Video title for context
summary_length: "brief", "standard", or "detailed"
focus_areas: List of specific areas to focus on
Returns:
Structured summary data
"""
if not transcript or len(transcript.strip()) < 50:
raise ServiceError("Transcript is too short for summarization")
# Determine summary configuration
length_configs = {
"brief": {"max_tokens": 800, "target_length": "2-3 paragraphs"},
"standard": {"max_tokens": 1200, "target_length": "4-5 paragraphs"},
"detailed": {"max_tokens": 1800, "target_length": "6-8 paragraphs"}
}
config = length_configs.get(summary_length, length_configs["standard"])
focus_text = f"Focus particularly on: {', '.join(focus_areas)}" if focus_areas else ""
system_prompt = f"""You are an expert at creating comprehensive video summaries.
Create a {config['target_length']} summary that captures the key concepts, main points,
and actionable insights from the video content. {focus_text}
Structure your response as valid JSON with this format:
{{
"summary": "Main summary text in {config['target_length']}",
"key_points": ["List of 5-8 key takeaways and important points"],
"main_topics": ["List of primary topics covered"],
"actionable_insights": ["List of 3-5 actionable insights or recommendations"],
"sentiment": "positive/neutral/negative overall tone",
"complexity_level": "beginner/intermediate/advanced",
"estimated_reading_time": "X minutes"
}}"""
# Build the prompt with context
context = f"Video Title: {video_title}\n\n" if video_title else ""
prompt = f"""{context}Please summarize the following video transcript:
{transcript[:8000]} # Limit to avoid token limits
Provide a comprehensive yet concise summary that would be valuable for someone who hasn't watched the video."""
try:
response = await self.generate_response(
prompt=prompt,
system_prompt=system_prompt,
max_tokens=config["max_tokens"],
temperature=0.3 # Lower temperature for more consistent summaries
)
# Try to parse as JSON
try:
summary_data = json.loads(response)
# Validate required fields
required_fields = ["summary", "key_points", "main_topics"]
for field in required_fields:
if field not in summary_data:
summary_data[field] = []
return summary_data
except json.JSONDecodeError:
# Fallback: return response as plain summary
logger.warning("DeepSeek response was not valid JSON, using as plain text")
return {
"summary": response,
"key_points": [],
"main_topics": [],
"actionable_insights": [],
"sentiment": "neutral",
"complexity_level": "unknown",
"estimated_reading_time": "5 minutes"
}
except Exception as e:
logger.error(f"Error summarizing transcript: {e}")
raise ServiceError(f"Failed to generate summary: {str(e)}")
async def analyze_content(
self,
content: str,
analysis_type: str = "general",
custom_prompt: Optional[str] = None
) -> Dict[str, Any]:
"""Analyze content with specific focus.
Args:
content: Content to analyze
analysis_type: Type of analysis ("sentiment", "topics", "structure", etc.)
custom_prompt: Custom analysis prompt
Returns:
Analysis results
"""
if custom_prompt:
system_prompt = custom_prompt
else:
analysis_prompts = {
"sentiment": "Analyze the sentiment and emotional tone of this content.",
"topics": "Identify and extract the main topics and themes from this content.",
"structure": "Analyze the structure and organization of this content.",
"technical": "Analyze the technical concepts and complexity of this content.",
"business": "Analyze the business implications and value propositions in this content."
}
system_prompt = analysis_prompts.get(analysis_type, "Provide a comprehensive analysis of this content.")
try:
response = await self.generate_response(
prompt=content[:6000], # Limit content length
system_prompt=system_prompt,
temperature=0.4,
max_tokens=1000
)
return {
"analysis_type": analysis_type,
"result": response,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error analyzing content: {e}")
raise ServiceError(f"Content analysis failed: {str(e)}")
async def test_connection(self) -> Dict[str, Any]:
"""Test the connection to DeepSeek API.
Returns:
Connection test results
"""
try:
start_time = datetime.now()
response = await self.generate_response(
prompt="Hello, this is a connection test.",
max_tokens=50,
temperature=0.1
)
response_time = (datetime.now() - start_time).total_seconds()
return {
"status": "connected",
"response_time_seconds": response_time,
"api_key_valid": True,
"model": self.default_model,
"test_response": response[:100] + "..." if len(response) > 100 else response
}
except ServiceError as e:
return {
"status": "error",
"error": str(e),
"api_key_valid": "Invalid API key" not in str(e)
}
except Exception as e:
return {
"status": "error",
"error": f"Unexpected error: {str(e)}",
"api_key_valid": False
}
async def get_model_info(self) -> Dict[str, Any]:
"""Get information about available models.
Returns:
Model information
"""
return {
"current_model": self.default_model,
"available_models": [
"deepseek-chat",
"deepseek-coder"
],
"default_config": {
"temperature": self.default_temperature,
"max_tokens": self.default_max_tokens
},
"api_endpoint": f"{self.base_url}/v1/chat/completions"
}
def get_cost_estimate(self, input_tokens: int, output_tokens: int) -> Dict[str, Any]:
"""Estimate costs for DeepSeek API usage.
Args:
input_tokens: Number of input tokens
output_tokens: Number of output tokens
Returns:
Cost estimation
"""
# DeepSeek pricing (as of 2024 - verify current rates)
input_price_per_1k = 0.0014 # $0.0014 per 1K input tokens
output_price_per_1k = 0.0028 # $0.0028 per 1K output tokens
input_cost = (input_tokens / 1000) * input_price_per_1k
output_cost = (output_tokens / 1000) * output_price_per_1k
total_cost = input_cost + output_cost
return {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"input_cost_usd": round(input_cost, 6),
"output_cost_usd": round(output_cost, 6),
"total_cost_usd": round(total_cost, 6),
"model": self.default_model
}