""" Video Download Service using yt-dlp for YouTube video management. Handles downloading, storage, caching, and cleanup of video files. """ import yt_dlp import os import json import hashlib import shutil import asyncio from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Tuple import logging logger = logging.getLogger(__name__) class VideoDownloadError(Exception): """Custom exception for video download errors""" pass class VideoDownloadService: """Service for downloading and managing YouTube videos locally.""" def __init__( self, storage_dir: str = "data/youtube-videos", max_storage_size_gb: float = 10.0, video_quality: str = "720p", keep_videos: bool = True, cache_file: str = "download_cache.json" ): """ Initialize the video download service. Args: storage_dir: Base directory for storing videos max_storage_size_gb: Maximum storage size in GB video_quality: Default video quality (720p, 1080p, best) keep_videos: Whether to keep videos after processing cache_file: File to track downloaded videos """ self.base_dir = Path(storage_dir) self.max_storage_bytes = max_storage_size_gb * 1024 * 1024 * 1024 self.video_quality = video_quality self.keep_videos = keep_videos self.cache_file = self.base_dir / cache_file # Create directory structure self.videos_dir = self.base_dir / "videos" self.audio_dir = self.base_dir / "audio" self.temp_dir = self.base_dir / "temp" self.metadata_dir = self.base_dir / "metadata" self._ensure_directories() self._load_cache() # Progress tracking self.download_progress = {} def _ensure_directories(self): """Create necessary directories if they don't exist.""" for directory in [ self.base_dir, self.videos_dir, self.audio_dir, self.temp_dir, self.metadata_dir ]: directory.mkdir(exist_ok=True, parents=True) def _load_cache(self): """Load the download cache from file.""" if self.cache_file.exists(): try: with open(self.cache_file, 'r') as f: self.cache = json.load(f) except json.JSONDecodeError: logger.warning("Cache file corrupted, starting with empty cache") self.cache = {} else: self.cache = {} self._save_cache() def _save_cache(self): """Save the download cache to file.""" with open(self.cache_file, 'w') as f: json.dump(self.cache, f, indent=2) def _get_video_hash(self, video_id: str) -> str: """Generate a unique hash for a video ID.""" return hashlib.md5(video_id.encode()).hexdigest() def _get_ydl_opts(self, video_id: str, download: bool = True) -> Dict: """Get yt-dlp options for download or info extraction.""" opts = { 'quiet': True, 'no_warnings': True, 'extract_flat': False, } if download: # Set quality format string based on preference if self.video_quality == "best": format_str = 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best' elif self.video_quality == "1080p": format_str = 'bestvideo[height<=1080][ext=mp4]+bestaudio[ext=m4a]/best[height<=1080][ext=mp4]/best' elif self.video_quality == "720p": format_str = 'bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/best[height<=720][ext=mp4]/best' else: format_str = 'best[ext=mp4]/best' video_path = self.videos_dir / f"{video_id}.mp4" opts.update({ 'format': format_str, 'outtmpl': str(video_path), 'progress_hooks': [lambda d: self._progress_hook(video_id, d)], 'postprocessors': [], 'writeinfojson': True, 'writethumbnail': True, }) else: opts['skip_download'] = True return opts def _progress_hook(self, video_id: str, d: Dict): """Progress hook for yt-dlp downloads.""" if d['status'] == 'downloading': self.download_progress[video_id] = { 'status': 'downloading', 'percent': d.get('_percent_str', 'N/A'), 'speed': d.get('_speed_str', 'N/A'), 'eta': d.get('_eta_str', 'N/A'), 'total_bytes': d.get('total_bytes', 0), 'downloaded_bytes': d.get('downloaded_bytes', 0), 'timestamp': datetime.now().isoformat() } elif d['status'] == 'finished': self.download_progress[video_id] = { 'status': 'finished', 'percent': '100%', 'timestamp': datetime.now().isoformat() } async def get_video_info(self, url: str) -> Dict: """ Extract video information using yt-dlp. Args: url: YouTube URL Returns: Video information dictionary """ ydl_opts = self._get_ydl_opts('', download=False) with yt_dlp.YoutubeDL(ydl_opts) as ydl: try: info = await asyncio.get_event_loop().run_in_executor( None, ydl.extract_info, url, False ) return info except Exception as e: logger.error(f"Error extracting video info: {e}") raise VideoDownloadError(f"Failed to get video info: {str(e)}") def is_video_downloaded(self, video_id: str) -> bool: """Check if a video is already downloaded.""" video_hash = self._get_video_hash(video_id) if video_hash not in self.cache: return False video_path = Path(self.cache[video_hash].get('video_path', '')) return video_path.exists() def get_current_storage_usage(self) -> float: """Get current storage usage in bytes.""" total_size = 0 for directory in [self.videos_dir, self.audio_dir]: if directory.exists(): for path in directory.glob('**/*'): if path.is_file(): total_size += path.stat().st_size return total_size def cleanup_old_videos(self, bytes_to_free: int) -> int: """ Remove oldest videos to free up space. Args: bytes_to_free: Number of bytes to free Returns: Number of bytes actually freed """ if not self.cache: return 0 # Sort videos by download date (oldest first) sorted_videos = sorted( self.cache.items(), key=lambda x: x[1].get('download_date', '1970-01-01') ) bytes_freed = 0 videos_removed = [] for video_hash, info in sorted_videos: if bytes_freed >= bytes_to_free: break # Skip videos marked as "keep" if info.get('keep', False): continue video_path = Path(info.get('video_path', '')) audio_path = Path(info.get('audio_path', '')) freed_this_video = 0 if video_path.exists(): freed_this_video += video_path.stat().st_size video_path.unlink() logger.info(f"Removed video: {video_path}") if audio_path.exists(): freed_this_video += audio_path.stat().st_size audio_path.unlink() logger.info(f"Removed audio: {audio_path}") # Remove metadata files video_id = info.get('video_id') if video_id: info_file = self.videos_dir / f"{video_id}.info.json" thumb_file = self.videos_dir / f"{video_id}.jpg" for file in [info_file, thumb_file]: if file.exists(): file.unlink() bytes_freed += freed_this_video videos_removed.append(video_hash) # Update cache for video_hash in videos_removed: del self.cache[video_hash] self._save_cache() logger.info(f"Cleanup freed {bytes_freed / (1024*1024):.2f} MB") return bytes_freed async def download_video( self, url: str, extract_audio: bool = True, force: bool = False ) -> Tuple[Optional[Path], Optional[Path]]: """ Download a video and optionally extract audio. Args: url: YouTube URL extract_audio: Whether to extract audio force: Force re-download even if cached Returns: Tuple of (video_path, audio_path) """ try: # Get video info first info = await self.get_video_info(url) video_id = info['id'] video_hash = self._get_video_hash(video_id) # Check if already downloaded if not force and self.is_video_downloaded(video_id): logger.info(f"Video {video_id} already downloaded, using cached version") cached_info = self.cache[video_hash] video_path = Path(cached_info['video_path']) audio_path = Path(cached_info.get('audio_path', '')) if cached_info.get('audio_path') else None return video_path, audio_path # Check storage space current_usage = self.get_current_storage_usage() estimated_size = info.get('filesize_approx', 500 * 1024 * 1024) # Default 500MB if current_usage + estimated_size > self.max_storage_bytes: bytes_to_free = (current_usage + estimated_size) - self.max_storage_bytes logger.info(f"Storage limit reached, freeing {bytes_to_free / (1024*1024):.2f} MB") freed = self.cleanup_old_videos(bytes_to_free) if freed < bytes_to_free: raise VideoDownloadError( f"Insufficient storage space. Need {bytes_to_free / (1024*1024):.2f} MB, " f"but only freed {freed / (1024*1024):.2f} MB" ) # Download video logger.info(f"Downloading video {video_id} at {self.video_quality} quality") video_path = self.videos_dir / f"{video_id}.mp4" ydl_opts = self._get_ydl_opts(video_id, download=True) with yt_dlp.YoutubeDL(ydl_opts) as ydl: await asyncio.get_event_loop().run_in_executor( None, ydl.download, [url] ) audio_path = None # Extract audio if requested if extract_audio and video_path.exists(): audio_path = self.audio_dir / f"{video_id}.mp3" logger.info(f"Extracting audio to {audio_path}") audio_opts = { 'format': 'bestaudio/best', 'outtmpl': str(audio_path.with_suffix('')), # Remove .mp3 for yt-dlp 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }], 'quiet': True, 'no_warnings': True, } with yt_dlp.YoutubeDL(audio_opts) as ydl: await asyncio.get_event_loop().run_in_executor( None, ydl.download, [url] ) # Update cache self.cache[video_hash] = { 'video_id': video_id, 'title': info.get('title', 'Unknown'), 'channel': info.get('channel', 'Unknown'), 'duration': info.get('duration', 0), 'video_path': str(video_path), 'audio_path': str(audio_path) if audio_path else None, 'download_date': datetime.now().isoformat(), 'size_bytes': video_path.stat().st_size if video_path.exists() else 0, 'url': url, 'quality': self.video_quality } self._save_cache() logger.info(f"Successfully downloaded video {video_id}") return video_path, audio_path except Exception as e: logger.error(f"Error downloading video from {url}: {e}") # Clean up any partial downloads if 'video_id' in locals(): self._cleanup_failed_download(video_id) raise VideoDownloadError(f"Failed to download video: {str(e)}") def _cleanup_failed_download(self, video_id: str): """Clean up any files from a failed download.""" logger.info(f"Cleaning up failed download for {video_id}") # Remove video and audio files video_path = self.videos_dir / f"{video_id}.mp4" audio_path = self.audio_dir / f"{video_id}.mp3" info_path = self.videos_dir / f"{video_id}.info.json" thumb_path = self.videos_dir / f"{video_id}.jpg" for path in [video_path, audio_path, info_path, thumb_path]: if path.exists(): path.unlink() logger.debug(f"Removed {path}") # Remove from cache if exists video_hash = self._get_video_hash(video_id) if video_hash in self.cache: del self.cache[video_hash] self._save_cache() def get_storage_stats(self) -> Dict: """Get storage statistics.""" total_videos = len(self.cache) total_size = self.get_current_storage_usage() available_size = self.max_storage_bytes - total_size return { 'total_videos': total_videos, 'total_size_bytes': total_size, 'total_size_mb': total_size / (1024 * 1024), 'total_size_gb': total_size / (1024 * 1024 * 1024), 'max_size_bytes': self.max_storage_bytes, 'max_size_gb': self.max_storage_bytes / (1024 * 1024 * 1024), 'available_bytes': available_size, 'available_mb': available_size / (1024 * 1024), 'available_gb': available_size / (1024 * 1024 * 1024), 'usage_percent': (total_size / self.max_storage_bytes * 100) if self.max_storage_bytes > 0 else 0, 'video_quality': self.video_quality, 'keep_videos': self.keep_videos } def get_download_progress(self, video_id: str) -> Optional[Dict]: """Get download progress for a specific video.""" return self.download_progress.get(video_id) def get_cached_videos(self) -> List[Dict]: """Get list of all cached videos with their info.""" videos = [] for video_hash, info in self.cache.items(): video_info = info.copy() video_info['hash'] = video_hash video_info['exists'] = Path(info['video_path']).exists() videos.append(video_info) # Sort by download date, newest first videos.sort(key=lambda x: x.get('download_date', ''), reverse=True) return videos