youtube-summarizer/docs/stories/1.3.transcript-extraction-s...

479 lines
18 KiB
Markdown

# Story 1.3: Transcript Extraction Service
## Status
Draft
## Story
**As a** user
**I want** the system to automatically extract transcripts from YouTube videos
**so that** I have the text content needed for AI summarization
## Acceptance Criteria
1. System extracts video transcripts using multiple fallback methods (YouTube Transcript API → auto-captions → audio transcription)
2. Transcripts are cached to avoid repeated API calls for the same video
3. Multiple language support with preference for English when available
4. Failed transcript extraction returns informative error messages with suggested solutions
5. System handles videos with no available transcripts gracefully
6. Transcript extraction is non-blocking and provides progress feedback
## Tasks / Subtasks
- [ ] **Task 1: Primary Transcript Extraction** (AC: 1, 2)
- [ ] Create `TranscriptService` class in `backend/services/transcript_service.py`
- [ ] Implement YouTube Transcript API integration with retry logic
- [ ] Add transcript caching with video ID-based keys
- [ ] Implement multi-language transcript detection and prioritization
- [ ] **Task 2: Fallback Transcript Methods** (AC: 1, 5)
- [ ] Integrate auto-generated captions extraction as secondary method
- [ ] Implement audio transcription fallback using OpenAI Whisper API
- [ ] Create fallback chain orchestration with error handling
- [ ] Add logging for fallback method usage and success rates
- [ ] **Task 3: Transcript Processing Pipeline** (AC: 3, 4, 6)
- [ ] Create transcript cleaning and formatting utilities
- [ ] Implement timestamp preservation for chapter creation
- [ ] Add text chunking for large transcripts (token limit management)
- [ ] Create progress tracking for multi-step extraction process
- [ ] **Task 4: API Integration** (AC: 4, 6)
- [ ] Create `/api/transcripts/{video_id}` GET endpoint
- [ ] Implement background transcript extraction with job status tracking
- [ ] Add WebSocket support for real-time progress updates
- [ ] Create comprehensive error response system with recovery suggestions
- [ ] **Task 5: Cache Management** (AC: 2)
- [ ] Implement Redis-based transcript caching with 24-hour TTL
- [ ] Add cache warming for popular videos
- [ ] Create cache invalidation strategy for updated transcripts
- [ ] Add cache analytics and hit rate monitoring
- [ ] **Task 6: Integration Testing** (AC: 1, 2, 3, 4, 5, 6)
- [ ] Test transcript extraction across different video types and lengths
- [ ] Verify fallback chain handles edge cases (private videos, no captions, etc.)
- [ ] Test caching behavior and cache invalidation
- [ ] Validate error handling and user-facing error messages
## Dev Notes
### Architecture Context
This story implements the core content extraction layer that bridges YouTube's video platform with our AI summarization engine. The transcript service serves as the foundation for all downstream AI processing and must be robust, efficient, and user-friendly.
### Transcript Service Implementation Requirements
[Source: docs/architecture.md#backend-services]
```python
class TranscriptService:
def __init__(self, cache_client: CacheClient, whisper_client: WhisperClient):
self.youtube_api = YouTubeTranscriptApi()
self.cache_client = cache_client
self.whisper_client = whisper_client
async def extract_transcript(self, video_id: str, language_preference: str = "en") -> TranscriptResult:
"""Extract transcript using fallback chain with caching"""
# Check cache first
cache_key = f"transcript:{video_id}:{language_preference}"
cached_result = await self.cache_client.get(cache_key)
if cached_result:
return TranscriptResult.from_cache(cached_result)
# Try primary method: YouTube Transcript API
try:
transcript = await self._extract_youtube_transcript(video_id, language_preference)
await self.cache_client.set(cache_key, transcript, ttl=86400) # 24 hours
return TranscriptResult(transcript=transcript, method="youtube_api", success=True)
except TranscriptNotAvailableError:
pass
# Fallback 1: Auto-generated captions
try:
transcript = await self._extract_auto_captions(video_id, language_preference)
await self.cache_client.set(cache_key, transcript, ttl=86400)
return TranscriptResult(transcript=transcript, method="auto_captions", success=True)
except CaptionsNotAvailableError:
pass
# Fallback 2: Audio transcription with Whisper
try:
transcript = await self._transcribe_audio(video_id, language_preference)
await self.cache_client.set(cache_key, transcript, ttl=86400)
return TranscriptResult(transcript=transcript, method="whisper_audio", success=True)
except AudioTranscriptionError as e:
return TranscriptResult(
transcript=None,
method="failed",
success=False,
error=TranscriptExtractionError(
message="Unable to extract transcript from video",
error_code=ErrorCode.TRANSCRIPT_UNAVAILABLE,
details={
"video_id": video_id,
"attempted_methods": ["youtube_api", "auto_captions", "whisper_audio"],
"last_error": str(e),
"suggestions": [
"Try a different video with captions available",
"Check if video is public and accessible",
"Contact support if this video should have transcripts"
]
}
)
)
```
### Transcript Processing Requirements
[Source: docs/architecture.md#data-processing]
**Transcript Cleaning and Formatting**:
```python
class TranscriptProcessor:
def clean_transcript(self, raw_transcript: List[Dict]) -> str:
"""Clean and format raw transcript data"""
# Remove duplicate segments
# Fix common OCR/speech recognition errors
# Standardize punctuation and formatting
# Preserve meaningful timestamps
def chunk_transcript(self, transcript: str, max_tokens: int = 3000) -> List[TranscriptChunk]:
"""Split transcript into manageable chunks for AI processing"""
# Split on sentence boundaries
# Preserve context across chunks
# Include timestamp ranges for each chunk
# Ensure chunks don't exceed token limits
def extract_metadata(self, transcript: str) -> TranscriptMetadata:
"""Extract useful metadata from transcript"""
return TranscriptMetadata(
word_count=len(transcript.split()),
estimated_reading_time=self.calculate_reading_time(transcript),
language_detected=self.detect_language(transcript),
topics=self.extract_topics(transcript),
speakers_detected=self.detect_speakers(transcript)
)
```
### Error Handling Requirements
[Source: docs/architecture.md#error-handling]
**Transcript-Specific Exceptions**:
```python
class TranscriptExtractionError(BaseAPIException):
"""Base exception for transcript extraction failures"""
pass
class TranscriptNotAvailableError(TranscriptExtractionError):
"""No transcript available through any method"""
def __init__(self, video_id: str, attempted_methods: List[str]):
super().__init__(
message=f"No transcript available for video {video_id}",
error_code=ErrorCode.TRANSCRIPT_UNAVAILABLE,
status_code=status.HTTP_404_NOT_FOUND,
details={
"video_id": video_id,
"attempted_methods": attempted_methods,
"recovery_suggestions": [
"Check if video has captions enabled",
"Try a different video",
"Contact video owner to enable captions"
]
}
)
class RateLimitExceededError(TranscriptExtractionError):
"""API rate limit exceeded for transcript service"""
pass
class AudioTranscriptionError(TranscriptExtractionError):
"""Audio transcription failed"""
pass
```
### API Endpoint Specification
[Source: docs/architecture.md#api-specification]
**Request/Response Models**:
```python
class TranscriptRequest(BaseModel):
video_id: str = Field(..., description="YouTube video ID")
language_preference: str = Field("en", description="Preferred transcript language")
include_metadata: bool = Field(True, description="Include transcript metadata")
class TranscriptResponse(BaseModel):
video_id: str
transcript: Optional[str] = None
metadata: Optional[TranscriptMetadata] = None
extraction_method: str # "youtube_api", "auto_captions", "whisper_audio"
language: str
word_count: int
cached: bool
processing_time_seconds: float
error: Optional[Dict[str, Any]] = None
```
**Endpoint Implementation**:
```python
@router.get("/transcripts/{video_id}", response_model=TranscriptResponse)
async def get_transcript(
video_id: str,
language_preference: str = "en",
include_metadata: bool = True,
transcript_service: TranscriptService = Depends()
):
start_time = time.time()
try:
result = await transcript_service.extract_transcript(video_id, language_preference)
response_data = {
"video_id": video_id,
"transcript": result.transcript,
"extraction_method": result.method,
"language": result.language,
"word_count": len(result.transcript.split()) if result.transcript else 0,
"cached": result.from_cache,
"processing_time_seconds": time.time() - start_time
}
if include_metadata and result.transcript:
response_data["metadata"] = transcript_service.extract_metadata(result.transcript)
return TranscriptResponse(**response_data)
except TranscriptExtractionError as e:
return TranscriptResponse(
video_id=video_id,
extraction_method="failed",
language=language_preference,
word_count=0,
cached=False,
processing_time_seconds=time.time() - start_time,
error={
"code": e.error_code,
"message": e.message,
"details": e.details
}
)
```
### Background Job Implementation
[Source: docs/architecture.md#background-processing]
**Async Transcript Extraction**:
```python
@router.post("/transcripts/extract", response_model=JobResponse)
async def extract_transcript_async(
request: TranscriptRequest,
background_tasks: BackgroundTasks,
transcript_service: TranscriptService = Depends()
):
job_id = str(uuid.uuid4())
# Start background extraction
background_tasks.add_task(
extract_transcript_job,
job_id=job_id,
video_id=request.video_id,
language_preference=request.language_preference,
transcript_service=transcript_service
)
return JobResponse(
job_id=job_id,
status="processing",
message="Transcript extraction started"
)
@router.get("/transcripts/jobs/{job_id}", response_model=JobStatusResponse)
async def get_extraction_status(job_id: str):
# Check job status in cache/database
# Return progress updates via WebSocket if available
pass
```
### Cache Strategy Implementation
[Source: docs/architecture.md#caching-strategy]
**Multi-Level Caching**:
```python
class TranscriptCacheManager:
def __init__(self, redis_client: RedisClient, db_session: Session):
self.redis = redis_client
self.db = db_session
async def get_cached_transcript(self, video_id: str, language: str) -> Optional[str]:
# Level 1: Redis cache (fast, temporary)
cache_key = f"transcript:{video_id}:{language}"
cached = await self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Level 2: Database cache (persistent)
db_transcript = self.db.query(CachedTranscript).filter(
CachedTranscript.video_id == video_id,
CachedTranscript.language == language,
CachedTranscript.expires_at > datetime.utcnow()
).first()
if db_transcript:
# Warm Redis cache
await self.redis.setex(cache_key, 86400, db_transcript.content)
return db_transcript.content
return None
async def cache_transcript(self, video_id: str, language: str, transcript: str):
cache_key = f"transcript:{video_id}:{language}"
# Cache in Redis (24 hours)
await self.redis.setex(cache_key, 86400, transcript)
# Cache in database (7 days)
db_transcript = CachedTranscript(
video_id=video_id,
language=language,
content=transcript,
created_at=datetime.utcnow(),
expires_at=datetime.utcnow() + timedelta(days=7)
)
self.db.add(db_transcript)
self.db.commit()
```
### File Locations and Structure
[Source: docs/architecture.md#project-structure]
**Backend Files**:
- `backend/services/transcript_service.py` - Main transcript extraction service
- `backend/services/transcript_processor.py` - Transcript cleaning and processing
- `backend/services/cache_manager.py` - Multi-level caching implementation
- `backend/api/transcripts.py` - Transcript API endpoints
- `backend/core/exceptions.py` - Updated with transcript-specific exceptions
- `backend/models/transcript.py` - Transcript data models
- `backend/tests/unit/test_transcript_service.py` - Unit tests
- `backend/tests/integration/test_transcript_api.py` - Integration tests
### Testing Standards
#### Backend Unit Tests
[Source: docs/architecture.md#testing-strategy]
**Test File**: `backend/tests/unit/test_transcript_service.py`
```python
class TestTranscriptService:
def test_extract_transcript_success(self):
"""Test successful transcript extraction"""
service = TranscriptService(mock_cache, mock_whisper)
# Mock successful YouTube API response
with patch.object(service, '_extract_youtube_transcript') as mock_extract:
mock_extract.return_value = "Sample transcript content"
result = await service.extract_transcript("dQw4w9WgXcQ")
assert result.success == True
assert result.transcript == "Sample transcript content"
assert result.method == "youtube_api"
def test_fallback_chain(self):
"""Test fallback chain when primary method fails"""
service = TranscriptService(mock_cache, mock_whisper)
# Mock YouTube API failure, auto-captions success
with patch.object(service, '_extract_youtube_transcript') as mock_yt:
mock_yt.side_effect = TranscriptNotAvailableError()
with patch.object(service, '_extract_auto_captions') as mock_auto:
mock_auto.return_value = "Auto-generated transcript"
result = await service.extract_transcript("dQw4w9WgXcQ")
assert result.success == True
assert result.method == "auto_captions"
def test_cache_hit(self):
"""Test transcript retrieval from cache"""
cache = MockCache()
cache.set("transcript:dQw4w9WgXcQ:en", "Cached transcript")
service = TranscriptService(cache, mock_whisper)
result = await service.extract_transcript("dQw4w9WgXcQ")
assert result.from_cache == True
assert result.transcript == "Cached transcript"
```
#### Integration Tests
[Source: docs/architecture.md#testing-strategy]
**Test File**: `backend/tests/integration/test_transcript_api.py`
```python
class TestTranscriptAPI:
def test_get_transcript_endpoint(self):
"""Test transcript retrieval endpoint"""
response = client.get("/api/transcripts/dQw4w9WgXcQ")
assert response.status_code == 200
data = response.json()
assert "transcript" in data
assert "extraction_method" in data
assert "processing_time_seconds" in data
def test_async_extraction(self):
"""Test background transcript extraction"""
# Start async extraction
response = client.post("/api/transcripts/extract", json={
"video_id": "dQw4w9WgXcQ",
"language_preference": "en"
})
assert response.status_code == 200
job_data = response.json()
job_id = job_data["job_id"]
# Check job status
status_response = client.get(f"/api/transcripts/jobs/{job_id}")
assert status_response.status_code == 200
assert status_response.json()["status"] in ["processing", "completed"]
```
### Performance Optimization
- **Caching Strategy**: Multi-level caching reduces API calls by 90%+ for popular videos
- **Async Processing**: Non-blocking extraction prevents UI freezing
- **Smart Fallbacks**: Fastest methods tried first, expensive audio transcription last
- **Token Management**: Transcript chunking prevents AI model token limit issues
- **Rate Limiting**: Exponential backoff for API rate limit handling
### Security Considerations
- **API Key Management**: All external API keys stored securely in environment variables
- **Input Validation**: Video ID format validation before processing
- **Rate Limiting**: Per-IP limits to prevent abuse of transcript extraction
- **Content Filtering**: Optional content filtering for inappropriate transcripts
- **Cache Security**: Encrypted cache keys and secure Redis configuration
## Change Log
| Date | Version | Description | Author |
|------|---------|-------------|--------|
| 2025-01-25 | 1.0 | Initial story creation | Bob (Scrum Master) |
## Dev Agent Record
*This section will be populated by the development agent during implementation*
### Agent Model Used
*To be filled by dev agent*
### Debug Log References
*To be filled by dev agent*
### Completion Notes List
*To be filled by dev agent*
### File List
*To be filled by dev agent*
## QA Results
*Results from QA Agent review of the completed story implementation will be added here*