youtube-summarizer/docs/stories/2.1.single-ai-model-integra...

26 KiB

Story 2.1: Single AI Model Integration

Status

Draft

Story

As a user
I want the system to generate intelligent summaries from extracted transcripts using AI
so that I can quickly understand video content without watching the entire video

Acceptance Criteria

  1. System integrates with OpenAI GPT-4o-mini for cost-effective summarization
  2. AI generates structured summaries with key points, main themes, and actionable insights
  3. Summary length is configurable (brief, standard, detailed) based on user preference
  4. System handles long transcripts by intelligent chunking without losing context
  5. AI processing includes error handling with graceful fallbacks and retry logic
  6. Generated summaries include confidence scores and processing metadata

Tasks / Subtasks

  • Task 1: AI Service Foundation (AC: 1, 5)

    • Create AIService base class in backend/services/ai_service.py
    • Implement OpenAI client configuration with API key management
    • Add retry logic with exponential backoff for API failures
    • Create comprehensive error handling for API responses
  • Task 2: OpenAI Integration (AC: 1, 6)

    • Create OpenAISummarizer class implementing AI service interface
    • Configure GPT-4o-mini with optimal parameters for summarization
    • Implement token counting and cost tracking for API usage
    • Add response validation and quality checks
  • Task 3: Summary Generation Logic (AC: 2, 3)

    • Create structured prompt templates for different summary types
    • Implement summary length configuration (brief/standard/detailed)
    • Add key point extraction and theme identification
    • Create actionable insights generation from content
  • Task 4: Transcript Chunking Strategy (AC: 4)

    • Implement intelligent transcript splitting based on content boundaries
    • Add context preservation between chunks for coherent summaries
    • Create chunk overlap strategy to maintain narrative flow
    • Implement map-reduce pattern for long transcript processing
  • Task 5: API Endpoints for Summarization (AC: 2, 3, 6)

    • Create /api/summarize POST endpoint for transcript processing
    • Implement /api/summaries/{id} GET endpoint for result retrieval
    • Add summary configuration options in request body
    • Include processing metadata and confidence scores in response
  • Task 6: Background Processing (AC: 5, 6)

    • Implement async summarization with job status tracking
    • Create job queue system for managing AI processing requests
    • Add progress updates via WebSocket for long-running summaries
    • Implement cancellation support for running summarization jobs
  • Task 7: Integration Testing (AC: 1, 2, 3, 4, 5, 6)

    • Test summarization with various transcript lengths and content types
    • Validate summary quality and structure across different configurations
    • Test error handling and fallback scenarios
    • Verify cost tracking and token usage monitoring

Dev Notes

Architecture Context

This story establishes the core AI intelligence of the YouTube Summarizer, transforming raw transcripts into valuable, structured insights. The implementation must balance quality, cost, and performance while providing a foundation for multi-model support in future stories.

AI Service Architecture Requirements

[Source: docs/architecture.md#ai-services]

# Base AI Service Interface
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Union
from dataclasses import dataclass
from enum import Enum

class SummaryLength(Enum):
    BRIEF = "brief"           # ~100-200 words
    STANDARD = "standard"     # ~300-500 words  
    DETAILED = "detailed"     # ~500-800 words

@dataclass
class SummaryRequest:
    transcript: str
    length: SummaryLength = SummaryLength.STANDARD
    focus_areas: Optional[List[str]] = None  # e.g., ["technical", "business", "educational"]
    language: str = "en"
    include_timestamps: bool = False

@dataclass
class SummaryResult:
    summary: str
    key_points: List[str]
    main_themes: List[str]
    actionable_insights: List[str]
    confidence_score: float
    processing_metadata: Dict[str, Union[str, int, float]]
    cost_data: Dict[str, Union[float, int]]

class AIService(ABC):
    """Base class for AI summarization services"""
    
    @abstractmethod
    async def generate_summary(self, request: SummaryRequest) -> SummaryResult:
        """Generate summary from transcript"""
        pass
    
    @abstractmethod
    def estimate_cost(self, transcript: str, length: SummaryLength) -> float:
        """Estimate processing cost in USD"""
        pass
    
    @abstractmethod
    def get_token_count(self, text: str) -> int:
        """Get token count for text"""
        pass

OpenAI Integration Implementation

[Source: docs/architecture.md#openai-integration]

# backend/services/openai_summarizer.py
import asyncio
import tiktoken
from openai import AsyncOpenAI
from typing import Dict, List, Optional
from .ai_service import AIService, SummaryRequest, SummaryResult, SummaryLength

class OpenAISummarizer(AIService):
    def __init__(self, api_key: str, model: str = "gpt-4o-mini"):
        self.client = AsyncOpenAI(api_key=api_key)
        self.model = model
        self.encoding = tiktoken.encoding_for_model(model)
        
        # Cost per 1K tokens (as of 2025)
        self.input_cost_per_1k = 0.00015   # $0.15 per 1M input tokens
        self.output_cost_per_1k = 0.0006   # $0.60 per 1M output tokens
        
    async def generate_summary(self, request: SummaryRequest) -> SummaryResult:
        """Generate structured summary using OpenAI GPT-4o-mini"""
        
        # Handle long transcripts with chunking
        if self.get_token_count(request.transcript) > 15000:  # Leave room for prompt
            return await self._generate_chunked_summary(request)
        
        prompt = self._build_summary_prompt(request)
        
        try:
            start_time = time.time()
            
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": "You are an expert content summarizer specializing in YouTube video analysis."},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,  # Lower temperature for consistent summaries
                max_tokens=self._get_max_tokens(request.length),
                response_format={"type": "json_object"}  # Ensure structured JSON response
            )
            
            processing_time = time.time() - start_time
            usage = response.usage
            
            # Parse structured response
            result_data = json.loads(response.choices[0].message.content)
            
            # Calculate costs
            input_cost = (usage.prompt_tokens / 1000) * self.input_cost_per_1k
            output_cost = (usage.completion_tokens / 1000) * self.output_cost_per_1k
            total_cost = input_cost + output_cost
            
            return SummaryResult(
                summary=result_data.get("summary", ""),
                key_points=result_data.get("key_points", []),
                main_themes=result_data.get("main_themes", []),
                actionable_insights=result_data.get("actionable_insights", []),
                confidence_score=result_data.get("confidence_score", 0.85),
                processing_metadata={
                    "model": self.model,
                    "processing_time_seconds": processing_time,
                    "prompt_tokens": usage.prompt_tokens,
                    "completion_tokens": usage.completion_tokens,
                    "total_tokens": usage.total_tokens,
                    "chunks_processed": 1
                },
                cost_data={
                    "input_cost_usd": input_cost,
                    "output_cost_usd": output_cost,
                    "total_cost_usd": total_cost,
                    "cost_per_summary": total_cost
                }
            )
            
        except Exception as e:
            raise AIServiceError(
                message=f"OpenAI summarization failed: {str(e)}",
                error_code=ErrorCode.AI_SERVICE_ERROR,
                details={
                    "model": self.model,
                    "transcript_length": len(request.transcript),
                    "error_type": type(e).__name__
                }
            )
    
    def _build_summary_prompt(self, request: SummaryRequest) -> str:
        """Build optimized prompt for summary generation"""
        length_instructions = {
            SummaryLength.BRIEF: "Generate a concise summary in 100-200 words",
            SummaryLength.STANDARD: "Generate a comprehensive summary in 300-500 words", 
            SummaryLength.DETAILED: "Generate a detailed summary in 500-800 words"
        }
        
        focus_instruction = ""
        if request.focus_areas:
            focus_instruction = f"\nPay special attention to these areas: {', '.join(request.focus_areas)}"
        
        return f"""
Analyze this YouTube video transcript and provide a structured summary in JSON format.

{length_instructions[request.length]}.

Required JSON structure:
{{
    "summary": "Main summary text here",
    "key_points": ["Point 1", "Point 2", "Point 3", ...],
    "main_themes": ["Theme 1", "Theme 2", "Theme 3"],
    "actionable_insights": ["Insight 1", "Insight 2", ...],
    "confidence_score": 0.95
}}

Guidelines:
- Extract 3-7 key points that capture the most important information
- Identify 2-4 main themes or topics discussed
- Provide 2-5 actionable insights that viewers can apply
- Assign a confidence score (0.0-1.0) based on transcript quality and coherence
- Use clear, engaging language that's accessible to a general audience
- Focus on value and practical takeaways{focus_instruction}

Transcript:
{request.transcript}
"""
    
    async def _generate_chunked_summary(self, request: SummaryRequest) -> SummaryResult:
        """Handle long transcripts using map-reduce approach"""
        
        # Split transcript into manageable chunks
        chunks = self._split_transcript_intelligently(request.transcript)
        
        # Generate summary for each chunk
        chunk_summaries = []
        total_cost = 0.0
        total_tokens = 0
        
        for i, chunk in enumerate(chunks):
            chunk_request = SummaryRequest(
                transcript=chunk,
                length=SummaryLength.BRIEF,  # Brief summaries for chunks
                focus_areas=request.focus_areas,
                language=request.language
            )
            
            chunk_result = await self.generate_summary(chunk_request)
            chunk_summaries.append(chunk_result.summary)
            total_cost += chunk_result.cost_data["total_cost_usd"]
            total_tokens += chunk_result.processing_metadata["total_tokens"]
            
            # Add delay to respect rate limits
            await asyncio.sleep(0.1)
        
        # Combine chunk summaries into final summary
        combined_transcript = "\n\n".join([
            f"Section {i+1} Summary: {summary}" 
            for i, summary in enumerate(chunk_summaries)
        ])
        
        final_request = SummaryRequest(
            transcript=combined_transcript,
            length=request.length,
            focus_areas=request.focus_areas,
            language=request.language
        )
        
        final_result = await self.generate_summary(final_request)
        
        # Update metadata to reflect chunked processing
        final_result.processing_metadata.update({
            "chunks_processed": len(chunks),
            "total_tokens": total_tokens + final_result.processing_metadata["total_tokens"],
            "chunking_strategy": "intelligent_content_boundaries"
        })
        
        final_result.cost_data["total_cost_usd"] = total_cost + final_result.cost_data["total_cost_usd"]
        
        return final_result
    
    def _split_transcript_intelligently(self, transcript: str, max_tokens: int = 12000) -> List[str]:
        """Split transcript at natural boundaries while respecting token limits"""
        
        # Split by paragraphs first, then sentences if needed
        paragraphs = transcript.split('\n\n')
        chunks = []
        current_chunk = []
        current_tokens = 0
        
        for paragraph in paragraphs:
            paragraph_tokens = self.get_token_count(paragraph)
            
            # If single paragraph exceeds limit, split by sentences
            if paragraph_tokens > max_tokens:
                sentences = paragraph.split('. ')
                for sentence in sentences:
                    sentence_tokens = self.get_token_count(sentence)
                    
                    if current_tokens + sentence_tokens > max_tokens and current_chunk:
                        chunks.append(' '.join(current_chunk))
                        current_chunk = [sentence]
                        current_tokens = sentence_tokens
                    else:
                        current_chunk.append(sentence)
                        current_tokens += sentence_tokens
            else:
                if current_tokens + paragraph_tokens > max_tokens and current_chunk:
                    chunks.append('\n\n'.join(current_chunk))
                    current_chunk = [paragraph]
                    current_tokens = paragraph_tokens
                else:
                    current_chunk.append(paragraph)
                    current_tokens += paragraph_tokens
        
        # Add final chunk
        if current_chunk:
            chunks.append('\n\n'.join(current_chunk))
        
        return chunks
    
    def _get_max_tokens(self, length: SummaryLength) -> int:
        """Get max output tokens based on summary length"""
        return {
            SummaryLength.BRIEF: 300,
            SummaryLength.STANDARD: 700,
            SummaryLength.DETAILED: 1200
        }[length]
    
    def estimate_cost(self, transcript: str, length: SummaryLength) -> float:
        """Estimate cost for summarizing transcript"""
        input_tokens = self.get_token_count(transcript)
        output_tokens = self._get_max_tokens(length)
        
        input_cost = (input_tokens / 1000) * self.input_cost_per_1k
        output_cost = (output_tokens / 1000) * self.output_cost_per_1k
        
        return input_cost + output_cost
    
    def get_token_count(self, text: str) -> int:
        """Get accurate token count for OpenAI model"""
        return len(self.encoding.encode(text))

API Endpoint Implementation

[Source: docs/architecture.md#api-specification]

# backend/api/summarization.py
from fastapi import APIRouter, HTTPException, BackgroundTasks, Depends
from pydantic import BaseModel, Field
from typing import Optional, List
from ..services.ai_service import SummaryRequest, SummaryLength
from ..services.openai_summarizer import OpenAISummarizer
from ..core.exceptions import AIServiceError

router = APIRouter(prefix="/api", tags=["summarization"])

class SummarizeRequest(BaseModel):
    transcript: str = Field(..., description="Video transcript to summarize")
    length: SummaryLength = Field(SummaryLength.STANDARD, description="Summary length preference")
    focus_areas: Optional[List[str]] = Field(None, description="Areas to focus on")
    language: str = Field("en", description="Content language")
    async_processing: bool = Field(False, description="Process asynchronously")

class SummarizeResponse(BaseModel):
    summary_id: Optional[str] = None  # For async processing
    summary: Optional[str] = None     # For sync processing
    key_points: Optional[List[str]] = None
    main_themes: Optional[List[str]] = None
    actionable_insights: Optional[List[str]] = None
    confidence_score: Optional[float] = None
    processing_metadata: Optional[dict] = None
    cost_data: Optional[dict] = None
    status: str = "completed"  # "processing", "completed", "failed"

@router.post("/summarize", response_model=SummarizeResponse)
async def summarize_transcript(
    request: SummarizeRequest,
    background_tasks: BackgroundTasks,
    ai_service: OpenAISummarizer = Depends()
):
    """Generate AI summary from transcript"""
    
    # Validate transcript length
    if len(request.transcript.strip()) < 50:
        raise HTTPException(
            status_code=400,
            detail="Transcript too short for meaningful summarization"
        )
    
    if len(request.transcript) > 100000:  # ~100k characters
        request.async_processing = True  # Force async for very long transcripts
    
    try:
        # Estimate cost before processing
        estimated_cost = ai_service.estimate_cost(request.transcript, request.length)
        
        if estimated_cost > 1.00:  # Cost limit check
            raise HTTPException(
                status_code=400,
                detail=f"Estimated cost ${estimated_cost:.3f} exceeds limit. Consider shorter transcript or brief summary."
            )
        
        summary_request = SummaryRequest(
            transcript=request.transcript,
            length=request.length,
            focus_areas=request.focus_areas,
            language=request.language
        )
        
        if request.async_processing:
            # Process asynchronously
            summary_id = str(uuid.uuid4())
            
            background_tasks.add_task(
                process_summary_async,
                summary_id=summary_id,
                request=summary_request,
                ai_service=ai_service
            )
            
            return SummarizeResponse(
                summary_id=summary_id,
                status="processing"
            )
        else:
            # Process synchronously
            result = await ai_service.generate_summary(summary_request)
            
            return SummarizeResponse(
                summary=result.summary,
                key_points=result.key_points,
                main_themes=result.main_themes,
                actionable_insights=result.actionable_insights,
                confidence_score=result.confidence_score,
                processing_metadata=result.processing_metadata,
                cost_data=result.cost_data,
                status="completed"
            )
            
    except AIServiceError as e:
        raise HTTPException(
            status_code=500,
            detail={
                "error": "AI service error",
                "message": e.message,
                "code": e.error_code,
                "details": e.details
            }
        )

async def process_summary_async(
    summary_id: str,
    request: SummaryRequest,
    ai_service: OpenAISummarizer
):
    """Background task for async summary processing"""
    try:
        result = await ai_service.generate_summary(request)
        
        # Store result in database/cache
        await store_summary_result(summary_id, result)
        
        # Send WebSocket notification
        await notify_summary_complete(summary_id, result)
        
    except Exception as e:
        await store_summary_error(summary_id, str(e))
        await notify_summary_failed(summary_id, str(e))

@router.get("/summaries/{summary_id}", response_model=SummarizeResponse)
async def get_summary(summary_id: str):
    """Get async summary result by ID"""
    
    # Retrieve from database/cache
    result = await get_stored_summary(summary_id)
    
    if not result:
        raise HTTPException(status_code=404, detail="Summary not found")
    
    return SummarizeResponse(**result)

Error Handling Requirements

[Source: docs/architecture.md#error-handling]

# backend/core/exceptions.py (additions)
class AIServiceError(BaseAPIException):
    """Base exception for AI service errors"""
    pass

class TokenLimitExceededError(AIServiceError):
    """Raised when content exceeds model token limit"""
    def __init__(self, token_count: int, max_tokens: int):
        super().__init__(
            message=f"Content ({token_count} tokens) exceeds model limit ({max_tokens} tokens)",
            error_code=ErrorCode.TOKEN_LIMIT_EXCEEDED,
            status_code=status.HTTP_400_BAD_REQUEST,
            details={
                "token_count": token_count,
                "max_tokens": max_tokens,
                "suggestions": [
                    "Use chunked processing for long content",
                    "Choose a briefer summary length",
                    "Split content into smaller sections"
                ]
            }
        )

class CostLimitExceededError(AIServiceError):
    """Raised when processing cost exceeds limits"""
    def __init__(self, estimated_cost: float, cost_limit: float):
        super().__init__(
            message=f"Estimated cost ${estimated_cost:.3f} exceeds limit ${cost_limit:.2f}",
            error_code=ErrorCode.COST_LIMIT_EXCEEDED,
            status_code=status.HTTP_400_BAD_REQUEST,
            details={
                "estimated_cost": estimated_cost,
                "cost_limit": cost_limit,
                "cost_reduction_tips": [
                    "Choose 'brief' summary length",
                    "Remove less important content from transcript",
                    "Process content in smaller segments"
                ]
            }
        )

class AIServiceUnavailableError(AIServiceError):
    """Raised when AI service is temporarily unavailable"""
    pass

File Locations and Structure

[Source: docs/architecture.md#project-structure]

Backend Files:

  • backend/services/ai_service.py - Base AI service interface and data models
  • backend/services/openai_summarizer.py - OpenAI GPT-4o-mini integration
  • backend/api/summarization.py - Summary generation endpoints
  • backend/core/exceptions.py - Updated with AI-specific exceptions
  • backend/models/summary.py - Database models for summary storage
  • backend/tests/unit/test_openai_summarizer.py - Unit tests
  • backend/tests/integration/test_summarization_api.py - Integration tests

Testing Standards

Backend Unit Tests

[Source: docs/architecture.md#testing-strategy]

# backend/tests/unit/test_openai_summarizer.py
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from backend.services.openai_summarizer import OpenAISummarizer
from backend.services.ai_service import SummaryRequest, SummaryLength

class TestOpenAISummarizer:
    @pytest.fixture
    def summarizer(self):
        return OpenAISummarizer(api_key="test-key")
    
    @pytest.mark.asyncio
    async def test_generate_summary_success(self, summarizer):
        """Test successful summary generation"""
        
        # Mock OpenAI response
        mock_response = MagicMock()
        mock_response.choices[0].message.content = json.dumps({
            "summary": "This is a test summary",
            "key_points": ["Point 1", "Point 2"],
            "main_themes": ["Theme 1"],
            "actionable_insights": ["Insight 1"],
            "confidence_score": 0.92
        })
        mock_response.usage.prompt_tokens = 100
        mock_response.usage.completion_tokens = 50
        mock_response.usage.total_tokens = 150
        
        with patch.object(summarizer.client.chat.completions, 'create', return_value=mock_response):
            request = SummaryRequest(
                transcript="This is a test transcript with some content to summarize.",
                length=SummaryLength.STANDARD
            )
            
            result = await summarizer.generate_summary(request)
            
            assert result.summary == "This is a test summary"
            assert len(result.key_points) == 2
            assert result.confidence_score == 0.92
            assert result.cost_data["total_cost_usd"] > 0

    @pytest.mark.asyncio
    async def test_chunked_processing(self, summarizer):
        """Test long transcript chunking"""
        
        # Create a very long transcript
        long_transcript = "This is a sentence. " * 2000  # ~4000 tokens
        
        with patch.object(summarizer, 'generate_summary') as mock_generate:
            mock_generate.return_value = AsyncMock()
            
            request = SummaryRequest(
                transcript=long_transcript,
                length=SummaryLength.STANDARD
            )
            
            await summarizer.generate_summary(request)
            
            # Should have triggered chunked processing
            assert mock_generate.call_count > 1

    def test_cost_estimation(self, summarizer):
        """Test cost estimation accuracy"""
        transcript = "Test transcript for cost estimation."
        
        cost = summarizer.estimate_cost(transcript, SummaryLength.STANDARD)
        
        assert isinstance(cost, float)
        assert cost > 0
        assert cost < 0.01  # Should be very cheap for short transcript

    def test_token_counting(self, summarizer):
        """Test token counting accuracy"""
        text = "Hello world, this is a test."
        
        token_count = summarizer.get_token_count(text)
        
        assert isinstance(token_count, int)
        assert token_count > 0
        assert token_count < 20  # Should be reasonable for short text

Performance Optimization

  • Token Management: Intelligent chunking prevents token limit errors while preserving context
  • Cost Optimization: GPT-4o-mini provides 80% savings vs GPT-4 while maintaining quality
  • Async Processing: Background processing for long transcripts prevents UI blocking
  • Caching Strategy: Summary results cached to avoid repeated API calls
  • Rate Limiting: Built-in delays and retry logic respect OpenAI rate limits

Security Considerations

  • API Key Security: Keys stored in environment variables, never in code
  • Input Validation: Transcript length and content validation before processing
  • Cost Controls: Per-request cost limits prevent unexpected charges
  • Error Sanitization: Sensitive error details not exposed to clients
  • Request Logging: Comprehensive logging for debugging without exposing content

Change Log

Date Version Description Author
2025-01-25 1.0 Initial story creation Bob (Scrum Master)

Dev Agent Record

This section will be populated by the development agent during implementation

Agent Model Used

To be filled by dev agent

Debug Log References

To be filled by dev agent

Completion Notes List

To be filled by dev agent

File List

To be filled by dev agent

QA Results

Results from QA Agent review of the completed story implementation will be added here