523 lines
19 KiB
Markdown
523 lines
19 KiB
Markdown
# Checkpoint 3: Architecture Design Report
|
|
|
|
## Modular Backend Architecture for Media Processing
|
|
|
|
### 1. Database Architecture (PostgreSQL)
|
|
|
|
#### Why PostgreSQL
|
|
- Multiple services planned (summarizer, frontend server)
|
|
- Concurrent access requirements
|
|
- Better testing tools (pg_tap, factory patterns)
|
|
- Professional migration tools with Alembic
|
|
- JSON/JSONB support for transcript data
|
|
- Scales better than SQLite for production use
|
|
|
|
#### Database Schema Design
|
|
|
|
```sql
|
|
-- Core Tables
|
|
CREATE TABLE media_files (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
source_url TEXT, -- YouTube URL or null for uploads
|
|
local_path TEXT NOT NULL, -- Where file is stored
|
|
media_type VARCHAR(10), -- mp3, mp4, wav, etc.
|
|
duration_seconds INTEGER,
|
|
file_size_bytes BIGINT,
|
|
download_status VARCHAR(20), -- pending, downloading, completed, failed
|
|
created_at TIMESTAMP DEFAULT NOW(),
|
|
updated_at TIMESTAMP DEFAULT NOW()
|
|
);
|
|
|
|
CREATE TABLE transcripts (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
media_file_id UUID REFERENCES media_files(id),
|
|
pipeline_version VARCHAR(10), -- 'v1', 'v2', 'v3', 'v4'
|
|
raw_content JSONB NOT NULL, -- Original Whisper output
|
|
enhanced_content JSONB, -- AI-enhanced version (v2+)
|
|
multipass_content JSONB, -- Multi-pass merged (v3+)
|
|
diarized_content JSONB, -- Speaker separated (v4+)
|
|
text_content TEXT, -- Plain text for search
|
|
model_used VARCHAR(50), -- whisper model version
|
|
processing_time_ms INTEGER,
|
|
word_count INTEGER,
|
|
processing_metadata JSONB, -- Version-specific metadata
|
|
created_at TIMESTAMP DEFAULT NOW(),
|
|
enhanced_at TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT NOW()
|
|
);
|
|
|
|
CREATE TABLE exports (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
transcript_id UUID REFERENCES transcripts(id),
|
|
format VARCHAR(10), -- json, txt
|
|
file_path TEXT,
|
|
created_at TIMESTAMP DEFAULT NOW()
|
|
);
|
|
|
|
CREATE TABLE cache_entries (
|
|
cache_key VARCHAR(255) PRIMARY KEY,
|
|
cache_type VARCHAR(50), -- embedding, query, etc.
|
|
value JSONB,
|
|
compressed BOOLEAN DEFAULT FALSE,
|
|
ttl_seconds INTEGER,
|
|
expires_at TIMESTAMP,
|
|
created_at TIMESTAMP DEFAULT NOW()
|
|
);
|
|
|
|
-- Batch Processing Tables
|
|
CREATE TABLE batch_jobs (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
status VARCHAR(20), -- pending, processing, completed, failed
|
|
total_items INTEGER,
|
|
completed_items INTEGER DEFAULT 0,
|
|
failed_items INTEGER DEFAULT 0,
|
|
created_at TIMESTAMP DEFAULT NOW(),
|
|
completed_at TIMESTAMP
|
|
);
|
|
|
|
CREATE TABLE batch_items (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
batch_job_id UUID REFERENCES batch_jobs(id),
|
|
media_file_id UUID REFERENCES media_files(id),
|
|
status VARCHAR(20),
|
|
error_message TEXT,
|
|
processing_order INTEGER,
|
|
created_at TIMESTAMP DEFAULT NOW()
|
|
);
|
|
|
|
-- Audio Processing Metadata
|
|
CREATE TABLE audio_processing_metadata (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
media_file_id UUID REFERENCES media_files(id),
|
|
original_format VARCHAR(10),
|
|
original_sample_rate INTEGER,
|
|
original_channels INTEGER,
|
|
processed_sample_rate INTEGER, -- Should be 16000
|
|
processed_channels INTEGER, -- Should be 1 (mono)
|
|
noise_level_db FLOAT,
|
|
preprocessing_steps JSONB, -- Array of applied steps
|
|
quality_score FLOAT, -- 0-1 quality assessment
|
|
created_at TIMESTAMP DEFAULT NOW()
|
|
);
|
|
|
|
-- Version-specific tables (added incrementally)
|
|
-- Phase 3 adds:
|
|
CREATE TABLE multipass_runs (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
transcript_id UUID REFERENCES transcripts(id),
|
|
pass_number INTEGER,
|
|
model_used VARCHAR(50),
|
|
confidence_scores JSONB,
|
|
segment_variations JSONB,
|
|
created_at TIMESTAMP DEFAULT NOW()
|
|
);
|
|
|
|
-- Phase 4 adds:
|
|
CREATE TABLE speaker_profiles (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
transcript_id UUID REFERENCES transcripts(id),
|
|
speaker_label VARCHAR(50),
|
|
voice_embedding BYTEA,
|
|
total_speaking_time FLOAT,
|
|
created_at TIMESTAMP DEFAULT NOW()
|
|
);
|
|
```
|
|
|
|
### 2. Service Layer Architecture (Iterative Design)
|
|
|
|
#### Modular, Refactorable Structure
|
|
|
|
```
|
|
trax/
|
|
├── src/
|
|
│ ├── core/
|
|
│ │ ├── config.py # Existing configuration
|
|
│ │ ├── database.py # PostgreSQL + Alembic setup
|
|
│ │ ├── exceptions.py # Custom exceptions
|
|
│ │ └── protocols.py # Abstract protocols for services
|
|
│ │
|
|
│ ├── models/
|
|
│ │ ├── __init__.py
|
|
│ │ ├── base.py # SQLAlchemy base with registry pattern
|
|
│ │ ├── media.py # MediaFile model
|
|
│ │ ├── transcript.py # Transcript model
|
|
│ │ ├── batch.py # Batch processing models
|
|
│ │ └── cache.py # CacheEntry model
|
|
│ │
|
|
│ ├── services/
|
|
│ │ ├── batch/ # PRIORITY: Batch processing first
|
|
│ │ │ ├── __init__.py
|
|
│ │ │ ├── queue.py # Batch queue management
|
|
│ │ │ ├── processor.py # Parallel batch processor
|
|
│ │ │ └── monitor.py # Progress tracking
|
|
│ │ │
|
|
│ │ ├── media/
|
|
│ │ │ ├── __init__.py
|
|
│ │ │ ├── downloader.py # Generic downloader protocol
|
|
│ │ │ ├── youtube.py # YouTube implementation (yt-dlp)
|
|
│ │ │ ├── local.py # Local file handler
|
|
│ │ │ └── converter.py # FFmpeg service
|
|
│ │ │
|
|
│ │ ├── audio/ # Pre/post-processing
|
|
│ │ │ ├── __init__.py
|
|
│ │ │ ├── preprocessor.py # Audio optimization
|
|
│ │ │ ├── postprocessor.py # Transcript enhancement
|
|
│ │ │ ├── analyzer.py # Quality assessment
|
|
│ │ │ └── enhancer.py # Noise reduction
|
|
│ │ │
|
|
│ │ ├── transcription/ # Iterative versions
|
|
│ │ │ ├── v1_basic/ # Phase 1: Single pass
|
|
│ │ │ │ ├── whisper.py # Basic transcription
|
|
│ │ │ │ └── optimizer.py # M3 optimizations
|
|
│ │ │ │
|
|
│ │ │ ├── v2_enhanced/ # Phase 2: + AI enhancement
|
|
│ │ │ │ ├── enhancer.py # DeepSeek enhancement
|
|
│ │ │ │ └── validator.py # Quality checks
|
|
│ │ │ │
|
|
│ │ │ ├── v3_multipass/ # Phase 3: + Multiple passes
|
|
│ │ │ │ ├── multipass.py # Compare multiple runs
|
|
│ │ │ │ ├── merger.py # Merge best segments
|
|
│ │ │ │ └── confidence.py # Confidence scoring
|
|
│ │ │ │
|
|
│ │ │ ├── v4_diarization/ # Phase 4: + Speaker identification
|
|
│ │ │ │ ├── diarizer.py # Speaker separation
|
|
│ │ │ │ ├── voice_db.py # Voice embeddings
|
|
│ │ │ │ └── labeler.py # Speaker labels
|
|
│ │ │ │
|
|
│ │ │ └── pipeline.py # Orchestrates current version
|
|
│ │ │
|
|
│ │ ├── enhancement/ # AI enhancement layer
|
|
│ │ │ ├── __init__.py
|
|
│ │ │ ├── protocol.py # Enhancement protocol
|
|
│ │ │ ├── deepseek.py # DeepSeek enhancer
|
|
│ │ │ ├── enhancer_rules.py # Enhancement rules
|
|
│ │ │ └── templates/
|
|
│ │ │ ├── enhancement_prompt.txt
|
|
│ │ │ └── structured_output.json
|
|
│ │ │
|
|
│ │ ├── cache/ # Later priority
|
|
│ │ │ ├── __init__.py
|
|
│ │ │ ├── base.py # Cache protocol
|
|
│ │ │ ├── embedding.py # Embedding cache
|
|
│ │ │ └── manager.py # Cache orchestrator
|
|
│ │ │
|
|
│ │ └── export/
|
|
│ │ ├── __init__.py
|
|
│ │ ├── json_export.py # JSON exporter
|
|
│ │ ├── text_backup.py # TXT backup
|
|
│ │ └── batch_export.py # Bulk export handling
|
|
│ │
|
|
│ ├── agents/
|
|
│ │ ├── rules/ # Consistency rules
|
|
│ │ │ ├── TRANSCRIPTION_RULES.md
|
|
│ │ │ ├── BATCH_PROCESSING_RULES.md
|
|
│ │ │ ├── CACHING_RULES.md
|
|
│ │ │ ├── EXPORT_RULES.md
|
|
│ │ │ └── DATABASE_RULES.md
|
|
│ │ │
|
|
│ │ └── templates/ # Structured outputs
|
|
│ │ ├── transcript_output.json
|
|
│ │ ├── error_response.json
|
|
│ │ └── batch_status.json
|
|
│ │
|
|
│ └── cli/
|
|
│ ├── __init__.py
|
|
│ ├── main.py # CLI entry point
|
|
│ └── commands/
|
|
│ ├── transcribe.py # Transcribe command
|
|
│ ├── batch.py # Batch processing
|
|
│ ├── export.py # Export command
|
|
│ └── cache.py # Cache management
|
|
```
|
|
|
|
### 3. Protocol-Based Design for Maximum Refactorability
|
|
|
|
#### Core Protocols (Abstract Base Classes)
|
|
|
|
```python
|
|
# src/core/protocols.py
|
|
from abc import ABC, abstractmethod
|
|
from typing import Protocol, Any, List, Optional
|
|
from pathlib import Path
|
|
import asyncio
|
|
|
|
class MediaDownloader(Protocol):
|
|
"""Protocol for media downloaders"""
|
|
@abstractmethod
|
|
async def download(self, source: str, destination: Path) -> Path:
|
|
"""Download media from source to destination"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def can_handle(self, source: str) -> bool:
|
|
"""Check if this downloader can handle the source"""
|
|
pass
|
|
|
|
class Transcriber(Protocol):
|
|
"""Protocol for transcription services"""
|
|
@abstractmethod
|
|
async def transcribe(self, audio_path: Path) -> dict:
|
|
"""Transcribe audio file to structured format"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_optimal_settings(self, file_size: int) -> dict:
|
|
"""Get optimal settings based on file characteristics"""
|
|
pass
|
|
|
|
class BatchProcessor(Protocol):
|
|
"""Protocol for batch processing"""
|
|
@abstractmethod
|
|
async def process_batch(self, items: List[Path]) -> List[dict]:
|
|
"""Process multiple items in parallel"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def get_progress(self, batch_id: str) -> dict:
|
|
"""Get progress of batch processing"""
|
|
pass
|
|
|
|
class Enhancer(Protocol):
|
|
"""Protocol for AI enhancement"""
|
|
@abstractmethod
|
|
async def enhance(self, transcript: dict) -> dict:
|
|
"""Enhance transcript with AI"""
|
|
pass
|
|
|
|
class AudioProcessor(Protocol):
|
|
"""Protocol for audio processing"""
|
|
@abstractmethod
|
|
async def preprocess(self, audio_path: Path) -> Path:
|
|
"""Preprocess audio for optimal transcription"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def analyze_quality(self, audio_path: Path) -> float:
|
|
"""Analyze audio quality (0-1 score)"""
|
|
pass
|
|
|
|
class CacheService(Protocol):
|
|
"""Protocol for cache services"""
|
|
@abstractmethod
|
|
async def get(self, key: str) -> Any:
|
|
"""Get value from cache"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def set(self, key: str, value: Any, ttl: int) -> None:
|
|
"""Set value in cache with TTL"""
|
|
pass
|
|
|
|
class Exporter(Protocol):
|
|
"""Protocol for export services"""
|
|
@abstractmethod
|
|
async def export(self, transcript: dict, path: Path) -> Path:
|
|
"""Export transcript to specified format"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_supported_formats(self) -> List[str]:
|
|
"""Get list of supported export formats"""
|
|
pass
|
|
```
|
|
|
|
### 4. Clean Iteration Strategy
|
|
|
|
#### Phase-Based Pipeline Evolution
|
|
|
|
```python
|
|
# Phase 1: MVP (Week 1-2)
|
|
async def transcribe_v1(audio_path: Path) -> dict:
|
|
"""Basic transcription with optimizations"""
|
|
audio = await preprocess(audio_path) # 16kHz mono
|
|
transcript = await whisper.transcribe(audio)
|
|
return format_json(transcript)
|
|
|
|
# Phase 2: Enhanced (Week 3)
|
|
async def transcribe_v2(audio_path: Path) -> dict:
|
|
"""v1 + AI enhancement"""
|
|
transcript = await transcribe_v1(audio_path)
|
|
enhanced = await deepseek.enhance(transcript)
|
|
return enhanced
|
|
|
|
# Phase 3: Multi-pass (Week 4-5)
|
|
async def transcribe_v3(audio_path: Path) -> dict:
|
|
"""v2 + multiple passes for accuracy"""
|
|
passes = []
|
|
for i in range(3):
|
|
transcript = await transcribe_v1_with_params(
|
|
audio_path,
|
|
temperature=0.1 * i # Vary parameters
|
|
)
|
|
passes.append(transcript)
|
|
|
|
merged = merge_best_segments(passes)
|
|
enhanced = await deepseek.enhance(merged)
|
|
return enhanced
|
|
|
|
# Phase 4: Diarization (Week 6+)
|
|
async def transcribe_v4(audio_path: Path) -> dict:
|
|
"""v3 + speaker diarization"""
|
|
transcript = await transcribe_v3(audio_path)
|
|
|
|
# Add speaker identification
|
|
speakers = await diarize_audio(audio_path)
|
|
labeled = assign_speakers(transcript, speakers)
|
|
|
|
return labeled
|
|
```
|
|
|
|
### 5. Configuration Management
|
|
|
|
#### Extended Configuration for Services
|
|
|
|
```python
|
|
# src/core/config.py (extended)
|
|
class Config:
|
|
# ... existing configuration ...
|
|
|
|
# Database
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://localhost/trax")
|
|
DATABASE_POOL_SIZE = int(os.getenv("DATABASE_POOL_SIZE", "10"))
|
|
|
|
# Media Processing
|
|
MEDIA_STORAGE_PATH = Path(os.getenv("MEDIA_STORAGE_PATH", "./data/media"))
|
|
MAX_FILE_SIZE_MB = int(os.getenv("MAX_FILE_SIZE_MB", "500"))
|
|
SUPPORTED_FORMATS = ["mp3", "mp4", "wav", "m4a", "webm"]
|
|
|
|
# Transcription
|
|
WHISPER_MODEL = os.getenv("WHISPER_MODEL", "distil-large-v3")
|
|
WHISPER_DEVICE = os.getenv("WHISPER_DEVICE", "cpu")
|
|
WHISPER_COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "int8_float32")
|
|
CHUNK_LENGTH_SECONDS = int(os.getenv("CHUNK_LENGTH_SECONDS", "600")) # 10 minutes
|
|
|
|
# Pipeline Versioning
|
|
PIPELINE_VERSION = os.getenv("PIPELINE_VERSION", "v1")
|
|
ENABLE_ENHANCEMENT = os.getenv("ENABLE_ENHANCEMENT", "false") == "true"
|
|
ENABLE_MULTIPASS = os.getenv("ENABLE_MULTIPASS", "false") == "true"
|
|
ENABLE_DIARIZATION = os.getenv("ENABLE_DIARIZATION", "false") == "true"
|
|
|
|
# Batch Processing
|
|
BATCH_SIZE = int(os.getenv("BATCH_SIZE", "10"))
|
|
BATCH_TIMEOUT_SECONDS = int(os.getenv("BATCH_TIMEOUT_SECONDS", "3600"))
|
|
MAX_PARALLEL_JOBS = int(os.getenv("MAX_PARALLEL_JOBS", "4"))
|
|
|
|
# AI Enhancement
|
|
ENHANCEMENT_MODEL = os.getenv("ENHANCEMENT_MODEL", "deepseek-chat")
|
|
ENHANCEMENT_MAX_RETRIES = int(os.getenv("ENHANCEMENT_MAX_RETRIES", "3"))
|
|
|
|
# Caching
|
|
CACHE_TTL_EMBEDDING = int(os.getenv("CACHE_TTL_EMBEDDING", "86400")) # 24h
|
|
CACHE_TTL_TRANSCRIPT = int(os.getenv("CACHE_TTL_TRANSCRIPT", "604800")) # 7d
|
|
CACHE_BACKEND = os.getenv("CACHE_BACKEND", "sqlite") # sqlite or redis
|
|
|
|
# Export
|
|
EXPORT_PATH = Path(os.getenv("EXPORT_PATH", "./data/exports"))
|
|
DEFAULT_EXPORT_FORMAT = os.getenv("DEFAULT_EXPORT_FORMAT", "json")
|
|
|
|
# Audio Processing
|
|
AUDIO_SAMPLE_RATE = int(os.getenv("AUDIO_SAMPLE_RATE", "16000"))
|
|
AUDIO_CHANNELS = int(os.getenv("AUDIO_CHANNELS", "1")) # Mono
|
|
AUDIO_NORMALIZE_DB = float(os.getenv("AUDIO_NORMALIZE_DB", "-3.0"))
|
|
|
|
# Multi-pass Settings
|
|
MULTIPASS_RUNS = int(os.getenv("MULTIPASS_RUNS", "3"))
|
|
MULTIPASS_MERGE_STRATEGY = os.getenv("MULTIPASS_MERGE_STRATEGY", "confidence")
|
|
|
|
# Diarization Settings
|
|
MAX_SPEAKERS = int(os.getenv("MAX_SPEAKERS", "10"))
|
|
MIN_SPEAKER_DURATION = float(os.getenv("MIN_SPEAKER_DURATION", "1.0"))
|
|
```
|
|
|
|
### 6. Testing Architecture
|
|
|
|
#### Test Structure for Easy Refactoring
|
|
|
|
```python
|
|
tests/
|
|
├── conftest.py # Shared fixtures
|
|
├── factories/ # Test data factories
|
|
│ ├── media_factory.py
|
|
│ ├── transcript_factory.py
|
|
│ └── batch_factory.py
|
|
├── fixtures/
|
|
│ ├── audio/
|
|
│ │ ├── sample_5s.wav # 5-second test file
|
|
│ │ ├── sample_30s.mp3 # 30-second test file
|
|
│ │ ├── sample_2m.mp4 # 2-minute test file
|
|
│ │ └── sample_noisy.wav # Noisy audio for testing
|
|
│ └── transcripts/
|
|
│ ├── expected_v1.json # Expected output for v1
|
|
│ ├── expected_v2.json # Expected output for v2
|
|
│ └── expected_v3.json # Expected output for v3
|
|
├── unit/
|
|
│ ├── services/
|
|
│ │ ├── test_batch_processor.py
|
|
│ │ ├── test_downloader.py
|
|
│ │ ├── test_transcriber.py
|
|
│ │ ├── test_enhancer.py
|
|
│ │ └── test_cache.py
|
|
│ ├── models/
|
|
│ │ └── test_models.py
|
|
│ └── test_protocols.py # Protocol compliance tests
|
|
├── integration/
|
|
│ ├── test_pipeline_v1.py # Basic pipeline
|
|
│ ├── test_pipeline_v2.py # With enhancement
|
|
│ ├── test_pipeline_v3.py # With multi-pass
|
|
│ ├── test_batch_processing.py # Batch operations
|
|
│ └── test_database.py # Database operations
|
|
└── performance/
|
|
├── test_speed.py # Speed benchmarks
|
|
└── test_accuracy.py # Accuracy measurements
|
|
```
|
|
|
|
### 7. Pipeline Orchestration
|
|
|
|
#### Smart Pipeline Selection
|
|
|
|
```python
|
|
class PipelineOrchestrator:
|
|
"""Orchestrates pipeline based on configuration"""
|
|
|
|
async def process(self, audio_path: Path) -> dict:
|
|
"""Process audio through appropriate pipeline version"""
|
|
|
|
# Start with v1 (always available)
|
|
result = await self.transcribe_v1(audio_path)
|
|
|
|
# Progressively add features based on config
|
|
if config.ENABLE_MULTIPASS:
|
|
result = await self.add_multipass(audio_path, result)
|
|
|
|
if config.ENABLE_ENHANCEMENT:
|
|
result = await self.add_enhancement(result)
|
|
|
|
if config.ENABLE_DIARIZATION:
|
|
result = await self.add_diarization(audio_path, result)
|
|
|
|
return result
|
|
|
|
async def process_batch(self, paths: List[Path]) -> List[dict]:
|
|
"""Process multiple files efficiently"""
|
|
tasks = [self.process(path) for path in paths]
|
|
return await asyncio.gather(*tasks)
|
|
```
|
|
|
|
### Summary
|
|
|
|
This architecture provides:
|
|
1. **Clean iterations** through versioned pipelines (v1→v4)
|
|
2. **Protocol-based design** for easy component swapping
|
|
3. **Batch-first approach** as requested
|
|
4. **Real test files** instead of mocks
|
|
5. **PostgreSQL** for multi-service support
|
|
6. **Fail-fast** error handling
|
|
7. **CLI-first** interface
|
|
|
|
The design prioritizes **iterability** and **batch processing** over caching, with clear upgrade paths between versions.
|
|
|
|
---
|
|
|
|
*Generated: 2024*
|
|
*Status: COMPLETE*
|
|
*Next: Team Structure Report* |