228 lines
8.0 KiB
Python
228 lines
8.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script for audio processing functionality.
|
|
Creates a simple test audio file and verifies censorship methods.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from src.core import AudioUtils, AudioProcessor, ProcessingOptions, CensorshipMethod
|
|
from pydub import AudioSegment
|
|
from pydub.generators import Sine
|
|
import tempfile
|
|
import logging
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def create_test_audio(duration_seconds: float = 5.0, frequency: int = 440) -> str:
|
|
"""Create a test audio file with a sine wave."""
|
|
# Generate sine wave
|
|
duration_ms = int(duration_seconds * 1000)
|
|
audio = Sine(frequency).to_audio_segment(duration=duration_ms)
|
|
|
|
# Save to temporary file
|
|
temp_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
|
|
audio.export(temp_file.name, format="wav")
|
|
|
|
logger.info(f"Created test audio file: {temp_file.name}")
|
|
return temp_file.name
|
|
|
|
|
|
def test_audio_utils():
|
|
"""Test AudioUtils class functionality."""
|
|
print("\n=== Testing AudioUtils ===")
|
|
|
|
audio_utils = AudioUtils()
|
|
|
|
# Create test audio
|
|
test_file = create_test_audio(duration_seconds=3.0)
|
|
|
|
try:
|
|
# Test file validation
|
|
print("\n1. Testing file validation...")
|
|
validation = audio_utils.validate_audio_file(test_file)
|
|
assert validation["valid"], f"Validation failed: {validation['errors']}"
|
|
print(f" ✓ File valid: duration={validation['duration']:.2f}s, "
|
|
f"sample_rate={validation['sample_rate']}Hz")
|
|
|
|
# Test audio info extraction
|
|
print("\n2. Testing audio info extraction...")
|
|
info = audio_utils.get_audio_info(test_file)
|
|
assert info["duration"] > 0, "Duration should be positive"
|
|
print(f" ✓ Audio info: {info['duration']:.2f}s, "
|
|
f"{info['channels']} channels, {info['sample_rate']}Hz")
|
|
|
|
# Test loading audio
|
|
print("\n3. Testing audio loading...")
|
|
audio = audio_utils.load_audio(test_file)
|
|
assert len(audio) > 0, "Audio should have content"
|
|
print(f" ✓ Loaded audio: {len(audio)}ms duration")
|
|
|
|
# Test silence detection
|
|
print("\n4. Testing silence detection...")
|
|
silent_segments = audio_utils.detect_silence(test_file)
|
|
print(f" ✓ Found {len(silent_segments)} silent segments")
|
|
|
|
# Test censorship methods
|
|
segments_to_censor = [(0.5, 1.0), (1.5, 2.0)]
|
|
|
|
print("\n5. Testing silence censorship...")
|
|
censored = audio_utils.apply_censorship(audio, segments_to_censor, "silence")
|
|
assert len(censored) == len(audio), "Audio length should be preserved"
|
|
print(f" ✓ Applied silence to {len(segments_to_censor)} segments")
|
|
|
|
print("\n6. Testing beep censorship...")
|
|
censored = audio_utils.apply_censorship(audio, segments_to_censor, "beep", frequency=1000)
|
|
assert len(censored) == len(audio), "Audio length should be preserved"
|
|
print(f" ✓ Applied beep to {len(segments_to_censor)} segments")
|
|
|
|
print("\n7. Testing white noise censorship...")
|
|
censored = audio_utils.apply_censorship(audio, segments_to_censor, "white_noise")
|
|
assert len(censored) == len(audio), "Audio length should be preserved"
|
|
print(f" ✓ Applied white noise to {len(segments_to_censor)} segments")
|
|
|
|
print("\n8. Testing fade censorship...")
|
|
censored = audio_utils.apply_censorship(audio, segments_to_censor, "fade")
|
|
assert len(censored) == len(audio), "Audio length should be preserved"
|
|
print(f" ✓ Applied fade to {len(segments_to_censor)} segments")
|
|
|
|
print("\n✅ AudioUtils tests passed!")
|
|
|
|
finally:
|
|
# Clean up
|
|
os.unlink(test_file)
|
|
|
|
|
|
def test_audio_processor():
|
|
"""Test AudioProcessor class functionality."""
|
|
print("\n=== Testing AudioProcessor ===")
|
|
|
|
processor = AudioProcessor()
|
|
|
|
# Create test audio
|
|
test_file = create_test_audio(duration_seconds=5.0)
|
|
output_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name
|
|
|
|
try:
|
|
# Define segments to censor (1.0-1.5s and 3.0-3.5s)
|
|
segments = [(1.0, 1.5), (3.0, 3.5)]
|
|
|
|
# Test with different censorship methods
|
|
methods = [
|
|
CensorshipMethod.SILENCE,
|
|
CensorshipMethod.BEEP,
|
|
CensorshipMethod.WHITE_NOISE,
|
|
CensorshipMethod.FADE
|
|
]
|
|
|
|
for method in methods:
|
|
print(f"\n1. Testing {method.value} censorship...")
|
|
|
|
options = ProcessingOptions(
|
|
censorship_method=method,
|
|
beep_frequency=800,
|
|
normalize_output=True
|
|
)
|
|
|
|
def progress_callback(message, percent):
|
|
print(f" {percent:3d}% - {message}")
|
|
|
|
result = processor.process_audio(
|
|
test_file,
|
|
output_file,
|
|
segments,
|
|
options,
|
|
progress_callback
|
|
)
|
|
|
|
assert result.success, f"Processing failed: {result.error}"
|
|
assert result.segments_censored == len(segments)
|
|
assert os.path.exists(output_file)
|
|
|
|
print(f" ✓ Processed with {method.value}: "
|
|
f"{result.segments_censored} segments censored in "
|
|
f"{result.processing_time:.2f}s")
|
|
|
|
# Clean up output file
|
|
os.unlink(output_file)
|
|
|
|
# Test segment validation
|
|
print("\n2. Testing segment validation...")
|
|
test_segments = [
|
|
(0.5, 1.0), # Valid
|
|
(2.0, 1.5), # Invalid: start > end
|
|
(10.0, 11.0), # Beyond duration
|
|
(1.8, 2.2), # Valid
|
|
]
|
|
|
|
cleaned, warnings = processor.validate_segments(test_segments, 5.0)
|
|
print(f" ✓ Validated segments: {len(cleaned)} valid, {len(warnings)} warnings")
|
|
for warning in warnings:
|
|
print(f" ⚠ {warning}")
|
|
|
|
# Test batch processing
|
|
print("\n3. Testing batch processing...")
|
|
batch_files = [
|
|
(test_file, "output1.wav", [(0.5, 1.0)]),
|
|
(test_file, "output2.wav", [(1.0, 1.5), (2.0, 2.5)]),
|
|
]
|
|
|
|
results = processor.process_batch(batch_files, ProcessingOptions())
|
|
assert all(r.success for r in results), "Batch processing should succeed"
|
|
print(f" ✓ Batch processed {len(results)} files")
|
|
|
|
# Clean up batch outputs
|
|
for _, output, _ in batch_files:
|
|
if os.path.exists(output):
|
|
os.unlink(output)
|
|
|
|
print("\n✅ AudioProcessor tests passed!")
|
|
|
|
finally:
|
|
# Clean up
|
|
os.unlink(test_file)
|
|
if os.path.exists(output_file):
|
|
os.unlink(output_file)
|
|
|
|
|
|
def test_dependency_check():
|
|
"""Test dependency checking."""
|
|
print("\n=== Testing Dependencies ===")
|
|
|
|
processor = AudioProcessor()
|
|
deps = processor.check_dependencies()
|
|
|
|
for dep, available in deps.items():
|
|
status = "✓" if available else "✗"
|
|
print(f" {status} {dep}: {'Available' if available else 'Not found'}")
|
|
|
|
if not deps["ffmpeg"]:
|
|
print("\n ⚠ Warning: ffmpeg not found. Install with:")
|
|
print(" macOS: brew install ffmpeg")
|
|
print(" Linux: apt-get install ffmpeg")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("Clean-Tracks Audio Processing Test Suite")
|
|
print("=" * 50)
|
|
|
|
try:
|
|
test_dependency_check()
|
|
test_audio_utils()
|
|
test_audio_processor()
|
|
|
|
print("\n" + "=" * 50)
|
|
print("✅ All tests passed successfully!")
|
|
|
|
except AssertionError as e:
|
|
print(f"\n❌ Test failed: {e}")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"\n❌ Unexpected error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1) |