116 lines
4.7 KiB
Python
116 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Test script for enhanced MediaService with progress tracking and error handling."""
|
|
|
|
import asyncio
|
|
import csv
|
|
import logging
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
from src.services.media_service import create_media_service
|
|
from src.services.media_types import (
|
|
MediaStatus, DownloadProgress, ProcessingProgress, ProgressCallback
|
|
)
|
|
from src.repositories.media_repository import create_media_repository
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TestProgressCallback:
|
|
def __init__(self):
|
|
self.download_progress = []
|
|
self.processing_progress = []
|
|
|
|
def __call__(self, progress: DownloadProgress | ProcessingProgress) -> None:
|
|
if isinstance(progress, DownloadProgress):
|
|
self.download_progress.append(progress)
|
|
logger.info(f"Download Progress: {progress.percentage:.1f}% - {progress.status}")
|
|
elif isinstance(progress, ProcessingProgress):
|
|
self.processing_progress.append(progress)
|
|
logger.info(f"Processing Progress: {progress.stage} - {progress.current_step}/{progress.total_steps} - {progress.status}")
|
|
|
|
|
|
def read_video_urls_from_csv(csv_file: str) -> List[str]:
|
|
"""Read video URLs from CSV file."""
|
|
urls = []
|
|
try:
|
|
with open(csv_file, 'r', encoding='utf-8') as file:
|
|
reader = csv.reader(file)
|
|
for row in reader:
|
|
if row and row[0].strip(): # Check if row is not empty
|
|
# Split by comma and filter out empty strings
|
|
row_urls = [url.strip() for url in row[0].split(',') if url.strip()]
|
|
urls.extend(row_urls)
|
|
logger.info(f"Loaded {len(urls)} video URLs from {csv_file}")
|
|
return urls
|
|
except FileNotFoundError:
|
|
logger.error(f"CSV file not found: {csv_file}")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Error reading CSV file: {e}")
|
|
return []
|
|
|
|
|
|
async def test_enhanced_media_service():
|
|
video_urls = read_video_urls_from_csv('videos.csv')
|
|
if not video_urls:
|
|
logger.error("No video URLs found. Please check videos.csv file.")
|
|
return
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
temp_path = Path(temp_dir)
|
|
media_repository = create_media_repository()
|
|
media_service = create_media_service(media_repository=media_repository)
|
|
await media_service.initialize()
|
|
progress_callback = TestProgressCallback()
|
|
test_url = video_urls[0]
|
|
|
|
try:
|
|
media_file = await media_service.process_media_pipeline(
|
|
test_url, temp_path, progress_callback=progress_callback
|
|
)
|
|
logger.info(f"Pipeline completed successfully: {media_file.filename}")
|
|
logger.info(f"Final status: {media_file.status}")
|
|
|
|
logger.info(f"Download progress updates: {len(progress_callback.download_progress)}")
|
|
logger.info(f"Processing progress updates: {len(progress_callback.processing_progress)}")
|
|
|
|
telemetry_data = media_service.get_telemetry_data()
|
|
logger.info(f"Telemetry records: {len(telemetry_data)}")
|
|
for telemetry in telemetry_data:
|
|
logger.info(f"Operation: {telemetry.operation}, Duration: {telemetry.duration:.2f}s, Success: {telemetry.success}")
|
|
if telemetry.error_type:
|
|
logger.info(f" Error: {telemetry.error_type} - {telemetry.error_message}")
|
|
|
|
# Test error handling with invalid URL
|
|
try:
|
|
await media_service.download_media("https://invalid-url-that-does-not-exist.com", temp_path)
|
|
except Exception as e:
|
|
logger.info(f"Expected error caught: {type(e).__name__} - {str(e)}")
|
|
|
|
# Test file size validation
|
|
try:
|
|
large_file = temp_path / "large_test.txt"
|
|
large_file.write_text("x" * (600 * 1024 * 1024)) # 600MB file
|
|
is_valid = await media_service.validate_file_size(large_file, max_size_mb=500)
|
|
logger.info(f"Large file validation result: {is_valid}")
|
|
except Exception as e:
|
|
logger.info(f"File size validation error: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during testing: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
final_telemetry = media_service.get_telemetry_data()
|
|
logger.info("Final Telemetry Summary:")
|
|
for telemetry in final_telemetry:
|
|
logger.info(f" {telemetry.operation}: {telemetry.duration:.2f}s, Success: {telemetry.success}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(test_enhanced_media_service())
|