13 KiB
AGENTS.md - YouTube Summarizer Backend
This file provides guidance for AI agents working with the YouTube Summarizer backend implementation.
Agent Development Context
The backend has been implemented following Story-Driven Development patterns with comprehensive testing and production-ready patterns. Agents should understand the existing architecture and extend it following established conventions.
Current Implementation Status
✅ Completed Stories
- Story 1.1: Project Setup and Infrastructure - DONE
- Story 2.1: Single AI Model Integration (Anthropic) - DONE
- Story 2.2: Summary Generation Pipeline - DONE ⬅️ Just completed with full QA
🔄 Ready for Implementation
- Story 1.2: YouTube URL Validation and Parsing
- Story 1.3: Transcript Extraction Service
- Story 1.4: Basic Web Interface
- Story 2.3: Caching System Implementation
- Story 2.4: Multi-Model Support
- Story 2.5: Export Functionality
Architecture Principles for Agents
1. Service Layer Pattern
All business logic lives in the services/ directory with clear interfaces:
# Follow this pattern for new services
class VideoService:
async def extract_video_id(self, url: str) -> str: ...
async def get_video_metadata(self, video_id: str) -> Dict[str, Any]: ...
async def validate_url(self, url: str) -> bool: ...
2. Dependency Injection Pattern
Use FastAPI's dependency injection for loose coupling:
def get_video_service() -> VideoService:
return VideoService()
@router.post("/api/endpoint")
async def endpoint(service: VideoService = Depends(get_video_service)):
return await service.process()
3. Async-First Development
All I/O operations must be async to prevent blocking:
# Correct async pattern
async def process_video(self, url: str) -> Result:
metadata = await self.video_service.get_metadata(url)
transcript = await self.transcript_service.extract(url)
summary = await self.ai_service.summarize(transcript)
return Result(metadata=metadata, summary=summary)
4. Error Handling Standards
Use custom exceptions with proper HTTP status codes:
from backend.core.exceptions import ValidationError, AIServiceError
try:
result = await service.process(data)
except ValidationError as e:
raise HTTPException(status_code=400, detail=e.message)
except AIServiceError as e:
raise HTTPException(status_code=500, detail=e.message)
Implementation Patterns for Agents
Adding New API Endpoints
- Create the endpoint in appropriate API module:
# backend/api/videos.py
from fastapi import APIRouter, HTTPException, Depends
from ..services.video_service import VideoService
router = APIRouter(prefix="/api/videos", tags=["videos"])
@router.post("/validate")
async def validate_video_url(
request: ValidateVideoRequest,
service: VideoService = Depends(get_video_service)
):
try:
is_valid = await service.validate_url(request.url)
return {"valid": is_valid}
except ValidationError as e:
raise HTTPException(status_code=400, detail=e.message)
- Register router in main.py:
from backend.api.videos import router as videos_router
app.include_router(videos_router)
- Add comprehensive tests:
# tests/unit/test_video_service.py
@pytest.mark.asyncio
async def test_validate_url_success():
service = VideoService()
result = await service.validate_url("https://youtube.com/watch?v=abc123")
assert result is True
# tests/integration/test_videos_api.py
def test_validate_video_endpoint(client):
response = client.post("/api/videos/validate", json={"url": "https://youtube.com/watch?v=test"})
assert response.status_code == 200
assert response.json()["valid"] is True
Extending the Pipeline
When adding new pipeline stages, follow the established pattern:
# Add new stage to PipelineStage enum
class PipelineStage(Enum):
# ... existing stages ...
NEW_STAGE = "new_stage"
# Add stage processing to SummaryPipeline
async def _execute_pipeline(self, job_id: str, config: PipelineConfig):
# ... existing stages ...
# New stage
await self._update_progress(job_id, PipelineStage.NEW_STAGE, 85, "Processing new stage...")
new_result = await self._process_new_stage(result, config)
result.new_field = new_result
# Add progress percentage mapping
stage_percentages = {
# ... existing mappings ...
PipelineStage.NEW_STAGE: 85,
}
Database Integration Pattern
When adding database models, follow the repository pattern:
# backend/models/video.py
from sqlalchemy import Column, String, DateTime, Text
from .base import Base
class Video(Base):
__tablename__ = "videos"
id = Column(String, primary_key=True)
url = Column(String, nullable=False)
title = Column(String)
metadata = Column(Text) # JSON field
created_at = Column(DateTime, default=datetime.utcnow)
# backend/repositories/video_repository.py
class VideoRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def create_video(self, video: Video) -> Video:
self.session.add(video)
await self.session.commit()
return video
async def get_by_id(self, video_id: str) -> Optional[Video]:
result = await self.session.execute(
select(Video).where(Video.id == video_id)
)
return result.scalar_one_or_none()
Testing Guidelines for Agents
Unit Test Structure
# tests/unit/test_new_service.py
import pytest
from unittest.mock import Mock, AsyncMock
from backend.services.new_service import NewService
class TestNewService:
@pytest.fixture
def service(self):
return NewService()
@pytest.mark.asyncio
async def test_process_success(self, service):
# Arrange
input_data = "test_input"
expected_output = "expected_result"
# Act
result = await service.process(input_data)
# Assert
assert result == expected_output
@pytest.mark.asyncio
async def test_process_error_handling(self, service):
with pytest.raises(ServiceError):
await service.process("invalid_input")
Integration Test Structure
# tests/integration/test_new_api.py
from fastapi.testclient import TestClient
from unittest.mock import patch, AsyncMock
class TestNewAPI:
def test_endpoint_success(self, client):
with patch('backend.api.new.get_new_service') as mock_get_service:
mock_service = Mock()
mock_service.process = AsyncMock(return_value="result")
mock_get_service.return_value = mock_service
response = client.post("/api/new/process", json={"input": "test"})
assert response.status_code == 200
assert response.json() == {"result": "result"}
Code Quality Standards
Documentation Requirements
class NewService:
"""Service for handling new functionality.
This service integrates with external APIs and provides
processed results for the application.
"""
async def process(self, input_data: str) -> Dict[str, Any]:
"""Process input data and return structured results.
Args:
input_data: Raw input string to process
Returns:
Processed results dictionary
Raises:
ValidationError: If input_data is invalid
ProcessingError: If processing fails
"""
Type Hints and Validation
from typing import Dict, List, Optional, Union
from pydantic import BaseModel, Field
class ProcessRequest(BaseModel):
"""Request model for processing endpoint."""
input_data: str = Field(..., description="Data to process")
options: Optional[Dict[str, Any]] = Field(None, description="Processing options")
class Config:
schema_extra = {
"example": {
"input_data": "sample input",
"options": {"format": "json"}
}
}
Error Handling Patterns
from backend.core.exceptions import BaseAPIException, ErrorCode
class ProcessingError(BaseAPIException):
"""Raised when processing fails."""
def __init__(self, message: str, details: Optional[Dict] = None):
super().__init__(
message=message,
error_code=ErrorCode.PROCESSING_ERROR,
status_code=500,
details=details,
recoverable=True
)
Integration with Existing Services
Using the Pipeline Service
# Get pipeline instance
pipeline = get_summary_pipeline()
# Start processing
job_id = await pipeline.process_video(
video_url="https://youtube.com/watch?v=abc123",
config=PipelineConfig(summary_length="detailed")
)
# Monitor progress
result = await pipeline.get_pipeline_result(job_id)
print(f"Status: {result.status}")
Using the AI Service
from backend.services.anthropic_summarizer import AnthropicSummarizer
from backend.services.ai_service import SummaryRequest, SummaryLength
ai_service = AnthropicSummarizer(api_key=api_key)
summary_result = await ai_service.generate_summary(
SummaryRequest(
transcript="Video transcript text...",
length=SummaryLength.STANDARD,
focus_areas=["key insights", "actionable items"]
)
)
print(f"Summary: {summary_result.summary}")
print(f"Key Points: {summary_result.key_points}")
Using WebSocket Updates
from backend.core.websocket_manager import websocket_manager
# Send progress update
await websocket_manager.send_progress_update(job_id, {
"stage": "processing",
"percentage": 50,
"message": "Halfway complete"
})
# Send completion notification
await websocket_manager.send_completion_notification(job_id, {
"status": "completed",
"result": result_data
})
Performance Patterns
Caching Integration
from backend.services.cache_manager import CacheManager
cache = CacheManager()
# Cache expensive operations
cache_key = f"expensive_operation:{input_hash}"
cached_result = await cache.get_cached_result(cache_key)
if not cached_result:
result = await expensive_operation(input_data)
await cache.cache_result(cache_key, result, ttl=3600)
else:
result = cached_result
Background Processing
import asyncio
from fastapi import BackgroundTasks
async def long_running_task(task_id: str, data: Dict):
"""Background task for processing."""
try:
result = await process_data(data)
await store_result(task_id, result)
await notify_completion(task_id)
except Exception as e:
await store_error(task_id, str(e))
@router.post("/api/process-async")
async def start_processing(
request: ProcessRequest,
background_tasks: BackgroundTasks
):
task_id = str(uuid.uuid4())
background_tasks.add_task(long_running_task, task_id, request.dict())
return {"task_id": task_id, "status": "processing"}
Security Guidelines
Input Validation
from pydantic import BaseModel, validator
import re
class VideoUrlRequest(BaseModel):
url: str
@validator('url')
def validate_youtube_url(cls, v):
youtube_pattern = r'^https?://(www\.)?(youtube\.com|youtu\.be)/.+'
if not re.match(youtube_pattern, v):
raise ValueError('Must be a valid YouTube URL')
return v
API Key Management
import os
from fastapi import HTTPException
def get_api_key() -> str:
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise HTTPException(
status_code=500,
detail="API key not configured"
)
return api_key
Deployment Considerations
Environment Configuration
from pydantic import BaseSettings
class Settings(BaseSettings):
anthropic_api_key: str
database_url: str = "sqlite:///./data/app.db"
redis_url: Optional[str] = None
log_level: str = "INFO"
cors_origins: List[str] = ["http://localhost:3000"]
class Config:
env_file = ".env"
settings = Settings()
Health Checks
@router.get("/health")
async def health_check():
"""Health check endpoint for load balancers."""
checks = {
"database": await check_database_connection(),
"cache": await check_cache_connection(),
"ai_service": await check_ai_service(),
}
all_healthy = all(checks.values())
status_code = 200 if all_healthy else 503
return {"status": "healthy" if all_healthy else "unhealthy", "checks": checks}
Migration Patterns
When extending existing functionality, maintain backward compatibility:
# Version 1 API
@router.post("/api/summarize")
async def summarize_v1(request: SummarizeRequest):
# Legacy implementation
pass
# Version 2 API (new functionality)
@router.post("/api/v2/summarize")
async def summarize_v2(request: SummarizeRequestV2):
# Enhanced implementation
pass
This backend follows production-ready patterns and is designed for extensibility. Agents should maintain these standards when adding new functionality.