"""Practical examples of using the new service architecture. This file demonstrates common usage patterns for the Trax platform services. """ import asyncio from pathlib import Path from typing import List from src.services import ( create_youtube_service, create_media_service, create_transcription_service, create_enhancement_service, create_export_service, create_batch_processor, create_service_container, ) from src.services.protocols import TranscriptionConfig, ExportFormat async def example_youtube_workflow(): """Example: Extract metadata from YouTube videos.""" print("=== YouTube Workflow Example ===") # Create YouTube service youtube_service = create_youtube_service() # Extract metadata from a single video url = "https://youtube.com/watch?v=dQw4w9WgXcQ" metadata = await youtube_service.extract_metadata(url) print(f"Video Title: {metadata['title']}") print(f"Duration: {metadata['duration']} seconds") print(f"Channel: {metadata['channel']}") print(f"Views: {metadata['view_count']:,}") # Batch extract from multiple videos urls = [ "https://youtube.com/watch?v=video1", "https://youtube.com/watch?v=video2", "https://youtube.com/watch?v=video3" ] results = await youtube_service.batch_extract(urls) print(f"\nBatch processed {len(results)} videos") for result in results: if result["success"]: print(f"✓ {result['url']}: {result['data']['title']}") else: print(f"✗ {result['url']}: {result['error']}") async def example_media_processing(): """Example: Process media files.""" print("\n=== Media Processing Example ===") # Create media service media_service = create_media_service() # Process a media file through the complete pipeline url = "https://example.com/audio.mp3" output_dir = Path("/tmp/media_output") print(f"Processing media from: {url}") # This will download, preprocess, and create database records media_file = await media_service.process_media_pipeline( url=url, output_dir=output_dir ) print(f"Media file processed: {media_file.file_path}") print(f"File size: {media_file.file_size / 1024 / 1024:.2f} MB") print(f"Duration: {media_file.duration:.2f} seconds") # Validate file size is_valid = await media_service.validate_file_size( file_path=Path(media_file.file_path), max_size_mb=500 ) print(f"File size valid: {is_valid}") async def example_transcription_workflow(): """Example: Transcribe audio files.""" print("\n=== Transcription Workflow Example ===") # Create transcription service with custom configuration config = TranscriptionConfig( model="whisper-large-v3", language="en", task="transcribe", temperature=0.0 ) transcription_service = create_transcription_service(config=config) # Transcribe an audio file audio_path = Path("/tmp/audio.wav") print(f"Transcribing audio: {audio_path}") result = await transcription_service.transcribe_audio(audio_path, config) print(f"Transcription completed!") print(f"Text: {result.raw_content[:100]}...") print(f"Word count: {result.word_count}") print(f"Accuracy: {result.accuracy_estimate:.2%}") print(f"Processing time: {result.processing_time_ms}ms") print(f"Model used: {result.model_used}") # Show segments with timestamps print("\nSegments:") for segment in result.segments[:3]: # Show first 3 segments print(f" {segment['start']:.1f}s - {segment['end']:.1f}s: {segment['text']}") async def example_enhancement_workflow(): """Example: Enhance transcript quality.""" print("\n=== Enhancement Workflow Example ===") # Create enhancement service enhancement_service = create_enhancement_service() # Initialize the service await enhancement_service.initialize() # Raw transcript with issues raw_transcript = """ this is a raw transcript with some issues like missing punctuation and capitalization problems also some grammar issues that need fixing """ print("Original transcript:") print(raw_transcript.strip()) # Enhance the transcript enhanced = await enhancement_service.enhance_transcript(raw_transcript) print("\nEnhanced transcript:") print(enhanced.enhanced_text) print(f"\nImprovements made:") for improvement in enhanced.improvements: print(f" • {improvement}") print(f"Confidence: {enhanced.confidence_score:.2%}") print(f"Processing time: {enhanced.processing_time:.2f}s") async def example_export_workflow(): """Example: Export transcripts in various formats.""" print("\n=== Export Workflow Example ===") # Create export service export_service = create_export_service() # Create a sample transcription result from src.services.protocols import TranscriptionResult sample_result = TranscriptionResult( raw_content="This is a sample transcript for export testing.", segments=[ {"start": 0.0, "end": 5.0, "text": "This is a sample transcript", "confidence": 0.95}, {"start": 5.0, "end": 10.0, "text": "for export testing.", "confidence": 0.92} ], confidence_scores=[0.95, 0.92], accuracy_estimate=0.93, word_count=8, processing_time_ms=1500, model_used="whisper-1" ) # Export in different formats output_dir = Path("/tmp/exports") output_dir.mkdir(exist_ok=True) formats = [ExportFormat.JSON, ExportFormat.TXT, ExportFormat.SRT, ExportFormat.MARKDOWN] for format_type in formats: output_path = output_dir / f"transcript.{format_type.value}" result = await export_service.export_transcript( sample_result, output_path, format_type ) if result.success: print(f"✓ Exported to {format_type.value.upper()}: {result.file_path}") print(f" File size: {result.file_size} bytes") else: print(f"✗ Failed to export {format_type.value.upper()}: {result.error_message}") async def example_batch_processing(): """Example: Process multiple tasks in batch.""" print("\n=== Batch Processing Example ===") # Create batch processor batch_processor = create_batch_processor() # Add multiple transcription tasks tasks = [ {"url": "https://youtube.com/watch?v=video1", "priority": "high"}, {"url": "https://youtube.com/watch?v=video2", "priority": "medium"}, {"url": "https://youtube.com/watch?v=video3", "priority": "low"}, {"url": "https://youtube.com/watch?v=video4", "priority": "high"}, ] print(f"Adding {len(tasks)} tasks to batch processor...") task_ids = [] for i, task_data in enumerate(tasks): task_id = await batch_processor.add_task("transcription", task_data) task_ids.append(task_id) print(f" Added task {i+1}: {task_id}") # Process tasks with limited workers print("\nProcessing tasks with 2 workers...") await batch_processor.process_tasks(max_workers=2) # Check progress progress = await batch_processor.get_progress() print(f"\nBatch processing completed!") print(f"Total tasks: {progress.total_tasks}") print(f"Completed: {progress.completed_tasks}") print(f"Failed: {progress.failed_tasks}") print(f"Overall progress: {progress.overall_progress:.1%}") # Get completed tasks completed_tasks = await batch_processor.get_completed_tasks() print(f"\nCompleted task details:") for task in completed_tasks: print(f" Task {task.task_id}: {task.task_type} - {task.status}") async def example_service_container(): """Example: Use all services together in a container.""" print("\n=== Service Container Example ===") # Create complete service container services = create_service_container() print("Available services:") for service_name in services.keys(): print(f" • {service_name}") # Use services from container youtube_service = services["youtube_service"] media_service = services["media_service"] transcription_service = services["transcription_service"] # Complete workflow: YouTube → Media → Transcription url = "https://youtube.com/watch?v=example" print(f"\nProcessing complete workflow for: {url}") # Step 1: Extract YouTube metadata metadata = await youtube_service.extract_metadata(url) print(f"1. YouTube metadata extracted: {metadata['title']}") # Step 2: Process media output_dir = Path("/tmp/workflow_output") media_file = await media_service.process_media_pipeline(url, output_dir) print(f"2. Media processed: {media_file.file_path}") # Step 3: Transcribe result = await transcription_service.transcribe_file(media_file) print(f"3. Transcription completed: {result.word_count} words") print("\nComplete workflow finished successfully!") async def main(): """Run all examples.""" print("🚀 Trax Platform Service Examples") print("=" * 50) try: await example_youtube_workflow() await example_media_processing() await example_transcription_workflow() await example_enhancement_workflow() await example_export_workflow() await example_batch_processing() await example_service_container() print("\n✅ All examples completed successfully!") except Exception as e: print(f"\n❌ Example failed: {e}") import traceback traceback.print_exc() if __name__ == "__main__": # Run examples asyncio.run(main())