""" Integration tests for complete audio processing pipeline. """ import pytest import time import json import os from pathlib import Path from unittest.mock import Mock, patch, MagicMock import tempfile import shutil from src.core.audio_processor import AudioProcessor from src.core.word_detector import WordDetector from src.core.audio_censor import AudioCensor from src.api import create_app class TestEndToEndProcessing: """Test complete end-to-end audio processing.""" @patch('src.core.audio_processor.whisper') def test_complete_processing_workflow(self, mock_whisper, temp_dir): """Test complete workflow from upload to processed file.""" # Setup mock Whisper mock_model = Mock() mock_model.transcribe.return_value = { 'text': 'This is a test with badword1 and normal words.', 'segments': [{ 'id': 0, 'start': 0.0, 'end': 5.0, 'text': 'This is a test with badword1 and normal words.', 'words': [ {'word': 'This', 'start': 0.0, 'end': 0.5}, {'word': 'is', 'start': 0.5, 'end': 0.8}, {'word': 'a', 'start': 0.8, 'end': 1.0}, {'word': 'test', 'start': 1.0, 'end': 1.5}, {'word': 'with', 'start': 1.5, 'end': 2.0}, {'word': 'badword1', 'start': 2.0, 'end': 2.5}, {'word': 'and', 'start': 2.5, 'end': 3.0}, {'word': 'normal', 'start': 3.0, 'end': 3.5}, {'word': 'words', 'start': 3.5, 'end': 4.0} ] }] } mock_whisper.load_model.return_value = mock_model # Create test audio file audio_file = temp_dir / 'test_audio.mp3' audio_file.write_bytes(b'ID3' + b'\x00' * 1000) # Initialize processor processor = AudioProcessor(model_size='base') # Process file result = processor.process_file( input_file=str(audio_file), output_file=str(temp_dir / 'output.mp3'), word_list=['badword1', 'badword2'], censor_method='beep' ) # Verify results assert result['words_detected'] == 1 assert result['words_censored'] == 1 assert 'audio_duration' in result assert 'detected_words' in result assert len(result['detected_words']) == 1 assert result['detected_words'][0]['word'] == 'badword1' @patch('src.core.audio_processor.whisper') def test_batch_processing_pipeline(self, mock_whisper, temp_dir): """Test processing multiple files in batch.""" # Setup mock mock_model = Mock() mock_model.transcribe.return_value = { 'text': 'Sample text with badword1.', 'segments': [{ 'text': 'Sample text with badword1.', 'words': [ {'word': 'Sample', 'start': 0.0, 'end': 0.5}, {'word': 'text', 'start': 0.5, 'end': 1.0}, {'word': 'with', 'start': 1.0, 'end': 1.5}, {'word': 'badword1', 'start': 1.5, 'end': 2.0} ] }] } mock_whisper.load_model.return_value = mock_model # Create multiple test files files = [] for i in range(3): file_path = temp_dir / f'audio_{i}.mp3' file_path.write_bytes(b'ID3' + b'\x00' * 500) files.append(file_path) # Process batch processor = AudioProcessor(model_size='base') results = [] for file_path in files: output_path = temp_dir / f'output_{file_path.stem}.mp3' result = processor.process_file( input_file=str(file_path), output_file=str(output_path), word_list=['badword1'], censor_method='silence' ) results.append(result) # Verify all processed assert len(results) == 3 for result in results: assert result['words_detected'] == 1 assert result['words_censored'] == 1 def test_processing_with_different_censor_methods(self, temp_dir): """Test different censorship methods.""" censor_methods = ['beep', 'silence', 'white_noise', 'fade'] # Create test segments test_segments = [ {'start': 1.0, 'end': 1.5, 'word': 'badword1'}, {'start': 3.0, 'end': 3.5, 'word': 'badword2'} ] for method in censor_methods: # Create mock audio data audio_data = b'\x00' * 10000 # Test censoring with each method censor = AudioCensor() # Note: Actual implementation would process real audio # This is testing the interface assert method in ['beep', 'silence', 'white_noise', 'fade'] def test_error_recovery_in_pipeline(self, temp_dir): """Test error recovery during processing.""" # Test with corrupted file corrupted_file = temp_dir / 'corrupted.mp3' corrupted_file.write_bytes(b'INVALID') processor = AudioProcessor(model_size='base') # Should handle error gracefully with pytest.raises(Exception): processor.process_file( input_file=str(corrupted_file), output_file=str(temp_dir / 'output.mp3'), word_list=['test'], censor_method='beep' ) class TestProcessingWithRealAPI: """Test processing through the actual API.""" def test_upload_and_process_via_api(self, client, temp_dir): """Test uploading and processing through API endpoints.""" # Create test file audio_content = b'ID3' + b'\x00' * 1000 # Upload file response = client.post( '/api/upload', data={ 'file': (io.BytesIO(audio_content), 'test.mp3') }, content_type='multipart/form-data' ) assert response.status_code == 200 upload_data = response.get_json() job_id = upload_data['job_id'] # Start processing response = client.post( f'/api/jobs/{job_id}/process', json={ 'word_list_id': 'default', 'censor_method': 'beep', 'whisper_model': 'base' } ) assert response.status_code in [200, 202] # Check status response = client.get(f'/api/jobs/{job_id}/status') assert response.status_code in [200, 404] def test_concurrent_processing_via_api(self, client): """Test processing multiple files concurrently.""" job_ids = [] # Upload multiple files for i in range(3): audio_content = b'ID3' + b'\x00' * 500 response = client.post( '/api/upload', data={ 'file': (io.BytesIO(audio_content), f'test_{i}.mp3') }, content_type='multipart/form-data' ) if response.status_code == 200: job_ids.append(response.get_json()['job_id']) # Start processing all files for job_id in job_ids: response = client.post( f'/api/jobs/{job_id}/process', json={'word_list_id': 'default'} ) assert response.status_code in [200, 202, 404] # Check all statuses for job_id in job_ids: response = client.get(f'/api/jobs/{job_id}/status') assert response.status_code in [200, 404] class TestProcessingOptimizations: """Test processing optimizations and performance.""" @patch('src.core.audio_processor.whisper') def test_model_caching(self, mock_whisper): """Test that models are cached properly.""" mock_model = Mock() mock_whisper.load_model.return_value = mock_model # Create multiple processors processor1 = AudioProcessor(model_size='base') processor2 = AudioProcessor(model_size='base') # Model should be loaded only once (cached) # This depends on implementation assert mock_whisper.load_model.call_count >= 1 def test_parallel_batch_processing(self, temp_dir): """Test parallel processing of batch jobs.""" from concurrent.futures import ThreadPoolExecutor # Create test files files = [] for i in range(5): file_path = temp_dir / f'audio_{i}.mp3' file_path.write_bytes(b'ID3' + b'\x00' * 100) files.append(file_path) def process_file(file_path): # Simulate processing time.sleep(0.1) return {'file': str(file_path), 'processed': True} # Process in parallel with ThreadPoolExecutor(max_workers=3) as executor: results = list(executor.map(process_file, files)) assert len(results) == 5 for result in results: assert result['processed'] is True def test_memory_efficient_processing(self): """Test memory-efficient processing of large files.""" # This would test streaming/chunking for large files # Implementation depends on actual audio processing pass class TestProcessingValidation: """Test validation in processing pipeline.""" def test_input_validation(self, temp_dir): """Test input file validation.""" processor = AudioProcessor(model_size='base') # Test with non-existent file with pytest.raises(FileNotFoundError): processor.process_file( input_file='nonexistent.mp3', output_file=str(temp_dir / 'output.mp3'), word_list=['test'], censor_method='beep' ) # Test with invalid file type text_file = temp_dir / 'test.txt' text_file.write_text('Not audio') with pytest.raises(ValueError): processor.process_file( input_file=str(text_file), output_file=str(temp_dir / 'output.mp3'), word_list=['test'], censor_method='beep' ) def test_word_list_validation(self, temp_dir): """Test word list validation.""" audio_file = temp_dir / 'test.mp3' audio_file.write_bytes(b'ID3' + b'\x00' * 100) processor = AudioProcessor(model_size='base') # Test with empty word list result = processor.process_file( input_file=str(audio_file), output_file=str(temp_dir / 'output.mp3'), word_list=[], censor_method='beep' ) # Should process but find no words assert result['words_detected'] == 0 def test_output_validation(self, temp_dir): """Test output file validation.""" audio_file = temp_dir / 'test.mp3' audio_file.write_bytes(b'ID3' + b'\x00' * 100) processor = AudioProcessor(model_size='base') # Test with invalid output path with pytest.raises(Exception): processor.process_file( input_file=str(audio_file), output_file='/invalid/path/output.mp3', word_list=['test'], censor_method='beep' ) class TestProcessingMonitoring: """Test monitoring and metrics during processing.""" def test_processing_metrics_collection(self): """Test that processing metrics are collected.""" from src.api.websocket_enhanced import JobMetrics metrics = JobMetrics( job_id='test-job', start_time=time.time() ) # Update metrics during processing metrics.current_stage = 'transcription' metrics.overall_progress = 25.0 metrics.words_detected = 5 # Get metrics dict metrics_data = metrics.to_dict() assert 'elapsed_time' in metrics_data assert metrics_data['words_detected'] == 5 assert metrics_data['current_stage'] == 'transcription' def test_performance_tracking(self, temp_dir): """Test performance tracking during processing.""" start_time = time.time() # Simulate processing stages stages = { 'initialization': 0.1, 'loading': 0.2, 'transcription': 0.5, 'detection': 0.3, 'censoring': 0.2, 'finalization': 0.1 } stage_times = {} for stage, duration in stages.items(): stage_start = time.time() time.sleep(duration) stage_times[stage] = time.time() - stage_start total_time = time.time() - start_time # Verify timing assert total_time >= sum(stages.values()) for stage, expected in stages.items(): assert stage_times[stage] >= expected import io # Add this import at the top if __name__ == '__main__': pytest.main([__file__, '-v'])