youtube-summarizer/docs/stories/2.3.caching-system-implemen...

34 KiB

Story 2.3: Caching System Implementation

Status

Draft

Story

As a user
I want the system to intelligently cache transcripts and summaries
so that I get faster responses and the system reduces API costs for repeated requests

Acceptance Criteria

  1. Multi-level caching system with memory (Redis) and persistent (database) layers
  2. Transcripts cached by video ID with 7-day TTL to handle video updates
  3. Summaries cached by content hash and configuration to serve identical requests instantly
  4. Cache warming for popular videos and intelligent prefetching for related content
  5. Cache invalidation strategy handles video updates, content changes, and storage limits
  6. System provides cache analytics and hit rate monitoring for optimization

Tasks / Subtasks

  • Task 1: Cache Architecture Design (AC: 1, 5)

    • Create CacheManager service in backend/services/cache_manager.py
    • Implement multi-tier caching strategy (L1: Redis, L2: Database, L3: File system)
    • Design cache key generation with collision avoidance
    • Create cache invalidation and cleanup mechanisms
  • Task 2: Transcript Caching (AC: 2, 5)

    • Implement transcript-specific cache with video ID keys
    • Add TTL management with configurable expiration policies
    • Create cache warming for trending and frequently accessed videos
    • Implement cache size monitoring and automatic cleanup
  • Task 3: Summary Caching (AC: 3, 5)

    • Create content-aware cache keys based on transcript hash and config
    • Implement summary result caching with metadata preservation
    • Add cache versioning for AI model and prompt changes
    • Create cache hit optimization for similar summary requests
  • Task 4: Intelligent Cache Warming (AC: 4)

    • Implement background cache warming for popular content
    • Add predictive caching based on user patterns and trending videos
    • Create related content prefetching using video metadata
    • Implement cache warming scheduling and resource management
  • Task 5: Cache Analytics and Monitoring (AC: 6)

    • Create cache performance metrics collection system
    • Implement hit rate monitoring and reporting dashboard
    • Add cache usage analytics and cost savings tracking
    • Create alerting for cache performance degradation
  • Task 6: Integration with Existing Services (AC: 1, 2, 3)

    • Update TranscriptService to use cache-first strategy
    • Modify SummaryPipeline to leverage cached results
    • Add cache layer to API endpoints with appropriate headers
    • Implement cache bypass options for development and testing
  • Task 7: Performance and Reliability (AC: 1, 5, 6)

    • Add cache failover mechanisms for Redis unavailability
    • Implement cache consistency checks and repair mechanisms
    • Create cache performance benchmarking and optimization
    • Add comprehensive error handling and logging

Dev Notes

Architecture Context

This story implements a sophisticated caching system that significantly improves performance while reducing operational costs. The cache must be intelligent, reliable, and transparent to users while providing substantial performance benefits.

Multi-Level Cache Architecture

[Source: docs/architecture.md#caching-strategy]

# backend/services/cache_manager.py
import hashlib
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union
from enum import Enum
import asyncio
import redis
from sqlalchemy.orm import Session
from ..models.cache import CachedTranscript, CachedSummary, CacheAnalytics
from ..core.database import get_db_session

class CacheLevel(Enum):
    L1_MEMORY = "l1_memory"      # Redis - fastest, volatile
    L2_DATABASE = "l2_database"  # PostgreSQL - persistent, structured  
    L3_FILESYSTEM = "l3_filesystem"  # File system - cheapest, slowest

class CachePolicy(Enum):
    WRITE_THROUGH = "write_through"    # Write to all levels immediately
    WRITE_BACK = "write_back"          # Write to fast cache first, sync later
    WRITE_AROUND = "write_around"      # Skip cache on write, read from storage

@dataclass
class CacheConfig:
    transcript_ttl_hours: int = 168  # 7 days
    summary_ttl_hours: int = 72      # 3 days
    memory_max_size_mb: int = 512    # Redis memory limit
    warming_batch_size: int = 50     # Videos per warming batch
    cleanup_interval_hours: int = 6  # Cleanup frequency
    hit_rate_alert_threshold: float = 0.7  # Alert if hit rate drops below

@dataclass 
class CacheMetrics:
    hits: int = 0
    misses: int = 0
    write_operations: int = 0
    evictions: int = 0
    errors: int = 0
    total_size_bytes: int = 0
    average_response_time_ms: float = 0.0
    
    @property
    def hit_rate(self) -> float:
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0.0

class CacheManager:
    """Multi-level intelligent caching system"""
    
    def __init__(
        self, 
        redis_client: redis.Redis,
        config: CacheConfig = None
    ):
        self.redis = redis_client
        self.config = config or CacheConfig()
        self.metrics = CacheMetrics()
        
        # Cache key prefixes
        self.TRANSCRIPT_PREFIX = "transcript:"
        self.SUMMARY_PREFIX = "summary:"
        self.METADATA_PREFIX = "meta:"
        self.ANALYTICS_PREFIX = "analytics:"
        
        # Background tasks
        self._cleanup_task = None
        self._warming_task = None
        
    async def start_background_tasks(self):
        """Start background cache management tasks"""
        
        self._cleanup_task = asyncio.create_task(self._periodic_cleanup())
        self._warming_task = asyncio.create_task(self._cache_warming_scheduler())
    
    async def stop_background_tasks(self):
        """Stop background tasks gracefully"""
        
        if self._cleanup_task:
            self._cleanup_task.cancel()
        if self._warming_task:
            self._warming_task.cancel()
    
    # Transcript Caching Methods
    
    async def get_cached_transcript(
        self, 
        video_id: str, 
        language: str = "en"
    ) -> Optional[Dict[str, Any]]:
        """Retrieve cached transcript with multi-level fallback"""
        
        cache_key = self._generate_transcript_key(video_id, language)
        start_time = time.time()
        
        try:
            # L1: Try Redis first (fastest)
            cached_data = await self._get_from_redis(cache_key)
            if cached_data:
                self._record_cache_hit("transcript", "l1_memory", start_time)
                return cached_data
            
            # L2: Try database (persistent)
            cached_data = await self._get_transcript_from_database(video_id, language)
            if cached_data:
                # Warm Redis cache for next time
                await self._set_in_redis(cache_key, cached_data, self.config.transcript_ttl_hours * 3600)
                self._record_cache_hit("transcript", "l2_database", start_time)
                return cached_data
            
            # L3: Could implement file system cache here
            
            self._record_cache_miss("transcript", start_time)
            return None
            
        except Exception as e:
            self.metrics.errors += 1
            print(f"Cache retrieval error: {e}")
            return None
    
    async def cache_transcript(
        self, 
        video_id: str, 
        language: str, 
        transcript_data: Dict[str, Any],
        policy: CachePolicy = CachePolicy.WRITE_THROUGH
    ) -> bool:
        """Cache transcript with specified write policy"""
        
        cache_key = self._generate_transcript_key(video_id, language)
        start_time = time.time()
        
        try:
            success = True
            
            if policy == CachePolicy.WRITE_THROUGH:
                # Write to all cache levels
                success &= await self._set_in_redis(
                    cache_key, 
                    transcript_data, 
                    self.config.transcript_ttl_hours * 3600
                )
                success &= await self._set_transcript_in_database(video_id, language, transcript_data)
                
            elif policy == CachePolicy.WRITE_BACK:
                # Write to Redis immediately, database later
                success = await self._set_in_redis(
                    cache_key, 
                    transcript_data, 
                    self.config.transcript_ttl_hours * 3600
                )
                asyncio.create_task(self._set_transcript_in_database(video_id, language, transcript_data))
            
            self.metrics.write_operations += 1
            self._record_cache_operation("transcript_write", start_time)
            
            return success
            
        except Exception as e:
            self.metrics.errors += 1
            print(f"Cache write error: {e}")
            return False
    
    # Summary Caching Methods
    
    async def get_cached_summary(
        self, 
        transcript_hash: str, 
        config_hash: str
    ) -> Optional[Dict[str, Any]]:
        """Retrieve cached summary by content and configuration hash"""
        
        cache_key = self._generate_summary_key(transcript_hash, config_hash)
        start_time = time.time()
        
        try:
            # L1: Try Redis first
            cached_data = await self._get_from_redis(cache_key)
            if cached_data:
                # Check if summary is still valid (AI model version, prompt changes)
                if self._is_summary_valid(cached_data):
                    self._record_cache_hit("summary", "l1_memory", start_time)
                    return cached_data
                else:
                    # Invalid summary, remove from cache
                    await self._delete_from_redis(cache_key)
            
            # L2: Try database
            cached_data = await self._get_summary_from_database(transcript_hash, config_hash)
            if cached_data and self._is_summary_valid(cached_data):
                # Warm Redis cache
                await self._set_in_redis(cache_key, cached_data, self.config.summary_ttl_hours * 3600)
                self._record_cache_hit("summary", "l2_database", start_time)
                return cached_data
            
            self._record_cache_miss("summary", start_time)
            return None
            
        except Exception as e:
            self.metrics.errors += 1
            return None
    
    async def cache_summary(
        self,
        transcript_hash: str,
        config_hash: str,
        summary_data: Dict[str, Any]
    ) -> bool:
        """Cache summary result with metadata"""
        
        cache_key = self._generate_summary_key(transcript_hash, config_hash)
        
        # Add versioning and timestamp metadata
        enhanced_data = {
            **summary_data,
            "_cache_metadata": {
                "cached_at": datetime.utcnow().isoformat(),
                "ai_model_version": "gpt-4o-mini-2024",  # Track model version
                "prompt_version": "v1.0",                # Track prompt version
                "cache_version": "1.0"
            }
        }
        
        try:
            # Write through to both levels
            success = await self._set_in_redis(
                cache_key, 
                enhanced_data, 
                self.config.summary_ttl_hours * 3600
            )
            success &= await self._set_summary_in_database(transcript_hash, config_hash, enhanced_data)
            
            self.metrics.write_operations += 1
            return success
            
        except Exception as e:
            self.metrics.errors += 1
            return False
    
    # Cache Key Generation
    
    def _generate_transcript_key(self, video_id: str, language: str) -> str:
        """Generate unique cache key for transcript"""
        return f"{self.TRANSCRIPT_PREFIX}{video_id}:{language}"
    
    def _generate_summary_key(self, transcript_hash: str, config_hash: str) -> str:
        """Generate unique cache key for summary"""
        return f"{self.SUMMARY_PREFIX}{transcript_hash}:{config_hash}"
    
    def generate_content_hash(self, content: str) -> str:
        """Generate stable hash for content"""
        return hashlib.sha256(content.encode('utf-8')).hexdigest()[:16]
    
    def generate_config_hash(self, config: Dict[str, Any]) -> str:
        """Generate stable hash for configuration"""
        # Sort keys for consistent hashing
        config_str = json.dumps(config, sort_keys=True)
        return hashlib.sha256(config_str.encode('utf-8')).hexdigest()[:16]
    
    # Redis Operations
    
    async def _get_from_redis(self, key: str) -> Optional[Dict[str, Any]]:
        """Get data from Redis with error handling"""
        try:
            data = await self.redis.get(key)
            if data:
                return json.loads(data)
            return None
        except Exception as e:
            print(f"Redis get error: {e}")
            return None
    
    async def _set_in_redis(self, key: str, data: Dict[str, Any], ttl_seconds: int) -> bool:
        """Set data in Redis with TTL"""
        try:
            serialized = json.dumps(data)
            await self.redis.setex(key, ttl_seconds, serialized)
            return True
        except Exception as e:
            print(f"Redis set error: {e}")
            return False
    
    async def _delete_from_redis(self, key: str) -> bool:
        """Delete key from Redis"""
        try:
            await self.redis.delete(key)
            return True
        except Exception as e:
            print(f"Redis delete error: {e}")
            return False
    
    # Database Operations
    
    async def _get_transcript_from_database(
        self, 
        video_id: str, 
        language: str
    ) -> Optional[Dict[str, Any]]:
        """Retrieve transcript from database cache"""
        
        with get_db_session() as session:
            cached = session.query(CachedTranscript).filter(
                CachedTranscript.video_id == video_id,
                CachedTranscript.language == language,
                CachedTranscript.expires_at > datetime.utcnow()
            ).first()
            
            if cached:
                return {
                    "transcript": cached.content,
                    "metadata": cached.metadata,
                    "extraction_method": cached.extraction_method,
                    "cached_at": cached.created_at.isoformat()
                }
            
            return None
    
    async def _set_transcript_in_database(
        self, 
        video_id: str, 
        language: str, 
        data: Dict[str, Any]
    ) -> bool:
        """Store transcript in database cache"""
        
        try:
            with get_db_session() as session:
                # Remove existing cache entry
                session.query(CachedTranscript).filter(
                    CachedTranscript.video_id == video_id,
                    CachedTranscript.language == language
                ).delete()
                
                # Create new cache entry
                cached = CachedTranscript(
                    video_id=video_id,
                    language=language,
                    content=data.get("transcript", ""),
                    metadata=data.get("metadata", {}),
                    extraction_method=data.get("extraction_method", "unknown"),
                    created_at=datetime.utcnow(),
                    expires_at=datetime.utcnow() + timedelta(hours=self.config.transcript_ttl_hours)
                )
                
                session.add(cached)
                session.commit()
                
                return True
                
        except Exception as e:
            print(f"Database cache write error: {e}")
            return False
    
    async def _get_summary_from_database(
        self, 
        transcript_hash: str, 
        config_hash: str
    ) -> Optional[Dict[str, Any]]:
        """Retrieve summary from database cache"""
        
        with get_db_session() as session:
            cached = session.query(CachedSummary).filter(
                CachedSummary.transcript_hash == transcript_hash,
                CachedSummary.config_hash == config_hash,
                CachedSummary.expires_at > datetime.utcnow()
            ).first()
            
            if cached:
                return {
                    "summary": cached.summary,
                    "key_points": cached.key_points,
                    "main_themes": cached.main_themes,
                    "actionable_insights": cached.actionable_insights,
                    "confidence_score": cached.confidence_score,
                    "processing_metadata": cached.processing_metadata,
                    "cost_data": cached.cost_data,
                    "_cache_metadata": cached.cache_metadata
                }
            
            return None
    
    async def _set_summary_in_database(
        self, 
        transcript_hash: str, 
        config_hash: str, 
        data: Dict[str, Any]
    ) -> bool:
        """Store summary in database cache"""
        
        try:
            with get_db_session() as session:
                # Remove existing cache entry
                session.query(CachedSummary).filter(
                    CachedSummary.transcript_hash == transcript_hash,
                    CachedSummary.config_hash == config_hash
                ).delete()
                
                # Create new cache entry
                cached = CachedSummary(
                    transcript_hash=transcript_hash,
                    config_hash=config_hash,
                    summary=data.get("summary", ""),
                    key_points=data.get("key_points", []),
                    main_themes=data.get("main_themes", []),
                    actionable_insights=data.get("actionable_insights", []),
                    confidence_score=data.get("confidence_score", 0.0),
                    processing_metadata=data.get("processing_metadata", {}),
                    cost_data=data.get("cost_data", {}),
                    cache_metadata=data.get("_cache_metadata", {}),
                    created_at=datetime.utcnow(),
                    expires_at=datetime.utcnow() + timedelta(hours=self.config.summary_ttl_hours)
                )
                
                session.add(cached)
                session.commit()
                
                return True
                
        except Exception as e:
            print(f"Database summary cache error: {e}")
            return False
    
    # Cache Validation and Cleanup
    
    def _is_summary_valid(self, cached_data: Dict[str, Any]) -> bool:
        """Check if cached summary is still valid"""
        
        metadata = cached_data.get("_cache_metadata", {})
        
        # Check AI model version
        cached_model = metadata.get("ai_model_version", "unknown")
        current_model = "gpt-4o-mini-2024"  # Would come from config
        
        if cached_model != current_model:
            return False
        
        # Check prompt version
        cached_prompt = metadata.get("prompt_version", "unknown")
        current_prompt = "v1.0"  # Would come from config
        
        if cached_prompt != current_prompt:
            return False
        
        # Check age (additional validation beyond TTL)
        cached_at = metadata.get("cached_at")
        if cached_at:
            cached_time = datetime.fromisoformat(cached_at)
            age_hours = (datetime.utcnow() - cached_time).total_seconds() / 3600
            
            if age_hours > self.config.summary_ttl_hours:
                return False
        
        return True
    
    async def _periodic_cleanup(self):
        """Background task for cache cleanup and maintenance"""
        
        while True:
            try:
                await asyncio.sleep(self.config.cleanup_interval_hours * 3600)
                
                # Clean expired entries from database
                await self._cleanup_expired_cache()
                
                # Clean up Redis memory if needed
                await self._cleanup_redis_memory()
                
                # Update cache analytics
                await self._update_cache_analytics()
                
            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"Cache cleanup error: {e}")
    
    async def _cleanup_expired_cache(self):
        """Remove expired entries from database"""
        
        with get_db_session() as session:
            now = datetime.utcnow()
            
            # Clean expired transcripts
            deleted_transcripts = session.query(CachedTranscript).filter(
                CachedTranscript.expires_at < now
            ).delete()
            
            # Clean expired summaries
            deleted_summaries = session.query(CachedSummary).filter(
                CachedSummary.expires_at < now
            ).delete()
            
            session.commit()
            
            print(f"Cleaned up {deleted_transcripts} transcripts and {deleted_summaries} summaries")
    
    async def _cleanup_redis_memory(self):
        """Clean up Redis memory if approaching limits"""
        
        try:
            memory_info = await self.redis.info('memory')
            used_memory_mb = memory_info.get('used_memory', 0) / (1024 * 1024)
            
            if used_memory_mb > self.config.memory_max_size_mb * 0.8:  # 80% threshold
                # Remove least recently used keys
                await self.redis.config_set('maxmemory-policy', 'allkeys-lru')
                print(f"Redis memory cleanup triggered: {used_memory_mb:.1f}MB used")
        except Exception as e:
            print(f"Redis memory cleanup error: {e}")
    
    # Cache Analytics and Monitoring
    
    def _record_cache_hit(self, cache_type: str, level: str, start_time: float):
        """Record cache hit metrics"""
        self.metrics.hits += 1
        response_time = (time.time() - start_time) * 1000
        self._update_average_response_time(response_time)
    
    def _record_cache_miss(self, cache_type: str, start_time: float):
        """Record cache miss metrics"""
        self.metrics.misses += 1
        response_time = (time.time() - start_time) * 1000
        self._update_average_response_time(response_time)
    
    def _record_cache_operation(self, operation_type: str, start_time: float):
        """Record cache operation metrics"""
        response_time = (time.time() - start_time) * 1000
        self._update_average_response_time(response_time)
    
    def _update_average_response_time(self, response_time: float):
        """Update rolling average response time"""
        total_ops = self.metrics.hits + self.metrics.misses + self.metrics.write_operations
        if total_ops > 1:
            self.metrics.average_response_time_ms = (
                (self.metrics.average_response_time_ms * (total_ops - 1) + response_time) / total_ops
            )
        else:
            self.metrics.average_response_time_ms = response_time
    
    async def get_cache_analytics(self) -> Dict[str, Any]:
        """Get comprehensive cache analytics"""
        
        # Get Redis memory info
        redis_info = {}
        try:
            memory_info = await self.redis.info('memory')
            redis_info = {
                "used_memory_mb": memory_info.get('used_memory', 0) / (1024 * 1024),
                "max_memory_mb": self.config.memory_max_size_mb,
                "memory_usage_percent": (memory_info.get('used_memory', 0) / (1024 * 1024)) / self.config.memory_max_size_mb * 100
            }
        except Exception as e:
            redis_info = {"error": str(e)}
        
        # Get database cache counts
        db_info = {}
        try:
            with get_db_session() as session:
                transcript_count = session.query(CachedTranscript).count()
                summary_count = session.query(CachedSummary).count()
                
                db_info = {
                    "cached_transcripts": transcript_count,
                    "cached_summaries": summary_count,
                    "total_cached_items": transcript_count + summary_count
                }
        except Exception as e:
            db_info = {"error": str(e)}
        
        return {
            "performance_metrics": {
                "hit_rate": self.metrics.hit_rate,
                "total_hits": self.metrics.hits,
                "total_misses": self.metrics.misses,
                "total_writes": self.metrics.write_operations,
                "total_errors": self.metrics.errors,
                "average_response_time_ms": self.metrics.average_response_time_ms
            },
            "memory_usage": redis_info,
            "storage_usage": db_info,
            "configuration": {
                "transcript_ttl_hours": self.config.transcript_ttl_hours,
                "summary_ttl_hours": self.config.summary_ttl_hours,
                "memory_max_size_mb": self.config.memory_max_size_mb
            }
        }
    
    async def _cache_warming_scheduler(self):
        """Background task for intelligent cache warming"""
        
        while True:
            try:
                await asyncio.sleep(3600)  # Run hourly
                
                # Get popular videos for warming
                popular_videos = await self._get_popular_videos()
                
                for video_batch in self._batch_videos(popular_videos, self.config.warming_batch_size):
                    await self._warm_video_batch(video_batch)
                    await asyncio.sleep(5)  # Rate limiting
                
            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"Cache warming error: {e}")
    
    async def _get_popular_videos(self) -> List[str]:
        """Get list of popular video IDs for cache warming"""
        # This would integrate with analytics or trending APIs
        # For now, return empty list
        return []
    
    def _batch_videos(self, videos: List[str], batch_size: int) -> List[List[str]]:
        """Split videos into batches for processing"""
        return [videos[i:i + batch_size] for i in range(0, len(videos), batch_size)]
    
    async def _warm_video_batch(self, video_ids: List[str]):
        """Warm cache for a batch of videos"""
        # Implementation would pre-fetch and cache popular videos
        pass

Database Models for Cache

[Source: docs/architecture.md#database-models]

# backend/models/cache.py
from sqlalchemy import Column, String, Text, DateTime, Float, Integer, JSON, Boolean
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime

Base = declarative_base()

class CachedTranscript(Base):
    __tablename__ = "cached_transcripts"
    
    id = Column(Integer, primary_key=True)
    video_id = Column(String(20), nullable=False, index=True)
    language = Column(String(10), nullable=False, default="en")
    
    # Content
    content = Column(Text, nullable=False)
    metadata = Column(JSON, default=dict)
    extraction_method = Column(String(50), nullable=False)
    
    # Cache management
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    expires_at = Column(DateTime, nullable=False, index=True)
    access_count = Column(Integer, default=1)
    last_accessed = Column(DateTime, default=datetime.utcnow)
    
    # Performance tracking
    size_bytes = Column(Integer, nullable=False, default=0)

class CachedSummary(Base):
    __tablename__ = "cached_summaries"
    
    id = Column(Integer, primary_key=True)
    transcript_hash = Column(String(32), nullable=False, index=True)
    config_hash = Column(String(32), nullable=False, index=True)
    
    # Summary content
    summary = Column(Text, nullable=False)
    key_points = Column(JSON, default=list)
    main_themes = Column(JSON, default=list)
    actionable_insights = Column(JSON, default=list)
    confidence_score = Column(Float, default=0.0)
    
    # Processing metadata
    processing_metadata = Column(JSON, default=dict)
    cost_data = Column(JSON, default=dict)
    cache_metadata = Column(JSON, default=dict)
    
    # Cache management
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    expires_at = Column(DateTime, nullable=False, index=True)
    access_count = Column(Integer, default=1)
    last_accessed = Column(DateTime, default=datetime.utcnow)
    
    # Performance tracking
    size_bytes = Column(Integer, nullable=False, default=0)

class CacheAnalytics(Base):
    __tablename__ = "cache_analytics"
    
    id = Column(Integer, primary_key=True)
    date = Column(DateTime, nullable=False, index=True)
    
    # Hit rate metrics
    transcript_hits = Column(Integer, default=0)
    transcript_misses = Column(Integer, default=0)
    summary_hits = Column(Integer, default=0)
    summary_misses = Column(Integer, default=0)
    
    # Performance metrics
    average_response_time_ms = Column(Float, default=0.0)
    total_cache_size_mb = Column(Float, default=0.0)
    
    # Cost savings
    estimated_api_cost_saved_usd = Column(Float, default=0.0)
    
    created_at = Column(DateTime, default=datetime.utcnow)

Integration with Existing Services

[Source: docs/architecture.md#service-integration]

# Update to transcript_service.py
class TranscriptService:
    def __init__(self, cache_manager: CacheManager):
        self.cache_manager = cache_manager
        # ... existing initialization
    
    async def extract_transcript(self, video_id: str, language: str = "en") -> TranscriptResult:
        """Extract transcript with cache-first strategy"""
        
        # Try cache first
        cached_transcript = await self.cache_manager.get_cached_transcript(video_id, language)
        if cached_transcript:
            return TranscriptResult(
                transcript=cached_transcript["transcript"],
                metadata=cached_transcript["metadata"],
                method=cached_transcript["extraction_method"],
                from_cache=True,
                cached_at=cached_transcript["cached_at"]
            )
        
        # Extract fresh transcript
        result = await self._extract_fresh_transcript(video_id, language)
        
        # Cache the result
        if result.success:
            await self.cache_manager.cache_transcript(
                video_id=video_id,
                language=language,
                transcript_data={
                    "transcript": result.transcript,
                    "metadata": result.metadata,
                    "extraction_method": result.method
                }
            )
        
        return result

# Update to summary_pipeline.py  
class SummaryPipeline:
    def __init__(self, cache_manager: CacheManager, ...):
        self.cache_manager = cache_manager
        # ... existing initialization
    
    async def _generate_optimized_summary(
        self, 
        transcript: str, 
        config: PipelineConfig, 
        analysis: Dict[str, Any]
    ) -> Any:
        """Generate summary with intelligent caching"""
        
        # Generate cache keys
        transcript_hash = self.cache_manager.generate_content_hash(transcript)
        config_dict = {
            "length": config.summary_length,
            "focus_areas": config.focus_areas,
            "model": "gpt-4o-mini-2024"  # Include model version
        }
        config_hash = self.cache_manager.generate_config_hash(config_dict)
        
        # Try cache first
        cached_summary = await self.cache_manager.get_cached_summary(transcript_hash, config_hash)
        if cached_summary:
            return SummaryResult(
                summary=cached_summary["summary"],
                key_points=cached_summary["key_points"],
                main_themes=cached_summary["main_themes"],
                actionable_insights=cached_summary["actionable_insights"],
                confidence_score=cached_summary["confidence_score"],
                processing_metadata={
                    **cached_summary["processing_metadata"],
                    "from_cache": True
                },
                cost_data={**cached_summary["cost_data"], "cache_savings": True}
            )
        
        # Generate fresh summary
        result = await self.ai_service.generate_summary(summary_request)
        
        # Cache the result
        await self.cache_manager.cache_summary(
            transcript_hash=transcript_hash,
            config_hash=config_hash,
            summary_data={
                "summary": result.summary,
                "key_points": result.key_points,
                "main_themes": result.main_themes,
                "actionable_insights": result.actionable_insights,
                "confidence_score": result.confidence_score,
                "processing_metadata": result.processing_metadata,
                "cost_data": result.cost_data
            }
        )
        
        return result

Performance Benefits

  • 95%+ Cache Hit Rate: Intelligent caching reduces repeated API calls dramatically
  • Sub-100ms Response Time: Redis caching provides near-instant responses for cached content
  • Cost Reduction: 80%+ savings on API costs for popular videos
  • Scalability: Multi-level cache handles growth from hobby to production scale
  • Reliability: Cache failover ensures service availability during outages

Change Log

Date Version Description Author
2025-01-25 1.0 Initial story creation Bob (Scrum Master)

Dev Agent Record

This section will be populated by the development agent during implementation

QA Results

Results from QA Agent review of the completed story implementation will be added here