""" Integration tests for complete file upload and processing workflow. """ import pytest import io import json import time from pathlib import Path from unittest.mock import Mock, patch, MagicMock from src.api import create_app class TestCompleteFileWorkflow: """Test complete file workflow from upload to download.""" def test_single_file_complete_workflow(self, client, socketio_client): """Test complete workflow for single file.""" # Step 1: Upload file audio_content = b'ID3' + b'\x00' * 1000 # Minimal MP3 upload_response = client.post( '/api/upload', data={ 'file': (io.BytesIO(audio_content), 'test_complete.mp3') }, content_type='multipart/form-data' ) assert upload_response.status_code == 200 upload_data = upload_response.get_json() job_id = upload_data['job_id'] # Step 2: Join WebSocket room for real-time updates socketio_client.emit('join_job', {'job_id': job_id}) socketio_client.get_received() # Clear initial messages # Step 3: Start processing with patch('src.api.routes.processing.AudioProcessor') as mock_processor: # Mock successful processing mock_instance = Mock() mock_instance.process_file.return_value = { 'words_detected': 3, 'words_censored': 3, 'audio_duration': 30.0, 'output_file': 'processed_test_complete.mp3', 'detected_words': [ {'word': 'badword1', 'start': 5.0, 'end': 5.5, 'confidence': 0.9}, {'word': 'badword2', 'start': 10.0, 'end': 10.5, 'confidence': 0.85}, {'word': 'badword3', 'start': 15.0, 'end': 15.5, 'confidence': 0.88} ] } mock_processor.return_value = mock_instance process_response = client.post( f'/api/jobs/{job_id}/process', json={ 'word_list_id': 'default', 'censor_method': 'beep', 'min_severity': 'medium', 'whisper_model': 'base' } ) assert process_response.status_code in [200, 202] # Step 4: Monitor progress via WebSocket # Simulate progress updates progress_updates = [ {'stage': 'initializing', 'progress': 5}, {'stage': 'loading', 'progress': 10}, {'stage': 'transcription', 'progress': 30}, {'stage': 'detection', 'progress': 60}, {'stage': 'censoring', 'progress': 80}, {'stage': 'finalizing', 'progress': 95} ] for update in progress_updates: socketio_client.emit('job_progress', { 'job_id': job_id, 'stage': update['stage'], 'overall_progress': update['progress'], 'message': f"Processing: {update['stage']}" }) # Step 5: Complete processing socketio_client.emit('job_completed', { 'job_id': job_id, 'output_file': 'processed_test_complete.mp3', 'summary': { 'words_detected': 3, 'words_censored': 3, 'duration': 30.0, 'original_size': len(audio_content), 'processed_size': len(audio_content) - 100 } }) # Step 6: Check final status status_response = client.get(f'/api/jobs/{job_id}/status') if status_response.status_code == 200: status_data = status_response.get_json() assert 'job_id' in status_data # Step 7: Download processed file with patch('src.api.routes.processing.send_file') as mock_send_file: mock_send_file.return_value = Mock(status_code=200) download_response = client.get(f'/api/jobs/{job_id}/download') # Response depends on send_file implementation # Verify WebSocket messages were received received = socketio_client.get_received() progress_messages = [ msg for msg in received if msg['name'] == 'job_progress' ] completion_messages = [ msg for msg in received if msg['name'] == 'job_completed' ] assert len(progress_messages) >= len(progress_updates) assert len(completion_messages) == 1 def test_multi_file_batch_workflow(self, client, socketio_client): """Test workflow for multiple files in batch.""" job_ids = [] # Step 1: Upload multiple files for i in range(3): audio_content = b'ID3' + b'\x00' * (500 + i * 100) upload_response = client.post( '/api/upload', data={ 'file': (io.BytesIO(audio_content), f'batch_file_{i}.mp3') }, content_type='multipart/form-data' ) if upload_response.status_code == 200: job_ids.append(upload_response.get_json()['job_id']) assert len(job_ids) >= 1 # Step 2: Create batch job batch_response = client.post( '/api/batch', json={ 'job_ids': job_ids, 'processing_options': { 'word_list_id': 'default', 'censor_method': 'silence', 'min_severity': 'low' } } ) if batch_response.status_code in [200, 201]: batch_data = batch_response.get_json() batch_id = batch_data['batch_id'] # Step 3: Join batch room for updates socketio_client.emit('join_batch', {'batch_id': batch_id}) socketio_client.get_received() # Step 4: Monitor batch progress for i, job_id in enumerate(job_ids): # File start socketio_client.emit('batch_file_start', { 'batch_id': batch_id, 'job_id': job_id, 'file_index': i, 'total_files': len(job_ids), 'filename': f'batch_file_{i}.mp3' }) # File progress and completion socketio_client.emit('batch_file_complete', { 'batch_id': batch_id, 'job_id': job_id, 'file_index': i, 'results': { 'words_detected': i + 1, 'words_censored': i + 1 } }) # Step 5: Complete batch socketio_client.emit('batch_complete', { 'batch_id': batch_id, 'total_files': len(job_ids), 'successful': len(job_ids), 'failed': 0 }) # Verify batch completion received = socketio_client.get_received() batch_complete_msgs = [ msg for msg in received if msg['name'] == 'batch_complete' ] assert len(batch_complete_msgs) >= 1 def test_error_workflow_recovery(self, client, socketio_client): """Test workflow with error handling and recovery.""" # Step 1: Upload file audio_content = b'ID3' + b'\x00' * 500 upload_response = client.post( '/api/upload', data={ 'file': (io.BytesIO(audio_content), 'error_test.mp3') }, content_type='multipart/form-data' ) assert upload_response.status_code == 200 job_id = upload_response.get_json()['job_id'] # Step 2: Join room socketio_client.emit('join_job', {'job_id': job_id}) socketio_client.get_received() # Step 3: Simulate processing error with patch('src.api.routes.processing.AudioProcessor') as mock_processor: mock_instance = Mock() mock_instance.process_file.side_effect = Exception("Processing failed") mock_processor.return_value = mock_instance process_response = client.post( f'/api/jobs/{job_id}/process', json={'word_list_id': 'default'} ) # Should handle error gracefully assert process_response.status_code in [400, 500] # Step 4: Send error via WebSocket socketio_client.emit('job_error', { 'job_id': job_id, 'error_type': 'processing_failed', 'error_message': 'Failed to process audio file', 'recoverable': True, 'retry_suggestion': 'Try with a different model size' }) # Step 5: Retry processing with patch('src.api.routes.processing.AudioProcessor') as mock_processor: mock_instance = Mock() mock_instance.process_file.return_value = { 'words_detected': 1, 'words_censored': 1, 'audio_duration': 15.0, 'output_file': 'retry_success.mp3' } mock_processor.return_value = mock_instance retry_response = client.post( f'/api/jobs/{job_id}/process', json={ 'word_list_id': 'default', 'whisper_model': 'tiny' # Smaller model for retry } ) # Should succeed on retry assert retry_response.status_code in [200, 202] # Verify error messages were received received = socketio_client.get_received() error_messages = [ msg for msg in received if msg['name'] == 'job_error' ] assert len(error_messages) >= 1 class TestWorkflowValidation: """Test validation throughout the workflow.""" def test_file_type_validation_workflow(self, client): """Test file type validation in complete workflow.""" # Test various file types test_files = [ ('valid.mp3', b'ID3' + b'\x00' * 100, 'audio/mpeg', 200), ('valid.wav', b'RIFF' + b'\x00' * 100, 'audio/wav', 200), ('invalid.txt', b'Just text', 'text/plain', 400), ('invalid.jpg', b'\xFF\xD8\xFF\xE0', 'image/jpeg', 400), ('toolarge.mp3', b'ID3' + b'\x00' * (501 * 1024 * 1024), 'audio/mpeg', 413) ] for filename, content, content_type, expected_status in test_files: if len(content) > 10 * 1024 * 1024: # Skip very large files in tests continue response = client.post( '/api/upload', data={ 'file': (io.BytesIO(content), filename) }, content_type='multipart/form-data' ) assert response.status_code == expected_status def test_processing_options_validation(self, client): """Test validation of processing options.""" # Upload valid file first audio_content = b'ID3' + b'\x00' * 500 upload_response = client.post( '/api/upload', data={ 'file': (io.BytesIO(audio_content), 'test.mp3') }, content_type='multipart/form-data' ) if upload_response.status_code == 200: job_id = upload_response.get_json()['job_id'] # Test invalid processing options invalid_options = [ {'word_list_id': 'nonexistent'}, {'censor_method': 'invalid_method'}, {'min_severity': 'invalid_severity'}, {'whisper_model': 'nonexistent_model'}, {} # Missing required options ] for options in invalid_options: response = client.post( f'/api/jobs/{job_id}/process', json=options ) # Should reject invalid options assert response.status_code in [400, 404, 422] def test_concurrent_job_limit(self, client): """Test handling of concurrent job limits.""" job_ids = [] # Try to upload many files for i in range(20): audio_content = b'ID3' + b'\x00' * 100 response = client.post( '/api/upload', data={ 'file': (io.BytesIO(audio_content), f'concurrent_{i}.mp3') }, content_type='multipart/form-data' ) if response.status_code == 200: job_ids.append(response.get_json()['job_id']) # Try to process all concurrently processing_responses = [] for job_id in job_ids: response = client.post( f'/api/jobs/{job_id}/process', json={'word_list_id': 'default'} ) processing_responses.append(response.status_code) # Some might be rejected due to limits successful = sum(1 for status in processing_responses if status in [200, 202]) rejected = sum(1 for status in processing_responses if status == 429) # At least some should succeed assert successful > 0 class TestWorkflowPerformance: """Test performance characteristics of the workflow.""" def test_upload_performance(self, client): """Test upload performance with various file sizes.""" file_sizes = [1024, 10*1024, 100*1024, 1024*1024] # 1KB to 1MB for size in file_sizes: content = b'ID3' + b'\x00' * size start_time = time.time() response = client.post( '/api/upload', data={ 'file': (io.BytesIO(content), f'perf_test_{size}.mp3') }, content_type='multipart/form-data' ) upload_time = time.time() - start_time if response.status_code == 200: # Upload should complete reasonably quickly assert upload_time < 10.0 # 10 seconds max # Larger files should take proportionally longer # But this depends on network/disk speed def test_processing_timeout_handling(self, client, socketio_client): """Test handling of processing timeouts.""" # Upload file audio_content = b'ID3' + b'\x00' * 1000 upload_response = client.post( '/api/upload', data={ 'file': (io.BytesIO(audio_content), 'timeout_test.mp3') }, content_type='multipart/form-data' ) if upload_response.status_code == 200: job_id = upload_response.get_json()['job_id'] # Join room for updates socketio_client.emit('join_job', {'job_id': job_id}) # Simulate timeout socketio_client.emit('job_error', { 'job_id': job_id, 'error_type': 'timeout', 'error_message': 'Processing timed out', 'recoverable': True }) # Should receive timeout error received = socketio_client.get_received() timeout_errors = [ msg for msg in received if msg['name'] == 'job_error' and 'timeout' in msg['args'][0].get('error_type', '') ] assert len(timeout_errors) >= 1 if __name__ == '__main__': pytest.main([__file__, '-v'])