youtube-summarizer/docs/stories/2.2.summary-generation-pipe...

32 KiB

Story 2.2: Summary Generation Pipeline

Status

Draft

Story

As a user
I want an end-to-end pipeline that seamlessly processes YouTube URLs into high-quality summaries
so that I can get from video link to summary in a single, streamlined workflow

Acceptance Criteria

  1. Integrated pipeline connects URL validation → transcript extraction → AI summarization
  2. Pipeline handles the complete workflow asynchronously with progress tracking
  3. System provides intelligent summary optimization based on transcript characteristics
  4. Generated summaries include enhanced metadata (video info, processing stats, quality scores)
  5. Pipeline includes quality validation and automatic retry for failed summaries
  6. Users can monitor pipeline progress and receive completion notifications

Tasks / Subtasks

  • Task 1: Pipeline Orchestration Service (AC: 1, 2)

    • Create SummaryPipeline orchestrator in backend/services/summary_pipeline.py
    • Implement workflow coordination between video, transcript, and AI services
    • Add pipeline state management with persistent job tracking
    • Create rollback and cleanup mechanisms for failed pipelines
  • Task 2: Enhanced Video Metadata Integration (AC: 4)

    • Integrate YouTube Data API for rich video metadata extraction
    • Add video categorization and content type detection
    • Implement thumbnail and channel information capture
    • Create metadata-driven summary customization logic
  • Task 3: Intelligent Summary Optimization (AC: 3, 5)

    • Implement transcript analysis for content type detection (educational, entertainment, technical)
    • Add automatic summary length optimization based on content complexity
    • Create quality scoring algorithm for generated summaries
    • Implement summary enhancement for poor-quality results
  • Task 4: Progress Tracking and Notifications (AC: 2, 6)

    • Create comprehensive pipeline progress tracking system
    • Implement WebSocket notifications for real-time updates
    • Add email notifications for completed summaries (optional)
    • Create detailed logging and audit trail for each pipeline run
  • Task 5: Quality Assurance and Validation (AC: 5)

    • Implement summary quality validation checks
    • Add automatic retry logic for failed or low-quality summaries
    • Create fallback strategies for different types of failures
    • Implement summary improvement suggestions and regeneration
  • Task 6: API Integration and Frontend (AC: 1, 2, 6)

    • Create /api/process endpoint for end-to-end pipeline processing
    • Update frontend to use integrated pipeline instead of separate services
    • Add pipeline status dashboard for monitoring active and completed jobs
    • Implement pipeline cancellation and cleanup functionality
  • Task 7: Performance and Reliability (AC: 2, 5)

    • Add comprehensive error handling and recovery mechanisms
    • Implement pipeline timeout and resource management
    • Create performance monitoring and optimization tracking
    • Add pipeline analytics and usage statistics

Dev Notes

Architecture Context

This story creates the core user-facing workflow that demonstrates the full value of the YouTube Summarizer. The pipeline must be reliable, fast, and provide clear feedback while handling edge cases gracefully.

Pipeline Orchestration Architecture

[Source: docs/architecture.md#pipeline-architecture]

# backend/services/summary_pipeline.py
import asyncio
import uuid
from datetime import datetime
from enum import Enum
from typing import Dict, Optional, List, Any
from dataclasses import dataclass, asdict
from ..services.video_service import VideoService
from ..services.transcript_service import TranscriptService
from ..services.openai_summarizer import OpenAISummarizer
from ..services.cache_manager import CacheManager
from ..core.exceptions import PipelineError

class PipelineStage(Enum):
    INITIALIZED = "initialized"
    VALIDATING_URL = "validating_url"
    EXTRACTING_METADATA = "extracting_metadata"
    EXTRACTING_TRANSCRIPT = "extracting_transcript"
    ANALYZING_CONTENT = "analyzing_content"
    GENERATING_SUMMARY = "generating_summary"
    VALIDATING_QUALITY = "validating_quality"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class PipelineConfig:
    summary_length: str = "standard"
    include_timestamps: bool = False
    focus_areas: Optional[List[str]] = None
    quality_threshold: float = 0.7
    max_retries: int = 2
    enable_notifications: bool = True

@dataclass
class PipelineProgress:
    stage: PipelineStage
    percentage: float
    message: str
    estimated_time_remaining: Optional[float] = None
    current_step_details: Optional[Dict[str, Any]] = None

@dataclass
class PipelineResult:
    job_id: str
    video_url: str
    video_id: str
    status: PipelineStage
    
    # Video metadata
    video_metadata: Optional[Dict[str, Any]] = None
    
    # Processing results
    transcript: Optional[str] = None
    summary: Optional[str] = None
    key_points: Optional[List[str]] = None
    main_themes: Optional[List[str]] = None
    actionable_insights: Optional[List[str]] = None
    
    # Quality and metadata
    confidence_score: Optional[float] = None
    quality_score: Optional[float] = None
    processing_metadata: Optional[Dict[str, Any]] = None
    cost_data: Optional[Dict[str, Any]] = None
    
    # Timeline
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    processing_time_seconds: Optional[float] = None
    
    # Error information
    error: Optional[Dict[str, Any]] = None
    retry_count: int = 0

class SummaryPipeline:
    """Orchestrates the complete YouTube summarization workflow"""
    
    def __init__(
        self,
        video_service: VideoService,
        transcript_service: TranscriptService,
        ai_service: OpenAISummarizer,
        cache_manager: CacheManager
    ):
        self.video_service = video_service
        self.transcript_service = transcript_service
        self.ai_service = ai_service
        self.cache_manager = cache_manager
        
        # Active jobs tracking
        self.active_jobs: Dict[str, PipelineResult] = {}
        self.progress_callbacks: Dict[str, List[callable]] = {}
    
    async def process_video(
        self,
        video_url: str,
        config: PipelineConfig = None,
        progress_callback: callable = None
    ) -> str:
        """Start video processing pipeline and return job ID"""
        
        if config is None:
            config = PipelineConfig()
        
        job_id = str(uuid.uuid4())
        
        # Initialize pipeline result
        result = PipelineResult(
            job_id=job_id,
            video_url=video_url,
            video_id="",  # Will be populated during validation
            status=PipelineStage.INITIALIZED,
            started_at=datetime.utcnow(),
            retry_count=0
        )
        
        self.active_jobs[job_id] = result
        
        if progress_callback:
            self.progress_callbacks[job_id] = [progress_callback]
        
        # Start processing in background
        asyncio.create_task(self._execute_pipeline(job_id, config))
        
        return job_id
    
    async def _execute_pipeline(self, job_id: str, config: PipelineConfig):
        """Execute the complete processing pipeline"""
        
        result = self.active_jobs[job_id]
        
        try:
            # Stage 1: URL Validation
            await self._update_progress(job_id, PipelineStage.VALIDATING_URL, 5, "Validating YouTube URL...")
            video_id = await self.video_service.extract_video_id(result.video_url)
            result.video_id = video_id
            
            # Stage 2: Extract Video Metadata
            await self._update_progress(job_id, PipelineStage.EXTRACTING_METADATA, 15, "Extracting video information...")
            metadata = await self._extract_enhanced_metadata(video_id)
            result.video_metadata = metadata
            
            # Stage 3: Extract Transcript
            await self._update_progress(job_id, PipelineStage.EXTRACTING_TRANSCRIPT, 35, "Extracting transcript...")
            transcript_result = await self.transcript_service.extract_transcript(video_id)
            result.transcript = transcript_result.transcript
            
            # Stage 4: Analyze Content for Optimization
            await self._update_progress(job_id, PipelineStage.ANALYZING_CONTENT, 50, "Analyzing content characteristics...")
            content_analysis = await self._analyze_content_characteristics(result.transcript, metadata)
            optimized_config = self._optimize_config_for_content(config, content_analysis)
            
            # Stage 5: Generate Summary
            await self._update_progress(job_id, PipelineStage.GENERATING_SUMMARY, 75, "Generating AI summary...")
            summary_result = await self._generate_optimized_summary(result.transcript, optimized_config, content_analysis)
            
            # Populate summary results
            result.summary = summary_result.summary
            result.key_points = summary_result.key_points
            result.main_themes = summary_result.main_themes
            result.actionable_insights = summary_result.actionable_insights
            result.confidence_score = summary_result.confidence_score
            result.processing_metadata = summary_result.processing_metadata
            result.cost_data = summary_result.cost_data
            
            # Stage 6: Quality Validation
            await self._update_progress(job_id, PipelineStage.VALIDATING_QUALITY, 90, "Validating summary quality...")
            quality_score = await self._validate_summary_quality(result, content_analysis)
            result.quality_score = quality_score
            
            # Check if quality meets threshold
            if quality_score < config.quality_threshold and result.retry_count < config.max_retries:
                await self._retry_with_improvements(job_id, config, "Low quality score")
                return
            
            # Stage 7: Complete
            result.completed_at = datetime.utcnow()
            result.processing_time_seconds = (result.completed_at - result.started_at).total_seconds()
            result.status = PipelineStage.COMPLETED
            
            await self._update_progress(job_id, PipelineStage.COMPLETED, 100, "Summary completed successfully!")
            
            # Cache the result
            await self.cache_manager.cache_pipeline_result(job_id, result)
            
            # Send completion notification
            if config.enable_notifications:
                await self._send_completion_notification(result)
                
        except Exception as e:
            await self._handle_pipeline_error(job_id, e, config)
    
    async def _extract_enhanced_metadata(self, video_id: str) -> Dict[str, Any]:
        """Extract rich video metadata using YouTube Data API"""
        
        # This would integrate with YouTube Data API v3
        # For now, implementing basic structure
        
        try:
            # Simulate YouTube Data API call
            metadata = {
                "title": f"Video {video_id} Title",  # Would come from API
                "description": "Video description...",
                "channel_name": "Channel Name",
                "published_at": datetime.utcnow().isoformat(),
                "duration": "PT10M30S",  # ISO 8601 duration
                "view_count": 1000,
                "like_count": 50,
                "category": "Education",
                "tags": ["python", "tutorial", "coding"],
                "thumbnail_url": f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg",
                "language": "en",
                "default_language": "en"
            }
            
            return metadata
            
        except Exception as e:
            # Return basic metadata if enhanced extraction fails
            return {
                "video_id": video_id,
                "title": f"Video {video_id}",
                "error": f"Enhanced metadata extraction failed: {str(e)}"
            }
    
    async def _analyze_content_characteristics(self, transcript: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
        """Analyze transcript and metadata to determine optimal processing strategy"""
        
        analysis = {
            "transcript_length": len(transcript),
            "word_count": len(transcript.split()),
            "estimated_reading_time": len(transcript.split()) / 250,  # Words per minute
            "complexity_score": 0.5,  # Would implement actual complexity analysis
            "content_type": "general",
            "language": metadata.get("language", "en"),
            "technical_indicators": [],
            "educational_indicators": [],
            "entertainment_indicators": []
        }
        
        # Basic content type detection
        transcript_lower = transcript.lower()
        
        # Technical content indicators
        technical_terms = ["algorithm", "function", "variable", "database", "api", "code", "programming"]
        technical_count = sum(1 for term in technical_terms if term in transcript_lower)
        if technical_count >= 3:
            analysis["content_type"] = "technical"
            analysis["technical_indicators"] = [term for term in technical_terms if term in transcript_lower]
        
        # Educational content indicators
        educational_terms = ["learn", "tutorial", "explain", "understand", "concept", "example", "lesson"]
        educational_count = sum(1 for term in educational_terms if term in transcript_lower)
        if educational_count >= 3:
            analysis["content_type"] = "educational"
            analysis["educational_indicators"] = [term for term in educational_terms if term in transcript_lower]
        
        # Entertainment content indicators
        entertainment_terms = ["funny", "story", "experience", "adventure", "review", "reaction"]
        entertainment_count = sum(1 for term in entertainment_terms if term in transcript_lower)
        if entertainment_count >= 2:
            analysis["content_type"] = "entertainment"
            analysis["entertainment_indicators"] = [term for term in entertainment_terms if term in transcript_lower]
        
        # Complexity scoring based on sentence length and vocabulary
        sentences = transcript.split('.')
        avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences) if sentences else 0
        
        if avg_sentence_length > 20:
            analysis["complexity_score"] = min(1.0, analysis["complexity_score"] + 0.3)
        elif avg_sentence_length < 10:
            analysis["complexity_score"] = max(0.1, analysis["complexity_score"] - 0.2)
        
        return analysis
    
    def _optimize_config_for_content(self, base_config: PipelineConfig, analysis: Dict[str, Any]) -> PipelineConfig:
        """Optimize processing configuration based on content analysis"""
        
        optimized_config = PipelineConfig(**asdict(base_config))
        
        # Adjust summary length based on content
        if analysis["word_count"] > 5000 and optimized_config.summary_length == "standard":
            optimized_config.summary_length = "detailed"
        elif analysis["word_count"] < 500 and optimized_config.summary_length == "standard":
            optimized_config.summary_length = "brief"
        
        # Add focus areas based on content type
        if not optimized_config.focus_areas:
            optimized_config.focus_areas = []
        
        content_type = analysis.get("content_type", "general")
        if content_type == "technical":
            optimized_config.focus_areas.extend(["technical concepts", "implementation details"])
        elif content_type == "educational":
            optimized_config.focus_areas.extend(["learning objectives", "key concepts", "practical applications"])
        elif content_type == "entertainment":
            optimized_config.focus_areas.extend(["main highlights", "key moments", "overall message"])
        
        # Adjust quality threshold based on complexity
        if analysis["complexity_score"] > 0.7:
            optimized_config.quality_threshold = max(0.6, optimized_config.quality_threshold - 0.1)
        
        return optimized_config
    
    async def _generate_optimized_summary(
        self, 
        transcript: str, 
        config: PipelineConfig, 
        analysis: Dict[str, Any]
    ) -> Any:  # Returns SummaryResult
        """Generate summary with content-aware optimizations"""
        
        from ..services.ai_service import SummaryRequest, SummaryLength
        
        # Map config to AI service parameters
        length_mapping = {
            "brief": SummaryLength.BRIEF,
            "standard": SummaryLength.STANDARD,
            "detailed": SummaryLength.DETAILED
        }
        
        summary_request = SummaryRequest(
            transcript=transcript,
            length=length_mapping[config.summary_length],
            focus_areas=config.focus_areas,
            language=analysis.get("language", "en"),
            include_timestamps=config.include_timestamps
        )
        
        # Add content-specific prompt enhancements
        if analysis["content_type"] == "technical":
            summary_request.focus_areas.append("explain technical concepts clearly")
        elif analysis["content_type"] == "educational":
            summary_request.focus_areas.append("highlight learning outcomes")
        
        return await self.ai_service.generate_summary(summary_request)
    
    async def _validate_summary_quality(self, result: PipelineResult, analysis: Dict[str, Any]) -> float:
        """Validate and score summary quality"""
        
        quality_score = 0.0
        
        # Check summary length appropriateness
        summary_word_count = len(result.summary.split()) if result.summary else 0
        transcript_word_count = analysis["word_count"]
        
        # Good summary should be 5-15% of original length
        compression_ratio = summary_word_count / transcript_word_count if transcript_word_count > 0 else 0
        if 0.05 <= compression_ratio <= 0.15:
            quality_score += 0.3
        elif 0.03 <= compression_ratio <= 0.20:
            quality_score += 0.2
        
        # Check key points availability and quality
        if result.key_points and len(result.key_points) >= 3:
            quality_score += 0.2
        
        # Check main themes availability
        if result.main_themes and len(result.main_themes) >= 2:
            quality_score += 0.15
        
        # Check actionable insights
        if result.actionable_insights and len(result.actionable_insights) >= 1:
            quality_score += 0.15
        
        # Use AI confidence score
        if result.confidence_score and result.confidence_score > 0.8:
            quality_score += 0.2
        elif result.confidence_score and result.confidence_score > 0.6:
            quality_score += 0.1
        
        return min(1.0, quality_score)
    
    async def _retry_with_improvements(self, job_id: str, config: PipelineConfig, reason: str):
        """Retry pipeline with improved configuration"""
        
        result = self.active_jobs[job_id]
        result.retry_count += 1
        
        await self._update_progress(
            job_id, 
            PipelineStage.ANALYZING_CONTENT, 
            40, 
            f"Retrying with improvements (attempt {result.retry_count + 1}/{config.max_retries + 1})"
        )
        
        # Improve configuration for retry
        improved_config = PipelineConfig(**asdict(config))
        improved_config.summary_length = "detailed"  # Try more detailed summary
        improved_config.quality_threshold = max(0.5, config.quality_threshold - 0.1)  # Lower threshold slightly
        
        # Continue pipeline with improved config
        await self._execute_pipeline(job_id, improved_config)
    
    async def _handle_pipeline_error(self, job_id: str, error: Exception, config: PipelineConfig):
        """Handle pipeline errors with retry logic"""
        
        result = self.active_jobs[job_id]
        result.status = PipelineStage.FAILED
        result.error = {
            "message": str(error),
            "type": type(error).__name__,
            "stage": result.status.value,
            "retry_count": result.retry_count
        }
        
        # Attempt retry if within limits
        if result.retry_count < config.max_retries:
            await asyncio.sleep(2 ** result.retry_count)  # Exponential backoff
            await self._retry_with_improvements(job_id, config, f"Error: {str(error)}")
        else:
            result.completed_at = datetime.utcnow()
            await self._update_progress(job_id, PipelineStage.FAILED, 0, f"Failed after {result.retry_count + 1} attempts")
    
    async def _update_progress(
        self, 
        job_id: str, 
        stage: PipelineStage, 
        percentage: float, 
        message: str,
        details: Dict[str, Any] = None
    ):
        """Update pipeline progress and notify callbacks"""
        
        result = self.active_jobs.get(job_id)
        if result:
            result.status = stage
        
        progress = PipelineProgress(
            stage=stage,
            percentage=percentage,
            message=message,
            current_step_details=details
        )
        
        # Notify all registered callbacks
        callbacks = self.progress_callbacks.get(job_id, [])
        for callback in callbacks:
            try:
                await callback(job_id, progress)
            except Exception as e:
                print(f"Progress callback error: {e}")
    
    async def get_pipeline_result(self, job_id: str) -> Optional[PipelineResult]:
        """Get pipeline result by job ID"""
        
        # Check active jobs first
        if job_id in self.active_jobs:
            return self.active_jobs[job_id]
        
        # Check cache for completed jobs
        cached_result = await self.cache_manager.get_cached_pipeline_result(job_id)
        return cached_result
    
    async def cancel_pipeline(self, job_id: str) -> bool:
        """Cancel running pipeline"""
        
        if job_id in self.active_jobs:
            result = self.active_jobs[job_id]
            result.status = PipelineStage.CANCELLED
            result.completed_at = datetime.utcnow()
            
            await self._update_progress(job_id, PipelineStage.CANCELLED, 0, "Pipeline cancelled by user")
            
            return True
        
        return False
    
    async def _send_completion_notification(self, result: PipelineResult):
        """Send completion notification (email, webhook, etc.)"""
        
        # This would integrate with notification service
        notification_data = {
            "job_id": result.job_id,
            "video_title": result.video_metadata.get("title", "Unknown") if result.video_metadata else "Unknown",
            "status": result.status.value,
            "processing_time": result.processing_time_seconds,
            "summary_preview": result.summary[:100] + "..." if result.summary else None
        }
        
        # Log completion for now (would send actual notifications)
        print(f"Pipeline completed: {notification_data}")

API Integration

[Source: docs/architecture.md#api-specification]

# backend/api/pipeline.py
from fastapi import APIRouter, HTTPException, BackgroundTasks, Depends
from pydantic import BaseModel, Field, HttpUrl
from typing import Optional, List, Dict, Any
from ..services.summary_pipeline import SummaryPipeline, PipelineConfig, PipelineStage
from ..core.websocket_manager import WebSocketManager

router = APIRouter(prefix="/api", tags=["pipeline"])

class ProcessVideoRequest(BaseModel):
    video_url: HttpUrl = Field(..., description="YouTube video URL to process")
    summary_length: str = Field("standard", description="Summary length preference")
    focus_areas: Optional[List[str]] = Field(None, description="Areas to focus on in summary")
    include_timestamps: bool = Field(False, description="Include timestamps in summary")
    enable_notifications: bool = Field(True, description="Enable completion notifications")
    quality_threshold: float = Field(0.7, description="Minimum quality score threshold")

class ProcessVideoResponse(BaseModel):
    job_id: str
    status: str
    message: str
    estimated_completion_time: Optional[float] = None

class PipelineStatusResponse(BaseModel):
    job_id: str
    status: str
    progress_percentage: float
    current_message: str
    video_metadata: Optional[Dict[str, Any]] = None
    result: Optional[Dict[str, Any]] = None
    error: Optional[Dict[str, Any]] = None
    processing_time_seconds: Optional[float] = None

@router.post("/process", response_model=ProcessVideoResponse)
async def process_video(
    request: ProcessVideoRequest,
    pipeline: SummaryPipeline = Depends(),
    websocket_manager: WebSocketManager = Depends()
):
    """Process YouTube video through complete pipeline"""
    
    try:
        config = PipelineConfig(
            summary_length=request.summary_length,
            focus_areas=request.focus_areas or [],
            include_timestamps=request.include_timestamps,
            quality_threshold=request.quality_threshold,
            enable_notifications=request.enable_notifications
        )
        
        # Create progress callback for WebSocket notifications
        async def progress_callback(job_id: str, progress):
            await websocket_manager.send_progress_update(job_id, {
                "stage": progress.stage.value,
                "percentage": progress.percentage,
                "message": progress.message,
                "details": progress.current_step_details
            })
        
        # Start pipeline processing
        job_id = await pipeline.process_video(
            video_url=str(request.video_url),
            config=config,
            progress_callback=progress_callback
        )
        
        return ProcessVideoResponse(
            job_id=job_id,
            status="processing",
            message="Video processing started",
            estimated_completion_time=120.0  # 2 minutes estimate
        )
        
    except Exception as e:
        raise HTTPException(
            status_code=500,
            detail=f"Failed to start processing: {str(e)}"
        )

@router.get("/process/{job_id}", response_model=PipelineStatusResponse)
async def get_pipeline_status(
    job_id: str,
    pipeline: SummaryPipeline = Depends()
):
    """Get pipeline processing status and results"""
    
    result = await pipeline.get_pipeline_result(job_id)
    
    if not result:
        raise HTTPException(status_code=404, detail="Pipeline job not found")
    
    # Calculate progress percentage based on stage
    stage_percentages = {
        PipelineStage.INITIALIZED: 0,
        PipelineStage.VALIDATING_URL: 5,
        PipelineStage.EXTRACTING_METADATA: 15,
        PipelineStage.EXTRACTING_TRANSCRIPT: 35,
        PipelineStage.ANALYZING_CONTENT: 50,
        PipelineStage.GENERATING_SUMMARY: 75,
        PipelineStage.VALIDATING_QUALITY: 90,
        PipelineStage.COMPLETED: 100,
        PipelineStage.FAILED: 0,
        PipelineStage.CANCELLED: 0
    }
    
    response_data = {
        "job_id": job_id,
        "status": result.status.value,
        "progress_percentage": stage_percentages.get(result.status, 0),
        "current_message": f"Status: {result.status.value}",
        "video_metadata": result.video_metadata,
        "processing_time_seconds": result.processing_time_seconds
    }
    
    # Include results if completed
    if result.status == PipelineStage.COMPLETED:
        response_data["result"] = {
            "summary": result.summary,
            "key_points": result.key_points,
            "main_themes": result.main_themes,
            "actionable_insights": result.actionable_insights,
            "confidence_score": result.confidence_score,
            "quality_score": result.quality_score,
            "cost_data": result.cost_data
        }
    
    # Include error if failed
    if result.status == PipelineStage.FAILED and result.error:
        response_data["error"] = result.error
    
    return PipelineStatusResponse(**response_data)

@router.delete("/process/{job_id}")
async def cancel_pipeline(
    job_id: str,
    pipeline: SummaryPipeline = Depends()
):
    """Cancel running pipeline"""
    
    success = await pipeline.cancel_pipeline(job_id)
    
    if not success:
        raise HTTPException(status_code=404, detail="Pipeline job not found or already completed")
    
    return {"message": "Pipeline cancelled successfully"}

File Locations and Structure

[Source: docs/architecture.md#project-structure]

Backend Files:

  • backend/services/summary_pipeline.py - Main pipeline orchestration service
  • backend/api/pipeline.py - Pipeline management endpoints
  • backend/core/websocket_manager.py - WebSocket progress notifications
  • backend/models/pipeline.py - Pipeline result storage models
  • backend/services/notification_service.py - Completion notifications
  • backend/tests/unit/test_summary_pipeline.py - Unit tests
  • backend/tests/integration/test_pipeline_api.py - Integration tests

Frontend Integration

[Source: docs/architecture.md#frontend-architecture]

// frontend/src/hooks/usePipelineProcessor.ts
import { useState, useCallback } from 'react';
import { useMutation, useQuery } from '@tanstack/react-query';
import { apiClient } from '@/services/apiClient';
import { useWebSocket } from './useWebSocket';

interface PipelineConfig {
  summary_length: 'brief' | 'standard' | 'detailed';
  focus_areas?: string[];
  include_timestamps: boolean;
  enable_notifications: boolean;
  quality_threshold: number;
}

interface PipelineProgress {
  stage: string;
  percentage: number;
  message: string;
  details?: any;
}

export function usePipelineProcessor() {
  const [activeJobId, setActiveJobId] = useState<string | null>(null);
  const [progress, setProgress] = useState<PipelineProgress | null>(null);
  
  const { connect, disconnect } = useWebSocket({
    onProgressUpdate: (update: PipelineProgress) => {
      setProgress(update);
    }
  });

  const startProcessing = useMutation({
    mutationFn: async ({ url, config }: { url: string; config: PipelineConfig }) => {
      const response = await apiClient.processVideo(url, config);
      return response;
    },
    onSuccess: (data) => {
      setActiveJobId(data.job_id);
      connect(data.job_id);
    }
  });

  const { data: pipelineStatus } = useQuery({
    queryKey: ['pipeline-status', activeJobId],
    queryFn: () => activeJobId ? apiClient.getPipelineStatus(activeJobId) : null,
    enabled: !!activeJobId,
    refetchInterval: (data) => 
      data?.status === 'completed' || data?.status === 'failed' ? false : 2000
  });

  const cancelProcessing = useCallback(async () => {
    if (activeJobId) {
      await apiClient.cancelPipeline(activeJobId);
      setActiveJobId(null);
      setProgress(null);
      disconnect();
    }
  }, [activeJobId, disconnect]);

  return {
    startProcessing: startProcessing.mutateAsync,
    cancelProcessing,
    isProcessing: startProcessing.isPending || (pipelineStatus?.status === 'processing'),
    progress: progress || (pipelineStatus ? {
      stage: pipelineStatus.status,
      percentage: pipelineStatus.progress_percentage,
      message: pipelineStatus.current_message
    } : null),
    result: pipelineStatus?.result,
    error: startProcessing.error || pipelineStatus?.error,
    pipelineStatus
  };
}

Quality Assurance Features

  • Automatic Retry Logic: Failed or low-quality summaries automatically retried with improved parameters
  • Content-Aware Processing: Different strategies for technical, educational, and entertainment content
  • Quality Scoring: Multi-factor quality assessment ensures consistent results
  • Progress Transparency: Detailed progress tracking keeps users informed throughout the process
  • Error Recovery: Comprehensive error handling with graceful degradation

Change Log

Date Version Description Author
2025-01-25 1.0 Initial story creation Bob (Scrum Master)

Dev Agent Record

This section will be populated by the development agent during implementation

QA Results

Results from QA Agent review of the completed story implementation will be added here