trax/docs/architecture/audio-processing.md

541 lines
17 KiB
Markdown

# Audio Processing Architecture
## Overview
The audio processing pipeline handles the critical first step: converting various media formats into optimized audio suitable for transcription. This architecture ensures consistent, high-quality input for the Whisper model.
## Pipeline Stages
### Stage 1: Media Download/Acquisition
```python
class MediaAcquisition:
"""Handle media from various sources"""
async def acquire(self, source: str) -> Path:
if source.startswith(('http://', 'https://')):
return await self.download_media(source)
elif Path(source).exists():
return Path(source)
else:
raise ValueError(f"Invalid source: {source}")
async def download_media(self, url: str) -> Path:
"""Download with progress tracking"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
total_size = int(response.headers.get('content-length', 0))
# Stream to temporary file
temp_file = Path(tempfile.mktemp(suffix='.tmp'))
with open(temp_file, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
f.write(chunk)
await self.update_progress(f.tell(), total_size)
return temp_file
```
### Stage 2: Format Detection & Validation
```python
class FormatValidator:
"""Validate and identify media formats"""
SUPPORTED_FORMATS = {
'video': ['.mp4', '.avi', '.mov', '.mkv', '.webm'],
'audio': ['.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a']
}
def validate_format(self, file_path: Path) -> MediaInfo:
"""Extract media information"""
probe = ffmpeg.probe(str(file_path))
# Check for audio stream
audio_streams = [
s for s in probe['streams']
if s['codec_type'] == 'audio'
]
if not audio_streams:
raise ValueError("No audio stream found")
stream = audio_streams[0]
return MediaInfo(
format=probe['format']['format_name'],
duration=float(probe['format']['duration']),
sample_rate=int(stream['sample_rate']),
channels=int(stream['channels']),
codec=stream['codec_name'],
bitrate=int(stream.get('bit_rate', 0))
)
```
### Stage 3: Audio Extraction
```python
class AudioExtractor:
"""Extract audio from video files"""
async def extract_audio(self, video_path: Path) -> Path:
"""Extract audio track from video"""
output_path = video_path.with_suffix('.extracted.wav')
# FFmpeg extraction command
command = (
ffmpeg
.input(str(video_path))
.output(
str(output_path),
acodec='pcm_s16le', # 16-bit PCM
ar=16000, # 16kHz sample rate
ac=1, # Mono
loglevel='error'
)
.overwrite_output()
)
# Run async
process = await asyncio.create_subprocess_exec(
'ffmpeg', *command.compile(),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise ProcessingError(f"FFmpeg failed: {stderr.decode()}")
return output_path
```
### Stage 4: Audio Preprocessing
```python
class AudioPreprocessor:
"""Optimize audio for transcription"""
def __init__(self):
self.target_sample_rate = 16000
self.target_channels = 1 # Mono
self.target_format = 'wav'
async def preprocess(self, audio_path: Path) -> Path:
"""Full preprocessing pipeline"""
# Load audio
audio, sr = librosa.load(
str(audio_path),
sr=self.target_sample_rate,
mono=True
)
# Apply preprocessing chain
audio = self.remove_silence(audio, sr)
audio = self.normalize_volume(audio)
audio = self.apply_noise_reduction(audio, sr)
audio = self.compress_dynamic_range(audio)
# Save processed audio
output_path = audio_path.with_suffix('.preprocessed.wav')
sf.write(output_path, audio, sr, subtype='PCM_16')
return output_path
def remove_silence(self, audio: np.ndarray, sr: int) -> np.ndarray:
"""Remove leading/trailing silence"""
# Use librosa's trim function
trimmed, _ = librosa.effects.trim(
audio,
top_db=20, # Threshold in dB
frame_length=2048,
hop_length=512
)
return trimmed
def normalize_volume(self, audio: np.ndarray) -> np.ndarray:
"""Normalize to consistent volume"""
# Peak normalization to -3dB
peak = np.abs(audio).max()
if peak > 0:
target_peak = 10 ** (-3 / 20) # -3dB in linear scale
audio = audio * (target_peak / peak)
return audio
def apply_noise_reduction(self, audio: np.ndarray, sr: int) -> np.ndarray:
"""Reduce background noise"""
# Simple spectral gating
D = librosa.stft(audio)
magnitude = np.abs(D)
# Estimate noise floor (bottom 10%)
noise_floor = np.percentile(magnitude, 10)
# Gate frequencies below noise floor
mask = magnitude > noise_floor * 1.5
D_gated = D * mask
# Reconstruct audio
audio_denoised = librosa.istft(D_gated)
return audio_denoised
def compress_dynamic_range(self, audio: np.ndarray) -> np.ndarray:
"""Apply gentle compression"""
# Simple compression algorithm
threshold = 0.7
ratio = 4.0
# Apply compression to peaks
mask = np.abs(audio) > threshold
compressed = audio.copy()
compressed[mask] = np.sign(audio[mask]) * (
threshold + (np.abs(audio[mask]) - threshold) / ratio
)
return compressed
```
### Stage 5: Chunking for Long Audio
```python
class AudioChunker:
"""Split long audio files for processing"""
def __init__(self, chunk_duration: int = 600): # 10 minutes
self.chunk_duration = chunk_duration
self.overlap = 2 # 2 second overlap
async def chunk_audio(self, audio_path: Path) -> List[AudioChunk]:
"""Split audio into overlapping chunks"""
# Get duration
info = await self.get_audio_info(audio_path)
duration = info.duration
if duration <= self.chunk_duration:
# No chunking needed
return [AudioChunk(
path=audio_path,
start=0,
end=duration,
index=0
)]
# Calculate chunks
chunks = []
chunk_size = self.chunk_duration
step = chunk_size - self.overlap
for i, start in enumerate(range(0, int(duration), step)):
end = min(start + chunk_size, duration)
# Extract chunk
chunk_path = await self.extract_chunk(
audio_path, start, end - start, i
)
chunks.append(AudioChunk(
path=chunk_path,
start=start,
end=end,
index=i
))
if end >= duration:
break
return chunks
async def extract_chunk(
self,
audio_path: Path,
start: float,
duration: float,
index: int
) -> Path:
"""Extract a specific chunk"""
output_path = audio_path.parent / f"{audio_path.stem}_chunk_{index:03d}.wav"
command = (
ffmpeg
.input(str(audio_path), ss=start, t=duration)
.output(str(output_path), acodec='copy')
.overwrite_output()
)
await self.run_ffmpeg(command)
return output_path
```
## Quality Assurance
### Audio Quality Metrics
```python
class AudioQualityAnalyzer:
"""Analyze audio quality metrics"""
def analyze(self, audio_path: Path) -> QualityReport:
audio, sr = librosa.load(str(audio_path))
return QualityReport(
snr=self.calculate_snr(audio),
silence_ratio=self.calculate_silence_ratio(audio),
clipping_ratio=self.calculate_clipping(audio),
frequency_range=self.analyze_frequency_range(audio, sr),
recommended_action=self.recommend_action(audio, sr)
)
def calculate_snr(self, audio: np.ndarray) -> float:
"""Signal-to-noise ratio in dB"""
# Use robust estimator
signal_power = np.median(audio ** 2)
noise_power = np.median((audio - np.median(audio)) ** 2)
if noise_power > 0:
snr = 10 * np.log10(signal_power / noise_power)
else:
snr = float('inf')
return snr
def calculate_silence_ratio(self, audio: np.ndarray) -> float:
"""Percentage of silence in audio"""
threshold = 0.01 # Silence threshold
silence_samples = np.sum(np.abs(audio) < threshold)
return silence_samples / len(audio)
def calculate_clipping(self, audio: np.ndarray) -> float:
"""Percentage of clipped samples"""
clipping_threshold = 0.99
clipped = np.sum(np.abs(audio) > clipping_threshold)
return clipped / len(audio)
```
## Performance Optimization
### Parallel Processing
```python
class ParallelAudioProcessor:
"""Process multiple audio files in parallel"""
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self.semaphore = asyncio.Semaphore(max_workers)
async def process_batch(self, audio_files: List[Path]) -> List[Path]:
"""Process multiple files concurrently"""
tasks = [
self.process_with_limit(audio_file)
for audio_file in audio_files
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle errors
processed = []
for result, audio_file in zip(results, audio_files):
if isinstance(result, Exception):
logger.error(f"Failed to process {audio_file}: {result}")
else:
processed.append(result)
return processed
async def process_with_limit(self, audio_file: Path) -> Path:
"""Process with concurrency limit"""
async with self.semaphore:
return await self.process_single(audio_file)
```
### Caching Preprocessed Audio
```python
class PreprocessedAudioCache:
"""Cache preprocessed audio files"""
def __init__(self, cache_dir: Path):
self.cache_dir = cache_dir
self.cache_dir.mkdir(exist_ok=True)
def get_cache_path(self, original_path: Path) -> Path:
"""Generate cache file path"""
file_hash = self.calculate_hash(original_path)
return self.cache_dir / f"{file_hash}.preprocessed.wav"
async def get_or_process(
self,
audio_path: Path,
processor: AudioPreprocessor
) -> Path:
"""Get from cache or process"""
cache_path = self.get_cache_path(audio_path)
if cache_path.exists():
# Verify cache is newer than source
if cache_path.stat().st_mtime > audio_path.stat().st_mtime:
logger.info(f"Using cached preprocessed audio: {cache_path}")
return cache_path
# Process and cache
processed = await processor.preprocess(audio_path)
shutil.copy2(processed, cache_path)
return cache_path
```
## Error Handling
### Common Audio Issues
```python
class AudioErrorHandler:
"""Handle common audio processing errors"""
async def handle_processing_error(
self,
error: Exception,
audio_path: Path
) -> Optional[Path]:
"""Attempt recovery from errors"""
if isinstance(error, CorruptedFileError):
# Try to repair with FFmpeg
return await self.repair_corrupted_file(audio_path)
elif isinstance(error, UnsupportedFormatError):
# Try alternative extraction method
return await self.extract_with_alternative_method(audio_path)
elif isinstance(error, SilentAudioError):
# Audio is completely silent
logger.warning(f"Audio file is silent: {audio_path}")
return None
else:
# Unknown error
logger.error(f"Unhandled error: {error}")
raise
async def repair_corrupted_file(self, audio_path: Path) -> Path:
"""Attempt to repair corrupted audio"""
repaired_path = audio_path.with_suffix('.repaired.wav')
# Use FFmpeg's error correction
command = (
ffmpeg
.input(str(audio_path), err_detect='aggressive')
.output(
str(repaired_path),
acodec='pcm_s16le',
ar=16000,
ac=1
)
.global_args('-xerror') # Exit on error
.overwrite_output()
)
try:
await self.run_ffmpeg(command)
return repaired_path
except Exception:
raise RepairFailedError(f"Could not repair {audio_path}")
```
## Testing Strategy
### Audio Processing Tests
```python
# tests/test_audio_processing.py
class TestAudioProcessing:
@pytest.fixture
def test_audio_files(self):
"""Provide test audio files"""
return {
'clean': Path('tests/fixtures/audio/clean_speech.wav'),
'noisy': Path('tests/fixtures/audio/noisy_speech.wav'),
'music': Path('tests/fixtures/audio/music_and_speech.mp3'),
'silent': Path('tests/fixtures/audio/silent.wav'),
'corrupted': Path('tests/fixtures/audio/corrupted.mp4')
}
async def test_preprocessing_improves_quality(self, test_audio_files):
"""Test that preprocessing improves audio quality"""
processor = AudioPreprocessor()
original = test_audio_files['noisy']
processed = await processor.preprocess(original)
# Analyze both
original_quality = AudioQualityAnalyzer().analyze(original)
processed_quality = AudioQualityAnalyzer().analyze(processed)
# Should improve SNR
assert processed_quality.snr > original_quality.snr
# Should reduce silence
assert processed_quality.silence_ratio < original_quality.silence_ratio
async def test_chunking_preserves_content(self, test_audio_files):
"""Test that chunking doesn't lose content"""
chunker = AudioChunker(chunk_duration=30) # 30 second chunks
original = test_audio_files['clean']
chunks = await chunker.chunk_audio(original)
# Verify coverage
original_duration = get_duration(original)
chunk_coverage = sum(c.end - c.start for c in chunks)
# Should cover entire file (with overlaps)
assert chunk_coverage >= original_duration
# Verify overlap
for i in range(len(chunks) - 1):
assert chunks[i].end > chunks[i + 1].start # Overlap exists
```
## Configuration
### Audio Processing Settings
```python
# config/audio.py
AUDIO_CONFIG = {
# Target format for Whisper
'target_sample_rate': 16000,
'target_channels': 1,
'target_format': 'wav',
'target_bit_depth': 16,
# Preprocessing
'remove_silence': True,
'silence_threshold_db': 20,
'normalize_volume': True,
'target_peak_db': -3,
'apply_noise_reduction': True,
'noise_gate_ratio': 1.5,
# Chunking
'max_chunk_duration': 600, # 10 minutes
'chunk_overlap': 2, # seconds
# Quality thresholds
'min_snr_db': 10,
'max_silence_ratio': 0.8,
'max_clipping_ratio': 0.01,
# Performance
'max_parallel_processing': 4,
'cache_preprocessed': True,
'cache_directory': '/tmp/trax/audio_cache'
}
```
## Summary
The audio processing architecture ensures:
1. **Format flexibility** - Handle any media format
2. **Quality optimization** - Improve audio for transcription
3. **Reliability** - Handle errors gracefully
4. **Performance** - Parallel processing and caching
5. **Testability** - Comprehensive test coverage
This foundation enables accurate, efficient transcription across diverse media sources.
---
*Last Updated: 2024*
*Architecture Version: 1.0*