32 KiB
Story 2.2: Summary Generation Pipeline
Status
Draft
Story
As a user
I want an end-to-end pipeline that seamlessly processes YouTube URLs into high-quality summaries
so that I can get from video link to summary in a single, streamlined workflow
Acceptance Criteria
- Integrated pipeline connects URL validation → transcript extraction → AI summarization
- Pipeline handles the complete workflow asynchronously with progress tracking
- System provides intelligent summary optimization based on transcript characteristics
- Generated summaries include enhanced metadata (video info, processing stats, quality scores)
- Pipeline includes quality validation and automatic retry for failed summaries
- Users can monitor pipeline progress and receive completion notifications
Tasks / Subtasks
-
Task 1: Pipeline Orchestration Service (AC: 1, 2)
- Create
SummaryPipelineorchestrator inbackend/services/summary_pipeline.py - Implement workflow coordination between video, transcript, and AI services
- Add pipeline state management with persistent job tracking
- Create rollback and cleanup mechanisms for failed pipelines
- Create
-
Task 2: Enhanced Video Metadata Integration (AC: 4)
- Integrate YouTube Data API for rich video metadata extraction
- Add video categorization and content type detection
- Implement thumbnail and channel information capture
- Create metadata-driven summary customization logic
-
Task 3: Intelligent Summary Optimization (AC: 3, 5)
- Implement transcript analysis for content type detection (educational, entertainment, technical)
- Add automatic summary length optimization based on content complexity
- Create quality scoring algorithm for generated summaries
- Implement summary enhancement for poor-quality results
-
Task 4: Progress Tracking and Notifications (AC: 2, 6)
- Create comprehensive pipeline progress tracking system
- Implement WebSocket notifications for real-time updates
- Add email notifications for completed summaries (optional)
- Create detailed logging and audit trail for each pipeline run
-
Task 5: Quality Assurance and Validation (AC: 5)
- Implement summary quality validation checks
- Add automatic retry logic for failed or low-quality summaries
- Create fallback strategies for different types of failures
- Implement summary improvement suggestions and regeneration
-
Task 6: API Integration and Frontend (AC: 1, 2, 6)
- Create
/api/processendpoint for end-to-end pipeline processing - Update frontend to use integrated pipeline instead of separate services
- Add pipeline status dashboard for monitoring active and completed jobs
- Implement pipeline cancellation and cleanup functionality
- Create
-
Task 7: Performance and Reliability (AC: 2, 5)
- Add comprehensive error handling and recovery mechanisms
- Implement pipeline timeout and resource management
- Create performance monitoring and optimization tracking
- Add pipeline analytics and usage statistics
Dev Notes
Architecture Context
This story creates the core user-facing workflow that demonstrates the full value of the YouTube Summarizer. The pipeline must be reliable, fast, and provide clear feedback while handling edge cases gracefully.
Pipeline Orchestration Architecture
[Source: docs/architecture.md#pipeline-architecture]
# backend/services/summary_pipeline.py
import asyncio
import uuid
from datetime import datetime
from enum import Enum
from typing import Dict, Optional, List, Any
from dataclasses import dataclass, asdict
from ..services.video_service import VideoService
from ..services.transcript_service import TranscriptService
from ..services.openai_summarizer import OpenAISummarizer
from ..services.cache_manager import CacheManager
from ..core.exceptions import PipelineError
class PipelineStage(Enum):
INITIALIZED = "initialized"
VALIDATING_URL = "validating_url"
EXTRACTING_METADATA = "extracting_metadata"
EXTRACTING_TRANSCRIPT = "extracting_transcript"
ANALYZING_CONTENT = "analyzing_content"
GENERATING_SUMMARY = "generating_summary"
VALIDATING_QUALITY = "validating_quality"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class PipelineConfig:
summary_length: str = "standard"
include_timestamps: bool = False
focus_areas: Optional[List[str]] = None
quality_threshold: float = 0.7
max_retries: int = 2
enable_notifications: bool = True
@dataclass
class PipelineProgress:
stage: PipelineStage
percentage: float
message: str
estimated_time_remaining: Optional[float] = None
current_step_details: Optional[Dict[str, Any]] = None
@dataclass
class PipelineResult:
job_id: str
video_url: str
video_id: str
status: PipelineStage
# Video metadata
video_metadata: Optional[Dict[str, Any]] = None
# Processing results
transcript: Optional[str] = None
summary: Optional[str] = None
key_points: Optional[List[str]] = None
main_themes: Optional[List[str]] = None
actionable_insights: Optional[List[str]] = None
# Quality and metadata
confidence_score: Optional[float] = None
quality_score: Optional[float] = None
processing_metadata: Optional[Dict[str, Any]] = None
cost_data: Optional[Dict[str, Any]] = None
# Timeline
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
processing_time_seconds: Optional[float] = None
# Error information
error: Optional[Dict[str, Any]] = None
retry_count: int = 0
class SummaryPipeline:
"""Orchestrates the complete YouTube summarization workflow"""
def __init__(
self,
video_service: VideoService,
transcript_service: TranscriptService,
ai_service: OpenAISummarizer,
cache_manager: CacheManager
):
self.video_service = video_service
self.transcript_service = transcript_service
self.ai_service = ai_service
self.cache_manager = cache_manager
# Active jobs tracking
self.active_jobs: Dict[str, PipelineResult] = {}
self.progress_callbacks: Dict[str, List[callable]] = {}
async def process_video(
self,
video_url: str,
config: PipelineConfig = None,
progress_callback: callable = None
) -> str:
"""Start video processing pipeline and return job ID"""
if config is None:
config = PipelineConfig()
job_id = str(uuid.uuid4())
# Initialize pipeline result
result = PipelineResult(
job_id=job_id,
video_url=video_url,
video_id="", # Will be populated during validation
status=PipelineStage.INITIALIZED,
started_at=datetime.utcnow(),
retry_count=0
)
self.active_jobs[job_id] = result
if progress_callback:
self.progress_callbacks[job_id] = [progress_callback]
# Start processing in background
asyncio.create_task(self._execute_pipeline(job_id, config))
return job_id
async def _execute_pipeline(self, job_id: str, config: PipelineConfig):
"""Execute the complete processing pipeline"""
result = self.active_jobs[job_id]
try:
# Stage 1: URL Validation
await self._update_progress(job_id, PipelineStage.VALIDATING_URL, 5, "Validating YouTube URL...")
video_id = await self.video_service.extract_video_id(result.video_url)
result.video_id = video_id
# Stage 2: Extract Video Metadata
await self._update_progress(job_id, PipelineStage.EXTRACTING_METADATA, 15, "Extracting video information...")
metadata = await self._extract_enhanced_metadata(video_id)
result.video_metadata = metadata
# Stage 3: Extract Transcript
await self._update_progress(job_id, PipelineStage.EXTRACTING_TRANSCRIPT, 35, "Extracting transcript...")
transcript_result = await self.transcript_service.extract_transcript(video_id)
result.transcript = transcript_result.transcript
# Stage 4: Analyze Content for Optimization
await self._update_progress(job_id, PipelineStage.ANALYZING_CONTENT, 50, "Analyzing content characteristics...")
content_analysis = await self._analyze_content_characteristics(result.transcript, metadata)
optimized_config = self._optimize_config_for_content(config, content_analysis)
# Stage 5: Generate Summary
await self._update_progress(job_id, PipelineStage.GENERATING_SUMMARY, 75, "Generating AI summary...")
summary_result = await self._generate_optimized_summary(result.transcript, optimized_config, content_analysis)
# Populate summary results
result.summary = summary_result.summary
result.key_points = summary_result.key_points
result.main_themes = summary_result.main_themes
result.actionable_insights = summary_result.actionable_insights
result.confidence_score = summary_result.confidence_score
result.processing_metadata = summary_result.processing_metadata
result.cost_data = summary_result.cost_data
# Stage 6: Quality Validation
await self._update_progress(job_id, PipelineStage.VALIDATING_QUALITY, 90, "Validating summary quality...")
quality_score = await self._validate_summary_quality(result, content_analysis)
result.quality_score = quality_score
# Check if quality meets threshold
if quality_score < config.quality_threshold and result.retry_count < config.max_retries:
await self._retry_with_improvements(job_id, config, "Low quality score")
return
# Stage 7: Complete
result.completed_at = datetime.utcnow()
result.processing_time_seconds = (result.completed_at - result.started_at).total_seconds()
result.status = PipelineStage.COMPLETED
await self._update_progress(job_id, PipelineStage.COMPLETED, 100, "Summary completed successfully!")
# Cache the result
await self.cache_manager.cache_pipeline_result(job_id, result)
# Send completion notification
if config.enable_notifications:
await self._send_completion_notification(result)
except Exception as e:
await self._handle_pipeline_error(job_id, e, config)
async def _extract_enhanced_metadata(self, video_id: str) -> Dict[str, Any]:
"""Extract rich video metadata using YouTube Data API"""
# This would integrate with YouTube Data API v3
# For now, implementing basic structure
try:
# Simulate YouTube Data API call
metadata = {
"title": f"Video {video_id} Title", # Would come from API
"description": "Video description...",
"channel_name": "Channel Name",
"published_at": datetime.utcnow().isoformat(),
"duration": "PT10M30S", # ISO 8601 duration
"view_count": 1000,
"like_count": 50,
"category": "Education",
"tags": ["python", "tutorial", "coding"],
"thumbnail_url": f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg",
"language": "en",
"default_language": "en"
}
return metadata
except Exception as e:
# Return basic metadata if enhanced extraction fails
return {
"video_id": video_id,
"title": f"Video {video_id}",
"error": f"Enhanced metadata extraction failed: {str(e)}"
}
async def _analyze_content_characteristics(self, transcript: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze transcript and metadata to determine optimal processing strategy"""
analysis = {
"transcript_length": len(transcript),
"word_count": len(transcript.split()),
"estimated_reading_time": len(transcript.split()) / 250, # Words per minute
"complexity_score": 0.5, # Would implement actual complexity analysis
"content_type": "general",
"language": metadata.get("language", "en"),
"technical_indicators": [],
"educational_indicators": [],
"entertainment_indicators": []
}
# Basic content type detection
transcript_lower = transcript.lower()
# Technical content indicators
technical_terms = ["algorithm", "function", "variable", "database", "api", "code", "programming"]
technical_count = sum(1 for term in technical_terms if term in transcript_lower)
if technical_count >= 3:
analysis["content_type"] = "technical"
analysis["technical_indicators"] = [term for term in technical_terms if term in transcript_lower]
# Educational content indicators
educational_terms = ["learn", "tutorial", "explain", "understand", "concept", "example", "lesson"]
educational_count = sum(1 for term in educational_terms if term in transcript_lower)
if educational_count >= 3:
analysis["content_type"] = "educational"
analysis["educational_indicators"] = [term for term in educational_terms if term in transcript_lower]
# Entertainment content indicators
entertainment_terms = ["funny", "story", "experience", "adventure", "review", "reaction"]
entertainment_count = sum(1 for term in entertainment_terms if term in transcript_lower)
if entertainment_count >= 2:
analysis["content_type"] = "entertainment"
analysis["entertainment_indicators"] = [term for term in entertainment_terms if term in transcript_lower]
# Complexity scoring based on sentence length and vocabulary
sentences = transcript.split('.')
avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences) if sentences else 0
if avg_sentence_length > 20:
analysis["complexity_score"] = min(1.0, analysis["complexity_score"] + 0.3)
elif avg_sentence_length < 10:
analysis["complexity_score"] = max(0.1, analysis["complexity_score"] - 0.2)
return analysis
def _optimize_config_for_content(self, base_config: PipelineConfig, analysis: Dict[str, Any]) -> PipelineConfig:
"""Optimize processing configuration based on content analysis"""
optimized_config = PipelineConfig(**asdict(base_config))
# Adjust summary length based on content
if analysis["word_count"] > 5000 and optimized_config.summary_length == "standard":
optimized_config.summary_length = "detailed"
elif analysis["word_count"] < 500 and optimized_config.summary_length == "standard":
optimized_config.summary_length = "brief"
# Add focus areas based on content type
if not optimized_config.focus_areas:
optimized_config.focus_areas = []
content_type = analysis.get("content_type", "general")
if content_type == "technical":
optimized_config.focus_areas.extend(["technical concepts", "implementation details"])
elif content_type == "educational":
optimized_config.focus_areas.extend(["learning objectives", "key concepts", "practical applications"])
elif content_type == "entertainment":
optimized_config.focus_areas.extend(["main highlights", "key moments", "overall message"])
# Adjust quality threshold based on complexity
if analysis["complexity_score"] > 0.7:
optimized_config.quality_threshold = max(0.6, optimized_config.quality_threshold - 0.1)
return optimized_config
async def _generate_optimized_summary(
self,
transcript: str,
config: PipelineConfig,
analysis: Dict[str, Any]
) -> Any: # Returns SummaryResult
"""Generate summary with content-aware optimizations"""
from ..services.ai_service import SummaryRequest, SummaryLength
# Map config to AI service parameters
length_mapping = {
"brief": SummaryLength.BRIEF,
"standard": SummaryLength.STANDARD,
"detailed": SummaryLength.DETAILED
}
summary_request = SummaryRequest(
transcript=transcript,
length=length_mapping[config.summary_length],
focus_areas=config.focus_areas,
language=analysis.get("language", "en"),
include_timestamps=config.include_timestamps
)
# Add content-specific prompt enhancements
if analysis["content_type"] == "technical":
summary_request.focus_areas.append("explain technical concepts clearly")
elif analysis["content_type"] == "educational":
summary_request.focus_areas.append("highlight learning outcomes")
return await self.ai_service.generate_summary(summary_request)
async def _validate_summary_quality(self, result: PipelineResult, analysis: Dict[str, Any]) -> float:
"""Validate and score summary quality"""
quality_score = 0.0
# Check summary length appropriateness
summary_word_count = len(result.summary.split()) if result.summary else 0
transcript_word_count = analysis["word_count"]
# Good summary should be 5-15% of original length
compression_ratio = summary_word_count / transcript_word_count if transcript_word_count > 0 else 0
if 0.05 <= compression_ratio <= 0.15:
quality_score += 0.3
elif 0.03 <= compression_ratio <= 0.20:
quality_score += 0.2
# Check key points availability and quality
if result.key_points and len(result.key_points) >= 3:
quality_score += 0.2
# Check main themes availability
if result.main_themes and len(result.main_themes) >= 2:
quality_score += 0.15
# Check actionable insights
if result.actionable_insights and len(result.actionable_insights) >= 1:
quality_score += 0.15
# Use AI confidence score
if result.confidence_score and result.confidence_score > 0.8:
quality_score += 0.2
elif result.confidence_score and result.confidence_score > 0.6:
quality_score += 0.1
return min(1.0, quality_score)
async def _retry_with_improvements(self, job_id: str, config: PipelineConfig, reason: str):
"""Retry pipeline with improved configuration"""
result = self.active_jobs[job_id]
result.retry_count += 1
await self._update_progress(
job_id,
PipelineStage.ANALYZING_CONTENT,
40,
f"Retrying with improvements (attempt {result.retry_count + 1}/{config.max_retries + 1})"
)
# Improve configuration for retry
improved_config = PipelineConfig(**asdict(config))
improved_config.summary_length = "detailed" # Try more detailed summary
improved_config.quality_threshold = max(0.5, config.quality_threshold - 0.1) # Lower threshold slightly
# Continue pipeline with improved config
await self._execute_pipeline(job_id, improved_config)
async def _handle_pipeline_error(self, job_id: str, error: Exception, config: PipelineConfig):
"""Handle pipeline errors with retry logic"""
result = self.active_jobs[job_id]
result.status = PipelineStage.FAILED
result.error = {
"message": str(error),
"type": type(error).__name__,
"stage": result.status.value,
"retry_count": result.retry_count
}
# Attempt retry if within limits
if result.retry_count < config.max_retries:
await asyncio.sleep(2 ** result.retry_count) # Exponential backoff
await self._retry_with_improvements(job_id, config, f"Error: {str(error)}")
else:
result.completed_at = datetime.utcnow()
await self._update_progress(job_id, PipelineStage.FAILED, 0, f"Failed after {result.retry_count + 1} attempts")
async def _update_progress(
self,
job_id: str,
stage: PipelineStage,
percentage: float,
message: str,
details: Dict[str, Any] = None
):
"""Update pipeline progress and notify callbacks"""
result = self.active_jobs.get(job_id)
if result:
result.status = stage
progress = PipelineProgress(
stage=stage,
percentage=percentage,
message=message,
current_step_details=details
)
# Notify all registered callbacks
callbacks = self.progress_callbacks.get(job_id, [])
for callback in callbacks:
try:
await callback(job_id, progress)
except Exception as e:
print(f"Progress callback error: {e}")
async def get_pipeline_result(self, job_id: str) -> Optional[PipelineResult]:
"""Get pipeline result by job ID"""
# Check active jobs first
if job_id in self.active_jobs:
return self.active_jobs[job_id]
# Check cache for completed jobs
cached_result = await self.cache_manager.get_cached_pipeline_result(job_id)
return cached_result
async def cancel_pipeline(self, job_id: str) -> bool:
"""Cancel running pipeline"""
if job_id in self.active_jobs:
result = self.active_jobs[job_id]
result.status = PipelineStage.CANCELLED
result.completed_at = datetime.utcnow()
await self._update_progress(job_id, PipelineStage.CANCELLED, 0, "Pipeline cancelled by user")
return True
return False
async def _send_completion_notification(self, result: PipelineResult):
"""Send completion notification (email, webhook, etc.)"""
# This would integrate with notification service
notification_data = {
"job_id": result.job_id,
"video_title": result.video_metadata.get("title", "Unknown") if result.video_metadata else "Unknown",
"status": result.status.value,
"processing_time": result.processing_time_seconds,
"summary_preview": result.summary[:100] + "..." if result.summary else None
}
# Log completion for now (would send actual notifications)
print(f"Pipeline completed: {notification_data}")
API Integration
[Source: docs/architecture.md#api-specification]
# backend/api/pipeline.py
from fastapi import APIRouter, HTTPException, BackgroundTasks, Depends
from pydantic import BaseModel, Field, HttpUrl
from typing import Optional, List, Dict, Any
from ..services.summary_pipeline import SummaryPipeline, PipelineConfig, PipelineStage
from ..core.websocket_manager import WebSocketManager
router = APIRouter(prefix="/api", tags=["pipeline"])
class ProcessVideoRequest(BaseModel):
video_url: HttpUrl = Field(..., description="YouTube video URL to process")
summary_length: str = Field("standard", description="Summary length preference")
focus_areas: Optional[List[str]] = Field(None, description="Areas to focus on in summary")
include_timestamps: bool = Field(False, description="Include timestamps in summary")
enable_notifications: bool = Field(True, description="Enable completion notifications")
quality_threshold: float = Field(0.7, description="Minimum quality score threshold")
class ProcessVideoResponse(BaseModel):
job_id: str
status: str
message: str
estimated_completion_time: Optional[float] = None
class PipelineStatusResponse(BaseModel):
job_id: str
status: str
progress_percentage: float
current_message: str
video_metadata: Optional[Dict[str, Any]] = None
result: Optional[Dict[str, Any]] = None
error: Optional[Dict[str, Any]] = None
processing_time_seconds: Optional[float] = None
@router.post("/process", response_model=ProcessVideoResponse)
async def process_video(
request: ProcessVideoRequest,
pipeline: SummaryPipeline = Depends(),
websocket_manager: WebSocketManager = Depends()
):
"""Process YouTube video through complete pipeline"""
try:
config = PipelineConfig(
summary_length=request.summary_length,
focus_areas=request.focus_areas or [],
include_timestamps=request.include_timestamps,
quality_threshold=request.quality_threshold,
enable_notifications=request.enable_notifications
)
# Create progress callback for WebSocket notifications
async def progress_callback(job_id: str, progress):
await websocket_manager.send_progress_update(job_id, {
"stage": progress.stage.value,
"percentage": progress.percentage,
"message": progress.message,
"details": progress.current_step_details
})
# Start pipeline processing
job_id = await pipeline.process_video(
video_url=str(request.video_url),
config=config,
progress_callback=progress_callback
)
return ProcessVideoResponse(
job_id=job_id,
status="processing",
message="Video processing started",
estimated_completion_time=120.0 # 2 minutes estimate
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to start processing: {str(e)}"
)
@router.get("/process/{job_id}", response_model=PipelineStatusResponse)
async def get_pipeline_status(
job_id: str,
pipeline: SummaryPipeline = Depends()
):
"""Get pipeline processing status and results"""
result = await pipeline.get_pipeline_result(job_id)
if not result:
raise HTTPException(status_code=404, detail="Pipeline job not found")
# Calculate progress percentage based on stage
stage_percentages = {
PipelineStage.INITIALIZED: 0,
PipelineStage.VALIDATING_URL: 5,
PipelineStage.EXTRACTING_METADATA: 15,
PipelineStage.EXTRACTING_TRANSCRIPT: 35,
PipelineStage.ANALYZING_CONTENT: 50,
PipelineStage.GENERATING_SUMMARY: 75,
PipelineStage.VALIDATING_QUALITY: 90,
PipelineStage.COMPLETED: 100,
PipelineStage.FAILED: 0,
PipelineStage.CANCELLED: 0
}
response_data = {
"job_id": job_id,
"status": result.status.value,
"progress_percentage": stage_percentages.get(result.status, 0),
"current_message": f"Status: {result.status.value}",
"video_metadata": result.video_metadata,
"processing_time_seconds": result.processing_time_seconds
}
# Include results if completed
if result.status == PipelineStage.COMPLETED:
response_data["result"] = {
"summary": result.summary,
"key_points": result.key_points,
"main_themes": result.main_themes,
"actionable_insights": result.actionable_insights,
"confidence_score": result.confidence_score,
"quality_score": result.quality_score,
"cost_data": result.cost_data
}
# Include error if failed
if result.status == PipelineStage.FAILED and result.error:
response_data["error"] = result.error
return PipelineStatusResponse(**response_data)
@router.delete("/process/{job_id}")
async def cancel_pipeline(
job_id: str,
pipeline: SummaryPipeline = Depends()
):
"""Cancel running pipeline"""
success = await pipeline.cancel_pipeline(job_id)
if not success:
raise HTTPException(status_code=404, detail="Pipeline job not found or already completed")
return {"message": "Pipeline cancelled successfully"}
File Locations and Structure
[Source: docs/architecture.md#project-structure]
Backend Files:
backend/services/summary_pipeline.py- Main pipeline orchestration servicebackend/api/pipeline.py- Pipeline management endpointsbackend/core/websocket_manager.py- WebSocket progress notificationsbackend/models/pipeline.py- Pipeline result storage modelsbackend/services/notification_service.py- Completion notificationsbackend/tests/unit/test_summary_pipeline.py- Unit testsbackend/tests/integration/test_pipeline_api.py- Integration tests
Frontend Integration
[Source: docs/architecture.md#frontend-architecture]
// frontend/src/hooks/usePipelineProcessor.ts
import { useState, useCallback } from 'react';
import { useMutation, useQuery } from '@tanstack/react-query';
import { apiClient } from '@/services/apiClient';
import { useWebSocket } from './useWebSocket';
interface PipelineConfig {
summary_length: 'brief' | 'standard' | 'detailed';
focus_areas?: string[];
include_timestamps: boolean;
enable_notifications: boolean;
quality_threshold: number;
}
interface PipelineProgress {
stage: string;
percentage: number;
message: string;
details?: any;
}
export function usePipelineProcessor() {
const [activeJobId, setActiveJobId] = useState<string | null>(null);
const [progress, setProgress] = useState<PipelineProgress | null>(null);
const { connect, disconnect } = useWebSocket({
onProgressUpdate: (update: PipelineProgress) => {
setProgress(update);
}
});
const startProcessing = useMutation({
mutationFn: async ({ url, config }: { url: string; config: PipelineConfig }) => {
const response = await apiClient.processVideo(url, config);
return response;
},
onSuccess: (data) => {
setActiveJobId(data.job_id);
connect(data.job_id);
}
});
const { data: pipelineStatus } = useQuery({
queryKey: ['pipeline-status', activeJobId],
queryFn: () => activeJobId ? apiClient.getPipelineStatus(activeJobId) : null,
enabled: !!activeJobId,
refetchInterval: (data) =>
data?.status === 'completed' || data?.status === 'failed' ? false : 2000
});
const cancelProcessing = useCallback(async () => {
if (activeJobId) {
await apiClient.cancelPipeline(activeJobId);
setActiveJobId(null);
setProgress(null);
disconnect();
}
}, [activeJobId, disconnect]);
return {
startProcessing: startProcessing.mutateAsync,
cancelProcessing,
isProcessing: startProcessing.isPending || (pipelineStatus?.status === 'processing'),
progress: progress || (pipelineStatus ? {
stage: pipelineStatus.status,
percentage: pipelineStatus.progress_percentage,
message: pipelineStatus.current_message
} : null),
result: pipelineStatus?.result,
error: startProcessing.error || pipelineStatus?.error,
pipelineStatus
};
}
Quality Assurance Features
- Automatic Retry Logic: Failed or low-quality summaries automatically retried with improved parameters
- Content-Aware Processing: Different strategies for technical, educational, and entertainment content
- Quality Scoring: Multi-factor quality assessment ensures consistent results
- Progress Transparency: Detailed progress tracking keeps users informed throughout the process
- Error Recovery: Comprehensive error handling with graceful degradation
Change Log
| Date | Version | Description | Author |
|---|---|---|---|
| 2025-01-25 | 1.0 | Initial story creation | Bob (Scrum Master) |
Dev Agent Record
This section will be populated by the development agent during implementation
QA Results
Results from QA Agent review of the completed story implementation will be added here