575 lines
24 KiB
Python
575 lines
24 KiB
Python
"""
|
|
Playwright-based video downloader using browser automation
|
|
"""
|
|
import asyncio
|
|
import time
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List
|
|
import logging
|
|
|
|
from backend.models.video_download import (
|
|
VideoDownloadResult,
|
|
DownloadPreferences,
|
|
DownloadMethod,
|
|
DownloadStatus,
|
|
VideoMetadata,
|
|
TranscriptData,
|
|
DownloaderException,
|
|
VideoNotAvailableError,
|
|
NetworkError
|
|
)
|
|
from backend.services.video_downloaders.base_downloader import BaseVideoDownloader
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PlaywrightDownloader(BaseVideoDownloader):
|
|
"""Playwright-based video downloader using MCP server with persistent authentication"""
|
|
|
|
def __init__(self, method: DownloadMethod = DownloadMethod.PLAYWRIGHT, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(method, config)
|
|
self.output_dir = Path(config.get('output_dir', './video_storage')) if config else Path('./video_storage')
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Configuration
|
|
self.headless = config.get('headless', True) if config else True
|
|
self.timeout = config.get('timeout', 30000) if config else 30000
|
|
self.session_file = config.get('session_file') if config else None
|
|
|
|
# Authentication settings
|
|
self.use_authentication = config.get('use_authentication', True) if config else True
|
|
self.fallback_to_guest = config.get('fallback_to_guest', True) if config else True
|
|
self._auth_checked = False
|
|
self._is_authenticated = False
|
|
|
|
async def download_video(self, url: str, preferences: DownloadPreferences) -> VideoDownloadResult:
|
|
"""Download video using Playwright browser automation with persistent authentication"""
|
|
start_time = time.time()
|
|
video_id = await self.extract_video_id(url)
|
|
|
|
try:
|
|
# Use the MCP Playwright server for browser automation
|
|
from backend.core.mcp_client import get_mcp_client
|
|
|
|
mcp_client = get_mcp_client("playwright")
|
|
|
|
# Check authentication status if enabled
|
|
if self.use_authentication and not self._auth_checked:
|
|
await self._check_authentication_status(mcp_client)
|
|
|
|
# Navigate to video page
|
|
await self._navigate_to_video(mcp_client, url)
|
|
|
|
# Extract video metadata from page
|
|
metadata = await self._extract_metadata_from_page(mcp_client, video_id)
|
|
|
|
# Check duration limits
|
|
if metadata.duration_seconds and preferences.max_duration_minutes > 0:
|
|
if metadata.duration_seconds > (preferences.max_duration_minutes * 60):
|
|
return self.create_result(
|
|
video_id, url, DownloadStatus.FAILED,
|
|
f"Video too long: {metadata.duration_seconds//60} minutes"
|
|
)
|
|
|
|
# Extract video URLs from page
|
|
video_urls = await self._extract_video_urls(mcp_client)
|
|
|
|
if not video_urls:
|
|
raise DownloaderException("Could not extract video URLs from page")
|
|
|
|
# Download the video/audio streams
|
|
video_path = None
|
|
audio_path = None
|
|
|
|
if preferences.prefer_audio_only:
|
|
audio_path = await self._download_audio_stream(video_urls, video_id)
|
|
else:
|
|
video_path, audio_path = await self._download_video_and_audio_streams(
|
|
video_urls, video_id, preferences
|
|
)
|
|
|
|
# Get transcript
|
|
transcript = None
|
|
if preferences.enable_subtitles:
|
|
transcript = await self._extract_transcript_from_browser(mcp_client, video_id)
|
|
|
|
processing_time = time.time() - start_time
|
|
|
|
# Calculate file sizes
|
|
file_size = 0
|
|
if audio_path and audio_path.exists():
|
|
file_size += audio_path.stat().st_size
|
|
if video_path and video_path.exists():
|
|
file_size += video_path.stat().st_size
|
|
|
|
return VideoDownloadResult(
|
|
video_id=video_id,
|
|
video_url=url,
|
|
status=DownloadStatus.COMPLETED,
|
|
method=self.method,
|
|
video_path=video_path,
|
|
audio_path=audio_path,
|
|
transcript=transcript,
|
|
metadata=metadata,
|
|
processing_time_seconds=processing_time,
|
|
file_size_bytes=file_size
|
|
)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Playwright download failed for {video_id}: {e}")
|
|
|
|
error_str = str(e).lower()
|
|
if "blocked" in error_str or "forbidden" in error_str:
|
|
raise NetworkError(f"Browser request blocked: {e}")
|
|
elif "private" in error_str or "unavailable" in error_str:
|
|
raise VideoNotAvailableError(f"Video not available: {e}")
|
|
else:
|
|
raise DownloaderException(f"Playwright error: {e}")
|
|
|
|
async def _check_authentication_status(self, mcp_client):
|
|
"""Check if browser session is authenticated to YouTube"""
|
|
try:
|
|
self.logger.info("Checking YouTube authentication status...")
|
|
|
|
# Navigate to YouTube first
|
|
await mcp_client.call_tool("browser_navigate", {
|
|
"url": "https://www.youtube.com"
|
|
})
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
# Check for authentication indicators
|
|
auth_check = await mcp_client.call_tool("browser_evaluate", {
|
|
"function": """() => {
|
|
const loginButton = document.querySelector('a[href*="signin"], button[aria-label*="Sign in"]');
|
|
const accountButton = document.querySelector('[data-ved] [aria-label*="Google Account"], .gb_d .gb_e, #avatar-btn');
|
|
const channelButton = document.querySelector('#channel-handle, #channel-title');
|
|
|
|
const isAuthenticated = !loginButton && (!!accountButton || !!channelButton);
|
|
|
|
let userInfo = null;
|
|
if (isAuthenticated) {
|
|
try {
|
|
const avatar = document.querySelector('#avatar img, .gb_h img');
|
|
const name = document.querySelector('.gb_e .gb_f, #channel-handle');
|
|
userInfo = {
|
|
avatar: avatar ? avatar.src : null,
|
|
name: name ? name.textContent.trim() : 'Authenticated User'
|
|
};
|
|
} catch (e) {
|
|
userInfo = { name: 'Authenticated User', avatar: null };
|
|
}
|
|
}
|
|
|
|
return {
|
|
isAuthenticated: isAuthenticated,
|
|
hasLoginButton: !!loginButton,
|
|
hasAccountButton: !!accountButton,
|
|
hasChannelButton: !!channelButton,
|
|
userInfo: userInfo,
|
|
cookies: document.cookie.length > 0
|
|
};
|
|
}"""
|
|
})
|
|
|
|
if isinstance(auth_check, str):
|
|
auth_check = json.loads(auth_check)
|
|
|
|
self._is_authenticated = auth_check.get("isAuthenticated", False)
|
|
self._auth_checked = True
|
|
|
|
if self._is_authenticated:
|
|
user_info = auth_check.get("userInfo", {})
|
|
self.logger.info(f"Browser session is authenticated to YouTube as: {user_info.get('name', 'Unknown')}")
|
|
else:
|
|
self.logger.info("Browser session is not authenticated to YouTube")
|
|
if not self.fallback_to_guest:
|
|
raise DownloaderException("YouTube authentication required but not available")
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Authentication status check failed: {e}")
|
|
self._is_authenticated = False
|
|
self._auth_checked = True
|
|
|
|
if not self.fallback_to_guest:
|
|
raise DownloaderException(f"Authentication check failed and guest fallback disabled: {e}")
|
|
|
|
async def _navigate_to_video(self, mcp_client, url: str):
|
|
"""Navigate to YouTube video page"""
|
|
try:
|
|
# Use MCP Playwright server to navigate
|
|
result = await mcp_client.call_tool("browser_navigate", {
|
|
"url": url,
|
|
"wait_until": "networkidle"
|
|
})
|
|
|
|
# Wait for video to load
|
|
await asyncio.sleep(3)
|
|
|
|
# Check if page loaded successfully
|
|
if "error" in str(result).lower():
|
|
raise DownloaderException(f"Failed to navigate to video: {result}")
|
|
|
|
except Exception as e:
|
|
raise DownloaderException(f"Navigation failed: {e}")
|
|
|
|
async def _extract_metadata_from_page(self, mcp_client, video_id: str) -> VideoMetadata:
|
|
"""Extract video metadata from the YouTube page"""
|
|
try:
|
|
# JavaScript to extract metadata from YouTube page
|
|
js_code = """
|
|
() => {
|
|
// Try to get data from YouTube's initial data
|
|
const getYtInitialData = () => {
|
|
return window.ytInitialData ||
|
|
window.ytInitialPlayerResponse ||
|
|
{};
|
|
};
|
|
|
|
const data = getYtInitialData();
|
|
const videoDetails = data.videoDetails || data.contents?.videoDetails || {};
|
|
|
|
// Extract from DOM as fallback
|
|
const titleElement = document.querySelector('h1.title yt-formatted-string, h1[data-id] yt-formatted-string, #above-the-fold #title h1');
|
|
const channelElement = document.querySelector('#owner-name a, #channel-name a, .ytd-channel-name a');
|
|
const viewsElement = document.querySelector('#info-strings yt-formatted-string, .view-count');
|
|
const descriptionElement = document.querySelector('#description-text, #meta-contents #description');
|
|
|
|
// Get duration from video element
|
|
const videoElement = document.querySelector('video');
|
|
const duration = videoElement ? Math.floor(videoElement.duration) : null;
|
|
|
|
return {
|
|
title: videoDetails.title || (titleElement ? titleElement.textContent.trim() : null),
|
|
description: videoDetails.shortDescription || (descriptionElement ? descriptionElement.textContent.trim().substring(0, 500) : null),
|
|
duration: videoDetails.lengthSeconds ? parseInt(videoDetails.lengthSeconds) : duration,
|
|
viewCount: videoDetails.viewCount || (viewsElement ? this._parseViewCount(viewsElement.textContent) : null),
|
|
author: videoDetails.author || (channelElement ? channelElement.textContent.trim() : null),
|
|
thumbnail: videoDetails.thumbnail?.thumbnails?.[0]?.url,
|
|
keywords: videoDetails.keywords || []
|
|
};
|
|
}
|
|
"""
|
|
|
|
result = await mcp_client.call_tool("browser_evaluate", {
|
|
"script": js_code
|
|
})
|
|
|
|
if isinstance(result, str):
|
|
metadata_dict = json.loads(result)
|
|
else:
|
|
metadata_dict = result
|
|
|
|
return VideoMetadata(
|
|
video_id=video_id,
|
|
title=metadata_dict.get('title'),
|
|
description=metadata_dict.get('description'),
|
|
duration_seconds=metadata_dict.get('duration'),
|
|
view_count=self._parse_view_count(metadata_dict.get('viewCount')),
|
|
uploader=metadata_dict.get('author'),
|
|
thumbnail_url=metadata_dict.get('thumbnail'),
|
|
tags=metadata_dict.get('keywords', [])
|
|
)
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Metadata extraction failed: {e}")
|
|
return VideoMetadata(video_id=video_id)
|
|
|
|
async def _extract_video_urls(self, mcp_client) -> List[Dict[str, Any]]:
|
|
"""Extract video stream URLs from YouTube page (enhanced for authenticated access)"""
|
|
try:
|
|
# Enhanced JavaScript that takes advantage of authentication
|
|
js_code = f"""
|
|
() => {{
|
|
const isAuthenticated = {str(self._is_authenticated).lower()};
|
|
|
|
// Extract streaming data from YouTube's player
|
|
const getStreamingData = () => {{
|
|
const playerResponse = window.ytInitialPlayerResponse || {{}};
|
|
const streamingData = playerResponse.streamingData || {{}};
|
|
|
|
const formats = [
|
|
...(streamingData.formats || []),
|
|
...(streamingData.adaptiveFormats || [])
|
|
];
|
|
|
|
return formats.map(format => ({{
|
|
url: format.url,
|
|
itag: format.itag,
|
|
quality: format.qualityLabel || format.quality,
|
|
mimeType: format.mimeType,
|
|
hasVideo: format.mimeType?.includes('video') || false,
|
|
hasAudio: format.mimeType?.includes('audio') || false,
|
|
filesize: format.contentLength,
|
|
fps: format.fps,
|
|
bitrate: format.bitrate,
|
|
authenticated: isAuthenticated,
|
|
qualityScore: format.bitrate || 0
|
|
}})).filter(f => f.url);
|
|
}};
|
|
|
|
let formats = getStreamingData();
|
|
|
|
// If authenticated, we may have access to higher quality streams
|
|
if (isAuthenticated) {{
|
|
// Sort by quality score (bitrate) to prioritize higher quality
|
|
formats = formats.sort((a, b) => (b.qualityScore || 0) - (a.qualityScore || 0));
|
|
}}
|
|
|
|
// If no formats found, try alternative method
|
|
if (!formats.length) {{
|
|
// Look for video element source
|
|
const videoElement = document.querySelector('video');
|
|
if (videoElement && videoElement.src) {{
|
|
return [{{
|
|
url: videoElement.src,
|
|
quality: 'unknown',
|
|
mimeType: 'video/mp4',
|
|
hasVideo: true,
|
|
hasAudio: true,
|
|
authenticated: isAuthenticated
|
|
}}];
|
|
}}
|
|
}}
|
|
|
|
return formats;
|
|
}}
|
|
"""
|
|
|
|
result = await mcp_client.call_tool("browser_evaluate", {
|
|
"script": js_code
|
|
})
|
|
|
|
if isinstance(result, str):
|
|
video_urls = json.loads(result)
|
|
else:
|
|
video_urls = result
|
|
|
|
return video_urls or []
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Video URL extraction failed: {e}")
|
|
return []
|
|
|
|
async def _download_audio_stream(self, video_urls: List[Dict[str, Any]], video_id: str) -> Optional[Path]:
|
|
"""Download best audio stream"""
|
|
# Find best audio-only stream
|
|
audio_streams = [
|
|
stream for stream in video_urls
|
|
if stream.get('hasAudio') and not stream.get('hasVideo')
|
|
]
|
|
|
|
if not audio_streams:
|
|
# Fallback to streams with both audio and video
|
|
audio_streams = [
|
|
stream for stream in video_urls
|
|
if stream.get('hasAudio')
|
|
]
|
|
|
|
if not audio_streams:
|
|
return None
|
|
|
|
# Sort by quality/bitrate
|
|
best_audio = max(audio_streams, key=lambda x: x.get('bitrate', 0))
|
|
|
|
return await self._download_stream(best_audio['url'], video_id, 'audio', 'mp3')
|
|
|
|
async def _download_video_and_audio_streams(self, video_urls: List[Dict[str, Any]],
|
|
video_id: str, preferences: DownloadPreferences) -> tuple[Optional[Path], Optional[Path]]:
|
|
"""Download video and audio streams separately"""
|
|
# Find best video stream
|
|
video_streams = [
|
|
stream for stream in video_urls
|
|
if stream.get('hasVideo')
|
|
]
|
|
|
|
# Find best audio stream
|
|
audio_streams = [
|
|
stream for stream in video_urls
|
|
if stream.get('hasAudio') and not stream.get('hasVideo')
|
|
]
|
|
|
|
if not audio_streams:
|
|
audio_streams = [
|
|
stream for stream in video_urls
|
|
if stream.get('hasAudio')
|
|
]
|
|
|
|
video_path = None
|
|
audio_path = None
|
|
|
|
# Download video stream
|
|
if video_streams:
|
|
# Filter by quality preference
|
|
quality_map = {'720p': 720, '1080p': 1080, '480p': 480}
|
|
target_quality = quality_map.get(preferences.quality.value, 720)
|
|
|
|
# Find stream closest to target quality
|
|
best_video = min(video_streams,
|
|
key=lambda x: abs(self._extract_quality_number(x.get('quality', '720p')) - target_quality))
|
|
|
|
video_path = await self._download_stream(best_video['url'], video_id, 'video', 'mp4')
|
|
|
|
# Download audio stream
|
|
if audio_streams:
|
|
best_audio = max(audio_streams, key=lambda x: x.get('bitrate', 0))
|
|
audio_path = await self._download_stream(best_audio['url'], video_id, 'audio', 'mp3')
|
|
|
|
return video_path, audio_path
|
|
|
|
async def _download_stream(self, url: str, video_id: str, stream_type: str, extension: str) -> Optional[Path]:
|
|
"""Download a single stream"""
|
|
try:
|
|
import aiohttp
|
|
import aiofiles
|
|
|
|
output_path = self.output_dir / f"{video_id}_{stream_type}.{extension}"
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status != 200:
|
|
raise DownloaderException(f"HTTP {response.status} for stream download")
|
|
|
|
async with aiofiles.open(output_path, 'wb') as f:
|
|
async for chunk in response.content.iter_chunked(8192):
|
|
await f.write(chunk)
|
|
|
|
self.logger.info(f"Downloaded {stream_type} stream: {output_path}")
|
|
return output_path
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Stream download failed: {e}")
|
|
return None
|
|
|
|
async def _extract_transcript_from_browser(self, mcp_client, video_id: str) -> Optional[TranscriptData]:
|
|
"""Try to extract transcript from browser"""
|
|
try:
|
|
# First try the standard transcript API
|
|
from youtube_transcript_api import YouTubeTranscriptApi
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
def _get_transcript():
|
|
api = YouTubeTranscriptApi()
|
|
transcript = api.fetch(video_id, languages=['en'])
|
|
|
|
full_text = ' '.join([snippet.text for snippet in transcript.snippets])
|
|
segments = [
|
|
{
|
|
'text': snippet.text,
|
|
'start': snippet.start,
|
|
'duration': snippet.duration
|
|
}
|
|
for snippet in transcript.snippets
|
|
]
|
|
|
|
return full_text, segments, transcript.is_generated, transcript.language_code
|
|
|
|
text, segments, is_generated, language = await loop.run_in_executor(None, _get_transcript)
|
|
|
|
return TranscriptData(
|
|
text=text,
|
|
language=language,
|
|
is_auto_generated=is_generated,
|
|
segments=segments,
|
|
source="youtube-transcript-api"
|
|
)
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Transcript extraction failed: {e}")
|
|
return None
|
|
|
|
def _parse_view_count(self, views_text) -> Optional[int]:
|
|
"""Parse view count from text"""
|
|
if not views_text:
|
|
return None
|
|
|
|
try:
|
|
# Remove non-numeric characters except for multipliers
|
|
import re
|
|
views_clean = re.sub(r'[^\d.KMB]', '', str(views_text).upper())
|
|
|
|
if 'K' in views_clean:
|
|
return int(float(views_clean.replace('K', '')) * 1000)
|
|
elif 'M' in views_clean:
|
|
return int(float(views_clean.replace('M', '')) * 1000000)
|
|
elif 'B' in views_clean:
|
|
return int(float(views_clean.replace('B', '')) * 1000000000)
|
|
else:
|
|
return int(re.sub(r'[^\d]', '', views_clean))
|
|
except:
|
|
return None
|
|
|
|
def _extract_quality_number(self, quality_str: str) -> int:
|
|
"""Extract numeric quality from string like '720p'"""
|
|
try:
|
|
import re
|
|
match = re.search(r'(\d+)', quality_str)
|
|
return int(match.group(1)) if match else 720
|
|
except:
|
|
return 720
|
|
|
|
async def test_connection(self) -> bool:
|
|
"""Test if Playwright MCP server is working and check authentication status"""
|
|
try:
|
|
from backend.core.mcp_client import get_mcp_client
|
|
|
|
mcp_client = get_mcp_client("playwright")
|
|
|
|
# Try to navigate to YouTube
|
|
result = await mcp_client.call_tool("browser_navigate", {
|
|
"url": "https://www.youtube.com",
|
|
"wait_until": "domcontentloaded"
|
|
})
|
|
|
|
if "error" in str(result).lower():
|
|
return False
|
|
|
|
# Check authentication status if enabled
|
|
if self.use_authentication:
|
|
await self._check_authentication_status(mcp_client)
|
|
self.logger.info(f"Authentication status: {'Authenticated' if self._is_authenticated else 'Guest mode'}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Playwright connection test failed: {e}")
|
|
return False
|
|
|
|
def get_authentication_status(self) -> Dict[str, Any]:
|
|
"""Get current authentication status information"""
|
|
return {
|
|
"useAuthentication": self.use_authentication,
|
|
"isAuthenticated": self._is_authenticated,
|
|
"authChecked": self._auth_checked,
|
|
"fallbackToGuest": self.fallback_to_guest,
|
|
"features": {
|
|
"privateVideos": self._is_authenticated,
|
|
"unlistedVideos": self._is_authenticated,
|
|
"memberContent": self._is_authenticated,
|
|
"highQualityStreams": self._is_authenticated,
|
|
"personalPlaylists": self._is_authenticated
|
|
} if self._is_authenticated else {}
|
|
}
|
|
|
|
def supports_audio_only(self) -> bool:
|
|
return True
|
|
|
|
def supports_quality_selection(self) -> bool:
|
|
return True
|
|
|
|
def get_supported_formats(self) -> list[str]:
|
|
return ["mp4", "webm", "mp3"]
|
|
|
|
|
|
# Create a mock MCP client if not available
|
|
class MockMCPClient:
|
|
async def call_tool(self, tool_name: str, params: dict):
|
|
raise DownloaderException("MCP Playwright server not available")
|
|
|
|
|
|
# Register the downloader
|
|
from backend.services.video_downloaders.base_downloader import DownloaderFactory
|
|
DownloaderFactory.register(DownloadMethod.PLAYWRIGHT, PlaywrightDownloader) |