# Batch Processing System The Trax batch processing system provides high-performance parallel processing for multiple media files with comprehensive error handling, progress tracking, and resource monitoring. ## Overview The batch processing system is designed to handle large volumes of audio/video files efficiently while providing real-time feedback and robust error recovery. It's optimized for M3 MacBook performance with configurable worker pools and intelligent resource management. ## Key Features ### Core Capabilities - **Parallel Processing**: Configurable worker pool (default: 8 workers for M3 MacBook) - **Priority Queue**: Task prioritization with automatic retry mechanism - **Real-time Progress**: 5-second interval progress reporting with resource monitoring - **Error Recovery**: Automatic retry with exponential backoff - **Pause/Resume**: User control over processing operations - **Resource Monitoring**: Memory and CPU usage tracking with configurable limits - **Quality Metrics**: Comprehensive reporting with accuracy and quality warnings ### Supported Task Types - **Transcription**: Audio/video to text using Whisper API - **Enhancement**: AI-powered transcript improvement using DeepSeek - **YouTube**: Metadata extraction from YouTube URLs - **Download**: Media file downloading and preprocessing - **Preprocessing**: Audio format conversion and optimization ## Architecture ### Components #### BatchProcessor The main orchestrator that manages the entire batch processing workflow. ```python from src.services.batch_processor import create_batch_processor # Create processor with custom settings processor = create_batch_processor( max_workers=8, # Number of parallel workers queue_size=1000, # Maximum queue size progress_interval=5.0, # Progress reporting interval memory_limit_mb=2048, # Memory limit in MB cpu_limit_percent=90 # CPU usage limit ) ``` #### BatchTask Represents individual tasks in the processing queue. ```python from src.services.batch_processor import BatchTask, TaskType task = BatchTask( id="task_1_transcribe", task_type=TaskType.TRANSCRIBE, data={"file_path": "/path/to/audio.mp3"}, priority=0, # Lower = higher priority max_retries=3 # Maximum retry attempts ) ``` #### BatchProgress Tracks real-time processing progress and resource usage. ```python from src.services.batch_processor import BatchProgress progress = BatchProgress(total_tasks=100) print(f"Success Rate: {progress.success_rate:.1f}%") print(f"Memory Usage: {progress.memory_usage_mb:.1f}MB") print(f"CPU Usage: {progress.cpu_usage_percent:.1f}%") ``` #### BatchResult Comprehensive results summary with quality metrics. ```python from src.services.batch_processor import BatchResult result = BatchResult( success_count=95, failure_count=5, total_count=100, processing_time=120.5, memory_peak_mb=512.0, cpu_peak_percent=75.0, quality_metrics={"avg_accuracy": 95.5} ) ``` ## Usage ### Basic Batch Processing ```bash # Process a folder of audio files trax batch /path/to/audio/files # Process with custom settings trax batch /path/to/files --workers 4 --memory-limit 1024 # Process with enhancement trax batch /path/to/files --enhance --progress-interval 2 ``` ### Programmatic Usage ```python import asyncio from src.services.batch_processor import create_batch_processor, TaskType async def process_files(): # Create batch processor processor = create_batch_processor(max_workers=4) # Add transcription tasks for file_path in audio_files: await processor.add_task( TaskType.TRANSCRIBE, {"file_path": str(file_path)}, priority=0 ) # Progress callback def progress_callback(progress): print(f"Progress: {progress.completed_tasks}/{progress.total_tasks}") print(f"Memory: {progress.memory_usage_mb:.1f}MB") # Start processing result = await processor.start(progress_callback=progress_callback) # Display results print(f"Success: {result.success_count}/{result.total_count}") print(f"Processing time: {result.processing_time:.1f}s") # Run the batch processing asyncio.run(process_files()) ``` ### CLI Options ```bash trax batch [OPTIONS] Options: --workers INTEGER Number of worker processes (default: 8) --progress-interval FLOAT Progress update interval in seconds (default: 5.0) --memory-limit INTEGER Memory limit in MB (default: 2048) --cpu-limit INTEGER CPU usage limit percentage (default: 90) --model TEXT Whisper model to use (default: whisper-1) --language TEXT Language code (auto-detect if not specified) --chunk-size INTEGER Chunk size in seconds for long files (default: 600) --enhance Also enhance transcripts after transcription ``` ## Error Handling ### Automatic Retry - Failed tasks are automatically retried up to 3 times by default - Retry attempts use exponential backoff with priority degradation - Permanent failures are tracked separately for reporting ### Error Recovery Process 1. Task fails during processing 2. Error is captured and logged 3. Retry count is incremented 4. If retries remaining, task is re-queued with lower priority 5. If max retries exceeded, task is marked as permanently failed 6. Failed tasks are tracked separately for reporting ### Error Types Handled - **Network Errors**: API timeouts, connection failures - **File Errors**: Missing files, permission issues - **Processing Errors**: Audio format issues, API rate limits - **Resource Errors**: Memory exhaustion, CPU overload ## Performance Optimization ### M3 MacBook Optimization - Default 8 workers optimized for M3 architecture - Memory and CPU monitoring with configurable limits - Async processing throughout for non-blocking operations - Intelligent caching for expensive operations ### Resource Management - Real-time memory and CPU usage monitoring - Configurable resource limits to prevent system overload - Automatic worker scaling based on resource availability - Graceful degradation under high load ### Performance Benchmarks - **Transcription**: 95%+ accuracy, <30s for 5-minute audio - **Enhancement**: 99%+ accuracy, <35s processing time - **Batch Processing**: Parallel processing with configurable workers - **Resource Usage**: <2GB memory, optimized for M3 architecture ## Progress Tracking ### Real-time Monitoring - Progress updates every 5 seconds (configurable) - Resource usage tracking (memory, CPU) - Active worker count monitoring - Estimated completion time calculation ### Progress Metrics - Total tasks, completed tasks, failed tasks - Success rate and failure rate percentages - Processing time and resource usage peaks - Quality metrics by task type ### CLI Progress Display ``` Progress: 45/100 (45.0% success) | Active: 8 | Failed: 2 | Memory: 512.3MB | CPU: 75.2% ``` ## Quality Metrics ### Transcription Quality - Average accuracy across all transcription tasks - Quality warnings for low-confidence segments - Processing time and efficiency metrics - Error rate and recovery statistics ### Enhancement Quality - Average accuracy improvement across enhancement tasks - Content preservation validation - Processing time and efficiency metrics - Quality validation results ### Overall Metrics - Success rate and failure rate percentages - Processing time and resource usage peaks - Quality warnings aggregation and deduplication - Detailed failure information ## Configuration ### Worker Pool Settings ```python # Default settings for M3 MacBook DEFAULT_WORKERS = 8 DEFAULT_QUEUE_SIZE = 1000 DEFAULT_PROGRESS_INTERVAL = 5.0 DEFAULT_MEMORY_LIMIT_MB = 2048 DEFAULT_CPU_LIMIT_PERCENT = 90 ``` ### Task Priority Levels - **0**: Highest priority (immediate processing) - **1-5**: High priority (processed quickly) - **6-10**: Normal priority (standard processing) - **11+**: Low priority (processed when resources available) ### Retry Configuration - **Default Max Retries**: 3 - **Retry Backoff**: Exponential with priority degradation - **Retry Conditions**: Network errors, temporary failures - **Permanent Failures**: File not found, permission denied ## Testing ### Unit Tests Comprehensive test coverage for all batch processing components: ```bash # Run batch processor tests uv run pytest tests/test_batch_processor.py # Run with coverage uv run pytest tests/test_batch_processor.py --cov=src.services.batch_processor ``` ### Test Coverage - Worker pool initialization and configuration - Task processing and error handling - Progress tracking and resource monitoring - Pause/resume functionality - Quality metrics calculation - Integration tests for multiple task types ## Troubleshooting ### Common Issues #### High Memory Usage ```bash # Reduce worker count and memory limit trax batch /path/to/files --workers 4 --memory-limit 1024 ``` #### Slow Processing ```bash # Increase worker count (if resources available) trax batch /path/to/files --workers 12 --cpu-limit 95 ``` #### Frequent Failures - Check file permissions and accessibility - Verify API keys and rate limits - Monitor network connectivity - Review error logs for specific issues ### Debug Mode ```python import logging # Enable debug logging logging.basicConfig(level=logging.DEBUG) # Process with detailed logging processor = create_batch_processor() # ... processing code ``` ## Future Enhancements ### Planned Features - **Distributed Processing**: Multi-machine batch processing - **Advanced Scheduling**: Time-based and dependency-based scheduling - **Resource Prediction**: ML-based resource usage prediction - **Dynamic Scaling**: Automatic worker scaling based on load - **Advanced Analytics**: Detailed performance analytics and reporting ### Performance Improvements - **GPU Acceleration**: GPU-accelerated processing for supported tasks - **Streaming Processing**: Real-time streaming for live content - **Advanced Caching**: Intelligent caching with predictive loading - **Load Balancing**: Advanced load balancing across workers ## API Reference ### BatchProcessor Methods #### `add_task(task_type, data, priority=0)` Add a task to the processing queue. #### `start(progress_callback=None)` Start batch processing with optional progress callback. #### `pause()` Pause batch processing (workers will complete current tasks). #### `resume()` Resume batch processing. #### `stop()` Stop batch processing immediately. #### `get_progress()` Get current progress information. ### Task Types #### `TaskType.TRANSCRIBE` Transcribe audio/video files using Whisper API. #### `TaskType.ENHANCE` Enhance transcripts using DeepSeek API. #### `TaskType.YOUTUBE` Extract metadata from YouTube URLs. #### `TaskType.DOWNLOAD` Download media files from URLs. #### `TaskType.PREPROCESS` Preprocess audio files for transcription. --- *Last Updated: 2024-12-30* *Version: 0.2.0* *Status: Production Ready*