8.9 KiB
API Documentation
Complete reference for Trax service protocols and API interfaces.
Architecture Overview
Trax uses a protocol-based architecture with clean separation of concerns:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ CLI Layer │ │ Service Layer │ │ Repository Layer│
│ │ │ │ │ │
│ Click Commands │───▶│ Protocol-based │───▶│ Database Access │
│ Rich UI │ │ Services │ │ Data Models │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Core Service Protocols
YouTubeServiceProtocol
Extract metadata from YouTube URLs without API requirements.
from src.services.protocols import YouTubeServiceProtocol
class YouTubeServiceProtocol(Protocol):
async def extract_metadata(self, url: str) -> Dict[str, Any]:
"""Extract metadata from a YouTube URL."""
...
async def batch_extract(self, urls: List[str]) -> List[Dict[str, Any]]:
"""Extract metadata from multiple YouTube URLs."""
...
Usage Example:
from src.services.youtube_service import YouTubeMetadataService
service = YouTubeMetadataService()
metadata = await service.extract_metadata("https://youtube.com/watch?v=example")
# Returns:
{
"youtube_id": "example",
"title": "Video Title",
"channel": "Channel Name",
"description": "Video description",
"duration_seconds": 300,
"url": "https://youtube.com/watch?v=example",
"created_at": "2024-01-01T00:00:00Z"
}
MediaServiceProtocol
Complete media processing pipeline from download to preprocessing.
from src.services.protocols import MediaServiceProtocol
class MediaServiceProtocol(Protocol):
async def download_media(
self,
url: str,
output_dir: Path,
progress_callback: Optional[ProgressCallback] = None
) -> MediaFileInfo:
"""Download media from URL to local directory."""
...
async def preprocess_audio(
self,
input_path: Path,
output_path: Path,
progress_callback: Optional[ProgressCallback] = None
) -> bool:
"""Convert audio to 16kHz mono WAV format for Whisper processing."""
...
async def process_media_pipeline(
self,
url: str,
output_dir: Path,
youtube_video_id: Optional[UUID] = None,
progress_callback: Optional[ProgressCallback] = None
) -> MediaFile:
"""Complete media processing pipeline from download to ready."""
...
Key Features:
- Download-First Architecture - Always download before processing
- Format Standardization - Convert to 16kHz mono WAV for optimal Whisper performance
- Progress Tracking - Real-time progress callbacks
- Error Recovery - Automatic retry with exponential backoff
TranscriptionServiceProtocol
High-accuracy transcription with multiple pipeline versions.
from src.services.protocols import TranscriptionServiceProtocol, TranscriptionConfig
class TranscriptionServiceProtocol(Protocol):
async def transcribe_file(
self,
media_file: MediaFile,
config: Optional[TranscriptionConfig] = None
) -> TranscriptionResult:
"""Transcribe a media file."""
...
async def transcribe_audio(
self,
audio_path: Path,
config: Optional[TranscriptionConfig] = None
) -> TranscriptionResult:
"""Transcribe audio from file path."""
...
Pipeline Versions:
- v1: Whisper distil-large-v3 only (95%+ accuracy, <30s for 5min audio)
- v2: Whisper + DeepSeek enhancement (99%+ accuracy, <35s processing)
- v3: Multi-pass accuracy optimization (99.5%+ accuracy, <25s processing)
- v4: Speaker diarization support (90%+ speaker accuracy)
Configuration:
config = TranscriptionConfig(
model="distil-large-v3",
language="en", # Auto-detect if None
temperature=0.0, # Deterministic output
response_format="verbose_json"
)
BatchProcessorProtocol
Efficient parallel processing of multiple files.
from src.services.protocols import BatchProcessorProtocol
class BatchProcessorProtocol(Protocol):
async def add_task(self, task_type: str, input_data: Dict[str, Any]) -> UUID:
"""Add a new task to the batch processor."""
...
async def process_tasks(self, max_workers: int = 8) -> None:
"""Process all pending tasks with specified workers."""
...
async def get_progress(self) -> BatchProgress:
"""Get current batch processing progress."""
...
Performance Characteristics:
- Max Workers: 8 (optimized for M3 MacBook)
- Memory Limit: <2GB per worker
- Queue Management: Independent failure handling
- Progress Tracking: Real-time updates with atomic operations
ExportServiceProtocol
Export transcripts in multiple formats.
from src.services.protocols import ExportServiceProtocol, ExportFormat
class ExportServiceProtocol(Protocol):
async def export_transcript(
self,
transcription_result: TranscriptionResult,
output_path: Path,
format: ExportFormat
) -> ExportResult:
"""Export a transcript to the specified format."""
...
Supported Formats:
- JSON: Complete structured data with metadata
- TXT: Human-readable plain text
- SRT: Subtitle format with timestamps
- MARKDOWN: Formatted text with headers and sections
Data Models
MediaFileInfo
@dataclass
class MediaFileInfo:
filename: str
file_size: int
duration: Optional[float]
mime_type: str
source_path: str
file_hash: str
TranscriptionResult
@dataclass
class TranscriptionResult:
raw_content: str
segments: List[Dict[str, Any]]
confidence_scores: List[float]
accuracy_estimate: float
word_count: int
processing_time_ms: float
model_used: str
BatchProgress
@dataclass
class BatchProgress:
total_tasks: int
completed_tasks: int
failed_tasks: int
in_progress_tasks: int
pending_tasks: int
overall_progress: float # 0.0 to 100.0
Service Factory Pattern
Create services using the factory pattern for dependency injection:
from src.services.factories import ServiceFactory
# Create service factory
factory = ServiceFactory()
# Get configured services
youtube_service = factory.create_youtube_service()
media_service = factory.create_media_service()
transcription_service = factory.create_transcription_service()
batch_processor = factory.create_batch_processor()
Error Handling
All services implement consistent error handling:
from src.errors import TraxError, TranscriptionError, MediaError
try:
result = await transcription_service.transcribe_file(media_file)
except TranscriptionError as e:
logger.error(f"Transcription failed: {e}")
# Handle transcription-specific error
except MediaError as e:
logger.error(f"Media processing failed: {e}")
# Handle media-specific error
except TraxError as e:
logger.error(f"General error: {e}")
# Handle general application error
Progress Callbacks
Services support real-time progress tracking:
def progress_callback(progress: ProcessingProgress):
print(f"Progress: {progress.percentage}% - {progress.message}")
result = await media_service.download_media(
url="https://youtube.com/watch?v=example",
output_dir=Path("downloads"),
progress_callback=progress_callback
)
Performance Monitoring
Built-in telemetry for monitoring service performance:
# Get telemetry data
telemetry = media_service.get_telemetry_data()
for metric in telemetry:
print(f"Operation: {metric.operation}")
print(f"Duration: {metric.duration_ms}ms")
print(f"Memory: {metric.memory_usage_mb}MB")
Testing with Protocols
Services implement protocols for easy testing:
from src.services.protocols import TranscriptionServiceProtocol
class MockTranscriptionService:
async def transcribe_file(self, media_file, config=None):
return TranscriptionResult(
raw_content="Mock transcript",
segments=[],
confidence_scores=[0.95],
accuracy_estimate=0.95,
word_count=2,
processing_time_ms=1000,
model_used="mock"
)
# Use in tests
service: TranscriptionServiceProtocol = MockTranscriptionService()
For complete API reference and additional protocols, see the source code in src/services/protocols.py.