#!/usr/bin/env python3 """ One-time scan to populate ALL existing YouTube media items with thumbnails """ import sys import logging from datetime import datetime from typing import Dict, List # Add src directory to path sys.path.append('src') from config import BATCH_SIZE, LOG_LEVEL from directus_client import DirectusClient from youtube_processor import YouTubeProcessor class OneTimeThumbnailScanner: """One-time scanner to populate all YouTube thumbnails""" def __init__(self): self.directus_client = DirectusClient() self.youtube_processor = YouTubeProcessor() # Statistics self.stats = { 'items_found': 0, 'items_processed': 0, 'items_succeeded': 0, 'items_failed': 0, 'items_skipped': 0, 'start_time': datetime.now() } self.setup_logging() def setup_logging(self): """Configure logging""" log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' logging.basicConfig( level=getattr(logging, LOG_LEVEL.upper()), format=log_format, handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('/tmp/youtube_one_time_scan.log') ] ) self.logger = logging.getLogger(__name__) self.logger.info("šŸŽ¬ Starting one-time YouTube thumbnail scan...") def get_all_youtube_items(self) -> List[Dict]: """Get ALL YouTube items (with and without thumbnails) for complete scan""" try: import requests import json from config import DIRECTUS_ITEMS_URL # Query for ALL YouTube items regardless of thumbnail status filter_json = json.dumps({ "_and": [ { "_or": [ {"type": {"_eq": "youtube_video"}}, {"type": {"_eq": "youtube"}} ] }, {"url": {"_nnull": True}} ] }) all_items = [] offset = 0 limit = 100 # Larger batch for scanning while True: filter_params = { "filter": filter_json, "limit": limit, "offset": offset, "fields": "id,url,type,title,youtube_thumb" } response = requests.get( f"{DIRECTUS_ITEMS_URL}/media_items", headers=self.directus_client.headers, params=filter_params, timeout=30 ) if response.status_code == 200: data = response.json() items = data.get('data', []) if not items: break all_items.extend(items) offset += limit self.logger.info(f"Fetched {len(items)} items (total: {len(all_items)})") else: self.logger.error(f"Failed to get media items: {response.status_code} - {response.text}") break self.stats['items_found'] = len(all_items) self.logger.info(f"Found {len(all_items)} total YouTube items") return all_items except Exception as e: self.logger.error(f"Error getting all YouTube items: {e}") return [] def process_media_item(self, item: Dict) -> bool: """Process a single media item""" item_id = item.get('id') item_url = item.get('url') item_title = item.get('title', f"Media Item {item_id}") existing_thumb = item.get('youtube_thumb') # Skip if already has thumbnail if existing_thumb: self.logger.info(f"ā­ļø Item {item_id} already has thumbnail: {existing_thumb}") self.stats['items_skipped'] += 1 return True self.logger.info(f"šŸ”„ Processing item {item_id}: {item_title}") try: # Extract video ID video_id = self.youtube_processor.extract_video_id(item_url) if not video_id: self.logger.error(f"Could not extract video ID from URL: {item_url}") return False # Download thumbnail thumbnail_data, filename = self.youtube_processor.download_best_thumbnail(video_id) if not thumbnail_data or not filename: self.logger.error(f"Could not download thumbnail for video: {video_id}") return False # Upload to Directus file_id = self.directus_client.upload_file( thumbnail_data, filename, title=f"YouTube Thumbnail - {video_id}" ) if not file_id: self.logger.error(f"Could not upload thumbnail for video: {video_id}") return False # Update media item success = self.directus_client.update_media_item_thumbnail(item_id, file_id) if success: self.logger.info(f"āœ… Successfully processed item {item_id} -> thumbnail {file_id}") return True else: self.logger.error(f"āŒ Failed to update media item {item_id}") return False except Exception as e: self.logger.error(f"āŒ Error processing item {item_id}: {e}") return False def print_final_statistics(self): """Print final scan statistics""" uptime = datetime.now() - self.stats['start_time'] print(f"\nšŸ“Š One-Time Scan Complete!") print(f"=" * 40) print(f" Duration: {uptime}") print(f" Items Found: {self.stats['items_found']}") print(f" Items Processed: {self.stats['items_processed']}") print(f" Already Had Thumbnails: {self.stats['items_skipped']}") print(f" Successfully Added: {self.stats['items_succeeded']}") print(f" Failed: {self.stats['items_failed']}") if self.stats['items_processed'] > 0: success_rate = (self.stats['items_succeeded'] / self.stats['items_processed']) * 100 print(f" Success Rate: {success_rate:.1f}%") total_with_thumbs = self.stats['items_skipped'] + self.stats['items_succeeded'] coverage = (total_with_thumbs / self.stats['items_found']) * 100 if self.stats['items_found'] > 0 else 0 print(f" Total Coverage: {coverage:.1f}% ({total_with_thumbs}/{self.stats['items_found']})") print("") def run(self): """Main scanning process""" print("šŸŽ¬ YouTube Thumbnail One-Time Scan") print("==================================") print("This will scan ALL YouTube media items and populate missing thumbnails") print("") try: # Get all YouTube items self.logger.info("šŸ” Scanning for all YouTube media items...") items = self.get_all_youtube_items() if not items: self.logger.info("No YouTube items found") return # Process each item self.logger.info(f"šŸ“‹ Processing {len(items)} YouTube items...") for i, item in enumerate(items, 1): print(f"\n[{i}/{len(items)}] Processing: {item.get('title', 'Untitled')}") # Skip if already has thumbnail if item.get('youtube_thumb'): self.stats['items_skipped'] += 1 continue success = self.process_media_item(item) # Update statistics self.stats['items_processed'] += 1 if success: self.stats['items_succeeded'] += 1 else: self.stats['items_failed'] += 1 # Progress update every 5 items if i % 5 == 0: print(f"Progress: {i}/{len(items)} items checked") # Final statistics self.print_final_statistics() except KeyboardInterrupt: self.logger.info("Scan interrupted by user") self.print_final_statistics() except Exception as e: self.logger.error(f"Scan error: {e}") self.print_final_statistics() raise def main(): """Entry point""" try: scanner = OneTimeThumbnailScanner() scanner.run() except Exception as e: print(f"āŒ Failed to start scan: {e}") sys.exit(1) if __name__ == "__main__": main()