trax/test_media_service_integrat...

117 lines
4.4 KiB
Python

#!/usr/bin/env python3
"""Test script for MediaService integration with real video links."""
import asyncio
import csv
import logging
import tempfile
from pathlib import Path
from typing import List
from src.services.media_service import create_media_service, MediaStatus
from src.repositories.media_repository import create_media_repository
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def read_video_urls_from_csv(csv_path: str) -> List[str]:
"""Read video URLs from CSV file."""
urls = []
try:
with open(csv_path, 'r') as file:
reader = csv.reader(file)
for row in reader:
for url in row:
if url.strip() and url.strip().startswith('http'):
urls.append(url.strip())
logger.info(f"Found {len(urls)} video URLs in {csv_path}")
return urls
except Exception as e:
logger.error(f"Error reading CSV file: {e}")
return []
async def test_media_service_integration():
"""Test the MediaService with real video links."""
# Read video URLs from CSV
video_urls = read_video_urls_from_csv('videos.csv')
if not video_urls:
logger.error("No video URLs found in videos.csv")
return
# Create temporary directory for downloads
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
logger.info(f"Using temporary directory: {temp_path}")
# Create media service and repository
media_repository = create_media_repository()
media_service = create_media_service(media_repository=media_repository)
# Initialize the service
await media_service.initialize()
logger.info("MediaService initialized successfully")
# Test with the first video URL
test_url = video_urls[0]
logger.info(f"Testing with URL: {test_url}")
try:
# Download media
logger.info("Starting media download...")
media_info = await media_service.download_media(test_url, temp_path)
logger.info(f"Download completed: {media_info.filename}")
# Create database record
logger.info("Creating database record...")
media_file = await media_service.create_media_file_record(media_info)
logger.info(f"Database record created: {media_file.id}")
# Update status to downloading
await media_service.update_media_file_status(media_file.id, MediaStatus.DOWNLOADING.value)
logger.info("Status updated to downloading")
# Test audio preprocessing
input_file = Path(media_info.local_path)
output_file = temp_path / f"processed_{media_info.filename}.wav"
logger.info("Starting audio preprocessing...")
success = await media_service.preprocess_audio(input_file, output_file)
if success:
logger.info("Audio preprocessing completed successfully")
# Update status to ready
await media_service.update_media_file_status(media_file.id, MediaStatus.READY.value)
logger.info("Status updated to ready")
else:
logger.error("Audio preprocessing failed")
# Update status to failed
await media_service.update_media_file_status(media_file.id, MediaStatus.FAILED.value)
logger.info("Status updated to failed")
# Test database queries
logger.info("Testing database queries...")
pending_files = await media_service.get_pending_media_files()
ready_files = await media_service.get_ready_media_files()
logger.info(f"Pending files: {len(pending_files)}")
logger.info(f"Ready files: {len(ready_files)}")
# Get the media file by ID
retrieved_file = await media_service.get_media_file_by_id(media_file.id)
if retrieved_file:
logger.info(f"Retrieved file: {retrieved_file.filename} (status: {retrieved_file.status})")
else:
logger.error("Failed to retrieve media file by ID")
except Exception as e:
logger.error(f"Error during testing: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test_media_service_integration())