470 lines
20 KiB
Python
470 lines
20 KiB
Python
"""
|
|
Intelligent video downloader with progress tracking that orchestrates multiple download methods
|
|
"""
|
|
import asyncio
|
|
import time
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List, TYPE_CHECKING
|
|
import logging
|
|
|
|
if TYPE_CHECKING:
|
|
from backend.core.websocket_manager import WebSocketManager
|
|
|
|
from backend.models.video_download import (
|
|
VideoDownloadResult,
|
|
DownloadPreferences,
|
|
DownloadMethod,
|
|
DownloadStatus,
|
|
DownloadJobStatus,
|
|
DownloadMetrics,
|
|
HealthCheckResult,
|
|
AllMethodsFailedError,
|
|
DownloaderException,
|
|
VideoNotAvailableError,
|
|
NetworkError
|
|
)
|
|
from backend.config.video_download_config import VideoDownloadConfig
|
|
from backend.services.video_downloaders.base_downloader import DownloaderFactory, DownloadTimeout, DownloadProgress
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IntelligentVideoDownloader:
|
|
"""Intelligent orchestrator for video downloading with progress tracking and multiple fallback methods"""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Optional[VideoDownloadConfig] = None,
|
|
websocket_manager: Optional['WebSocketManager'] = None
|
|
):
|
|
self.config = config or VideoDownloadConfig()
|
|
self.config.ensure_directories()
|
|
self.websocket_manager = websocket_manager
|
|
|
|
# Initialize downloaders
|
|
self.downloaders = {}
|
|
self._initialize_downloaders()
|
|
|
|
# Metrics and caching
|
|
self.metrics = DownloadMetrics()
|
|
self.success_cache = {} # Track which methods work for which video types
|
|
self.active_jobs = {} # Track active download jobs
|
|
|
|
# Performance optimization
|
|
self.download_semaphore = asyncio.Semaphore(self.config.max_concurrent_downloads)
|
|
|
|
logger.info(f"Initialized IntelligentVideoDownloader with methods: {list(self.downloaders.keys())}")
|
|
|
|
def _initialize_downloaders(self):
|
|
"""Initialize all enabled download methods"""
|
|
available_methods = DownloaderFactory.get_available_methods()
|
|
|
|
for method in self.config.get_method_priority():
|
|
if method in available_methods:
|
|
try:
|
|
downloader_config = self._get_downloader_config(method)
|
|
self.downloaders[method] = DownloaderFactory.create(method, downloader_config)
|
|
logger.info(f"Initialized {method.value} downloader")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize {method.value} downloader: {e}")
|
|
|
|
if not self.downloaders:
|
|
raise RuntimeError("No download methods available")
|
|
|
|
def _get_downloader_config(self, method: DownloadMethod) -> Dict[str, Any]:
|
|
"""Get configuration for specific downloader"""
|
|
base_config = {
|
|
'output_dir': str(self.config.get_storage_dirs()['base']),
|
|
'timeout': self.config.method_timeout_seconds
|
|
}
|
|
|
|
if method == DownloadMethod.YT_DLP:
|
|
base_config.update({
|
|
'use_cookies': self.config.ytdlp_use_cookies,
|
|
'cookies_file': str(self.config.ytdlp_cookies_file) if self.config.ytdlp_cookies_file else None,
|
|
'user_agents': self.config.ytdlp_user_agents,
|
|
'proxies': [] # Add proxy support if needed
|
|
})
|
|
elif method == DownloadMethod.PLAYWRIGHT:
|
|
base_config.update({
|
|
'headless': self.config.playwright_headless,
|
|
'timeout': self.config.playwright_timeout,
|
|
'session_file': str(self.config.playwright_browser_session) if self.config.playwright_browser_session else None
|
|
})
|
|
elif method == DownloadMethod.TRANSCRIPT_ONLY:
|
|
base_config.update({
|
|
'youtube_api_key': self.config.youtube_api_key
|
|
})
|
|
|
|
return base_config
|
|
|
|
async def download_video(self, url: str, preferences: Optional[DownloadPreferences] = None) -> VideoDownloadResult:
|
|
"""Download video using intelligent method selection and fallbacks"""
|
|
if preferences is None:
|
|
preferences = DownloadPreferences()
|
|
|
|
job_id = str(uuid.uuid4())
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Extract video ID for caching and analysis
|
|
video_id = await self._extract_video_id(url)
|
|
|
|
# Create job status
|
|
job_status = DownloadJobStatus(
|
|
job_id=job_id,
|
|
video_url=url,
|
|
status=DownloadStatus.IN_PROGRESS
|
|
)
|
|
self.active_jobs[job_id] = job_status
|
|
|
|
# Get prioritized download methods
|
|
prioritized_methods = await self._get_prioritized_methods(video_id, preferences)
|
|
|
|
logger.info(f"Attempting download for {video_id} with methods: {[m.value for m in prioritized_methods]}")
|
|
|
|
last_error = None
|
|
|
|
# Try each method with timeout and retry logic
|
|
for method_idx, method in enumerate(prioritized_methods):
|
|
if method not in self.downloaders:
|
|
continue
|
|
|
|
downloader = self.downloaders[method]
|
|
job_status.current_method = method
|
|
|
|
# Calculate overall progress based on method index and download progress
|
|
method_weight = 30 # 30% for method selection
|
|
download_weight = 70 # 70% for actual download
|
|
base_progress = (method_idx / len(prioritized_methods)) * method_weight
|
|
|
|
job_status.progress_percent = base_progress
|
|
|
|
# Create progress callback for this download
|
|
async def progress_callback(progress: DownloadProgress):
|
|
"""Callback to handle progress updates from downloaders"""
|
|
# Calculate overall progress
|
|
overall_progress = base_progress + (progress.download_percent * download_weight / 100)
|
|
job_status.progress_percent = overall_progress
|
|
|
|
# Send WebSocket update if manager available
|
|
if self.websocket_manager and job_id:
|
|
await self._send_progress_update(
|
|
job_id=job_id,
|
|
stage='downloading',
|
|
percentage=overall_progress,
|
|
message=progress.status_message,
|
|
sub_progress=progress
|
|
)
|
|
|
|
# Retry logic for each method
|
|
max_retries = self.config.max_retries_per_method
|
|
|
|
for retry in range(max_retries + 1):
|
|
try:
|
|
logger.info(f"Trying {method.value} (attempt {retry + 1}/{max_retries + 1}) for {video_id}")
|
|
|
|
# Use semaphore to limit concurrent downloads
|
|
async with self.download_semaphore:
|
|
# Apply timeout to the download operation
|
|
async with DownloadTimeout(self.config.method_timeout_seconds) as timeout:
|
|
result = await timeout.run(
|
|
downloader.download_video(url, preferences, progress_callback)
|
|
)
|
|
|
|
if result and result.status in [DownloadStatus.COMPLETED, DownloadStatus.PARTIAL]:
|
|
# Success - update metrics and cache
|
|
self._update_success_metrics(method, video_id, True, retry)
|
|
job_status.status = result.status
|
|
job_status.progress_percent = 100
|
|
|
|
# Clean up job
|
|
del self.active_jobs[job_id]
|
|
|
|
logger.info(f"Successfully downloaded {video_id} using {method.value}")
|
|
return result
|
|
|
|
except (VideoNotAvailableError, NetworkError) as e:
|
|
# These errors are likely permanent, don't retry this method
|
|
logger.warning(f"{method.value} failed for {video_id} with permanent error: {e}")
|
|
self._update_success_metrics(method, video_id, False, retry)
|
|
last_error = e
|
|
break
|
|
|
|
except DownloaderException as e:
|
|
# Temporary error, may retry
|
|
logger.warning(f"{method.value} failed for {video_id} (attempt {retry + 1}): {e}")
|
|
self._update_success_metrics(method, video_id, False, retry)
|
|
last_error = e
|
|
|
|
if retry < max_retries:
|
|
# Exponential backoff
|
|
wait_time = (self.config.backoff_factor ** retry) * 2
|
|
await asyncio.sleep(min(wait_time, 30)) # Cap at 30 seconds
|
|
|
|
except Exception as e:
|
|
# Unexpected error
|
|
logger.error(f"{method.value} failed for {video_id} with unexpected error: {e}")
|
|
self._update_success_metrics(method, video_id, False, retry)
|
|
last_error = e
|
|
break
|
|
|
|
# All methods failed
|
|
job_status.status = DownloadStatus.FAILED
|
|
job_status.error_message = str(last_error)
|
|
|
|
processing_time = time.time() - start_time
|
|
|
|
# Try to create a partial result with just metadata/transcript if possible
|
|
if DownloadMethod.TRANSCRIPT_ONLY in self.downloaders:
|
|
try:
|
|
transcript_downloader = self.downloaders[DownloadMethod.TRANSCRIPT_ONLY]
|
|
result = await transcript_downloader.download_video(url, preferences)
|
|
|
|
if result and result.status == DownloadStatus.PARTIAL:
|
|
result.processing_time_seconds = processing_time
|
|
logger.info(f"Fallback to transcript-only successful for {video_id}")
|
|
return result
|
|
except:
|
|
pass
|
|
|
|
# Complete failure
|
|
self.metrics.failed_downloads += 1
|
|
del self.active_jobs[job_id]
|
|
|
|
raise AllMethodsFailedError(f"All download methods failed for {video_id}. Last error: {last_error}")
|
|
|
|
except Exception as e:
|
|
if job_id in self.active_jobs:
|
|
self.active_jobs[job_id].status = DownloadStatus.FAILED
|
|
self.active_jobs[job_id].error_message = str(e)
|
|
|
|
logger.error(f"Download failed for {url}: {e}")
|
|
raise
|
|
|
|
async def _get_prioritized_methods(self, video_id: str, preferences: DownloadPreferences) -> List[DownloadMethod]:
|
|
"""Get download methods prioritized by success rate and preferences"""
|
|
base_priority = self.config.get_method_priority()
|
|
available_methods = [method for method in base_priority if method in self.downloaders]
|
|
|
|
# Adjust priority based on preferences
|
|
if preferences.prefer_audio_only:
|
|
# Prefer methods that support audio-only downloads
|
|
audio_capable = [m for m in available_methods if self.downloaders[m].supports_audio_only()]
|
|
other_methods = [m for m in available_methods if not self.downloaders[m].supports_audio_only()]
|
|
available_methods = audio_capable + other_methods
|
|
|
|
# Adjust based on success rates
|
|
method_scores = {}
|
|
for method in available_methods:
|
|
base_score = len(available_methods) - available_methods.index(method) # Higher for earlier methods
|
|
success_rate = self.metrics.method_success_rates.get(method.value, 0.5) # Default 50%
|
|
method_scores[method] = base_score * (1 + success_rate)
|
|
|
|
# Sort by score (highest first)
|
|
prioritized = sorted(available_methods, key=lambda m: method_scores[m], reverse=True)
|
|
|
|
# Always ensure transcript-only is last as ultimate fallback
|
|
if DownloadMethod.TRANSCRIPT_ONLY in prioritized:
|
|
prioritized.remove(DownloadMethod.TRANSCRIPT_ONLY)
|
|
prioritized.append(DownloadMethod.TRANSCRIPT_ONLY)
|
|
|
|
return prioritized
|
|
|
|
def _update_success_metrics(self, method: DownloadMethod, video_id: str, success: bool, retry_count: int):
|
|
"""Update success metrics for a method"""
|
|
self.metrics.total_attempts += 1
|
|
self.metrics.update_success_rate(method, success)
|
|
|
|
if success:
|
|
self.metrics.successful_downloads += 1
|
|
# Cache successful method for this video type
|
|
self.success_cache[video_id] = {
|
|
'method': method,
|
|
'timestamp': datetime.now(),
|
|
'retry_count': retry_count
|
|
}
|
|
else:
|
|
self.metrics.failed_downloads += 1
|
|
|
|
async def _extract_video_id(self, url: str) -> str:
|
|
"""Extract video ID from URL"""
|
|
import re
|
|
|
|
patterns = [
|
|
r'(?:youtube\.com/watch\?v=|youtu\.be/)([a-zA-Z0-9_-]{11})',
|
|
r'youtube\.com/embed/([a-zA-Z0-9_-]{11})',
|
|
r'youtube\.com/v/([a-zA-Z0-9_-]{11})'
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, url)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
raise DownloaderException(f"Could not extract video ID from URL: {url}")
|
|
|
|
async def get_job_status(self, job_id: str) -> Optional[DownloadJobStatus]:
|
|
"""Get status of a download job"""
|
|
return self.active_jobs.get(job_id)
|
|
|
|
async def cancel_job(self, job_id: str) -> bool:
|
|
"""Cancel an active download job"""
|
|
if job_id in self.active_jobs:
|
|
self.active_jobs[job_id].status = DownloadStatus.CANCELLED
|
|
return True
|
|
return False
|
|
|
|
async def health_check(self) -> HealthCheckResult:
|
|
"""Perform health check on all download methods"""
|
|
method_details = {}
|
|
healthy_count = 0
|
|
|
|
tasks = []
|
|
for method, downloader in self.downloaders.items():
|
|
tasks.append(self._test_method_health(method, downloader))
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
for (method, downloader), result in zip(self.downloaders.items(), results):
|
|
if isinstance(result, Exception):
|
|
method_details[method.value] = {
|
|
'status': 'unhealthy',
|
|
'error': str(result),
|
|
'last_check': datetime.now().isoformat()
|
|
}
|
|
elif result:
|
|
method_details[method.value] = {
|
|
'status': 'healthy',
|
|
'last_check': datetime.now().isoformat(),
|
|
'success_rate': self.metrics.method_success_rates.get(method.value, 0.0)
|
|
}
|
|
healthy_count += 1
|
|
else:
|
|
method_details[method.value] = {
|
|
'status': 'unhealthy',
|
|
'error': 'Connection test failed',
|
|
'last_check': datetime.now().isoformat()
|
|
}
|
|
|
|
# Determine overall health
|
|
total_methods = len(self.downloaders)
|
|
if healthy_count >= (total_methods * 0.7): # 70% healthy
|
|
overall_status = 'healthy'
|
|
elif healthy_count >= 1: # At least one working
|
|
overall_status = 'degraded'
|
|
else:
|
|
overall_status = 'unhealthy'
|
|
|
|
# Generate recommendations
|
|
recommendations = []
|
|
if healthy_count < total_methods:
|
|
unhealthy_methods = [method for method, details in method_details.items()
|
|
if details['status'] == 'unhealthy']
|
|
recommendations.append(f"Check configuration for: {', '.join(unhealthy_methods)}")
|
|
|
|
if overall_status == 'unhealthy':
|
|
recommendations.append("All download methods are failing - check network connectivity")
|
|
|
|
return HealthCheckResult(
|
|
overall_status=overall_status,
|
|
healthy_methods=healthy_count,
|
|
total_methods=total_methods,
|
|
method_details=method_details,
|
|
recommendations=recommendations
|
|
)
|
|
|
|
async def _test_method_health(self, method: DownloadMethod, downloader) -> bool:
|
|
"""Test health of a specific download method"""
|
|
try:
|
|
async with DownloadTimeout(30): # 30 second timeout for health check
|
|
return await downloader.test_connection()
|
|
except Exception as e:
|
|
logger.warning(f"Health check failed for {method.value}: {e}")
|
|
return False
|
|
|
|
def get_metrics(self) -> DownloadMetrics:
|
|
"""Get download metrics"""
|
|
return self.metrics
|
|
|
|
def get_active_jobs(self) -> Dict[str, DownloadJobStatus]:
|
|
"""Get all active download jobs"""
|
|
return self.active_jobs.copy()
|
|
|
|
async def _send_progress_update(
|
|
self,
|
|
job_id: str,
|
|
stage: str,
|
|
percentage: float,
|
|
message: str,
|
|
sub_progress: Optional[DownloadProgress] = None
|
|
):
|
|
"""Send progress update via WebSocket if available"""
|
|
if not self.websocket_manager:
|
|
return
|
|
|
|
progress_data = {
|
|
'stage': stage,
|
|
'percentage': percentage,
|
|
'message': message,
|
|
'time_elapsed': time.time() if job_id in self.active_jobs else 0,
|
|
'sub_progress': None
|
|
}
|
|
|
|
# Add sub-progress details if available
|
|
if sub_progress:
|
|
progress_data['sub_progress'] = {
|
|
'download_percent': sub_progress.download_percent,
|
|
'bytes_downloaded': sub_progress.bytes_downloaded,
|
|
'total_bytes': sub_progress.total_bytes,
|
|
'speed_bps': sub_progress.speed_bps,
|
|
'eta_seconds': sub_progress.eta_seconds,
|
|
'current_method': sub_progress.current_method,
|
|
'retry_attempt': sub_progress.retry_attempt
|
|
}
|
|
|
|
try:
|
|
await self.websocket_manager.send_progress_update(job_id, progress_data)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to send WebSocket progress update: {e}")
|
|
|
|
async def cleanup_old_files(self, max_age_days: int = None) -> Dict[str, Any]:
|
|
"""Clean up old downloaded files"""
|
|
if max_age_days is None:
|
|
max_age_days = self.config.cleanup_older_than_days
|
|
|
|
cutoff_time = datetime.now() - timedelta(days=max_age_days)
|
|
|
|
stats = {
|
|
'files_deleted': 0,
|
|
'bytes_freed': 0,
|
|
'errors': []
|
|
}
|
|
|
|
for storage_dir in ['videos', 'audio', 'temp']:
|
|
dir_path = self.config.get_storage_dirs()[storage_dir]
|
|
|
|
if not dir_path.exists():
|
|
continue
|
|
|
|
for file_path in dir_path.glob('*'):
|
|
try:
|
|
if file_path.is_file():
|
|
file_time = datetime.fromtimestamp(file_path.stat().st_mtime)
|
|
|
|
if file_time < cutoff_time:
|
|
file_size = file_path.stat().st_size
|
|
file_path.unlink()
|
|
|
|
stats['files_deleted'] += 1
|
|
stats['bytes_freed'] += file_size
|
|
|
|
except Exception as e:
|
|
stats['errors'].append(f"Failed to delete {file_path}: {e}")
|
|
|
|
logger.info(f"Cleanup completed: {stats['files_deleted']} files deleted, "
|
|
f"{stats['bytes_freed'] / 1024 / 1024:.2f} MB freed")
|
|
|
|
return stats |