import re from typing import List, Dict, Any, Optional from backend.models.transcript import TranscriptChunk, TranscriptSegment class TranscriptProcessor: """Process and clean transcript data for AI consumption""" def __init__(self): self.chunk_overlap = 100 # Characters to overlap between chunks def clean_transcript(self, raw_transcript: str) -> str: """ Clean and format raw transcript data. Args: raw_transcript: Raw transcript text Returns: Cleaned transcript text """ if not raw_transcript: return "" # Remove multiple spaces cleaned = re.sub(r'\s+', ' ', raw_transcript) # Fix common OCR/speech recognition errors replacements = { ' i ': ' I ', ' im ': " I'm ", ' dont ': " don't ", ' wont ': " won't ", ' cant ': " can't ", ' youre ': " you're ", ' theyre ': " they're ", ' were ': " we're ", ' its ': " it's ", } for old, new in replacements.items(): cleaned = cleaned.replace(old, new) # Ensure sentences end with proper punctuation cleaned = re.sub(r'([a-z])(\s+)([A-Z])', r'\1.\2\3', cleaned) # Remove extra whitespace cleaned = cleaned.strip() # Fix multiple punctuation cleaned = re.sub(r'([.!?])\1+', r'\1', cleaned) return cleaned def chunk_transcript(self, transcript: str, max_tokens: int = 3000) -> List[TranscriptChunk]: """ Split transcript into manageable chunks for AI processing. Args: transcript: Full transcript text max_tokens: Maximum tokens per chunk (approximate) Returns: List of transcript chunks """ if not transcript: return [] # Approximate tokens as words * 1.3 max_words = int(max_tokens / 1.3) sentences = self._split_into_sentences(transcript) chunks = [] current_chunk = [] current_word_count = 0 chunk_index = 0 for sentence in sentences: sentence_words = len(sentence.split()) # If adding this sentence would exceed limit, create new chunk if current_word_count + sentence_words > max_words and current_chunk: chunk_text = ' '.join(current_chunk) chunks.append(TranscriptChunk( chunk_index=chunk_index, text=chunk_text, token_count=int(len(chunk_text.split()) * 1.3) )) # Start new chunk with overlap (last sentence of previous chunk) if chunks and current_chunk: current_chunk = [current_chunk[-1]] if len(current_chunk) > 0 else [] current_word_count = len(current_chunk[0].split()) if current_chunk else 0 else: current_chunk = [] current_word_count = 0 chunk_index += 1 current_chunk.append(sentence) current_word_count += sentence_words # Add final chunk if current_chunk: chunk_text = ' '.join(current_chunk) chunks.append(TranscriptChunk( chunk_index=chunk_index, text=chunk_text, token_count=int(len(chunk_text.split()) * 1.3) )) return chunks def _split_into_sentences(self, text: str) -> List[str]: """Split text into sentences""" # Simple sentence splitting (can be improved with NLTK) sentences = re.split(r'(?<=[.!?])\s+', text) return [s.strip() for s in sentences if s.strip()] def extract_key_moments(self, segments: List[TranscriptSegment], min_duration: float = 5.0) -> List[Dict[str, Any]]: """ Extract key moments from timestamped segments. Args: segments: List of transcript segments with timestamps min_duration: Minimum duration for a key moment Returns: List of key moments with timestamps """ if not segments: return [] key_moments = [] current_moment = { "text": "", "start": 0.0, "end": 0.0 } for segment in segments: # If this segment starts a new topic (simple heuristic) if self._is_topic_transition(segment.text): if current_moment["text"] and \ (current_moment["end"] - current_moment["start"]) >= min_duration: key_moments.append(current_moment) current_moment = { "text": segment.text, "start": segment.start, "end": segment.end } else: # Continue current moment if not current_moment["text"]: current_moment["start"] = segment.start current_moment["text"] += " " + segment.text current_moment["end"] = segment.end # Add final moment if current_moment["text"] and \ (current_moment["end"] - current_moment["start"]) >= min_duration: key_moments.append(current_moment) return key_moments def _is_topic_transition(self, text: str) -> bool: """Detect if text indicates a topic transition""" transition_phrases = [ "first", "second", "third", "next", "now", "let's", "moving on", "another", "finally", "in conclusion", "to summarize", "let me", "I want to", "the key" ] text_lower = text.lower() return any(phrase in text_lower for phrase in transition_phrases) def format_with_timestamps(self, segments: List[TranscriptSegment]) -> str: """ Format transcript with timestamps. Args: segments: List of transcript segments Returns: Formatted transcript with timestamps """ if not segments: return "" formatted_lines = [] for segment in segments: timestamp = self._format_timestamp(segment.start) formatted_lines.append(f"[{timestamp}] {segment.text}") return "\n".join(formatted_lines) def _format_timestamp(self, seconds: float) -> str: """Format seconds to MM:SS or HH:MM:SS format""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) if hours > 0: return f"{hours:02d}:{minutes:02d}:{secs:02d}" else: return f"{minutes:02d}:{secs:02d}" def estimate_summary_length(self, transcript: str, compression_ratio: float = 0.2) -> int: """ Estimate appropriate summary length based on transcript. Args: transcript: Full transcript text compression_ratio: Target compression ratio (0.2 = 20% of original) Returns: Estimated summary word count """ word_count = len(transcript.split()) target_words = int(word_count * compression_ratio) # Set reasonable bounds min_words = 100 max_words = 1000 return max(min_words, min(target_words, max_words)) def detect_language(self, text: str) -> str: """ Simple language detection (mock implementation). Args: text: Text to analyze Returns: Language code (e.g., 'en', 'es', 'fr') """ # Mock implementation - always returns English # In production, use langdetect or similar library return "en" def extract_topics(self, transcript: str, max_topics: int = 5) -> List[str]: """ Extract main topics from transcript (mock implementation). Args: transcript: Full transcript text max_topics: Maximum number of topics to extract Returns: List of topic strings """ # Mock implementation - extract capitalized phrases as topics topics = [] # Find capitalized phrases (simple heuristic) pattern = r'[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*' matches = re.findall(pattern, transcript) # Filter and deduplicate seen = set() for match in matches: if len(match.split()) >= 2 and match not in seen: topics.append(match) seen.add(match) if len(topics) >= max_topics: break # If not enough topics found, add generic ones if len(topics) < 3: topics.extend(["Technology", "Innovation", "Development"][:3-len(topics)]) return topics