youtube-summarizer/backend/services/anthropic_summarizer.py

329 lines
13 KiB
Python

"""Anthropic Claude summarization service."""
import asyncio
import json
import time
from typing import Dict, List, Optional
import re
from anthropic import AsyncAnthropic
from .ai_service import AIService, SummaryRequest, SummaryResult, SummaryLength
from ..core.exceptions import AIServiceError, ErrorCode
class AnthropicSummarizer(AIService):
"""Anthropic Claude-based summarization service."""
def __init__(self, api_key: str, model: str = "claude-3-5-haiku-20241022"):
"""Initialize Anthropic summarizer.
Args:
api_key: Anthropic API key
model: Model to use (default: claude-3-5-haiku for cost efficiency)
"""
self.client = AsyncAnthropic(api_key=api_key)
self.model = model
# Cost per 1K tokens (as of 2025) - Claude 3.5 Haiku
self.input_cost_per_1k = 0.00025 # $0.25 per 1M input tokens
self.output_cost_per_1k = 0.00125 # $1.25 per 1M output tokens
# Token limits for Claude models
self.max_tokens_input = 200000 # 200k context window
self.max_tokens_output = 8192 # Max output tokens
async def generate_summary(self, request: SummaryRequest) -> SummaryResult:
"""Generate structured summary using Anthropic Claude."""
# Handle very long transcripts with chunking
estimated_tokens = self.get_token_count(request.transcript)
if estimated_tokens > 150000: # Leave room for prompt and response
return await self._generate_chunked_summary(request)
prompt = self._build_summary_prompt(request)
try:
start_time = time.time()
response = await self.client.messages.create(
model=self.model,
max_tokens=self._get_max_tokens(request.length),
temperature=0.3, # Lower temperature for consistent summaries
messages=[
{"role": "user", "content": prompt}
]
)
processing_time = time.time() - start_time
# Extract JSON from response
response_text = response.content[0].text
result_data = self._extract_json_from_response(response_text)
# Calculate costs
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
input_cost = (input_tokens / 1000) * self.input_cost_per_1k
output_cost = (output_tokens / 1000) * self.output_cost_per_1k
total_cost = input_cost + output_cost
return SummaryResult(
summary=result_data.get("summary", ""),
key_points=result_data.get("key_points", []),
main_themes=result_data.get("main_themes", []),
actionable_insights=result_data.get("actionable_insights", []),
confidence_score=result_data.get("confidence_score", 0.85),
processing_metadata={
"model": self.model,
"processing_time_seconds": processing_time,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
"chunks_processed": 1
},
cost_data={
"input_cost_usd": input_cost,
"output_cost_usd": output_cost,
"total_cost_usd": total_cost,
"cost_per_summary": total_cost
}
)
except Exception as e:
raise AIServiceError(
message=f"Anthropic summarization failed: {str(e)}",
error_code=ErrorCode.AI_SERVICE_ERROR,
details={
"model": self.model,
"transcript_length": len(request.transcript),
"error_type": type(e).__name__
}
)
def _extract_json_from_response(self, response_text: str) -> dict:
"""Extract JSON from Claude's response which may include additional text."""
try:
# First try direct JSON parsing
return json.loads(response_text)
except json.JSONDecodeError:
# Look for JSON block in the response
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group())
except json.JSONDecodeError:
pass
# Fallback: create structure from response text
return self._parse_structured_response(response_text)
def _parse_structured_response(self, response_text: str) -> dict:
"""Parse structured response when JSON parsing fails."""
# This is a fallback parser for when Claude doesn't return pure JSON
lines = response_text.split('\n')
summary = ""
key_points = []
main_themes = []
actionable_insights = []
confidence_score = 0.85
current_section = None
for line in lines:
line = line.strip()
if not line:
continue
# Detect sections
if "summary" in line.lower() and ":" in line:
current_section = "summary"
if ":" in line:
summary = line.split(":", 1)[1].strip()
continue
elif "key points" in line.lower() or "key_points" in line.lower():
current_section = "key_points"
continue
elif "main themes" in line.lower() or "main_themes" in line.lower():
current_section = "main_themes"
continue
elif "actionable insights" in line.lower() or "actionable_insights" in line.lower():
current_section = "actionable_insights"
continue
elif "confidence" in line.lower():
# Extract confidence score
numbers = re.findall(r'0?\.\d+|\d+', line)
if numbers:
confidence_score = float(numbers[0])
continue
# Add content to appropriate section
if current_section == "summary" and summary == "":
summary = line
elif current_section == "key_points" and line.startswith(('-', '', '*')):
key_points.append(line[1:].strip())
elif current_section == "main_themes" and line.startswith(('-', '', '*')):
main_themes.append(line[1:].strip())
elif current_section == "actionable_insights" and line.startswith(('-', '', '*')):
actionable_insights.append(line[1:].strip())
return {
"summary": summary,
"key_points": key_points,
"main_themes": main_themes,
"actionable_insights": actionable_insights,
"confidence_score": confidence_score
}
def _build_summary_prompt(self, request: SummaryRequest) -> str:
"""Build optimized prompt for Claude summary generation."""
length_instructions = {
SummaryLength.BRIEF: "Generate a concise summary in 100-200 words",
SummaryLength.STANDARD: "Generate a comprehensive summary in 300-500 words",
SummaryLength.DETAILED: "Generate a detailed summary in 500-800 words"
}
focus_instruction = ""
if request.focus_areas:
focus_instruction = f"\nPay special attention to these areas: {', '.join(request.focus_areas)}"
return f"""
Analyze this YouTube video transcript and provide a structured summary in JSON format.
{length_instructions[request.length]}.
Please respond with a valid JSON object in this exact format:
{{
"summary": "Main summary text here",
"key_points": ["Point 1", "Point 2", "Point 3"],
"main_themes": ["Theme 1", "Theme 2", "Theme 3"],
"actionable_insights": ["Insight 1", "Insight 2"],
"confidence_score": 0.95
}}
Guidelines:
- Extract 3-7 key points that capture the most important information
- Identify 2-4 main themes or topics discussed
- Provide 2-5 actionable insights that viewers can apply
- Assign a confidence score (0.0-1.0) based on transcript quality and coherence
- Use clear, engaging language that's accessible to a general audience
- Focus on value and practical takeaways{focus_instruction}
Transcript:
{request.transcript}
"""
async def _generate_chunked_summary(self, request: SummaryRequest) -> SummaryResult:
"""Handle very long transcripts using map-reduce approach."""
# Split transcript into manageable chunks
chunks = self._split_transcript_intelligently(request.transcript)
# Generate summary for each chunk
chunk_summaries = []
total_cost = 0.0
total_tokens = 0
for i, chunk in enumerate(chunks):
chunk_request = SummaryRequest(
transcript=chunk,
length=SummaryLength.BRIEF, # Brief summaries for chunks
focus_areas=request.focus_areas,
language=request.language
)
chunk_result = await self.generate_summary(chunk_request)
chunk_summaries.append(chunk_result.summary)
total_cost += chunk_result.cost_data["total_cost_usd"]
total_tokens += chunk_result.processing_metadata["total_tokens"]
# Add delay to respect rate limits
await asyncio.sleep(0.1)
# Combine chunk summaries into final summary
combined_transcript = "\n\n".join([
f"Section {i+1} Summary: {summary}"
for i, summary in enumerate(chunk_summaries)
])
final_request = SummaryRequest(
transcript=combined_transcript,
length=request.length,
focus_areas=request.focus_areas,
language=request.language
)
final_result = await self.generate_summary(final_request)
# Update metadata to reflect chunked processing
final_result.processing_metadata.update({
"chunks_processed": len(chunks),
"total_tokens": total_tokens + final_result.processing_metadata["total_tokens"],
"chunking_strategy": "intelligent_content_boundaries"
})
final_result.cost_data["total_cost_usd"] = total_cost + final_result.cost_data["total_cost_usd"]
return final_result
def _split_transcript_intelligently(self, transcript: str, max_tokens: int = 120000) -> List[str]:
"""Split transcript at natural boundaries while respecting token limits."""
# Split by paragraphs first, then sentences if needed
paragraphs = transcript.split('\n\n')
chunks = []
current_chunk = []
current_tokens = 0
for paragraph in paragraphs:
paragraph_tokens = self.get_token_count(paragraph)
# If single paragraph exceeds limit, split by sentences
if paragraph_tokens > max_tokens:
sentences = paragraph.split('. ')
for sentence in sentences:
sentence_tokens = self.get_token_count(sentence)
if current_tokens + sentence_tokens > max_tokens and current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
current_tokens = sentence_tokens
else:
current_chunk.append(sentence)
current_tokens += sentence_tokens
else:
if current_tokens + paragraph_tokens > max_tokens and current_chunk:
chunks.append('\n\n'.join(current_chunk))
current_chunk = [paragraph]
current_tokens = paragraph_tokens
else:
current_chunk.append(paragraph)
current_tokens += paragraph_tokens
# Add final chunk
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
return chunks
def _get_max_tokens(self, length: SummaryLength) -> int:
"""Get max output tokens based on summary length."""
return {
SummaryLength.BRIEF: 400,
SummaryLength.STANDARD: 800,
SummaryLength.DETAILED: 1500
}[length]
def estimate_cost(self, transcript: str, length: SummaryLength) -> float:
"""Estimate cost for summarizing transcript."""
input_tokens = self.get_token_count(transcript)
output_tokens = self._get_max_tokens(length)
input_cost = (input_tokens / 1000) * self.input_cost_per_1k
output_cost = (output_tokens / 1000) * self.output_cost_per_1k
return input_cost + output_cost
def get_token_count(self, text: str) -> int:
"""Estimate token count for Anthropic model (roughly 4 chars per token)."""
# Anthropic uses a similar tokenization to OpenAI, roughly 4 characters per token
return len(text) // 4