#!/usr/bin/env python3 """Test script for MediaService integration with real video links.""" import asyncio import csv import logging import tempfile from pathlib import Path from typing import List from src.services.media_service import create_media_service, MediaStatus from src.repositories.media_repository import create_media_repository # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def read_video_urls_from_csv(csv_path: str) -> List[str]: """Read video URLs from CSV file.""" urls = [] try: with open(csv_path, 'r') as file: reader = csv.reader(file) for row in reader: for url in row: if url.strip() and url.strip().startswith('http'): urls.append(url.strip()) logger.info(f"Found {len(urls)} video URLs in {csv_path}") return urls except Exception as e: logger.error(f"Error reading CSV file: {e}") return [] async def test_media_service_integration(): """Test the MediaService with real video links.""" # Read video URLs from CSV video_urls = read_video_urls_from_csv('videos.csv') if not video_urls: logger.error("No video URLs found in videos.csv") return # Create temporary directory for downloads with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) logger.info(f"Using temporary directory: {temp_path}") # Create media service and repository media_repository = create_media_repository() media_service = create_media_service(media_repository=media_repository) # Initialize the service await media_service.initialize() logger.info("MediaService initialized successfully") # Test with the first video URL test_url = video_urls[0] logger.info(f"Testing with URL: {test_url}") try: # Download media logger.info("Starting media download...") media_info = await media_service.download_media(test_url, temp_path) logger.info(f"Download completed: {media_info.filename}") # Create database record logger.info("Creating database record...") media_file = await media_service.create_media_file_record(media_info) logger.info(f"Database record created: {media_file.id}") # Update status to downloading await media_service.update_media_file_status(media_file.id, MediaStatus.DOWNLOADING.value) logger.info("Status updated to downloading") # Test audio preprocessing input_file = Path(media_info.local_path) output_file = temp_path / f"processed_{media_info.filename}.wav" logger.info("Starting audio preprocessing...") success = await media_service.preprocess_audio(input_file, output_file) if success: logger.info("Audio preprocessing completed successfully") # Update status to ready await media_service.update_media_file_status(media_file.id, MediaStatus.READY.value) logger.info("Status updated to ready") else: logger.error("Audio preprocessing failed") # Update status to failed await media_service.update_media_file_status(media_file.id, MediaStatus.FAILED.value) logger.info("Status updated to failed") # Test database queries logger.info("Testing database queries...") pending_files = await media_service.get_pending_media_files() ready_files = await media_service.get_ready_media_files() logger.info(f"Pending files: {len(pending_files)}") logger.info(f"Ready files: {len(ready_files)}") # Get the media file by ID retrieved_file = await media_service.get_media_file_by_id(media_file.id) if retrieved_file: logger.info(f"Retrieved file: {retrieved_file.filename} (status: {retrieved_file.status})") else: logger.error("Failed to retrieve media file by ID") except Exception as e: logger.error(f"Error during testing: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(test_media_service_integration())