youtube-summarizer/backend/api/pipeline.py

399 lines
13 KiB
Python

"""Pipeline API endpoints for complete YouTube summarization workflow."""
from fastapi import APIRouter, HTTPException, BackgroundTasks, Depends
from pydantic import BaseModel, Field, HttpUrl
from typing import Optional, List, Dict, Any
from datetime import datetime
from ..services.summary_pipeline import SummaryPipeline
from ..services.video_service import VideoService
from ..services.transcript_service import TranscriptService
from ..services.deepseek_summarizer import DeepSeekSummarizer
from ..services.cache_manager import CacheManager
from ..services.notification_service import NotificationService
from ..models.pipeline import (
PipelineStage, PipelineConfig, ProcessVideoRequest,
ProcessVideoResponse, PipelineStatusResponse
)
from ..core.websocket_manager import websocket_manager
import os
router = APIRouter(prefix="/api", tags=["pipeline"])
# Dependency providers
def get_video_service() -> VideoService:
"""Get VideoService instance."""
return VideoService()
def get_transcript_service() -> TranscriptService:
"""Get TranscriptService instance with WebSocket support."""
from backend.core.websocket_manager import websocket_manager
return TranscriptService(websocket_manager=websocket_manager)
async def get_ai_service() -> DeepSeekSummarizer:
"""Get DeepSeekSummarizer instance."""
api_key = os.getenv("DEEPSEEK_API_KEY")
if not api_key:
raise HTTPException(
status_code=500,
detail="DeepSeek API key not configured"
)
service = DeepSeekSummarizer(api_key=api_key)
if not service.is_initialized:
await service.initialize()
return service
def get_cache_manager() -> CacheManager:
"""Get CacheManager instance."""
return CacheManager()
def get_notification_service() -> NotificationService:
"""Get NotificationService instance."""
return NotificationService()
async def get_summary_pipeline(
video_service: VideoService = Depends(get_video_service),
transcript_service: TranscriptService = Depends(get_transcript_service),
ai_service: DeepSeekSummarizer = Depends(get_ai_service),
cache_manager: CacheManager = Depends(get_cache_manager),
notification_service: NotificationService = Depends(get_notification_service)
) -> SummaryPipeline:
"""Get SummaryPipeline instance with all dependencies."""
return SummaryPipeline(
video_service=video_service,
transcript_service=transcript_service,
ai_service=ai_service,
cache_manager=cache_manager,
notification_service=notification_service
)
@router.post("/process", response_model=ProcessVideoResponse)
async def process_video(
request: ProcessVideoRequest,
pipeline: SummaryPipeline = Depends(get_summary_pipeline)
):
"""Process YouTube video through complete pipeline.
Args:
request: Video processing request with URL and configuration
pipeline: SummaryPipeline service instance
Returns:
ProcessVideoResponse with job ID and status
"""
try:
config = PipelineConfig(
summary_length=request.summary_length,
focus_areas=request.focus_areas or [],
include_timestamps=request.include_timestamps,
quality_threshold=request.quality_threshold,
enable_notifications=request.enable_notifications,
max_retries=2 # Default retry limit
)
# Create progress callback for WebSocket notifications
async def progress_callback(job_id: str, progress):
# Get current pipeline result to extract video context
result = await pipeline.get_pipeline_result(job_id)
video_context = {}
if result:
video_context = {
"video_id": result.video_id,
"title": result.video_metadata.get('title') if result.video_metadata else None,
"display_name": result.display_name
}
await websocket_manager.send_progress_update(job_id, {
"stage": progress.stage.value,
"percentage": progress.percentage,
"message": progress.message,
"details": progress.current_step_details,
"video_context": video_context
})
# Start pipeline processing
job_id = await pipeline.process_video(
video_url=str(request.video_url),
config=config,
progress_callback=progress_callback
)
return ProcessVideoResponse(
job_id=job_id,
status="processing",
message="Video processing started",
estimated_completion_time=120.0 # 2 minutes estimate
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to start processing: {str(e)}"
)
@router.get("/process/{job_id}", response_model=PipelineStatusResponse)
async def get_pipeline_status(
job_id: str,
pipeline: SummaryPipeline = Depends(get_summary_pipeline)
):
"""Get pipeline processing status and results.
Args:
job_id: Pipeline job identifier
pipeline: SummaryPipeline service instance
Returns:
PipelineStatusResponse with current status and results
"""
result = await pipeline.get_pipeline_result(job_id)
if not result:
raise HTTPException(
status_code=404,
detail="Pipeline job not found"
)
# Calculate progress percentage based on stage
stage_percentages = {
PipelineStage.INITIALIZED: 0,
PipelineStage.VALIDATING_URL: 5,
PipelineStage.EXTRACTING_METADATA: 15,
PipelineStage.EXTRACTING_TRANSCRIPT: 35,
PipelineStage.ANALYZING_CONTENT: 50,
PipelineStage.GENERATING_SUMMARY: 75,
PipelineStage.VALIDATING_QUALITY: 90,
PipelineStage.COMPLETED: 100,
PipelineStage.FAILED: 0,
PipelineStage.CANCELLED: 0
}
response_data = {
"job_id": job_id,
"status": result.status.value,
"progress_percentage": stage_percentages.get(result.status, 0),
"current_message": f"Status: {result.status.value.replace('_', ' ').title()}",
"video_metadata": result.video_metadata,
"processing_time_seconds": result.processing_time_seconds,
# Add user-friendly video identification
"display_name": result.display_name,
"video_title": result.video_metadata.get('title') if result.video_metadata else None,
"video_id": result.video_id,
"video_url": result.video_url
}
# Include results if completed
if result.status == PipelineStage.COMPLETED:
response_data["result"] = {
"summary": result.summary,
"key_points": result.key_points,
"main_themes": result.main_themes,
"actionable_insights": result.actionable_insights,
"confidence_score": result.confidence_score,
"quality_score": result.quality_score,
"cost_data": result.cost_data
}
# Include error if failed
if result.status == PipelineStage.FAILED and result.error:
response_data["error"] = result.error
return PipelineStatusResponse(**response_data)
@router.delete("/process/{job_id}")
async def cancel_pipeline(
job_id: str,
pipeline: SummaryPipeline = Depends(get_summary_pipeline)
):
"""Cancel running pipeline.
Args:
job_id: Pipeline job identifier
pipeline: SummaryPipeline service instance
Returns:
Success message if cancelled
"""
success = await pipeline.cancel_job(job_id)
if not success:
raise HTTPException(
status_code=404,
detail="Pipeline job not found or already completed"
)
return {"message": "Pipeline cancelled successfully"}
@router.get("/process/{job_id}/history")
async def get_pipeline_history(
job_id: str,
pipeline: SummaryPipeline = Depends(get_summary_pipeline)
):
"""Get pipeline processing history and logs.
Args:
job_id: Pipeline job identifier
pipeline: SummaryPipeline service instance
Returns:
Pipeline processing history
"""
result = await pipeline.get_pipeline_result(job_id)
if not result:
raise HTTPException(
status_code=404,
detail="Pipeline job not found"
)
return {
"job_id": job_id,
"created_at": result.started_at.isoformat() if result.started_at else None,
"completed_at": result.completed_at.isoformat() if result.completed_at else None,
"processing_time_seconds": result.processing_time_seconds,
"retry_count": result.retry_count,
"final_status": result.status.value,
"video_url": result.video_url,
"video_id": result.video_id,
# Add user-friendly video identification
"display_name": result.display_name,
"video_title": result.video_metadata.get('title') if result.video_metadata else None,
"error_history": [result.error] if result.error else []
}
@router.get("/stats")
async def get_pipeline_stats(
pipeline: SummaryPipeline = Depends(get_summary_pipeline),
cache_manager: CacheManager = Depends(get_cache_manager),
notification_service: NotificationService = Depends(get_notification_service)
):
"""Get pipeline processing statistics.
Args:
pipeline: SummaryPipeline service instance
cache_manager: CacheManager service instance
notification_service: NotificationService instance
Returns:
Pipeline processing statistics
"""
try:
# Get active jobs
active_jobs = pipeline.get_active_jobs()
# Get cache statistics
cache_stats = await cache_manager.get_cache_stats()
# Get notification statistics
notification_stats = notification_service.get_notification_stats()
# Get WebSocket connection stats
websocket_stats = websocket_manager.get_stats()
return {
"active_jobs": {
"count": len(active_jobs),
"job_ids": active_jobs
},
"cache": cache_stats,
"notifications": notification_stats,
"websockets": websocket_stats,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve statistics: {str(e)}"
)
@router.post("/cleanup")
async def cleanup_old_jobs(
max_age_hours: int = 24,
pipeline: SummaryPipeline = Depends(get_summary_pipeline),
cache_manager: CacheManager = Depends(get_cache_manager),
notification_service: NotificationService = Depends(get_notification_service)
):
"""Clean up old completed jobs and cache entries.
Args:
max_age_hours: Maximum age in hours for cleanup
pipeline: SummaryPipeline service instance
cache_manager: CacheManager service instance
notification_service: NotificationService instance
Returns:
Cleanup results
"""
try:
# Cleanup pipeline jobs
await pipeline.cleanup_completed_jobs(max_age_hours)
# Cleanup notification history
notification_service.clear_history()
# Note: Cache cleanup happens automatically during normal operations
return {
"message": "Cleanup completed successfully",
"max_age_hours": max_age_hours,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Cleanup failed: {str(e)}"
)
# Health check endpoint
@router.get("/health")
async def pipeline_health_check(
pipeline: SummaryPipeline = Depends(get_summary_pipeline)
):
"""Check pipeline service health.
Args:
pipeline: SummaryPipeline service instance
Returns:
Health status information
"""
try:
# Basic health checks
active_jobs_count = len(pipeline.get_active_jobs())
# Check API key availability
deepseek_key_available = bool(os.getenv("DEEPSEEK_API_KEY"))
health_status = {
"status": "healthy",
"active_jobs": active_jobs_count,
"deepseek_api_available": deepseek_key_available,
"timestamp": datetime.utcnow().isoformat()
}
if not deepseek_key_available:
health_status["status"] = "degraded"
health_status["warning"] = "DeepSeek API key not configured"
return health_status
except Exception as e:
raise HTTPException(
status_code=503,
detail=f"Health check failed: {str(e)}"
)