11 KiB
Batch Processing System
The Trax batch processing system provides high-performance parallel processing for multiple media files with comprehensive error handling, progress tracking, and resource monitoring.
Overview
The batch processing system is designed to handle large volumes of audio/video files efficiently while providing real-time feedback and robust error recovery. It's optimized for M3 MacBook performance with configurable worker pools and intelligent resource management.
Key Features
Core Capabilities
- Parallel Processing: Configurable worker pool (default: 8 workers for M3 MacBook)
- Priority Queue: Task prioritization with automatic retry mechanism
- Real-time Progress: 5-second interval progress reporting with resource monitoring
- Error Recovery: Automatic retry with exponential backoff
- Pause/Resume: User control over processing operations
- Resource Monitoring: Memory and CPU usage tracking with configurable limits
- Quality Metrics: Comprehensive reporting with accuracy and quality warnings
Supported Task Types
- Transcription: Audio/video to text using Whisper API
- Enhancement: AI-powered transcript improvement using DeepSeek
- YouTube: Metadata extraction from YouTube URLs
- Download: Media file downloading and preprocessing
- Preprocessing: Audio format conversion and optimization
Architecture
Components
BatchProcessor
The main orchestrator that manages the entire batch processing workflow.
from src.services.batch_processor import create_batch_processor
# Create processor with custom settings
processor = create_batch_processor(
max_workers=8, # Number of parallel workers
queue_size=1000, # Maximum queue size
progress_interval=5.0, # Progress reporting interval
memory_limit_mb=2048, # Memory limit in MB
cpu_limit_percent=90 # CPU usage limit
)
BatchTask
Represents individual tasks in the processing queue.
from src.services.batch_processor import BatchTask, TaskType
task = BatchTask(
id="task_1_transcribe",
task_type=TaskType.TRANSCRIBE,
data={"file_path": "/path/to/audio.mp3"},
priority=0, # Lower = higher priority
max_retries=3 # Maximum retry attempts
)
BatchProgress
Tracks real-time processing progress and resource usage.
from src.services.batch_processor import BatchProgress
progress = BatchProgress(total_tasks=100)
print(f"Success Rate: {progress.success_rate:.1f}%")
print(f"Memory Usage: {progress.memory_usage_mb:.1f}MB")
print(f"CPU Usage: {progress.cpu_usage_percent:.1f}%")
BatchResult
Comprehensive results summary with quality metrics.
from src.services.batch_processor import BatchResult
result = BatchResult(
success_count=95,
failure_count=5,
total_count=100,
processing_time=120.5,
memory_peak_mb=512.0,
cpu_peak_percent=75.0,
quality_metrics={"avg_accuracy": 95.5}
)
Usage
Basic Batch Processing
# Process a folder of audio files
trax batch /path/to/audio/files
# Process with custom settings
trax batch /path/to/files --workers 4 --memory-limit 1024
# Process with enhancement
trax batch /path/to/files --enhance --progress-interval 2
Programmatic Usage
import asyncio
from src.services.batch_processor import create_batch_processor, TaskType
async def process_files():
# Create batch processor
processor = create_batch_processor(max_workers=4)
# Add transcription tasks
for file_path in audio_files:
await processor.add_task(
TaskType.TRANSCRIBE,
{"file_path": str(file_path)},
priority=0
)
# Progress callback
def progress_callback(progress):
print(f"Progress: {progress.completed_tasks}/{progress.total_tasks}")
print(f"Memory: {progress.memory_usage_mb:.1f}MB")
# Start processing
result = await processor.start(progress_callback=progress_callback)
# Display results
print(f"Success: {result.success_count}/{result.total_count}")
print(f"Processing time: {result.processing_time:.1f}s")
# Run the batch processing
asyncio.run(process_files())
CLI Options
trax batch <folder> [OPTIONS]
Options:
--workers INTEGER Number of worker processes (default: 8)
--progress-interval FLOAT Progress update interval in seconds (default: 5.0)
--memory-limit INTEGER Memory limit in MB (default: 2048)
--cpu-limit INTEGER CPU usage limit percentage (default: 90)
--model TEXT Whisper model to use (default: whisper-1)
--language TEXT Language code (auto-detect if not specified)
--chunk-size INTEGER Chunk size in seconds for long files (default: 600)
--enhance Also enhance transcripts after transcription
Error Handling
Automatic Retry
- Failed tasks are automatically retried up to 3 times by default
- Retry attempts use exponential backoff with priority degradation
- Permanent failures are tracked separately for reporting
Error Recovery Process
- Task fails during processing
- Error is captured and logged
- Retry count is incremented
- If retries remaining, task is re-queued with lower priority
- If max retries exceeded, task is marked as permanently failed
- Failed tasks are tracked separately for reporting
Error Types Handled
- Network Errors: API timeouts, connection failures
- File Errors: Missing files, permission issues
- Processing Errors: Audio format issues, API rate limits
- Resource Errors: Memory exhaustion, CPU overload
Performance Optimization
M3 MacBook Optimization
- Default 8 workers optimized for M3 architecture
- Memory and CPU monitoring with configurable limits
- Async processing throughout for non-blocking operations
- Intelligent caching for expensive operations
Resource Management
- Real-time memory and CPU usage monitoring
- Configurable resource limits to prevent system overload
- Automatic worker scaling based on resource availability
- Graceful degradation under high load
Performance Benchmarks
- Transcription: 95%+ accuracy, <30s for 5-minute audio
- Enhancement: 99%+ accuracy, <35s processing time
- Batch Processing: Parallel processing with configurable workers
- Resource Usage: <2GB memory, optimized for M3 architecture
Progress Tracking
Real-time Monitoring
- Progress updates every 5 seconds (configurable)
- Resource usage tracking (memory, CPU)
- Active worker count monitoring
- Estimated completion time calculation
Progress Metrics
- Total tasks, completed tasks, failed tasks
- Success rate and failure rate percentages
- Processing time and resource usage peaks
- Quality metrics by task type
CLI Progress Display
Progress: 45/100 (45.0% success) | Active: 8 | Failed: 2 | Memory: 512.3MB | CPU: 75.2%
Quality Metrics
Transcription Quality
- Average accuracy across all transcription tasks
- Quality warnings for low-confidence segments
- Processing time and efficiency metrics
- Error rate and recovery statistics
Enhancement Quality
- Average accuracy improvement across enhancement tasks
- Content preservation validation
- Processing time and efficiency metrics
- Quality validation results
Overall Metrics
- Success rate and failure rate percentages
- Processing time and resource usage peaks
- Quality warnings aggregation and deduplication
- Detailed failure information
Configuration
Worker Pool Settings
# Default settings for M3 MacBook
DEFAULT_WORKERS = 8
DEFAULT_QUEUE_SIZE = 1000
DEFAULT_PROGRESS_INTERVAL = 5.0
DEFAULT_MEMORY_LIMIT_MB = 2048
DEFAULT_CPU_LIMIT_PERCENT = 90
Task Priority Levels
- 0: Highest priority (immediate processing)
- 1-5: High priority (processed quickly)
- 6-10: Normal priority (standard processing)
- 11+: Low priority (processed when resources available)
Retry Configuration
- Default Max Retries: 3
- Retry Backoff: Exponential with priority degradation
- Retry Conditions: Network errors, temporary failures
- Permanent Failures: File not found, permission denied
Testing
Unit Tests
Comprehensive test coverage for all batch processing components:
# Run batch processor tests
uv run pytest tests/test_batch_processor.py
# Run with coverage
uv run pytest tests/test_batch_processor.py --cov=src.services.batch_processor
Test Coverage
- Worker pool initialization and configuration
- Task processing and error handling
- Progress tracking and resource monitoring
- Pause/resume functionality
- Quality metrics calculation
- Integration tests for multiple task types
Troubleshooting
Common Issues
High Memory Usage
# Reduce worker count and memory limit
trax batch /path/to/files --workers 4 --memory-limit 1024
Slow Processing
# Increase worker count (if resources available)
trax batch /path/to/files --workers 12 --cpu-limit 95
Frequent Failures
- Check file permissions and accessibility
- Verify API keys and rate limits
- Monitor network connectivity
- Review error logs for specific issues
Debug Mode
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
# Process with detailed logging
processor = create_batch_processor()
# ... processing code
Future Enhancements
Planned Features
- Distributed Processing: Multi-machine batch processing
- Advanced Scheduling: Time-based and dependency-based scheduling
- Resource Prediction: ML-based resource usage prediction
- Dynamic Scaling: Automatic worker scaling based on load
- Advanced Analytics: Detailed performance analytics and reporting
Performance Improvements
- GPU Acceleration: GPU-accelerated processing for supported tasks
- Streaming Processing: Real-time streaming for live content
- Advanced Caching: Intelligent caching with predictive loading
- Load Balancing: Advanced load balancing across workers
API Reference
BatchProcessor Methods
add_task(task_type, data, priority=0)
Add a task to the processing queue.
start(progress_callback=None)
Start batch processing with optional progress callback.
pause()
Pause batch processing (workers will complete current tasks).
resume()
Resume batch processing.
stop()
Stop batch processing immediately.
get_progress()
Get current progress information.
Task Types
TaskType.TRANSCRIBE
Transcribe audio/video files using Whisper API.
TaskType.ENHANCE
Enhance transcripts using DeepSeek API.
TaskType.YOUTUBE
Extract metadata from YouTube URLs.
TaskType.DOWNLOAD
Download media files from URLs.
TaskType.PREPROCESS
Preprocess audio files for transcription.
Last Updated: 2024-12-30
Version: 0.2.0
Status: Production Ready