#!/usr/bin/env python3 """ Adaptive Chunk Sizing for Transcription Optimization. Dynamically adjusts chunk size based on audio characteristics for 1.5-2x improvement. Keeps under 300 LOC as per project guidelines. """ import numpy as np from typing import List, Optional, Tuple, Dict, Any from dataclasses import dataclass from enum import Enum import logging logger = logging.getLogger(__name__) class ChunkingStrategy(Enum): """Strategy for chunking audio.""" TIME_BASED = "time_based" SILENCE_BASED = "silence_based" ENERGY_BASED = "energy_based" HYBRID = "hybrid" @dataclass class AudioCharacteristics: """Characteristics of audio for adaptive chunking.""" duration: float has_silence_patterns: bool silence_segments: List[Tuple[float, float]] speech_density: float average_segment_length: float energy_profile: Optional[np.ndarray] = None @dataclass class ChunkInfo: """Information about an audio chunk.""" start_sample: int end_sample: int start_time: float end_time: float duration: float overlap_duration: float confidence: float split_at_silence: bool strategy_used: ChunkingStrategy class AdaptiveChunker: """Adaptive chunk sizing based on audio characteristics.""" def __init__( self, min_chunk_seconds: float = 10, max_chunk_seconds: float = 60, prefer_silence_splits: bool = True, adaptive: bool = True, fixed_chunk_size: Optional[int] = None, progressive_sizing: bool = False ): """Initialize adaptive chunker with constraints.""" self.min_chunk_seconds = min_chunk_seconds self.max_chunk_seconds = max_chunk_seconds self.prefer_silence_splits = prefer_silence_splits self.adaptive = adaptive self.fixed_chunk_size = fixed_chunk_size self.progressive_sizing = progressive_sizing self.silence_threshold = 0.01 def analyze_audio( self, audio: np.ndarray, sample_rate: int ) -> AudioCharacteristics: """Analyze audio to determine characteristics.""" duration = len(audio) / sample_rate # Detect silence segments silence_segments = self._detect_silence(audio, sample_rate) has_silence = len(silence_segments) > 0 # Calculate speech density silence_duration = sum(end - start for start, end in silence_segments) speech_density = 1.0 - (silence_duration / duration) if duration > 0 else 1.0 # Average segment length between silences if len(silence_segments) > 1: segment_lengths = [] for i in range(len(silence_segments) - 1): length = silence_segments[i+1][0] - silence_segments[i][1] segment_lengths.append(length) avg_segment = np.mean(segment_lengths) if segment_lengths else duration else: avg_segment = duration return AudioCharacteristics( duration=duration, has_silence_patterns=has_silence, silence_segments=silence_segments, speech_density=speech_density, average_segment_length=avg_segment ) def determine_chunk_size( self, duration_seconds: float, speech_density: float = 0.8 ) -> int: """Determine optimal chunk size based on duration and density.""" if not self.adaptive and self.fixed_chunk_size: return self.fixed_chunk_size # Base size on duration if duration_seconds <= 30: base_size = 10 elif duration_seconds <= 120: base_size = 20 elif duration_seconds <= 300: base_size = 30 elif duration_seconds <= 1200: base_size = 45 else: base_size = 60 # Adjust for speech density if speech_density > 0.9: # Dense speech - smaller chunks for better accuracy base_size = int(base_size * 0.8) elif speech_density < 0.5: # Sparse speech - larger chunks acceptable base_size = int(base_size * 1.2) # Apply constraints return max(self.min_chunk_seconds, min(base_size, self.max_chunk_seconds)) def create_adaptive_chunks( self, audio: np.ndarray, sample_rate: int, target_chunk_size: Optional[int] = None ) -> List[ChunkInfo]: """Create adaptive chunks based on audio characteristics.""" characteristics = self.analyze_audio(audio, sample_rate) if not self.adaptive: return self._create_fixed_chunks(audio, sample_rate, self.fixed_chunk_size or 30) # Select strategy strategy = self.select_strategy( characteristics.duration, characteristics.has_silence_patterns, characteristics.speech_density ) # Create chunks based on strategy if strategy == ChunkingStrategy.SILENCE_BASED and characteristics.has_silence_patterns: chunks = self._create_silence_based_chunks( audio, sample_rate, characteristics.silence_segments ) elif strategy == ChunkingStrategy.ENERGY_BASED: chunks = self._create_energy_based_chunks(audio, sample_rate) else: chunk_size = target_chunk_size or self.determine_chunk_size( characteristics.duration, characteristics.speech_density ) chunks = self._create_time_based_chunks(audio, sample_rate, chunk_size) return chunks def _detect_silence( self, audio: np.ndarray, sample_rate: int ) -> List[Tuple[float, float]]: """Detect silence segments in audio.""" window_size = int(0.1 * sample_rate) # 100ms windows silence_segments = [] # Calculate energy in windows for i in range(0, len(audio) - window_size, window_size): window = audio[i:i+window_size] energy = np.mean(np.abs(window)) if energy < self.silence_threshold: start_time = i / sample_rate end_time = (i + window_size) / sample_rate # Merge with previous segment if close if silence_segments and start_time - silence_segments[-1][1] < 0.5: silence_segments[-1] = (silence_segments[-1][0], end_time) else: silence_segments.append((start_time, end_time)) return silence_segments def _create_silence_based_chunks( self, audio: np.ndarray, sample_rate: int, silence_segments: List[Tuple[float, float]] ) -> List[ChunkInfo]: """Create chunks split at silence boundaries.""" chunks = [] current_start = 0 for silence_start, silence_end in silence_segments: silence_start_sample = int(silence_start * sample_rate) # Create chunk up to silence if silence_start_sample > current_start: chunk_duration = (silence_start_sample - current_start) / sample_rate # Only create chunk if it's meaningful if chunk_duration > self.min_chunk_seconds: overlap = self.determine_overlap(chunk_duration) chunks.append(ChunkInfo( start_sample=current_start, end_sample=silence_start_sample, start_time=current_start / sample_rate, end_time=silence_start_sample / sample_rate, duration=chunk_duration, overlap_duration=overlap, confidence=0.95, split_at_silence=True, strategy_used=ChunkingStrategy.SILENCE_BASED )) current_start = max(current_start, silence_start_sample - int(overlap * sample_rate)) # Handle remaining audio if current_start < len(audio): remaining_duration = (len(audio) - current_start) / sample_rate if remaining_duration > 1: # At least 1 second chunks.append(ChunkInfo( start_sample=current_start, end_sample=len(audio), start_time=current_start / sample_rate, end_time=len(audio) / sample_rate, duration=remaining_duration, overlap_duration=0, confidence=0.9, split_at_silence=False, strategy_used=ChunkingStrategy.SILENCE_BASED )) return chunks if chunks else self._create_time_based_chunks(audio, sample_rate, 30) def _create_time_based_chunks( self, audio: np.ndarray, sample_rate: int, chunk_size: int ) -> List[ChunkInfo]: """Create fixed-time chunks.""" chunks = [] chunk_samples = int(chunk_size * sample_rate) overlap = self.determine_overlap(chunk_size) overlap_samples = int(overlap * sample_rate) position = 0 while position < len(audio): end_pos = min(position + chunk_samples, len(audio)) chunks.append(ChunkInfo( start_sample=position, end_sample=end_pos, start_time=position / sample_rate, end_time=end_pos / sample_rate, duration=(end_pos - position) / sample_rate, overlap_duration=overlap if end_pos < len(audio) else 0, confidence=0.85, split_at_silence=False, strategy_used=ChunkingStrategy.TIME_BASED )) position = end_pos - overlap_samples if end_pos < len(audio) else end_pos return chunks def _create_fixed_chunks( self, audio: np.ndarray, sample_rate: int, chunk_size: int ) -> List[ChunkInfo]: """Create fixed-size chunks (non-adaptive).""" return self._create_time_based_chunks(audio, sample_rate, chunk_size) def _create_energy_based_chunks( self, audio: np.ndarray, sample_rate: int ) -> List[ChunkInfo]: """Create chunks based on energy valleys.""" valleys = self.find_energy_valleys(audio, sample_rate) if not valleys: return self._create_time_based_chunks(audio, sample_rate, 30) chunks = [] current_start = 0 for valley in valleys: if valley > current_start + self.min_chunk_seconds * sample_rate: chunks.append(ChunkInfo( start_sample=current_start, end_sample=valley, start_time=current_start / sample_rate, end_time=valley / sample_rate, duration=(valley - current_start) / sample_rate, overlap_duration=self.determine_overlap((valley - current_start) / sample_rate), confidence=0.9, split_at_silence=False, strategy_used=ChunkingStrategy.ENERGY_BASED )) current_start = valley return chunks def determine_overlap(self, chunk_size: float) -> float: """Determine overlap duration based on chunk size.""" if chunk_size <= 15: return 1.0 elif chunk_size <= 30: return 1.5 elif chunk_size <= 45: return 2.0 else: return 3.0 def select_strategy( self, duration_seconds: float, has_silence: bool, speech_density: float = 0.8 ) -> ChunkingStrategy: """Select optimal chunking strategy.""" if duration_seconds < 60: return ChunkingStrategy.TIME_BASED elif has_silence and duration_seconds > 300: return ChunkingStrategy.SILENCE_BASED elif has_silence and speech_density > 0.85: return ChunkingStrategy.HYBRID else: return ChunkingStrategy.TIME_BASED def find_energy_valleys( self, audio: np.ndarray, sample_rate: int ) -> List[int]: """Find low-energy points suitable for splitting.""" window_size = int(0.5 * sample_rate) # 500ms windows valleys = [] for i in range(window_size, len(audio) - window_size, window_size): before = np.mean(np.abs(audio[i-window_size:i])) current = np.mean(np.abs(audio[i-100:i+100])) after = np.mean(np.abs(audio[i:i+window_size])) # Valley if current is lower than surroundings if current < before * 0.3 and current < after * 0.3: valleys.append(i) return valleys def plan_progressive_chunks(self, duration_seconds: float) -> List[Dict[str, Any]]: """Plan progressive chunk sizing for long audio.""" if not self.progressive_sizing: size = self.determine_chunk_size(duration_seconds) return [{'size': size, 'start': i*size} for i in range(int(duration_seconds // size))] chunks = [] sizes = [20, 25, 30, 40, 50, 60] # Progressive sizes position = 0 for i, size in enumerate(sizes * (int(duration_seconds // sum(sizes)) + 1)): if position >= duration_seconds: break chunks.append({'size': size, 'start': position}) position += size return chunks def calculate_fixed_chunks( self, duration: float, chunk_size: float, overlap: float ) -> List[Dict]: """Calculate fixed chunks for comparison.""" chunks = [] position = 0 while position < duration: chunks.append({'start': position, 'size': chunk_size, 'overlap': overlap}) position += chunk_size - overlap return chunks def calculate_adaptive_chunks(self, duration: float) -> List[Dict]: """Calculate adaptive chunks with variable parameters.""" chunks = [] position = 0 while position < duration: remaining = duration - position size = self.determine_chunk_size(remaining) overlap = self.determine_overlap(size) if position + size < duration else 0 chunks.append({'start': position, 'size': size, 'overlap': overlap}) position += size - overlap return chunks def estimate_memory_usage( self, audio_size_mb: float, strategy: str, chunk_size: int = 30 ) -> float: """Estimate peak memory usage for processing strategy.""" if strategy == 'fixed': # Fixed strategy loads multiple chunks in memory return chunk_size / 60 * audio_size_mb * 2 # 2x for processing overhead else: # Adaptive strategy optimizes memory usage return audio_size_mb * 0.3 # Only current chunk + overhead