175 lines
5.5 KiB
Python
175 lines
5.5 KiB
Python
"""Pipeline data models for storage and API responses."""
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass, field
|
|
from pydantic import BaseModel, Field
|
|
|
|
|
|
class PipelineStage(Enum):
|
|
"""Pipeline processing stages."""
|
|
INITIALIZED = "initialized"
|
|
VALIDATING_URL = "validating_url"
|
|
EXTRACTING_METADATA = "extracting_metadata"
|
|
EXTRACTING_TRANSCRIPT = "extracting_transcript"
|
|
ANALYZING_CONTENT = "analyzing_content"
|
|
GENERATING_SUMMARY = "generating_summary"
|
|
VALIDATING_QUALITY = "validating_quality"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
CANCELLED = "cancelled"
|
|
|
|
|
|
@dataclass
|
|
class PipelineConfig:
|
|
"""Configuration for pipeline processing."""
|
|
summary_length: str = "standard"
|
|
include_timestamps: bool = False
|
|
focus_areas: Optional[List[str]] = None
|
|
quality_threshold: float = 0.7
|
|
max_retries: int = 2
|
|
enable_notifications: bool = True
|
|
|
|
|
|
@dataclass
|
|
class PipelineProgress:
|
|
"""Pipeline progress information."""
|
|
stage: PipelineStage
|
|
percentage: float
|
|
message: str
|
|
estimated_time_remaining: Optional[float] = None
|
|
current_step_details: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
@dataclass
|
|
class PipelineResult:
|
|
"""Complete pipeline processing result."""
|
|
job_id: str
|
|
video_url: str
|
|
video_id: str
|
|
status: PipelineStage
|
|
|
|
# Video metadata
|
|
video_metadata: Optional[Dict[str, Any]] = None
|
|
|
|
# Processing results
|
|
transcript: Optional[str] = None
|
|
summary: Optional[str] = None
|
|
key_points: Optional[List[str]] = None
|
|
main_themes: Optional[List[str]] = None
|
|
actionable_insights: Optional[List[str]] = None
|
|
|
|
# Quality and metadata
|
|
confidence_score: Optional[float] = None
|
|
quality_score: Optional[float] = None
|
|
processing_metadata: Optional[Dict[str, Any]] = None
|
|
cost_data: Optional[Dict[str, Any]] = None
|
|
|
|
# Timeline
|
|
started_at: Optional[datetime] = None
|
|
completed_at: Optional[datetime] = None
|
|
processing_time_seconds: Optional[float] = None
|
|
|
|
# Error information
|
|
error: Optional[Dict[str, Any]] = None
|
|
retry_count: int = 0
|
|
|
|
@property
|
|
def display_name(self) -> str:
|
|
"""Get user-friendly display name for this pipeline job."""
|
|
# Priority 1: Video title from metadata
|
|
if self.video_metadata and self.video_metadata.get('title'):
|
|
title = self.video_metadata['title']
|
|
# Truncate very long titles for display
|
|
if len(title) > 80:
|
|
return title[:77] + "..."
|
|
return title
|
|
|
|
# Priority 2: Video ID (more user-friendly than job ID)
|
|
if self.video_id:
|
|
return f"Video {self.video_id}"
|
|
|
|
# Priority 3: Fallback to job ID (last resort)
|
|
return f"Job {self.job_id[:8]}"
|
|
|
|
@property
|
|
def metadata(self) -> Dict[str, Any]:
|
|
"""Get comprehensive metadata including display information."""
|
|
base_metadata = self.video_metadata or {}
|
|
return {
|
|
**base_metadata,
|
|
'display_name': self.display_name,
|
|
'job_id': self.job_id,
|
|
'video_id': self.video_id,
|
|
'video_url': self.video_url,
|
|
'processing_status': self.status.value if self.status else 'unknown'
|
|
}
|
|
|
|
|
|
# Pydantic models for API requests/responses
|
|
|
|
class ProcessVideoRequest(BaseModel):
|
|
"""Request model for video processing."""
|
|
video_url: str = Field(..., description="YouTube video URL to process")
|
|
summary_length: str = Field("standard", description="Summary length preference")
|
|
focus_areas: Optional[List[str]] = Field(None, description="Areas to focus on in summary")
|
|
include_timestamps: bool = Field(False, description="Include timestamps in summary")
|
|
enable_notifications: bool = Field(True, description="Enable completion notifications")
|
|
quality_threshold: float = Field(0.7, description="Minimum quality score threshold")
|
|
|
|
|
|
class ProcessVideoResponse(BaseModel):
|
|
"""Response model for video processing start."""
|
|
job_id: str
|
|
status: str
|
|
message: str
|
|
estimated_completion_time: Optional[float] = None
|
|
|
|
|
|
class PipelineStatusResponse(BaseModel):
|
|
"""Response model for pipeline status."""
|
|
job_id: str
|
|
status: str
|
|
progress_percentage: float
|
|
current_message: str
|
|
video_metadata: Optional[Dict[str, Any]] = None
|
|
result: Optional[Dict[str, Any]] = None
|
|
error: Optional[Dict[str, Any]] = None
|
|
processing_time_seconds: Optional[float] = None
|
|
|
|
|
|
class ContentAnalysis(BaseModel):
|
|
"""Content analysis result."""
|
|
transcript_length: int
|
|
word_count: int
|
|
estimated_reading_time: float
|
|
complexity_score: float
|
|
content_type: str
|
|
language: str
|
|
technical_indicators: List[str] = Field(default_factory=list)
|
|
educational_indicators: List[str] = Field(default_factory=list)
|
|
entertainment_indicators: List[str] = Field(default_factory=list)
|
|
|
|
|
|
class QualityMetrics(BaseModel):
|
|
"""Quality assessment metrics."""
|
|
compression_ratio: float
|
|
key_points_count: int
|
|
main_themes_count: int
|
|
actionable_insights_count: int
|
|
confidence_score: float
|
|
overall_quality_score: float
|
|
quality_factors: Dict[str, float] = Field(default_factory=dict)
|
|
|
|
|
|
class PipelineStats(BaseModel):
|
|
"""Pipeline processing statistics."""
|
|
total_jobs: int
|
|
completed_jobs: int
|
|
failed_jobs: int
|
|
cancelled_jobs: int
|
|
average_processing_time: float
|
|
success_rate: float
|
|
average_quality_score: float
|
|
total_cost: float
|
|
jobs_by_stage: Dict[str, int] = Field(default_factory=dict) |