""" Intelligent video downloader with progress tracking that orchestrates multiple download methods """ import asyncio import time import uuid from datetime import datetime, timedelta from pathlib import Path from typing import Optional, Dict, Any, List, TYPE_CHECKING import logging if TYPE_CHECKING: from backend.core.websocket_manager import WebSocketManager from backend.models.video_download import ( VideoDownloadResult, DownloadPreferences, DownloadMethod, DownloadStatus, DownloadJobStatus, DownloadMetrics, HealthCheckResult, AllMethodsFailedError, DownloaderException, VideoNotAvailableError, NetworkError ) from backend.config.video_download_config import VideoDownloadConfig from backend.services.video_downloaders.base_downloader import DownloaderFactory, DownloadTimeout, DownloadProgress logger = logging.getLogger(__name__) class IntelligentVideoDownloader: """Intelligent orchestrator for video downloading with progress tracking and multiple fallback methods""" def __init__( self, config: Optional[VideoDownloadConfig] = None, websocket_manager: Optional['WebSocketManager'] = None ): self.config = config or VideoDownloadConfig() self.config.ensure_directories() self.websocket_manager = websocket_manager # Initialize downloaders self.downloaders = {} self._initialize_downloaders() # Metrics and caching self.metrics = DownloadMetrics() self.success_cache = {} # Track which methods work for which video types self.active_jobs = {} # Track active download jobs # Performance optimization self.download_semaphore = asyncio.Semaphore(self.config.max_concurrent_downloads) logger.info(f"Initialized IntelligentVideoDownloader with methods: {list(self.downloaders.keys())}") def _initialize_downloaders(self): """Initialize all enabled download methods""" available_methods = DownloaderFactory.get_available_methods() for method in self.config.get_method_priority(): if method in available_methods: try: downloader_config = self._get_downloader_config(method) self.downloaders[method] = DownloaderFactory.create(method, downloader_config) logger.info(f"Initialized {method.value} downloader") except Exception as e: logger.error(f"Failed to initialize {method.value} downloader: {e}") if not self.downloaders: raise RuntimeError("No download methods available") def _get_downloader_config(self, method: DownloadMethod) -> Dict[str, Any]: """Get configuration for specific downloader""" base_config = { 'output_dir': str(self.config.get_storage_dirs()['base']), 'timeout': self.config.method_timeout_seconds } if method == DownloadMethod.YT_DLP: base_config.update({ 'use_cookies': self.config.ytdlp_use_cookies, 'cookies_file': str(self.config.ytdlp_cookies_file) if self.config.ytdlp_cookies_file else None, 'user_agents': self.config.ytdlp_user_agents, 'proxies': [] # Add proxy support if needed }) elif method == DownloadMethod.PLAYWRIGHT: base_config.update({ 'headless': self.config.playwright_headless, 'timeout': self.config.playwright_timeout, 'session_file': str(self.config.playwright_browser_session) if self.config.playwright_browser_session else None }) elif method == DownloadMethod.TRANSCRIPT_ONLY: base_config.update({ 'youtube_api_key': self.config.youtube_api_key }) return base_config async def download_video(self, url: str, preferences: Optional[DownloadPreferences] = None) -> VideoDownloadResult: """Download video using intelligent method selection and fallbacks""" if preferences is None: preferences = DownloadPreferences() job_id = str(uuid.uuid4()) start_time = time.time() try: # Extract video ID for caching and analysis video_id = await self._extract_video_id(url) # Create job status job_status = DownloadJobStatus( job_id=job_id, video_url=url, status=DownloadStatus.IN_PROGRESS ) self.active_jobs[job_id] = job_status # Get prioritized download methods prioritized_methods = await self._get_prioritized_methods(video_id, preferences) logger.info(f"Attempting download for {video_id} with methods: {[m.value for m in prioritized_methods]}") last_error = None # Try each method with timeout and retry logic for method_idx, method in enumerate(prioritized_methods): if method not in self.downloaders: continue downloader = self.downloaders[method] job_status.current_method = method # Calculate overall progress based on method index and download progress method_weight = 30 # 30% for method selection download_weight = 70 # 70% for actual download base_progress = (method_idx / len(prioritized_methods)) * method_weight job_status.progress_percent = base_progress # Create progress callback for this download async def progress_callback(progress: DownloadProgress): """Callback to handle progress updates from downloaders""" # Calculate overall progress overall_progress = base_progress + (progress.download_percent * download_weight / 100) job_status.progress_percent = overall_progress # Send WebSocket update if manager available if self.websocket_manager and job_id: await self._send_progress_update( job_id=job_id, stage='downloading', percentage=overall_progress, message=progress.status_message, sub_progress=progress ) # Retry logic for each method max_retries = self.config.max_retries_per_method for retry in range(max_retries + 1): try: logger.info(f"Trying {method.value} (attempt {retry + 1}/{max_retries + 1}) for {video_id}") # Use semaphore to limit concurrent downloads async with self.download_semaphore: # Apply timeout to the download operation async with DownloadTimeout(self.config.method_timeout_seconds) as timeout: result = await timeout.run( downloader.download_video(url, preferences, progress_callback) ) if result and result.status in [DownloadStatus.COMPLETED, DownloadStatus.PARTIAL]: # Success - update metrics and cache self._update_success_metrics(method, video_id, True, retry) job_status.status = result.status job_status.progress_percent = 100 # Clean up job del self.active_jobs[job_id] logger.info(f"Successfully downloaded {video_id} using {method.value}") return result except (VideoNotAvailableError, NetworkError) as e: # These errors are likely permanent, don't retry this method logger.warning(f"{method.value} failed for {video_id} with permanent error: {e}") self._update_success_metrics(method, video_id, False, retry) last_error = e break except DownloaderException as e: # Temporary error, may retry logger.warning(f"{method.value} failed for {video_id} (attempt {retry + 1}): {e}") self._update_success_metrics(method, video_id, False, retry) last_error = e if retry < max_retries: # Exponential backoff wait_time = (self.config.backoff_factor ** retry) * 2 await asyncio.sleep(min(wait_time, 30)) # Cap at 30 seconds except Exception as e: # Unexpected error logger.error(f"{method.value} failed for {video_id} with unexpected error: {e}") self._update_success_metrics(method, video_id, False, retry) last_error = e break # All methods failed job_status.status = DownloadStatus.FAILED job_status.error_message = str(last_error) processing_time = time.time() - start_time # Try to create a partial result with just metadata/transcript if possible if DownloadMethod.TRANSCRIPT_ONLY in self.downloaders: try: transcript_downloader = self.downloaders[DownloadMethod.TRANSCRIPT_ONLY] result = await transcript_downloader.download_video(url, preferences) if result and result.status == DownloadStatus.PARTIAL: result.processing_time_seconds = processing_time logger.info(f"Fallback to transcript-only successful for {video_id}") return result except: pass # Complete failure self.metrics.failed_downloads += 1 del self.active_jobs[job_id] raise AllMethodsFailedError(f"All download methods failed for {video_id}. Last error: {last_error}") except Exception as e: if job_id in self.active_jobs: self.active_jobs[job_id].status = DownloadStatus.FAILED self.active_jobs[job_id].error_message = str(e) logger.error(f"Download failed for {url}: {e}") raise async def _get_prioritized_methods(self, video_id: str, preferences: DownloadPreferences) -> List[DownloadMethod]: """Get download methods prioritized by success rate and preferences""" base_priority = self.config.get_method_priority() available_methods = [method for method in base_priority if method in self.downloaders] # Adjust priority based on preferences if preferences.prefer_audio_only: # Prefer methods that support audio-only downloads audio_capable = [m for m in available_methods if self.downloaders[m].supports_audio_only()] other_methods = [m for m in available_methods if not self.downloaders[m].supports_audio_only()] available_methods = audio_capable + other_methods # Adjust based on success rates method_scores = {} for method in available_methods: base_score = len(available_methods) - available_methods.index(method) # Higher for earlier methods success_rate = self.metrics.method_success_rates.get(method.value, 0.5) # Default 50% method_scores[method] = base_score * (1 + success_rate) # Sort by score (highest first) prioritized = sorted(available_methods, key=lambda m: method_scores[m], reverse=True) # Always ensure transcript-only is last as ultimate fallback if DownloadMethod.TRANSCRIPT_ONLY in prioritized: prioritized.remove(DownloadMethod.TRANSCRIPT_ONLY) prioritized.append(DownloadMethod.TRANSCRIPT_ONLY) return prioritized def _update_success_metrics(self, method: DownloadMethod, video_id: str, success: bool, retry_count: int): """Update success metrics for a method""" self.metrics.total_attempts += 1 self.metrics.update_success_rate(method, success) if success: self.metrics.successful_downloads += 1 # Cache successful method for this video type self.success_cache[video_id] = { 'method': method, 'timestamp': datetime.now(), 'retry_count': retry_count } else: self.metrics.failed_downloads += 1 async def _extract_video_id(self, url: str) -> str: """Extract video ID from URL""" import re patterns = [ r'(?:youtube\.com/watch\?v=|youtu\.be/)([a-zA-Z0-9_-]{11})', r'youtube\.com/embed/([a-zA-Z0-9_-]{11})', r'youtube\.com/v/([a-zA-Z0-9_-]{11})' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) raise DownloaderException(f"Could not extract video ID from URL: {url}") async def get_job_status(self, job_id: str) -> Optional[DownloadJobStatus]: """Get status of a download job""" return self.active_jobs.get(job_id) async def cancel_job(self, job_id: str) -> bool: """Cancel an active download job""" if job_id in self.active_jobs: self.active_jobs[job_id].status = DownloadStatus.CANCELLED return True return False async def health_check(self) -> HealthCheckResult: """Perform health check on all download methods""" method_details = {} healthy_count = 0 tasks = [] for method, downloader in self.downloaders.items(): tasks.append(self._test_method_health(method, downloader)) results = await asyncio.gather(*tasks, return_exceptions=True) for (method, downloader), result in zip(self.downloaders.items(), results): if isinstance(result, Exception): method_details[method.value] = { 'status': 'unhealthy', 'error': str(result), 'last_check': datetime.now().isoformat() } elif result: method_details[method.value] = { 'status': 'healthy', 'last_check': datetime.now().isoformat(), 'success_rate': self.metrics.method_success_rates.get(method.value, 0.0) } healthy_count += 1 else: method_details[method.value] = { 'status': 'unhealthy', 'error': 'Connection test failed', 'last_check': datetime.now().isoformat() } # Determine overall health total_methods = len(self.downloaders) if healthy_count >= (total_methods * 0.7): # 70% healthy overall_status = 'healthy' elif healthy_count >= 1: # At least one working overall_status = 'degraded' else: overall_status = 'unhealthy' # Generate recommendations recommendations = [] if healthy_count < total_methods: unhealthy_methods = [method for method, details in method_details.items() if details['status'] == 'unhealthy'] recommendations.append(f"Check configuration for: {', '.join(unhealthy_methods)}") if overall_status == 'unhealthy': recommendations.append("All download methods are failing - check network connectivity") return HealthCheckResult( overall_status=overall_status, healthy_methods=healthy_count, total_methods=total_methods, method_details=method_details, recommendations=recommendations ) async def _test_method_health(self, method: DownloadMethod, downloader) -> bool: """Test health of a specific download method""" try: async with DownloadTimeout(30): # 30 second timeout for health check return await downloader.test_connection() except Exception as e: logger.warning(f"Health check failed for {method.value}: {e}") return False def get_metrics(self) -> DownloadMetrics: """Get download metrics""" return self.metrics def get_active_jobs(self) -> Dict[str, DownloadJobStatus]: """Get all active download jobs""" return self.active_jobs.copy() async def _send_progress_update( self, job_id: str, stage: str, percentage: float, message: str, sub_progress: Optional[DownloadProgress] = None ): """Send progress update via WebSocket if available""" if not self.websocket_manager: return progress_data = { 'stage': stage, 'percentage': percentage, 'message': message, 'time_elapsed': time.time() if job_id in self.active_jobs else 0, 'sub_progress': None } # Add sub-progress details if available if sub_progress: progress_data['sub_progress'] = { 'download_percent': sub_progress.download_percent, 'bytes_downloaded': sub_progress.bytes_downloaded, 'total_bytes': sub_progress.total_bytes, 'speed_bps': sub_progress.speed_bps, 'eta_seconds': sub_progress.eta_seconds, 'current_method': sub_progress.current_method, 'retry_attempt': sub_progress.retry_attempt } try: await self.websocket_manager.send_progress_update(job_id, progress_data) except Exception as e: logger.warning(f"Failed to send WebSocket progress update: {e}") async def cleanup_old_files(self, max_age_days: int = None) -> Dict[str, Any]: """Clean up old downloaded files""" if max_age_days is None: max_age_days = self.config.cleanup_older_than_days cutoff_time = datetime.now() - timedelta(days=max_age_days) stats = { 'files_deleted': 0, 'bytes_freed': 0, 'errors': [] } for storage_dir in ['videos', 'audio', 'temp']: dir_path = self.config.get_storage_dirs()[storage_dir] if not dir_path.exists(): continue for file_path in dir_path.glob('*'): try: if file_path.is_file(): file_time = datetime.fromtimestamp(file_path.stat().st_mtime) if file_time < cutoff_time: file_size = file_path.stat().st_size file_path.unlink() stats['files_deleted'] += 1 stats['bytes_freed'] += file_size except Exception as e: stats['errors'].append(f"Failed to delete {file_path}: {e}") logger.info(f"Cleanup completed: {stats['files_deleted']} files deleted, " f"{stats['bytes_freed'] / 1024 / 1024:.2f} MB freed") return stats