320 lines
8.9 KiB
Markdown
320 lines
8.9 KiB
Markdown
# API Documentation
|
|
|
|
Complete reference for Trax service protocols and API interfaces.
|
|
|
|
## Architecture Overview
|
|
|
|
Trax uses a protocol-based architecture with clean separation of concerns:
|
|
|
|
```
|
|
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
|
|
│ CLI Layer │ │ Service Layer │ │ Repository Layer│
|
|
│ │ │ │ │ │
|
|
│ Click Commands │───▶│ Protocol-based │───▶│ Database Access │
|
|
│ Rich UI │ │ Services │ │ Data Models │
|
|
└─────────────────┘ └─────────────────┘ └─────────────────┘
|
|
```
|
|
|
|
## Core Service Protocols
|
|
|
|
### YouTubeServiceProtocol
|
|
|
|
Extract metadata from YouTube URLs without API requirements.
|
|
|
|
```python
|
|
from src.services.protocols import YouTubeServiceProtocol
|
|
|
|
class YouTubeServiceProtocol(Protocol):
|
|
async def extract_metadata(self, url: str) -> Dict[str, Any]:
|
|
"""Extract metadata from a YouTube URL."""
|
|
...
|
|
|
|
async def batch_extract(self, urls: List[str]) -> List[Dict[str, Any]]:
|
|
"""Extract metadata from multiple YouTube URLs."""
|
|
...
|
|
```
|
|
|
|
**Usage Example:**
|
|
```python
|
|
from src.services.youtube_service import YouTubeMetadataService
|
|
|
|
service = YouTubeMetadataService()
|
|
metadata = await service.extract_metadata("https://youtube.com/watch?v=example")
|
|
|
|
# Returns:
|
|
{
|
|
"youtube_id": "example",
|
|
"title": "Video Title",
|
|
"channel": "Channel Name",
|
|
"description": "Video description",
|
|
"duration_seconds": 300,
|
|
"url": "https://youtube.com/watch?v=example",
|
|
"created_at": "2024-01-01T00:00:00Z"
|
|
}
|
|
```
|
|
|
|
### MediaServiceProtocol
|
|
|
|
Complete media processing pipeline from download to preprocessing.
|
|
|
|
```python
|
|
from src.services.protocols import MediaServiceProtocol
|
|
|
|
class MediaServiceProtocol(Protocol):
|
|
async def download_media(
|
|
self,
|
|
url: str,
|
|
output_dir: Path,
|
|
progress_callback: Optional[ProgressCallback] = None
|
|
) -> MediaFileInfo:
|
|
"""Download media from URL to local directory."""
|
|
...
|
|
|
|
async def preprocess_audio(
|
|
self,
|
|
input_path: Path,
|
|
output_path: Path,
|
|
progress_callback: Optional[ProgressCallback] = None
|
|
) -> bool:
|
|
"""Convert audio to 16kHz mono WAV format for Whisper processing."""
|
|
...
|
|
|
|
async def process_media_pipeline(
|
|
self,
|
|
url: str,
|
|
output_dir: Path,
|
|
youtube_video_id: Optional[UUID] = None,
|
|
progress_callback: Optional[ProgressCallback] = None
|
|
) -> MediaFile:
|
|
"""Complete media processing pipeline from download to ready."""
|
|
...
|
|
```
|
|
|
|
**Key Features:**
|
|
- **Download-First Architecture** - Always download before processing
|
|
- **Format Standardization** - Convert to 16kHz mono WAV for optimal Whisper performance
|
|
- **Progress Tracking** - Real-time progress callbacks
|
|
- **Error Recovery** - Automatic retry with exponential backoff
|
|
|
|
### TranscriptionServiceProtocol
|
|
|
|
High-accuracy transcription with multiple pipeline versions.
|
|
|
|
```python
|
|
from src.services.protocols import TranscriptionServiceProtocol, TranscriptionConfig
|
|
|
|
class TranscriptionServiceProtocol(Protocol):
|
|
async def transcribe_file(
|
|
self,
|
|
media_file: MediaFile,
|
|
config: Optional[TranscriptionConfig] = None
|
|
) -> TranscriptionResult:
|
|
"""Transcribe a media file."""
|
|
...
|
|
|
|
async def transcribe_audio(
|
|
self,
|
|
audio_path: Path,
|
|
config: Optional[TranscriptionConfig] = None
|
|
) -> TranscriptionResult:
|
|
"""Transcribe audio from file path."""
|
|
...
|
|
```
|
|
|
|
**Pipeline Versions:**
|
|
- **v1:** Whisper distil-large-v3 only (95%+ accuracy, <30s for 5min audio)
|
|
- **v2:** Whisper + DeepSeek enhancement (99%+ accuracy, <35s processing)
|
|
- **v3:** Multi-pass accuracy optimization (99.5%+ accuracy, <25s processing)
|
|
- **v4:** Speaker diarization support (90%+ speaker accuracy)
|
|
|
|
**Configuration:**
|
|
```python
|
|
config = TranscriptionConfig(
|
|
model="distil-large-v3",
|
|
language="en", # Auto-detect if None
|
|
temperature=0.0, # Deterministic output
|
|
response_format="verbose_json"
|
|
)
|
|
```
|
|
|
|
### BatchProcessorProtocol
|
|
|
|
Efficient parallel processing of multiple files.
|
|
|
|
```python
|
|
from src.services.protocols import BatchProcessorProtocol
|
|
|
|
class BatchProcessorProtocol(Protocol):
|
|
async def add_task(self, task_type: str, input_data: Dict[str, Any]) -> UUID:
|
|
"""Add a new task to the batch processor."""
|
|
...
|
|
|
|
async def process_tasks(self, max_workers: int = 8) -> None:
|
|
"""Process all pending tasks with specified workers."""
|
|
...
|
|
|
|
async def get_progress(self) -> BatchProgress:
|
|
"""Get current batch processing progress."""
|
|
...
|
|
```
|
|
|
|
**Performance Characteristics:**
|
|
- **Max Workers:** 8 (optimized for M3 MacBook)
|
|
- **Memory Limit:** <2GB per worker
|
|
- **Queue Management:** Independent failure handling
|
|
- **Progress Tracking:** Real-time updates with atomic operations
|
|
|
|
### ExportServiceProtocol
|
|
|
|
Export transcripts in multiple formats.
|
|
|
|
```python
|
|
from src.services.protocols import ExportServiceProtocol, ExportFormat
|
|
|
|
class ExportServiceProtocol(Protocol):
|
|
async def export_transcript(
|
|
self,
|
|
transcription_result: TranscriptionResult,
|
|
output_path: Path,
|
|
format: ExportFormat
|
|
) -> ExportResult:
|
|
"""Export a transcript to the specified format."""
|
|
...
|
|
```
|
|
|
|
**Supported Formats:**
|
|
- **JSON:** Complete structured data with metadata
|
|
- **TXT:** Human-readable plain text
|
|
- **SRT:** Subtitle format with timestamps
|
|
- **MARKDOWN:** Formatted text with headers and sections
|
|
|
|
## Data Models
|
|
|
|
### MediaFileInfo
|
|
```python
|
|
@dataclass
|
|
class MediaFileInfo:
|
|
filename: str
|
|
file_size: int
|
|
duration: Optional[float]
|
|
mime_type: str
|
|
source_path: str
|
|
file_hash: str
|
|
```
|
|
|
|
### TranscriptionResult
|
|
```python
|
|
@dataclass
|
|
class TranscriptionResult:
|
|
raw_content: str
|
|
segments: List[Dict[str, Any]]
|
|
confidence_scores: List[float]
|
|
accuracy_estimate: float
|
|
word_count: int
|
|
processing_time_ms: float
|
|
model_used: str
|
|
```
|
|
|
|
### BatchProgress
|
|
```python
|
|
@dataclass
|
|
class BatchProgress:
|
|
total_tasks: int
|
|
completed_tasks: int
|
|
failed_tasks: int
|
|
in_progress_tasks: int
|
|
pending_tasks: int
|
|
overall_progress: float # 0.0 to 100.0
|
|
```
|
|
|
|
## Service Factory Pattern
|
|
|
|
Create services using the factory pattern for dependency injection:
|
|
|
|
```python
|
|
from src.services.factories import ServiceFactory
|
|
|
|
# Create service factory
|
|
factory = ServiceFactory()
|
|
|
|
# Get configured services
|
|
youtube_service = factory.create_youtube_service()
|
|
media_service = factory.create_media_service()
|
|
transcription_service = factory.create_transcription_service()
|
|
batch_processor = factory.create_batch_processor()
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
All services implement consistent error handling:
|
|
|
|
```python
|
|
from src.errors import TraxError, TranscriptionError, MediaError
|
|
|
|
try:
|
|
result = await transcription_service.transcribe_file(media_file)
|
|
except TranscriptionError as e:
|
|
logger.error(f"Transcription failed: {e}")
|
|
# Handle transcription-specific error
|
|
except MediaError as e:
|
|
logger.error(f"Media processing failed: {e}")
|
|
# Handle media-specific error
|
|
except TraxError as e:
|
|
logger.error(f"General error: {e}")
|
|
# Handle general application error
|
|
```
|
|
|
|
## Progress Callbacks
|
|
|
|
Services support real-time progress tracking:
|
|
|
|
```python
|
|
def progress_callback(progress: ProcessingProgress):
|
|
print(f"Progress: {progress.percentage}% - {progress.message}")
|
|
|
|
result = await media_service.download_media(
|
|
url="https://youtube.com/watch?v=example",
|
|
output_dir=Path("downloads"),
|
|
progress_callback=progress_callback
|
|
)
|
|
```
|
|
|
|
## Performance Monitoring
|
|
|
|
Built-in telemetry for monitoring service performance:
|
|
|
|
```python
|
|
# Get telemetry data
|
|
telemetry = media_service.get_telemetry_data()
|
|
|
|
for metric in telemetry:
|
|
print(f"Operation: {metric.operation}")
|
|
print(f"Duration: {metric.duration_ms}ms")
|
|
print(f"Memory: {metric.memory_usage_mb}MB")
|
|
```
|
|
|
|
## Testing with Protocols
|
|
|
|
Services implement protocols for easy testing:
|
|
|
|
```python
|
|
from src.services.protocols import TranscriptionServiceProtocol
|
|
|
|
class MockTranscriptionService:
|
|
async def transcribe_file(self, media_file, config=None):
|
|
return TranscriptionResult(
|
|
raw_content="Mock transcript",
|
|
segments=[],
|
|
confidence_scores=[0.95],
|
|
accuracy_estimate=0.95,
|
|
word_count=2,
|
|
processing_time_ms=1000,
|
|
model_used="mock"
|
|
)
|
|
|
|
# Use in tests
|
|
service: TranscriptionServiceProtocol = MockTranscriptionService()
|
|
```
|
|
|
|
For complete API reference and additional protocols, see the source code in `src/services/protocols.py`.
|