525 lines
17 KiB
Python
525 lines
17 KiB
Python
"""
|
|
Integration tests for API endpoints.
|
|
"""
|
|
|
|
import pytest
|
|
import json
|
|
import io
|
|
import time
|
|
from pathlib import Path
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
from werkzeug.datastructures import FileStorage
|
|
|
|
from src.api import create_app
|
|
|
|
|
|
class TestHealthEndpoints:
|
|
"""Test health check endpoints."""
|
|
|
|
def test_health_check(self, client):
|
|
"""Test basic health check endpoint."""
|
|
response = client.get('/api/health')
|
|
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert data['status'] == 'healthy'
|
|
assert 'timestamp' in data
|
|
assert 'version' in data
|
|
|
|
def test_readiness_check(self, client):
|
|
"""Test readiness check endpoint."""
|
|
response = client.get('/api/ready')
|
|
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert data['ready'] is True
|
|
assert 'checks' in data
|
|
assert 'database' in data['checks']
|
|
assert 'storage' in data['checks']
|
|
|
|
|
|
class TestUploadEndpoints:
|
|
"""Test file upload endpoints."""
|
|
|
|
def test_upload_audio_file(self, client, temp_dir):
|
|
"""Test uploading an audio file."""
|
|
# Create a test audio file
|
|
audio_content = b'ID3' + b'\x00' * 1000 # Minimal MP3
|
|
audio_file = FileStorage(
|
|
stream=io.BytesIO(audio_content),
|
|
filename='test.mp3',
|
|
content_type='audio/mpeg'
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/upload',
|
|
data={'file': audio_file},
|
|
content_type='multipart/form-data'
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert 'job_id' in data
|
|
assert 'filename' in data
|
|
assert data['filename'] == 'test.mp3'
|
|
assert 'status' in data
|
|
assert data['status'] == 'uploaded'
|
|
|
|
def test_upload_invalid_file(self, client):
|
|
"""Test uploading an invalid file type."""
|
|
# Create a text file
|
|
text_file = FileStorage(
|
|
stream=io.BytesIO(b'This is not audio'),
|
|
filename='test.txt',
|
|
content_type='text/plain'
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/upload',
|
|
data={'file': text_file},
|
|
content_type='multipart/form-data'
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
data = response.get_json()
|
|
assert 'error' in data
|
|
assert 'Invalid file type' in data['error']
|
|
|
|
def test_upload_oversized_file(self, client):
|
|
"""Test uploading a file that exceeds size limit."""
|
|
# Create a large file (over limit)
|
|
large_content = b'ID3' + b'\x00' * (501 * 1024 * 1024) # 501MB
|
|
large_file = FileStorage(
|
|
stream=io.BytesIO(large_content),
|
|
filename='large.mp3',
|
|
content_type='audio/mpeg'
|
|
)
|
|
|
|
response = client.post(
|
|
'/api/upload',
|
|
data={'file': large_file},
|
|
content_type='multipart/form-data'
|
|
)
|
|
|
|
assert response.status_code == 413
|
|
|
|
def test_upload_without_file(self, client):
|
|
"""Test upload endpoint without a file."""
|
|
response = client.post('/api/upload')
|
|
|
|
assert response.status_code == 400
|
|
data = response.get_json()
|
|
assert 'error' in data
|
|
assert 'No file provided' in data['error']
|
|
|
|
|
|
class TestProcessingEndpoints:
|
|
"""Test audio processing endpoints."""
|
|
|
|
@patch('src.api.routes.processing.AudioProcessor')
|
|
def test_process_job(self, mock_processor, client):
|
|
"""Test starting processing for a job."""
|
|
# Mock processor
|
|
mock_instance = Mock()
|
|
mock_instance.process_file.return_value = {
|
|
'words_detected': 5,
|
|
'words_censored': 5,
|
|
'audio_duration': 30.0,
|
|
'output_file': 'output.mp3',
|
|
'detected_words': []
|
|
}
|
|
mock_processor.return_value = mock_instance
|
|
|
|
# Start processing
|
|
response = client.post(
|
|
'/api/jobs/test-job-123/process',
|
|
json={
|
|
'word_list_id': 'default',
|
|
'censor_method': 'beep',
|
|
'min_severity': 'medium',
|
|
'whisper_model': 'base'
|
|
}
|
|
)
|
|
|
|
assert response.status_code in [200, 202]
|
|
data = response.get_json()
|
|
assert 'job_id' in data
|
|
assert 'status' in data
|
|
|
|
def test_process_invalid_job(self, client):
|
|
"""Test processing with invalid job ID."""
|
|
response = client.post(
|
|
'/api/jobs/invalid-job/process',
|
|
json={'word_list_id': 'default'}
|
|
)
|
|
|
|
assert response.status_code == 404
|
|
data = response.get_json()
|
|
assert 'error' in data
|
|
assert 'Job not found' in data['error']
|
|
|
|
@patch('src.api.routes.processing.JobManager')
|
|
def test_get_job_status(self, mock_manager, client):
|
|
"""Test getting job status."""
|
|
# Mock job manager
|
|
mock_job = Mock()
|
|
mock_job.to_dict.return_value = {
|
|
'job_id': 'test-job-123',
|
|
'status': 'processing',
|
|
'progress': 45.0,
|
|
'current_stage': 'transcription'
|
|
}
|
|
mock_manager.get_job.return_value = mock_job
|
|
|
|
response = client.get('/api/jobs/test-job-123/status')
|
|
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert data['job_id'] == 'test-job-123'
|
|
assert data['status'] == 'processing'
|
|
assert data['progress'] == 45.0
|
|
|
|
def test_cancel_job(self, client):
|
|
"""Test canceling a processing job."""
|
|
response = client.post('/api/jobs/test-job-123/cancel')
|
|
|
|
# Should return 200 or 204
|
|
assert response.status_code in [200, 204]
|
|
|
|
@patch('src.api.routes.processing.send_file')
|
|
def test_download_processed_file(self, mock_send_file, client):
|
|
"""Test downloading processed audio file."""
|
|
# Mock send_file
|
|
mock_send_file.return_value = Mock(status_code=200)
|
|
|
|
response = client.get('/api/jobs/test-job-123/download')
|
|
|
|
# Note: actual response depends on send_file implementation
|
|
assert mock_send_file.called
|
|
|
|
|
|
class TestWordListEndpoints:
|
|
"""Test word list management endpoints."""
|
|
|
|
def test_get_word_lists(self, client):
|
|
"""Test getting all word lists."""
|
|
response = client.get('/api/word-lists')
|
|
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert isinstance(data, list)
|
|
# Should have at least default list
|
|
assert len(data) >= 1
|
|
|
|
if data:
|
|
first_list = data[0]
|
|
assert 'id' in first_list
|
|
assert 'name' in first_list
|
|
assert 'word_count' in first_list
|
|
|
|
def test_get_specific_word_list(self, client):
|
|
"""Test getting a specific word list."""
|
|
response = client.get('/api/word-lists/default')
|
|
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert 'id' in data
|
|
assert 'name' in data
|
|
assert 'words' in data
|
|
assert isinstance(data['words'], list)
|
|
|
|
def test_create_word_list(self, client):
|
|
"""Test creating a new word list."""
|
|
response = client.post(
|
|
'/api/word-lists',
|
|
json={
|
|
'name': 'Test List',
|
|
'description': 'A test word list',
|
|
'words': [
|
|
{'word': 'testword1', 'severity': 'high', 'category': 'test'},
|
|
{'word': 'testword2', 'severity': 'medium', 'category': 'test'}
|
|
]
|
|
}
|
|
)
|
|
|
|
assert response.status_code in [200, 201]
|
|
data = response.get_json()
|
|
assert 'id' in data
|
|
assert data['name'] == 'Test List'
|
|
assert data['word_count'] == 2
|
|
|
|
def test_update_word_list(self, client):
|
|
"""Test updating a word list."""
|
|
response = client.put(
|
|
'/api/word-lists/test-list',
|
|
json={
|
|
'name': 'Updated Test List',
|
|
'words': [
|
|
{'word': 'newword', 'severity': 'low', 'category': 'test'}
|
|
]
|
|
}
|
|
)
|
|
|
|
assert response.status_code in [200, 404]
|
|
|
|
def test_delete_word_list(self, client):
|
|
"""Test deleting a word list."""
|
|
response = client.delete('/api/word-lists/test-list')
|
|
|
|
assert response.status_code in [204, 404]
|
|
|
|
def test_add_word_to_list(self, client):
|
|
"""Test adding a word to a list."""
|
|
response = client.post(
|
|
'/api/word-lists/default/words',
|
|
json={
|
|
'word': 'newbadword',
|
|
'severity': 'high',
|
|
'category': 'profanity'
|
|
}
|
|
)
|
|
|
|
assert response.status_code in [200, 201]
|
|
|
|
def test_remove_word_from_list(self, client):
|
|
"""Test removing a word from a list."""
|
|
response = client.delete('/api/word-lists/default/words/testword')
|
|
|
|
assert response.status_code in [204, 404]
|
|
|
|
|
|
class TestHistoryEndpoints:
|
|
"""Test processing history endpoints."""
|
|
|
|
def test_get_processing_history(self, client):
|
|
"""Test getting processing history."""
|
|
response = client.get('/api/history')
|
|
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert isinstance(data, list)
|
|
|
|
# Check structure if there are items
|
|
if data:
|
|
item = data[0]
|
|
assert 'job_id' in item
|
|
assert 'filename' in item
|
|
assert 'timestamp' in item
|
|
assert 'status' in item
|
|
|
|
def test_get_history_with_filters(self, client):
|
|
"""Test getting filtered history."""
|
|
response = client.get('/api/history?status=completed&limit=10')
|
|
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert isinstance(data, list)
|
|
assert len(data) <= 10
|
|
|
|
def test_get_history_item(self, client):
|
|
"""Test getting specific history item."""
|
|
response = client.get('/api/history/test-job-123')
|
|
|
|
# Could be 200 or 404 depending on whether job exists
|
|
assert response.status_code in [200, 404]
|
|
|
|
def test_delete_history_item(self, client):
|
|
"""Test deleting history item."""
|
|
response = client.delete('/api/history/test-job-123')
|
|
|
|
assert response.status_code in [204, 404]
|
|
|
|
|
|
class TestSettingsEndpoints:
|
|
"""Test user settings endpoints."""
|
|
|
|
def test_get_user_settings(self, client):
|
|
"""Test getting user settings."""
|
|
response = client.get('/api/settings')
|
|
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert 'processing' in data
|
|
assert 'privacy' in data
|
|
assert 'ui' in data
|
|
|
|
def test_update_user_settings(self, client):
|
|
"""Test updating user settings."""
|
|
response = client.put(
|
|
'/api/settings',
|
|
json={
|
|
'processing': {
|
|
'whisper_model_size': 'large',
|
|
'default_censor_method': 'silence'
|
|
},
|
|
'ui': {
|
|
'theme': 'dark',
|
|
'notifications_enabled': True
|
|
}
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert data['processing']['whisper_model_size'] == 'large'
|
|
assert data['ui']['theme'] == 'dark'
|
|
|
|
def test_reset_settings(self, client):
|
|
"""Test resetting settings to defaults."""
|
|
response = client.post('/api/settings/reset')
|
|
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert 'message' in data
|
|
assert 'reset' in data['message'].lower()
|
|
|
|
|
|
class TestBatchEndpoints:
|
|
"""Test batch processing endpoints."""
|
|
|
|
def test_create_batch_job(self, client):
|
|
"""Test creating a batch processing job."""
|
|
response = client.post(
|
|
'/api/batch',
|
|
json={
|
|
'job_ids': ['job1', 'job2', 'job3'],
|
|
'processing_options': {
|
|
'word_list_id': 'default',
|
|
'censor_method': 'beep'
|
|
}
|
|
}
|
|
)
|
|
|
|
assert response.status_code in [200, 201]
|
|
data = response.get_json()
|
|
assert 'batch_id' in data
|
|
assert 'total_jobs' in data
|
|
assert data['total_jobs'] == 3
|
|
|
|
def test_get_batch_status(self, client):
|
|
"""Test getting batch job status."""
|
|
response = client.get('/api/batch/batch-123/status')
|
|
|
|
assert response.status_code in [200, 404]
|
|
|
|
if response.status_code == 200:
|
|
data = response.get_json()
|
|
assert 'batch_id' in data
|
|
assert 'progress' in data
|
|
assert 'completed_jobs' in data
|
|
assert 'total_jobs' in data
|
|
|
|
|
|
class TestWebSocketIntegration:
|
|
"""Test WebSocket integration with API."""
|
|
|
|
def test_websocket_connection(self, socketio_client):
|
|
"""Test WebSocket connection."""
|
|
# Connect to WebSocket
|
|
received = socketio_client.get_received()
|
|
|
|
# Should receive connection confirmation
|
|
assert any(msg['name'] == 'connected' for msg in received)
|
|
|
|
def test_join_job_room(self, socketio_client):
|
|
"""Test joining a job-specific room."""
|
|
socketio_client.emit('join_job', {'job_id': 'test-job-123'})
|
|
|
|
received = socketio_client.get_received()
|
|
# Should receive room join confirmation
|
|
assert any(
|
|
msg['name'] == 'joined_job' and
|
|
msg['args'][0]['job_id'] == 'test-job-123'
|
|
for msg in received
|
|
)
|
|
|
|
def test_receive_progress_updates(self, socketio_client):
|
|
"""Test receiving progress updates."""
|
|
# Join a job room
|
|
socketio_client.emit('join_job', {'job_id': 'test-job-123'})
|
|
|
|
# Simulate progress update
|
|
socketio_client.emit('processing_progress', {
|
|
'job_id': 'test-job-123',
|
|
'progress': 50.0,
|
|
'stage': 'transcription'
|
|
})
|
|
|
|
received = socketio_client.get_received()
|
|
# Should receive progress update
|
|
progress_msgs = [
|
|
msg for msg in received
|
|
if msg['name'] == 'processing_progress'
|
|
]
|
|
assert len(progress_msgs) > 0
|
|
|
|
|
|
class TestErrorHandling:
|
|
"""Test API error handling."""
|
|
|
|
def test_404_error(self, client):
|
|
"""Test 404 error handling."""
|
|
response = client.get('/api/nonexistent')
|
|
|
|
assert response.status_code == 404
|
|
data = response.get_json()
|
|
assert 'error' in data
|
|
|
|
def test_method_not_allowed(self, client):
|
|
"""Test 405 error handling."""
|
|
response = client.post('/api/health') # Health is GET only
|
|
|
|
assert response.status_code == 405
|
|
data = response.get_json()
|
|
assert 'error' in data
|
|
|
|
def test_invalid_json(self, client):
|
|
"""Test handling of invalid JSON."""
|
|
response = client.post(
|
|
'/api/word-lists',
|
|
data='invalid json {',
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
data = response.get_json()
|
|
assert 'error' in data
|
|
|
|
def test_rate_limiting(self, client):
|
|
"""Test rate limiting (if implemented)."""
|
|
# Make many rapid requests
|
|
responses = []
|
|
for _ in range(100):
|
|
responses.append(client.get('/api/health'))
|
|
|
|
# Check if any were rate limited (429)
|
|
# Note: This depends on rate limiting being configured
|
|
status_codes = [r.status_code for r in responses]
|
|
# All should be successful if no rate limiting
|
|
assert all(code == 200 for code in status_codes)
|
|
|
|
|
|
class TestCORSHeaders:
|
|
"""Test CORS header configuration."""
|
|
|
|
def test_cors_headers_present(self, client):
|
|
"""Test that CORS headers are present."""
|
|
response = client.get('/api/health')
|
|
|
|
# Check for CORS headers
|
|
assert 'Access-Control-Allow-Origin' in response.headers
|
|
|
|
def test_cors_preflight(self, client):
|
|
"""Test CORS preflight request."""
|
|
response = client.options('/api/upload', headers={
|
|
'Origin': 'http://localhost:3000',
|
|
'Access-Control-Request-Method': 'POST',
|
|
'Access-Control-Request-Headers': 'Content-Type'
|
|
})
|
|
|
|
assert response.status_code == 200
|
|
assert 'Access-Control-Allow-Methods' in response.headers
|
|
|
|
|
|
if __name__ == '__main__':
|
|
pytest.main([__file__, '-v']) |