youtube-summarizer/backend/services/faster_whisper_transcript_s...

637 lines
26 KiB
Python

"""
Faster Whisper transcription service for YouTube videos.
Uses faster-whisper (CTranslate2) for 20-32x speed improvement over OpenAI Whisper.
Implements large-v3-turbo model for maximum accuracy and speed.
"""
import os
import logging
import tempfile
import asyncio
from datetime import datetime
from typing import List, Dict, Optional, Tuple, Union
from pathlib import Path
import torch
from faster_whisper import WhisperModel
from pydub import AudioSegment
import yt_dlp
import aiofiles
import aiohttp
from ..models.transcript import DualTranscriptSegment, DualTranscriptMetadata
from ..core.config import settings
from ..config.video_download_config import VideoDownloadConfig
logger = logging.getLogger(__name__)
class FasterWhisperTranscriptService:
"""
Service for transcribing YouTube videos using faster-whisper.
Provides 20-32x speed improvement over OpenAI Whisper while maintaining
or improving accuracy using the large-v3-turbo model.
"""
def __init__(
self,
model_size: str = "large-v3-turbo",
device: str = "auto",
compute_type: str = "auto",
beam_size: int = 5,
vad_filter: bool = True,
word_timestamps: bool = True,
temperature: float = 0.0,
best_of: int = 5
):
"""
Initialize the faster-whisper transcription service.
Args:
model_size: Model size ("large-v3-turbo", "large-v3", "large-v2", "medium", "small", "base", "tiny")
Recommended: "large-v3-turbo" for best speed/accuracy balance
device: Device to run on ("cpu", "cuda", "auto")
compute_type: Computation type ("int8", "float16", "float32", "auto")
"int8" provides best speed with minimal accuracy loss
"""
self.model_size = model_size
self.device = self._get_device(device)
self.compute_type = self._get_compute_type(compute_type)
self.model = None
# Configuration optimized for faster-whisper
self.chunk_duration = 30 * 60 # 30 minutes per chunk
self.overlap_duration = 30 # 30 seconds overlap between chunks
self.max_segment_length = 1000 # Maximum characters per segment
# Faster-whisper specific optimizations from parameters
self.vad_filter = vad_filter # Voice Activity Detection for efficiency
self.vad_parameters = dict(
min_silence_duration_ms=500,
speech_pad_ms=400,
)
# Batch processing configuration from parameters
self.beam_size = beam_size # Beam search size (1-10, higher = better quality, slower)
self.best_of = best_of # Number of candidates when sampling (None = deterministic)
self.temperature = temperature # Sampling temperature (0 = deterministic)
self.word_timestamps = word_timestamps # Enable word-level timestamps
# Use video storage configuration
self.config = VideoDownloadConfig()
self.config.ensure_directories()
self.storage_dirs = self.config.get_storage_dirs()
self.temp_dir = self.storage_dirs["temp"]
def _get_device(self, device: str) -> str:
"""Determine the appropriate device for processing."""
if device == "auto":
if torch.cuda.is_available():
logger.info("CUDA available, using GPU acceleration")
return "cuda"
else:
logger.info("CUDA not available, using CPU")
return "cpu"
return device
def _get_compute_type(self, compute_type: str) -> str:
"""Determine the appropriate compute type for the device."""
if compute_type == "auto":
if self.device == "cuda":
# Use float16 for GPU for best speed/memory balance
return "float16"
else:
# Use int8 for CPU for best speed
return "int8"
return compute_type
async def _load_model(self) -> WhisperModel:
"""Load the faster-whisper model on-demand."""
if self.model is None:
logger.info(f"Loading faster-whisper model '{self.model_size}' on device '{self.device}' with compute_type '{self.compute_type}'")
try:
# Run model loading in executor to avoid blocking async loop
loop = asyncio.get_event_loop()
# Handle special model names
model_name = self.model_size
if model_name == "large-v3-turbo":
# Use the optimized CTranslate2 model
model_name = "deepdml/faster-whisper-large-v3-turbo-ct2"
self.model = await loop.run_in_executor(
None,
lambda: WhisperModel(
model_name,
device=self.device,
compute_type=self.compute_type,
cpu_threads=0, # Use all available CPU threads
num_workers=1, # Number of parallel workers
)
)
logger.info(f"Successfully loaded faster-whisper model '{self.model_size}' ({model_name})")
logger.info(f"Model device: {self.device}, compute_type: {self.compute_type}")
except Exception as e:
logger.error(f"Failed to load faster-whisper model: {e}")
# Fallback to standard large-v3 if turbo model fails
if self.model_size == "large-v3-turbo":
logger.info("Falling back to large-v3 model")
try:
self.model = await loop.run_in_executor(
None,
lambda: WhisperModel(
"large-v3",
device=self.device,
compute_type=self.compute_type,
)
)
logger.info("Successfully loaded fallback large-v3 model")
except Exception as fallback_error:
logger.error(f"Fallback model also failed: {fallback_error}")
raise fallback_error
else:
raise e
return self.model
async def transcribe_video(
self,
video_id: str,
video_url: str,
progress_callback=None
) -> Tuple[List[DualTranscriptSegment], DualTranscriptMetadata]:
"""
Transcribe a YouTube video and return segments with metadata.
Args:
video_id: YouTube video ID
video_url: Full YouTube video URL
progress_callback: Optional callback for progress updates
Returns:
Tuple of (segments, metadata)
"""
start_time = datetime.now()
try:
if progress_callback:
await progress_callback("Downloading audio from YouTube video...")
# Download audio from YouTube video
audio_path = await self._download_audio(video_id, video_url)
if progress_callback:
await progress_callback("Audio downloaded, starting faster-whisper transcription...")
logger.info(f"Starting faster-whisper transcription for video {video_id} using model {self.model_size}")
# Transcribe the audio file
segments = await self._transcribe_audio_file(
audio_path,
progress_callback=progress_callback
)
# Calculate processing time
processing_time = (datetime.now() - start_time).total_seconds()
# Create metadata
metadata = DualTranscriptMetadata(
video_id=video_id,
language="en", # faster-whisper auto-detects, but assume English for now
word_count=sum(len(segment.text.split()) for segment in segments),
total_segments=len(segments),
has_timestamps=True,
extraction_method="faster_whisper",
processing_time_seconds=processing_time,
quality_score=self._calculate_quality_score(segments),
confidence_score=self._calculate_confidence_score(segments)
)
duration_minutes = processing_time / 60
logger.info(
f"Completed faster-whisper transcription for video {video_id}. "
f"Generated {len(segments)} segments in {processing_time:.2f}s ({duration_minutes:.2f} minutes). "
f"Model: {self.model_size}, Device: {self.device}"
)
# Save transcript to file
await self._save_transcript(video_id, segments, metadata)
return segments, metadata
except Exception as e:
logger.error(f"Faster-whisper transcription failed for video {video_id}: {e}")
raise
finally:
# Clean up temporary files, but keep MP3 for future re-transcription
if 'audio_path' in locals() and audio_path:
await self._cleanup_temp_files(audio_path)
async def _download_audio(self, video_id: str, video_url: str) -> str:
"""Download audio from YouTube video using yt-dlp."""
try:
# Check if audio already exists (MP3 for storage)
mp3_path = self.storage_dirs["audio"] / f"{video_id}.mp3"
# If MP3 exists, use it directly (faster-whisper handles MP3 natively)
if mp3_path.exists():
logger.info(f"Using existing audio file: {mp3_path}")
return str(mp3_path)
# Download as MP3 for efficient storage
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'outtmpl': str(self.storage_dirs["audio"] / f"{video_id}.%(ext)s"),
'quiet': True,
'no_warnings': True,
}
# Run yt-dlp in executor to avoid blocking
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: self._run_yt_dlp(video_url, ydl_opts)
)
# Return MP3 path (faster-whisper can handle MP3 directly)
if mp3_path.exists():
return str(mp3_path)
raise RuntimeError(f"Failed to download audio for {video_id}")
except Exception as e:
logger.error(f"Failed to download audio for video {video_id}: {e}")
raise RuntimeError(f"Audio download failed: {e}")
def _run_yt_dlp(self, url: str, opts: dict):
"""Run yt-dlp synchronously."""
with yt_dlp.YoutubeDL(opts) as ydl:
ydl.download([url])
async def _transcribe_audio_file(
self,
audio_path: str,
progress_callback=None
) -> List[DualTranscriptSegment]:
"""
Transcribe an audio file with optimized faster-whisper settings.
Args:
audio_path: Path to the audio file
progress_callback: Optional callback for progress updates
Returns:
List of transcription segments
"""
model = await self._load_model()
# Get audio duration for progress tracking
duration = await self._get_audio_duration(audio_path)
logger.info(f"Audio duration: {duration:.2f} seconds ({duration/60:.1f} minutes)")
try:
if progress_callback:
await progress_callback(f"Transcribing {duration/60:.1f} minute audio with {self.model_size}...")
# Use faster-whisper with optimized settings
logger.info(f"Transcribing with faster-whisper - VAD: {self.vad_filter}, Beam: {self.beam_size}")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: self._transcribe_with_faster_whisper(model, audio_path)
)
segments, info = result
# Log transcription info
logger.info(f"Detected language: {info.language} (probability: {info.language_probability:.2f})")
logger.info(f"Duration: {info.duration:.2f}s, VAD: {info.vad_options if hasattr(info, 'vad_options') else 'N/A'}")
# Convert to DualTranscriptSegment objects
transcript_segments = []
for segment in segments:
# Handle word-level timestamps if available
text = segment.text.strip()
# Split long segments if needed
if len(text) > self.max_segment_length:
split_segments = self._split_long_segment(
text, segment.start, segment.end
)
transcript_segments.extend(split_segments)
else:
transcript_segments.append(DualTranscriptSegment(
start_time=segment.start,
end_time=segment.end,
text=text,
confidence=segment.avg_logprob if hasattr(segment, 'avg_logprob') else None
))
if progress_callback:
await progress_callback(f"Transcription complete - {len(transcript_segments)} segments generated")
return transcript_segments
except Exception as e:
logger.error(f"Failed to transcribe audio file {audio_path}: {e}")
raise
def _transcribe_with_faster_whisper(self, model: WhisperModel, audio_path: str):
"""
Perform the actual transcription with faster-whisper.
Run in executor to avoid blocking the event loop.
"""
return model.transcribe(
audio_path,
beam_size=self.beam_size,
best_of=self.best_of,
temperature=self.temperature,
vad_filter=self.vad_filter,
vad_parameters=self.vad_parameters,
word_timestamps=self.word_timestamps,
language="en", # Can be made configurable
task="transcribe"
)
async def _get_audio_duration(self, audio_path: str) -> float:
"""Get audio duration using pydub."""
loop = asyncio.get_event_loop()
audio = await loop.run_in_executor(None, AudioSegment.from_file, audio_path)
return len(audio) / 1000.0 # Convert milliseconds to seconds
def _split_long_segment(
self,
text: str,
start_time: float,
end_time: float
) -> List[DualTranscriptSegment]:
"""
Split a long text segment into smaller segments.
Args:
text: Text to split
start_time: Start time of the original segment
end_time: End time of the original segment
Returns:
List of smaller segments
"""
segments = []
duration = end_time - start_time
# Split text by sentences or at word boundaries
words = text.split()
current_text = ""
current_words = 0
time_per_word = duration / len(words) if len(words) > 0 else 0
for i, word in enumerate(words):
if len(current_text + " " + word) > self.max_segment_length and current_text:
# Create segment
segment_start = start_time + (current_words - len(current_text.split())) * time_per_word
segment_end = start_time + current_words * time_per_word
segments.append(DualTranscriptSegment(
start_time=segment_start,
end_time=segment_end,
text=current_text.strip()
))
current_text = word
else:
current_text += " " + word if current_text else word
current_words += 1
# Add final segment
if current_text:
segment_start = start_time + (current_words - len(current_text.split())) * time_per_word
segments.append(DualTranscriptSegment(
start_time=segment_start,
end_time=end_time,
text=current_text.strip()
))
return segments
def _calculate_quality_score(self, segments: List[DualTranscriptSegment]) -> float:
"""Calculate overall quality score based on segment characteristics."""
if not segments:
return 0.0
# Faster-whisper provides more reliable confidence scores
confidences = [s.confidence for s in segments if s.confidence is not None]
if not confidences:
return 0.8 # Default high quality for faster-whisper
avg_confidence = sum(confidences) / len(confidences)
# Normalize confidence from log probability to 0-1 scale
# faster-whisper typically gives better normalized scores
normalized_confidence = max(0.0, min(1.0, (avg_confidence + 5.0) / 5.0))
# Boost quality score for faster-whisper due to improved model
return min(1.0, normalized_confidence * 1.1)
def _calculate_confidence_score(self, segments: List[DualTranscriptSegment]) -> float:
"""Calculate average confidence score."""
if not segments:
return 0.0
confidences = [s.confidence for s in segments if s.confidence is not None]
if not confidences:
return 0.85 # Higher default for faster-whisper
avg_confidence = sum(confidences) / len(confidences)
# Normalize from log probability to 0-1 scale
return max(0.0, min(1.0, (avg_confidence + 5.0) / 5.0))
async def _save_transcript(
self,
video_id: str,
segments: List[DualTranscriptSegment],
metadata: DualTranscriptMetadata
):
"""Save transcript and metadata to files for future use"""
try:
# Save audio metadata with faster-whisper info
await self._save_audio_metadata(video_id, metadata)
transcript_path = self.storage_dirs["transcripts"] / f"{video_id}_faster_whisper.txt"
# Create human-readable transcript file
transcript_lines = [
f"# Faster-Whisper Transcript - Model: {self.model_size}",
f"# Processing time: {metadata.processing_time_seconds:.2f}s",
f"# Quality score: {metadata.quality_score:.3f}",
f"# Confidence score: {metadata.confidence_score:.3f}",
f"# Total segments: {len(segments)}",
""
]
for segment in segments:
if segment.start_time is not None and segment.end_time is not None:
timestamp = f"[{segment.start_time:.1f}s - {segment.end_time:.1f}s]"
transcript_lines.append(f"{timestamp} {segment.text}")
else:
transcript_lines.append(segment.text)
# Write transcript to file
async with aiofiles.open(transcript_path, 'w', encoding='utf-8') as f:
await f.write('\n'.join(transcript_lines))
logger.info(f"Saved faster-whisper transcript to {transcript_path}")
# Also save as JSON for programmatic access
json_path = self.storage_dirs["transcripts"] / f"{video_id}_faster_whisper.json"
segments_data = {
"metadata": {
"model": self.model_size,
"device": self.device,
"compute_type": self.compute_type,
"processing_time_seconds": metadata.processing_time_seconds,
"quality_score": metadata.quality_score,
"confidence_score": metadata.confidence_score,
"total_segments": len(segments),
"word_count": metadata.word_count,
"extraction_method": "faster_whisper"
},
"segments": [
{
"start_time": seg.start_time,
"end_time": seg.end_time,
"text": seg.text,
"confidence": seg.confidence
}
for seg in segments
]
}
async with aiofiles.open(json_path, 'w', encoding='utf-8') as f:
import json
await f.write(json.dumps(segments_data, indent=2))
logger.info(f"Saved faster-whisper transcript JSON to {json_path}")
except Exception as e:
logger.warning(f"Failed to save transcript for {video_id}: {e}")
async def _save_audio_metadata(self, video_id: str, metadata: DualTranscriptMetadata):
"""Save audio metadata with faster-whisper specific information"""
try:
mp3_path = self.storage_dirs["audio"] / f"{video_id}.mp3"
if not mp3_path.exists():
return
# Get audio file info
audio_info = {
"video_id": video_id,
"file_path": str(mp3_path),
"file_size_mb": round(mp3_path.stat().st_size / (1024 * 1024), 2),
"download_date": datetime.now().isoformat(),
"format": "mp3",
"quality": "192kbps",
# Faster-whisper specific metadata
"transcription_engine": "faster_whisper",
"model_used": self.model_size,
"device": self.device,
"compute_type": self.compute_type,
"processing_time_seconds": metadata.processing_time_seconds,
"quality_score": metadata.quality_score,
"confidence_score": metadata.confidence_score,
"vad_enabled": self.vad_filter,
"beam_size": self.beam_size
}
# Try to get audio duration
try:
loop = asyncio.get_event_loop()
audio = await loop.run_in_executor(None, AudioSegment.from_file, str(mp3_path))
duration_seconds = len(audio) / 1000.0
audio_info["duration_seconds"] = duration_seconds
audio_info["duration_formatted"] = f"{int(duration_seconds // 60)}:{int(duration_seconds % 60):02d}"
# Calculate speed improvement ratio
if metadata.processing_time_seconds > 0:
speed_ratio = duration_seconds / metadata.processing_time_seconds
audio_info["speed_ratio"] = round(speed_ratio, 2)
audio_info["realtime_factor"] = f"{speed_ratio:.1f}x faster than realtime"
except:
pass
# Save metadata
metadata_path = self.storage_dirs["audio"] / f"{video_id}_faster_whisper_metadata.json"
async with aiofiles.open(metadata_path, 'w', encoding='utf-8') as f:
import json
await f.write(json.dumps(audio_info, indent=2))
logger.info(f"Saved faster-whisper audio metadata to {metadata_path}")
except Exception as e:
logger.warning(f"Failed to save audio metadata for {video_id}: {e}")
async def _cleanup_temp_files(self, audio_path: str):
"""Clean up temporary files while preserving MP3 for re-use."""
try:
# Only clean up if this was a temporary WAV file
if audio_path.endswith('.wav'):
wav_path = Path(audio_path)
mp3_path = wav_path.with_suffix('.mp3')
if mp3_path.exists() and wav_path.exists():
try:
os.unlink(audio_path)
logger.info(f"Cleaned up temporary WAV, keeping MP3: {mp3_path}")
except Exception as e:
logger.warning(f"Failed to clean up WAV file {audio_path}: {e}")
else:
logger.info(f"Keeping audio file: {audio_path}")
except Exception as e:
logger.warning(f"Error during temp file cleanup: {e}")
async def cleanup(self):
"""Clean up resources and free memory."""
try:
# Unload model to free memory
if self.model is not None:
del self.model
self.model = None
# Clear GPU cache if using CUDA
if torch.cuda.is_available() and self.device == "cuda":
torch.cuda.empty_cache()
logger.info("Cleared GPU cache")
logger.info("Faster-whisper service cleanup completed")
except Exception as e:
logger.warning(f"Error during cleanup: {e}")
def get_performance_info(self) -> Dict:
"""Get information about the current configuration and expected performance."""
return {
"model": self.model_size,
"device": self.device,
"compute_type": self.compute_type,
"vad_enabled": self.vad_filter,
"beam_size": self.beam_size,
"expected_speed_improvement": "20-32x faster than OpenAI Whisper",
"optimizations": [
"CTranslate2 optimization engine",
"Voice Activity Detection (VAD)",
"GPU acceleration" if self.device == "cuda" else "CPU optimization",
f"Quantization ({self.compute_type})",
"Native MP3 support (no conversion needed)"
]
}