youtube-summarizer/backend/services/cache_manager.py

263 lines
8.8 KiB
Python

"""Cache management service for pipeline results and intermediate data."""
import json
import hashlib
import sys
import os
from datetime import datetime, timedelta
from typing import Dict, Optional, Any
from dataclasses import asdict
# Add library path to import MemoryCache
lib_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../../lib'))
if lib_path not in sys.path:
sys.path.insert(0, lib_path)
try:
from ai_assistant_lib.utils.helpers.cache import MemoryCache
except ImportError:
# Fallback to basic dict if library not available
class MemoryCache:
def __init__(self, default_ttl=3600, max_size=10000):
self._cache = {}
self.default_ttl = default_ttl
async def get(self, key):
return self._cache.get(key)
async def set(self, key, value, ttl=None):
self._cache[key] = value
async def delete(self, key):
return self._cache.pop(key, None) is not None
async def clear(self):
self._cache.clear()
async def stats(self):
return {"size": len(self._cache), "hit_rate": 0.0, "miss_rate": 0.0}
class CacheManager:
"""Manages caching of pipeline results and intermediate data using AI Assistant Library."""
def __init__(self, default_ttl: int = 3600):
"""Initialize cache manager.
Args:
default_ttl: Default time-to-live for cache entries in seconds
"""
self.default_ttl = default_ttl
self._cache = MemoryCache(default_ttl=default_ttl, max_size=10000)
def _generate_key(self, prefix: str, identifier: str) -> str:
"""Generate cache key with prefix."""
return f"{prefix}:{identifier}"
async def cache_pipeline_result(self, job_id: str, result: Any, ttl: Optional[int] = None) -> bool:
"""Cache pipeline result.
Args:
job_id: Pipeline job ID
result: Pipeline result object
ttl: Time-to-live in seconds (uses default if None)
Returns:
True if cached successfully
"""
try:
key = self._generate_key("pipeline_result", job_id)
# Convert result to dict if it's a dataclass
if hasattr(result, '__dataclass_fields__'):
result_data = asdict(result)
else:
result_data = result
# MemoryCache handles TTL and expiration automatically
await self._cache.set(key, result_data, ttl=ttl or self.default_ttl)
return True
except Exception as e:
print(f"Failed to cache pipeline result: {e}")
return False
async def get_cached_pipeline_result(self, job_id: str) -> Optional[Dict[str, Any]]:
"""Get cached pipeline result.
Args:
job_id: Pipeline job ID
Returns:
Cached result data or None if not found/expired
"""
key = self._generate_key("pipeline_result", job_id)
return await self._cache.get(key)
async def cache_transcript(self, video_id: str, transcript: str, metadata: Dict[str, Any] = None, ttl: Optional[int] = None) -> bool:
"""Cache transcript data.
Args:
video_id: YouTube video ID
transcript: Transcript text
metadata: Optional metadata
ttl: Time-to-live in seconds
Returns:
True if cached successfully
"""
try:
key = self._generate_key("transcript", video_id)
data = {
"transcript": transcript,
"metadata": metadata or {},
"video_id": video_id
}
await self._cache.set(key, data, ttl=ttl or self.default_ttl)
return True
except Exception as e:
print(f"Failed to cache transcript: {e}")
return False
async def get_cached_transcript(self, video_id: str) -> Optional[Dict[str, Any]]:
"""Get cached transcript.
Args:
video_id: YouTube video ID
Returns:
Cached transcript data or None if not found/expired
"""
key = self._generate_key("transcript", video_id)
return await self._cache.get(key)
async def cache_video_metadata(self, video_id: str, metadata: Dict[str, Any], ttl: Optional[int] = None) -> bool:
"""Cache video metadata.
Args:
video_id: YouTube video ID
metadata: Video metadata
ttl: Time-to-live in seconds
Returns:
True if cached successfully
"""
try:
key = self._generate_key("video_metadata", video_id)
await self._cache.set(key, metadata, ttl=ttl or self.default_ttl)
return True
except Exception as e:
print(f"Failed to cache video metadata: {e}")
return False
async def get_cached_video_metadata(self, video_id: str) -> Optional[Dict[str, Any]]:
"""Get cached video metadata.
Args:
video_id: YouTube video ID
Returns:
Cached metadata or None if not found/expired
"""
key = self._generate_key("video_metadata", video_id)
return await self._cache.get(key)
async def cache_summary(self, cache_key: str, summary_data: Dict[str, Any], ttl: Optional[int] = None) -> bool:
"""Cache summary data with custom key.
Args:
cache_key: Custom cache key (e.g., hash of transcript + config)
summary_data: Summary result data
ttl: Time-to-live in seconds
Returns:
True if cached successfully
"""
try:
key = self._generate_key("summary", cache_key)
await self._cache.set(key, summary_data, ttl=ttl or self.default_ttl)
return True
except Exception as e:
print(f"Failed to cache summary: {e}")
return False
async def get_cached_summary(self, cache_key: str) -> Optional[Dict[str, Any]]:
"""Get cached summary data.
Args:
cache_key: Custom cache key
Returns:
Cached summary data or None if not found/expired
"""
key = self._generate_key("summary", cache_key)
return await self._cache.get(key)
def generate_summary_cache_key(self, video_id: str, config: Dict[str, Any]) -> str:
"""Generate cache key for summary based on video ID and configuration.
Args:
video_id: YouTube video ID
config: Summary configuration
Returns:
Cache key string
"""
# Create deterministic key from video ID and config
config_str = json.dumps(config, sort_keys=True)
key_input = f"{video_id}:{config_str}"
return hashlib.sha256(key_input.encode()).hexdigest()[:16]
async def invalidate_video_cache(self, video_id: str) -> int:
"""Invalidate all cache entries for a video.
Args:
video_id: YouTube video ID
Returns:
Number of entries invalidated
"""
patterns = [
self._generate_key("transcript", video_id),
self._generate_key("video_metadata", video_id),
self._generate_key("pipeline_result", video_id)
]
removed_count = 0
for key in patterns:
if await self._cache.delete(key):
removed_count += 1
# For summary keys with video_id, we'd need to scan all keys
# This is a limitation of the current MemoryCache interface
# In production, we'd use pattern-based invalidation
return removed_count
async def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics.
Returns:
Cache statistics dictionary
"""
stats = await self._cache.stats()
return {
"total_entries": stats.get("size", 0),
"entries_by_type": {}, # Not available in current MemoryCache interface
"default_ttl_seconds": self.default_ttl,
"hit_rate": stats.get("hit_rate", 0.0),
"miss_rate": stats.get("miss_rate", 0.0)
}
async def clear_cache(self) -> int:
"""Clear all cache entries.
Returns:
Number of entries cleared
"""
stats = await self._cache.stats()
count = stats.get("size", 0)
await self._cache.clear()
return count