youtube-summarizer/backend/services/video_downloaders/ytdlp_downloader.py

529 lines
22 KiB
Python

"""
Enhanced yt-dlp downloader with progress tracking and 403 error workarounds
"""
import asyncio
import time
import random
import json
from pathlib import Path
from typing import Optional, Dict, Any, List, Callable
import logging
import subprocess
from backend.models.video_download import (
VideoDownloadResult,
DownloadPreferences,
DownloadMethod,
DownloadStatus,
VideoMetadata,
TranscriptData,
VideoQuality,
DownloaderException,
VideoNotAvailableError,
NetworkError
)
from backend.services.video_downloaders.base_downloader import BaseVideoDownloader, DownloadProgress
logger = logging.getLogger(__name__)
class YtDlpDownloader(BaseVideoDownloader):
"""Enhanced yt-dlp downloader with progress tracking and 403 error workarounds"""
def __init__(self, method: DownloadMethod = DownloadMethod.YT_DLP, config: Optional[Dict[str, Any]] = None):
super().__init__(method, config)
self.output_dir = Path(config.get('output_dir', './video_storage')) if config else Path('./video_storage')
self.output_dir.mkdir(parents=True, exist_ok=True)
# Configuration
self.use_cookies = config.get('use_cookies', True) if config else True
self.cookies_file = config.get('cookies_file') if config else None
self.user_agents = config.get('user_agents', self._get_default_user_agents()) if config else self._get_default_user_agents()
self.proxies = config.get('proxies', []) if config else []
# Progress tracking
self.progress_callback: Optional[Callable[[DownloadProgress], None]] = None
self.retry_attempt = 0
def _get_default_user_agents(self) -> List[str]:
"""Get default user agents for rotation"""
return [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:120.0) Gecko/20100101 Firefox/120.0"
]
async def download_video(
self,
url: str,
preferences: DownloadPreferences,
progress_callback: Optional[Callable[[DownloadProgress], None]] = None
) -> VideoDownloadResult:
"""Download video using yt-dlp with progress tracking and multiple fallback strategies"""
start_time = time.time()
video_id = await self.extract_video_id(url)
# Store progress callback for use in strategies
self.progress_callback = progress_callback
self.retry_attempt = 0
# Try multiple strategies
strategies = [
self._download_with_cookies,
self._download_with_user_agent_rotation,
self._download_with_format_selection,
self._download_audio_only_fallback
]
if self.proxies:
strategies.insert(2, self._download_with_proxy_rotation)
last_error = None
for strategy_idx, strategy in enumerate(strategies):
try:
self.retry_attempt = strategy_idx
# Report progress for strategy attempt
await self.report_progress(
self.progress_callback,
DownloadProgress(
download_percent=0.0,
current_method="yt-dlp",
retry_attempt=self.retry_attempt,
status_message=f"Trying yt-dlp strategy: {strategy.__name__.replace('_', ' ').title()}"
)
)
self.logger.info(f"Trying yt-dlp strategy: {strategy.__name__}")
result = await strategy(url, video_id, preferences)
if result:
result.processing_time_seconds = time.time() - start_time
# Report completion
await self.report_progress(
self.progress_callback,
DownloadProgress(
download_percent=100.0,
current_method="yt-dlp",
retry_attempt=self.retry_attempt,
status_message="Download completed successfully"
)
)
return result
except Exception as e:
self.logger.warning(f"yt-dlp strategy {strategy.__name__} failed: {e}")
last_error = e
# Report failure
await self.report_progress(
self.progress_callback,
DownloadProgress(
download_percent=0.0,
current_method="yt-dlp",
retry_attempt=self.retry_attempt,
status_message=f"Strategy failed: {str(e)[:100]}"
)
)
continue
# All strategies failed
error_msg = f"All yt-dlp strategies failed. Last error: {last_error}"
if "403" in str(last_error) or "Forbidden" in str(last_error):
raise NetworkError(f"YouTube blocked yt-dlp requests: {last_error}")
else:
raise DownloaderException(error_msg)
async def _download_with_cookies(self, url: str, video_id: str, preferences: DownloadPreferences) -> Optional[VideoDownloadResult]:
"""Try download with browser cookies"""
if not self.use_cookies:
raise DownloaderException("Cookies disabled")
options = {
'outtmpl': str(self.output_dir / f'{video_id}_%(title)s.%(ext)s'),
'format': self._get_format_selector(preferences),
'user_agent': random.choice(self.user_agents),
'referer': 'https://www.youtube.com/',
'extractor_args': {
'youtube': {
'skip': ['dash', 'hls'] # Skip problematic formats
}
}
}
if self.cookies_file and Path(self.cookies_file).exists():
options['cookiefile'] = str(self.cookies_file)
else:
# Try to use browser cookies
options['cookiesfrombrowser'] = ('chrome', None, None, None)
return await self._execute_ytdlp(url, video_id, options, preferences)
async def _download_with_user_agent_rotation(self, url: str, video_id: str, preferences: DownloadPreferences) -> Optional[VideoDownloadResult]:
"""Try download with user agent rotation"""
user_agent = random.choice(self.user_agents)
options = {
'outtmpl': str(self.output_dir / f'{video_id}_%(title)s.%(ext)s'),
'format': self._get_format_selector(preferences),
'user_agent': user_agent,
'referer': 'https://www.youtube.com/',
'headers': {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
}
return await self._execute_ytdlp(url, video_id, options, preferences)
async def _download_with_proxy_rotation(self, url: str, video_id: str, preferences: DownloadPreferences) -> Optional[VideoDownloadResult]:
"""Try download with proxy rotation"""
if not self.proxies:
raise DownloaderException("No proxies configured")
proxy = random.choice(self.proxies)
options = {
'outtmpl': str(self.output_dir / f'{video_id}_%(title)s.%(ext)s'),
'format': self._get_format_selector(preferences),
'proxy': proxy,
'user_agent': random.choice(self.user_agents),
'socket_timeout': 30
}
return await self._execute_ytdlp(url, video_id, options, preferences)
async def _download_with_format_selection(self, url: str, video_id: str, preferences: DownloadPreferences) -> Optional[VideoDownloadResult]:
"""Try download with specific format selection to avoid problematic streams"""
options = {
'outtmpl': str(self.output_dir / f'{video_id}_%(title)s.%(ext)s'),
'format': 'best[height<=720]/best', # Lower quality to avoid blocks
'user_agent': random.choice(self.user_agents),
'extractor_args': {
'youtube': {
'player_client': ['android', 'web'] # Use different clients
}
}
}
return await self._execute_ytdlp(url, video_id, options, preferences)
async def _download_audio_only_fallback(self, url: str, video_id: str, preferences: DownloadPreferences) -> Optional[VideoDownloadResult]:
"""Try audio-only download as fallback"""
options = {
'outtmpl': str(self.output_dir / f'{video_id}_audio.%(ext)s'),
'format': 'bestaudio/best',
'user_agent': random.choice(self.user_agents),
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}] if self._has_ffmpeg() else []
}
return await self._execute_ytdlp(url, video_id, options, preferences, audio_only=True)
async def _execute_ytdlp(self, url: str, video_id: str, options: Dict[str, Any],
preferences: DownloadPreferences, audio_only: bool = False) -> Optional[VideoDownloadResult]:
"""Execute yt-dlp with given options"""
try:
import yt_dlp
# Add progress hook
options['progress_hooks'] = [self._progress_hook]
# Add metadata extraction
options['writeinfojson'] = True
options['writethumbnail'] = False # Skip thumbnail to avoid extra requests
loop = asyncio.get_event_loop()
def _download():
with yt_dlp.YoutubeDL(options) as ydl:
# First, extract info without downloading
info = ydl.extract_info(url, download=False)
# Check duration
duration = info.get('duration', 0)
if duration and preferences.max_duration_minutes > 0:
if duration > (preferences.max_duration_minutes * 60):
raise DownloaderException(f"Video too long: {duration//60} minutes")
# Now download
info = ydl.extract_info(url, download=True)
return info
info = await loop.run_in_executor(None, _download)
if not info:
return None
# Extract metadata
metadata = self._extract_metadata_from_info(info, video_id)
# Find downloaded files
video_path = None
audio_path = None
# Look for downloaded files
for file_path in self.output_dir.glob(f"{video_id}_*"):
if file_path.suffix.lower() in ['.mp4', '.mkv', '.webm']:
if not audio_only:
video_path = file_path
elif file_path.suffix.lower() in ['.mp3', '.m4a', '.webm']:
audio_path = file_path
# If audio-only but we got video, extract audio
if audio_only and video_path and not audio_path:
audio_path = await self._extract_audio_from_video(video_path, video_id)
# Get transcript
transcript = await self._extract_transcript_ytdlp(video_id)
# Calculate file size
file_size = 0
if video_path and video_path.exists():
file_size += video_path.stat().st_size
if audio_path and audio_path.exists():
file_size += audio_path.stat().st_size
return VideoDownloadResult(
video_id=video_id,
video_url=url,
status=DownloadStatus.COMPLETED,
method=self.method,
video_path=video_path,
audio_path=audio_path,
transcript=transcript,
metadata=metadata,
file_size_bytes=file_size
)
except Exception as e:
self.logger.error(f"yt-dlp execution failed: {e}")
# Analyze error type
error_str = str(e).lower()
if "403" in error_str or "forbidden" in error_str:
raise NetworkError(f"YouTube blocked request: {e}")
elif "private" in error_str or "unavailable" in error_str:
raise VideoNotAvailableError(f"Video not available: {e}")
else:
raise DownloaderException(f"yt-dlp error: {e}")
def _get_format_selector(self, preferences: DownloadPreferences) -> str:
"""Get format selector based on preferences"""
if preferences.prefer_audio_only:
return 'bestaudio/best'
quality_map = {
VideoQuality.AUDIO_ONLY: 'bestaudio',
VideoQuality.LOW_480P: 'best[height<=480]',
VideoQuality.MEDIUM_720P: 'best[height<=720]',
VideoQuality.HIGH_1080P: 'best[height<=1080]',
VideoQuality.BEST: 'best'
}
return quality_map.get(preferences.quality, 'best[height<=720]/best')
def _progress_hook(self, d):
"""Enhanced progress hook for yt-dlp with detailed progress reporting"""
if d['status'] == 'downloading':
# Extract progress information
downloaded_bytes = d.get('downloaded_bytes', 0)
total_bytes = d.get('total_bytes') or d.get('total_bytes_estimate', 0)
percent = (downloaded_bytes / total_bytes * 100) if total_bytes > 0 else 0
speed = d.get('speed', 0) or 0 # bytes per second
eta = d.get('eta', 0) or 0 # seconds
# Create progress update
progress = DownloadProgress(
download_percent=percent,
bytes_downloaded=downloaded_bytes,
total_bytes=total_bytes,
speed_bps=speed,
eta_seconds=eta,
current_method="yt-dlp",
retry_attempt=self.retry_attempt,
status_message=f"Downloading: {percent:.1f}% ({self._format_bytes(downloaded_bytes)}/{self._format_bytes(total_bytes)}) at {self._format_speed(speed)}"
)
# Send progress update asynchronously if callback is available
if self.progress_callback:
# Since this is called from sync context, we need to handle async callback
try:
asyncio.create_task(self.report_progress(self.progress_callback, progress))
except RuntimeError:
# If no event loop is running, try to get the loop
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.call_soon_threadsafe(
lambda: asyncio.create_task(self.report_progress(self.progress_callback, progress))
)
except Exception as e:
self.logger.debug(f"Could not send progress update: {e}")
self.logger.debug(f"Downloading: {percent:.1f}%, Speed: {self._format_speed(speed)}, ETA: {eta}s")
elif d['status'] == 'finished':
self.logger.info(f"Download finished: {d['filename']}")
# Send completion progress
if self.progress_callback:
progress = DownloadProgress(
download_percent=100.0,
current_method="yt-dlp",
retry_attempt=self.retry_attempt,
status_message="Processing downloaded file..."
)
try:
asyncio.create_task(self.report_progress(self.progress_callback, progress))
except RuntimeError:
pass
def _format_bytes(self, bytes: int) -> str:
"""Format bytes to human readable string"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes < 1024.0:
return f"{bytes:.1f}{unit}"
bytes /= 1024.0
return f"{bytes:.1f}PB"
def _format_speed(self, speed: float) -> str:
"""Format speed to human readable string"""
if speed <= 0:
return "N/A"
return f"{self._format_bytes(speed)}/s"
def _extract_metadata_from_info(self, info: Dict[str, Any], video_id: str) -> VideoMetadata:
"""Extract metadata from yt-dlp info"""
return VideoMetadata(
video_id=video_id,
title=info.get('title'),
description=info.get('description'),
duration_seconds=info.get('duration'),
view_count=info.get('view_count'),
upload_date=info.get('upload_date'),
uploader=info.get('uploader'),
thumbnail_url=info.get('thumbnail'),
tags=info.get('tags', []),
language=info.get('language', 'en'),
age_restricted=info.get('age_limit', 0) > 0
)
async def _extract_audio_from_video(self, video_path: Path, video_id: str) -> Optional[Path]:
"""Extract audio from video file"""
if not self._has_ffmpeg():
return None
audio_path = self.output_dir / f"{video_id}_audio.mp3"
try:
import ffmpeg
loop = asyncio.get_event_loop()
def _extract():
(
ffmpeg
.input(str(video_path))
.output(str(audio_path), acodec='mp3', audio_bitrate='192k')
.overwrite_output()
.run(quiet=True)
)
await loop.run_in_executor(None, _extract)
return audio_path
except Exception as e:
self.logger.error(f"Audio extraction failed: {e}")
return None
async def _extract_transcript_ytdlp(self, video_id: str) -> Optional[TranscriptData]:
"""Extract transcript using youtube-transcript-api"""
try:
from youtube_transcript_api import YouTubeTranscriptApi
loop = asyncio.get_event_loop()
def _get_transcript():
api = YouTubeTranscriptApi()
transcript = api.fetch(video_id, languages=['en'])
full_text = ' '.join([snippet.text for snippet in transcript.snippets])
segments = [
{
'text': snippet.text,
'start': snippet.start,
'duration': snippet.duration
}
for snippet in transcript.snippets
]
return full_text, segments, transcript.is_generated, transcript.language_code
text, segments, is_generated, language = await loop.run_in_executor(None, _get_transcript)
return TranscriptData(
text=text,
language=language,
is_auto_generated=is_generated,
segments=segments,
source="youtube-transcript-api"
)
except Exception as e:
self.logger.debug(f"Transcript extraction failed: {e}")
return None
def _has_ffmpeg(self) -> bool:
"""Check if ffmpeg is available"""
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
async def test_connection(self) -> bool:
"""Test if yt-dlp is working"""
try:
import yt_dlp
test_url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ"
loop = asyncio.get_event_loop()
def _test():
with yt_dlp.YoutubeDL({'quiet': True}) as ydl:
info = ydl.extract_info(test_url, download=False)
return info is not None and 'title' in info
return await loop.run_in_executor(None, _test)
except Exception as e:
self.logger.error(f"yt-dlp connection test failed: {e}")
return False
def supports_audio_only(self) -> bool:
return True
def supports_quality_selection(self) -> bool:
return True
def get_supported_formats(self) -> list[str]:
return ["mp4", "webm", "mp3", "m4a"]
# Register the downloader
from backend.services.video_downloaders.base_downloader import DownloaderFactory
DownloaderFactory.register(DownloadMethod.YT_DLP, YtDlpDownloader)