26 KiB
Story 2.1: Single AI Model Integration
Status
Draft
Story
As a user
I want the system to generate intelligent summaries from extracted transcripts using AI
so that I can quickly understand video content without watching the entire video
Acceptance Criteria
- System integrates with OpenAI GPT-4o-mini for cost-effective summarization
- AI generates structured summaries with key points, main themes, and actionable insights
- Summary length is configurable (brief, standard, detailed) based on user preference
- System handles long transcripts by intelligent chunking without losing context
- AI processing includes error handling with graceful fallbacks and retry logic
- Generated summaries include confidence scores and processing metadata
Tasks / Subtasks
-
Task 1: AI Service Foundation (AC: 1, 5)
- Create
AIServicebase class inbackend/services/ai_service.py - Implement OpenAI client configuration with API key management
- Add retry logic with exponential backoff for API failures
- Create comprehensive error handling for API responses
- Create
-
Task 2: OpenAI Integration (AC: 1, 6)
- Create
OpenAISummarizerclass implementing AI service interface - Configure GPT-4o-mini with optimal parameters for summarization
- Implement token counting and cost tracking for API usage
- Add response validation and quality checks
- Create
-
Task 3: Summary Generation Logic (AC: 2, 3)
- Create structured prompt templates for different summary types
- Implement summary length configuration (brief/standard/detailed)
- Add key point extraction and theme identification
- Create actionable insights generation from content
-
Task 4: Transcript Chunking Strategy (AC: 4)
- Implement intelligent transcript splitting based on content boundaries
- Add context preservation between chunks for coherent summaries
- Create chunk overlap strategy to maintain narrative flow
- Implement map-reduce pattern for long transcript processing
-
Task 5: API Endpoints for Summarization (AC: 2, 3, 6)
- Create
/api/summarizePOST endpoint for transcript processing - Implement
/api/summaries/{id}GET endpoint for result retrieval - Add summary configuration options in request body
- Include processing metadata and confidence scores in response
- Create
-
Task 6: Background Processing (AC: 5, 6)
- Implement async summarization with job status tracking
- Create job queue system for managing AI processing requests
- Add progress updates via WebSocket for long-running summaries
- Implement cancellation support for running summarization jobs
-
Task 7: Integration Testing (AC: 1, 2, 3, 4, 5, 6)
- Test summarization with various transcript lengths and content types
- Validate summary quality and structure across different configurations
- Test error handling and fallback scenarios
- Verify cost tracking and token usage monitoring
Dev Notes
Architecture Context
This story establishes the core AI intelligence of the YouTube Summarizer, transforming raw transcripts into valuable, structured insights. The implementation must balance quality, cost, and performance while providing a foundation for multi-model support in future stories.
AI Service Architecture Requirements
[Source: docs/architecture.md#ai-services]
# Base AI Service Interface
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Union
from dataclasses import dataclass
from enum import Enum
class SummaryLength(Enum):
BRIEF = "brief" # ~100-200 words
STANDARD = "standard" # ~300-500 words
DETAILED = "detailed" # ~500-800 words
@dataclass
class SummaryRequest:
transcript: str
length: SummaryLength = SummaryLength.STANDARD
focus_areas: Optional[List[str]] = None # e.g., ["technical", "business", "educational"]
language: str = "en"
include_timestamps: bool = False
@dataclass
class SummaryResult:
summary: str
key_points: List[str]
main_themes: List[str]
actionable_insights: List[str]
confidence_score: float
processing_metadata: Dict[str, Union[str, int, float]]
cost_data: Dict[str, Union[float, int]]
class AIService(ABC):
"""Base class for AI summarization services"""
@abstractmethod
async def generate_summary(self, request: SummaryRequest) -> SummaryResult:
"""Generate summary from transcript"""
pass
@abstractmethod
def estimate_cost(self, transcript: str, length: SummaryLength) -> float:
"""Estimate processing cost in USD"""
pass
@abstractmethod
def get_token_count(self, text: str) -> int:
"""Get token count for text"""
pass
OpenAI Integration Implementation
[Source: docs/architecture.md#openai-integration]
# backend/services/openai_summarizer.py
import asyncio
import tiktoken
from openai import AsyncOpenAI
from typing import Dict, List, Optional
from .ai_service import AIService, SummaryRequest, SummaryResult, SummaryLength
class OpenAISummarizer(AIService):
def __init__(self, api_key: str, model: str = "gpt-4o-mini"):
self.client = AsyncOpenAI(api_key=api_key)
self.model = model
self.encoding = tiktoken.encoding_for_model(model)
# Cost per 1K tokens (as of 2025)
self.input_cost_per_1k = 0.00015 # $0.15 per 1M input tokens
self.output_cost_per_1k = 0.0006 # $0.60 per 1M output tokens
async def generate_summary(self, request: SummaryRequest) -> SummaryResult:
"""Generate structured summary using OpenAI GPT-4o-mini"""
# Handle long transcripts with chunking
if self.get_token_count(request.transcript) > 15000: # Leave room for prompt
return await self._generate_chunked_summary(request)
prompt = self._build_summary_prompt(request)
try:
start_time = time.time()
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are an expert content summarizer specializing in YouTube video analysis."},
{"role": "user", "content": prompt}
],
temperature=0.3, # Lower temperature for consistent summaries
max_tokens=self._get_max_tokens(request.length),
response_format={"type": "json_object"} # Ensure structured JSON response
)
processing_time = time.time() - start_time
usage = response.usage
# Parse structured response
result_data = json.loads(response.choices[0].message.content)
# Calculate costs
input_cost = (usage.prompt_tokens / 1000) * self.input_cost_per_1k
output_cost = (usage.completion_tokens / 1000) * self.output_cost_per_1k
total_cost = input_cost + output_cost
return SummaryResult(
summary=result_data.get("summary", ""),
key_points=result_data.get("key_points", []),
main_themes=result_data.get("main_themes", []),
actionable_insights=result_data.get("actionable_insights", []),
confidence_score=result_data.get("confidence_score", 0.85),
processing_metadata={
"model": self.model,
"processing_time_seconds": processing_time,
"prompt_tokens": usage.prompt_tokens,
"completion_tokens": usage.completion_tokens,
"total_tokens": usage.total_tokens,
"chunks_processed": 1
},
cost_data={
"input_cost_usd": input_cost,
"output_cost_usd": output_cost,
"total_cost_usd": total_cost,
"cost_per_summary": total_cost
}
)
except Exception as e:
raise AIServiceError(
message=f"OpenAI summarization failed: {str(e)}",
error_code=ErrorCode.AI_SERVICE_ERROR,
details={
"model": self.model,
"transcript_length": len(request.transcript),
"error_type": type(e).__name__
}
)
def _build_summary_prompt(self, request: SummaryRequest) -> str:
"""Build optimized prompt for summary generation"""
length_instructions = {
SummaryLength.BRIEF: "Generate a concise summary in 100-200 words",
SummaryLength.STANDARD: "Generate a comprehensive summary in 300-500 words",
SummaryLength.DETAILED: "Generate a detailed summary in 500-800 words"
}
focus_instruction = ""
if request.focus_areas:
focus_instruction = f"\nPay special attention to these areas: {', '.join(request.focus_areas)}"
return f"""
Analyze this YouTube video transcript and provide a structured summary in JSON format.
{length_instructions[request.length]}.
Required JSON structure:
{{
"summary": "Main summary text here",
"key_points": ["Point 1", "Point 2", "Point 3", ...],
"main_themes": ["Theme 1", "Theme 2", "Theme 3"],
"actionable_insights": ["Insight 1", "Insight 2", ...],
"confidence_score": 0.95
}}
Guidelines:
- Extract 3-7 key points that capture the most important information
- Identify 2-4 main themes or topics discussed
- Provide 2-5 actionable insights that viewers can apply
- Assign a confidence score (0.0-1.0) based on transcript quality and coherence
- Use clear, engaging language that's accessible to a general audience
- Focus on value and practical takeaways{focus_instruction}
Transcript:
{request.transcript}
"""
async def _generate_chunked_summary(self, request: SummaryRequest) -> SummaryResult:
"""Handle long transcripts using map-reduce approach"""
# Split transcript into manageable chunks
chunks = self._split_transcript_intelligently(request.transcript)
# Generate summary for each chunk
chunk_summaries = []
total_cost = 0.0
total_tokens = 0
for i, chunk in enumerate(chunks):
chunk_request = SummaryRequest(
transcript=chunk,
length=SummaryLength.BRIEF, # Brief summaries for chunks
focus_areas=request.focus_areas,
language=request.language
)
chunk_result = await self.generate_summary(chunk_request)
chunk_summaries.append(chunk_result.summary)
total_cost += chunk_result.cost_data["total_cost_usd"]
total_tokens += chunk_result.processing_metadata["total_tokens"]
# Add delay to respect rate limits
await asyncio.sleep(0.1)
# Combine chunk summaries into final summary
combined_transcript = "\n\n".join([
f"Section {i+1} Summary: {summary}"
for i, summary in enumerate(chunk_summaries)
])
final_request = SummaryRequest(
transcript=combined_transcript,
length=request.length,
focus_areas=request.focus_areas,
language=request.language
)
final_result = await self.generate_summary(final_request)
# Update metadata to reflect chunked processing
final_result.processing_metadata.update({
"chunks_processed": len(chunks),
"total_tokens": total_tokens + final_result.processing_metadata["total_tokens"],
"chunking_strategy": "intelligent_content_boundaries"
})
final_result.cost_data["total_cost_usd"] = total_cost + final_result.cost_data["total_cost_usd"]
return final_result
def _split_transcript_intelligently(self, transcript: str, max_tokens: int = 12000) -> List[str]:
"""Split transcript at natural boundaries while respecting token limits"""
# Split by paragraphs first, then sentences if needed
paragraphs = transcript.split('\n\n')
chunks = []
current_chunk = []
current_tokens = 0
for paragraph in paragraphs:
paragraph_tokens = self.get_token_count(paragraph)
# If single paragraph exceeds limit, split by sentences
if paragraph_tokens > max_tokens:
sentences = paragraph.split('. ')
for sentence in sentences:
sentence_tokens = self.get_token_count(sentence)
if current_tokens + sentence_tokens > max_tokens and current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
current_tokens = sentence_tokens
else:
current_chunk.append(sentence)
current_tokens += sentence_tokens
else:
if current_tokens + paragraph_tokens > max_tokens and current_chunk:
chunks.append('\n\n'.join(current_chunk))
current_chunk = [paragraph]
current_tokens = paragraph_tokens
else:
current_chunk.append(paragraph)
current_tokens += paragraph_tokens
# Add final chunk
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
return chunks
def _get_max_tokens(self, length: SummaryLength) -> int:
"""Get max output tokens based on summary length"""
return {
SummaryLength.BRIEF: 300,
SummaryLength.STANDARD: 700,
SummaryLength.DETAILED: 1200
}[length]
def estimate_cost(self, transcript: str, length: SummaryLength) -> float:
"""Estimate cost for summarizing transcript"""
input_tokens = self.get_token_count(transcript)
output_tokens = self._get_max_tokens(length)
input_cost = (input_tokens / 1000) * self.input_cost_per_1k
output_cost = (output_tokens / 1000) * self.output_cost_per_1k
return input_cost + output_cost
def get_token_count(self, text: str) -> int:
"""Get accurate token count for OpenAI model"""
return len(self.encoding.encode(text))
API Endpoint Implementation
[Source: docs/architecture.md#api-specification]
# backend/api/summarization.py
from fastapi import APIRouter, HTTPException, BackgroundTasks, Depends
from pydantic import BaseModel, Field
from typing import Optional, List
from ..services.ai_service import SummaryRequest, SummaryLength
from ..services.openai_summarizer import OpenAISummarizer
from ..core.exceptions import AIServiceError
router = APIRouter(prefix="/api", tags=["summarization"])
class SummarizeRequest(BaseModel):
transcript: str = Field(..., description="Video transcript to summarize")
length: SummaryLength = Field(SummaryLength.STANDARD, description="Summary length preference")
focus_areas: Optional[List[str]] = Field(None, description="Areas to focus on")
language: str = Field("en", description="Content language")
async_processing: bool = Field(False, description="Process asynchronously")
class SummarizeResponse(BaseModel):
summary_id: Optional[str] = None # For async processing
summary: Optional[str] = None # For sync processing
key_points: Optional[List[str]] = None
main_themes: Optional[List[str]] = None
actionable_insights: Optional[List[str]] = None
confidence_score: Optional[float] = None
processing_metadata: Optional[dict] = None
cost_data: Optional[dict] = None
status: str = "completed" # "processing", "completed", "failed"
@router.post("/summarize", response_model=SummarizeResponse)
async def summarize_transcript(
request: SummarizeRequest,
background_tasks: BackgroundTasks,
ai_service: OpenAISummarizer = Depends()
):
"""Generate AI summary from transcript"""
# Validate transcript length
if len(request.transcript.strip()) < 50:
raise HTTPException(
status_code=400,
detail="Transcript too short for meaningful summarization"
)
if len(request.transcript) > 100000: # ~100k characters
request.async_processing = True # Force async for very long transcripts
try:
# Estimate cost before processing
estimated_cost = ai_service.estimate_cost(request.transcript, request.length)
if estimated_cost > 1.00: # Cost limit check
raise HTTPException(
status_code=400,
detail=f"Estimated cost ${estimated_cost:.3f} exceeds limit. Consider shorter transcript or brief summary."
)
summary_request = SummaryRequest(
transcript=request.transcript,
length=request.length,
focus_areas=request.focus_areas,
language=request.language
)
if request.async_processing:
# Process asynchronously
summary_id = str(uuid.uuid4())
background_tasks.add_task(
process_summary_async,
summary_id=summary_id,
request=summary_request,
ai_service=ai_service
)
return SummarizeResponse(
summary_id=summary_id,
status="processing"
)
else:
# Process synchronously
result = await ai_service.generate_summary(summary_request)
return SummarizeResponse(
summary=result.summary,
key_points=result.key_points,
main_themes=result.main_themes,
actionable_insights=result.actionable_insights,
confidence_score=result.confidence_score,
processing_metadata=result.processing_metadata,
cost_data=result.cost_data,
status="completed"
)
except AIServiceError as e:
raise HTTPException(
status_code=500,
detail={
"error": "AI service error",
"message": e.message,
"code": e.error_code,
"details": e.details
}
)
async def process_summary_async(
summary_id: str,
request: SummaryRequest,
ai_service: OpenAISummarizer
):
"""Background task for async summary processing"""
try:
result = await ai_service.generate_summary(request)
# Store result in database/cache
await store_summary_result(summary_id, result)
# Send WebSocket notification
await notify_summary_complete(summary_id, result)
except Exception as e:
await store_summary_error(summary_id, str(e))
await notify_summary_failed(summary_id, str(e))
@router.get("/summaries/{summary_id}", response_model=SummarizeResponse)
async def get_summary(summary_id: str):
"""Get async summary result by ID"""
# Retrieve from database/cache
result = await get_stored_summary(summary_id)
if not result:
raise HTTPException(status_code=404, detail="Summary not found")
return SummarizeResponse(**result)
Error Handling Requirements
[Source: docs/architecture.md#error-handling]
# backend/core/exceptions.py (additions)
class AIServiceError(BaseAPIException):
"""Base exception for AI service errors"""
pass
class TokenLimitExceededError(AIServiceError):
"""Raised when content exceeds model token limit"""
def __init__(self, token_count: int, max_tokens: int):
super().__init__(
message=f"Content ({token_count} tokens) exceeds model limit ({max_tokens} tokens)",
error_code=ErrorCode.TOKEN_LIMIT_EXCEEDED,
status_code=status.HTTP_400_BAD_REQUEST,
details={
"token_count": token_count,
"max_tokens": max_tokens,
"suggestions": [
"Use chunked processing for long content",
"Choose a briefer summary length",
"Split content into smaller sections"
]
}
)
class CostLimitExceededError(AIServiceError):
"""Raised when processing cost exceeds limits"""
def __init__(self, estimated_cost: float, cost_limit: float):
super().__init__(
message=f"Estimated cost ${estimated_cost:.3f} exceeds limit ${cost_limit:.2f}",
error_code=ErrorCode.COST_LIMIT_EXCEEDED,
status_code=status.HTTP_400_BAD_REQUEST,
details={
"estimated_cost": estimated_cost,
"cost_limit": cost_limit,
"cost_reduction_tips": [
"Choose 'brief' summary length",
"Remove less important content from transcript",
"Process content in smaller segments"
]
}
)
class AIServiceUnavailableError(AIServiceError):
"""Raised when AI service is temporarily unavailable"""
pass
File Locations and Structure
[Source: docs/architecture.md#project-structure]
Backend Files:
backend/services/ai_service.py- Base AI service interface and data modelsbackend/services/openai_summarizer.py- OpenAI GPT-4o-mini integrationbackend/api/summarization.py- Summary generation endpointsbackend/core/exceptions.py- Updated with AI-specific exceptionsbackend/models/summary.py- Database models for summary storagebackend/tests/unit/test_openai_summarizer.py- Unit testsbackend/tests/integration/test_summarization_api.py- Integration tests
Testing Standards
Backend Unit Tests
[Source: docs/architecture.md#testing-strategy]
# backend/tests/unit/test_openai_summarizer.py
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from backend.services.openai_summarizer import OpenAISummarizer
from backend.services.ai_service import SummaryRequest, SummaryLength
class TestOpenAISummarizer:
@pytest.fixture
def summarizer(self):
return OpenAISummarizer(api_key="test-key")
@pytest.mark.asyncio
async def test_generate_summary_success(self, summarizer):
"""Test successful summary generation"""
# Mock OpenAI response
mock_response = MagicMock()
mock_response.choices[0].message.content = json.dumps({
"summary": "This is a test summary",
"key_points": ["Point 1", "Point 2"],
"main_themes": ["Theme 1"],
"actionable_insights": ["Insight 1"],
"confidence_score": 0.92
})
mock_response.usage.prompt_tokens = 100
mock_response.usage.completion_tokens = 50
mock_response.usage.total_tokens = 150
with patch.object(summarizer.client.chat.completions, 'create', return_value=mock_response):
request = SummaryRequest(
transcript="This is a test transcript with some content to summarize.",
length=SummaryLength.STANDARD
)
result = await summarizer.generate_summary(request)
assert result.summary == "This is a test summary"
assert len(result.key_points) == 2
assert result.confidence_score == 0.92
assert result.cost_data["total_cost_usd"] > 0
@pytest.mark.asyncio
async def test_chunked_processing(self, summarizer):
"""Test long transcript chunking"""
# Create a very long transcript
long_transcript = "This is a sentence. " * 2000 # ~4000 tokens
with patch.object(summarizer, 'generate_summary') as mock_generate:
mock_generate.return_value = AsyncMock()
request = SummaryRequest(
transcript=long_transcript,
length=SummaryLength.STANDARD
)
await summarizer.generate_summary(request)
# Should have triggered chunked processing
assert mock_generate.call_count > 1
def test_cost_estimation(self, summarizer):
"""Test cost estimation accuracy"""
transcript = "Test transcript for cost estimation."
cost = summarizer.estimate_cost(transcript, SummaryLength.STANDARD)
assert isinstance(cost, float)
assert cost > 0
assert cost < 0.01 # Should be very cheap for short transcript
def test_token_counting(self, summarizer):
"""Test token counting accuracy"""
text = "Hello world, this is a test."
token_count = summarizer.get_token_count(text)
assert isinstance(token_count, int)
assert token_count > 0
assert token_count < 20 # Should be reasonable for short text
Performance Optimization
- Token Management: Intelligent chunking prevents token limit errors while preserving context
- Cost Optimization: GPT-4o-mini provides 80% savings vs GPT-4 while maintaining quality
- Async Processing: Background processing for long transcripts prevents UI blocking
- Caching Strategy: Summary results cached to avoid repeated API calls
- Rate Limiting: Built-in delays and retry logic respect OpenAI rate limits
Security Considerations
- API Key Security: Keys stored in environment variables, never in code
- Input Validation: Transcript length and content validation before processing
- Cost Controls: Per-request cost limits prevent unexpected charges
- Error Sanitization: Sensitive error details not exposed to clients
- Request Logging: Comprehensive logging for debugging without exposing content
Change Log
| Date | Version | Description | Author |
|---|---|---|---|
| 2025-01-25 | 1.0 | Initial story creation | Bob (Scrum Master) |
Dev Agent Record
This section will be populated by the development agent during implementation
Agent Model Used
To be filled by dev agent
Debug Log References
To be filled by dev agent
Completion Notes List
To be filled by dev agent
File List
To be filled by dev agent
QA Results
Results from QA Agent review of the completed story implementation will be added here