youtube-summarizer/backend/services/chroma_service.py

378 lines
14 KiB
Python

"""ChromaDB service for vector storage and similarity search."""
import asyncio
import logging
from typing import List, Dict, Any, Optional, Tuple
import uuid
import hashlib
import json
from datetime import datetime
import chromadb
from chromadb.config import Settings
from chromadb.utils import embedding_functions
from sentence_transformers import SentenceTransformer
import numpy as np
from backend.core.exceptions import ServiceError
logger = logging.getLogger(__name__)
class ChromaDBError(ServiceError):
"""ChromaDB specific errors."""
pass
class ChromaService:
"""Service for ChromaDB vector database operations."""
def __init__(
self,
persist_directory: str = "./data/chromadb",
embedding_model: str = "all-MiniLM-L6-v2",
collection_name: str = "youtube_transcripts"
):
"""Initialize ChromaDB service.
Args:
persist_directory: Directory for persistent storage
embedding_model: SentenceTransformers model name
collection_name: ChromaDB collection name
"""
self.persist_directory = persist_directory
self.embedding_model_name = f"sentence-transformers/{embedding_model}"
self.collection_name = collection_name
# Initialize components
self._client = None
self._collection = None
self._embedding_model = None
self._embedding_function = None
# Performance metrics
self.stats = {
'documents_added': 0,
'queries_executed': 0,
'total_embedding_time': 0.0,
'total_search_time': 0.0
}
async def initialize(self) -> None:
"""Initialize ChromaDB client and collection."""
try:
logger.info(f"Initializing ChromaDB with persist_directory: {self.persist_directory}")
# Initialize ChromaDB client with persistent storage
self._client = chromadb.PersistentClient(
path=self.persist_directory,
settings=Settings(
anonymized_telemetry=False,
allow_reset=True
)
)
# Initialize embedding function
self._embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=self.embedding_model_name
)
# Load embedding model for manual operations
self._embedding_model = SentenceTransformer(self.embedding_model_name)
# Get or create collection
try:
self._collection = self._client.get_collection(
name=self.collection_name,
embedding_function=self._embedding_function
)
logger.info(f"Loaded existing collection '{self.collection_name}' with {self._collection.count()} documents")
except Exception:
self._collection = self._client.create_collection(
name=self.collection_name,
embedding_function=self._embedding_function,
metadata={"description": "YouTube video transcript chunks for RAG"}
)
logger.info(f"Created new collection '{self.collection_name}'")
except Exception as e:
logger.error(f"Failed to initialize ChromaDB: {e}")
raise ChromaDBError(f"ChromaDB initialization failed: {e}")
async def add_document_chunks(
self,
video_id: str,
chunks: List[Dict[str, Any]]
) -> List[str]:
"""Add document chunks to ChromaDB.
Args:
video_id: YouTube video ID
chunks: List of chunk dictionaries with content and metadata
Returns:
List of ChromaDB document IDs
"""
if not self._collection:
await self.initialize()
try:
start_time = datetime.now()
# Prepare documents for ChromaDB
documents = []
metadatas = []
ids = []
for chunk in chunks:
# Generate unique ID for ChromaDB
chunk_id = str(uuid.uuid4())
ids.append(chunk_id)
# Document content
content = chunk.get('content', '')
documents.append(content)
# Metadata for filtering and context
metadata = {
'video_id': video_id,
'chunk_type': chunk.get('chunk_type', 'transcript'),
'chunk_index': chunk.get('chunk_index', 0),
'start_timestamp': chunk.get('start_timestamp'),
'end_timestamp': chunk.get('end_timestamp'),
'content_length': len(content),
'content_hash': hashlib.sha256(content.encode()).hexdigest(),
'created_at': datetime.now().isoformat(),
'embedding_model': self.embedding_model_name
}
# Add optional metadata
if 'keywords' in chunk:
metadata['keywords'] = json.dumps(chunk['keywords'])
if 'entities' in chunk:
metadata['entities'] = json.dumps(chunk['entities'])
metadatas.append(metadata)
# Add to ChromaDB collection
self._collection.add(
documents=documents,
metadatas=metadatas,
ids=ids
)
# Update statistics
processing_time = (datetime.now() - start_time).total_seconds()
self.stats['documents_added'] += len(documents)
self.stats['total_embedding_time'] += processing_time
logger.info(f"Added {len(documents)} chunks to ChromaDB in {processing_time:.3f}s")
return ids
except Exception as e:
logger.error(f"Failed to add documents to ChromaDB: {e}")
raise ChromaDBError(f"Failed to add documents: {e}")
async def search_similar(
self,
query: str,
video_id: Optional[str] = None,
chunk_types: Optional[List[str]] = None,
n_results: int = 5,
similarity_threshold: float = 0.0
) -> List[Dict[str, Any]]:
"""Search for similar content using vector similarity.
Args:
query: Search query text
video_id: Optional filter by video ID
chunk_types: Optional filter by chunk types
n_results: Number of results to return
similarity_threshold: Minimum similarity score
Returns:
List of search results with content, metadata, and scores
"""
if not self._collection:
await self.initialize()
try:
start_time = datetime.now()
# Build where clause for filtering
where = {}
if video_id:
where['video_id'] = video_id
if chunk_types:
where['chunk_type'] = {"$in": chunk_types}
# Perform similarity search
results = self._collection.query(
query_texts=[query],
n_results=n_results,
where=where if where else None,
include=['metadatas', 'documents', 'distances']
)
# Process and format results
formatted_results = []
if results['documents'] and results['documents'][0]:
for i, (doc, metadata, distance) in enumerate(zip(
results['documents'][0],
results['metadatas'][0],
results['distances'][0]
)):
# Convert distance to similarity score (ChromaDB uses L2 distance)
similarity_score = max(0.0, 1.0 - (distance / 2.0))
if similarity_score >= similarity_threshold:
result = {
'content': doc,
'metadata': metadata,
'similarity_score': similarity_score,
'distance': distance,
'rank': i + 1,
'video_id': metadata.get('video_id'),
'chunk_type': metadata.get('chunk_type'),
'start_timestamp': metadata.get('start_timestamp'),
'end_timestamp': metadata.get('end_timestamp'),
'chunk_index': metadata.get('chunk_index')
}
# Format timestamp for display
if result['start_timestamp'] is not None:
timestamp = result['start_timestamp']
hours = int(timestamp // 3600)
minutes = int((timestamp % 3600) // 60)
seconds = int(timestamp % 60)
result['timestamp_formatted'] = f"[{hours:02d}:{minutes:02d}:{seconds:02d}]"
result['youtube_link'] = f"https://youtube.com/watch?v={result['video_id']}&t={int(timestamp)}s"
formatted_results.append(result)
# Update statistics
search_time = (datetime.now() - start_time).total_seconds()
self.stats['queries_executed'] += 1
self.stats['total_search_time'] += search_time
logger.info(f"Search completed in {search_time:.3f}s, found {len(formatted_results)} results")
return formatted_results
except Exception as e:
logger.error(f"Search failed: {e}")
raise ChromaDBError(f"Search failed: {e}")
async def get_collection_stats(self) -> Dict[str, Any]:
"""Get collection statistics and health metrics."""
if not self._collection:
await self.initialize()
try:
count = self._collection.count()
return {
'collection_name': self.collection_name,
'total_documents': count,
'embedding_model': self.embedding_model_name,
'persist_directory': self.persist_directory,
**self.stats
}
except Exception as e:
logger.error(f"Failed to get collection stats: {e}")
return {'error': str(e)}
async def delete_video_chunks(self, video_id: str) -> int:
"""Delete all chunks for a specific video.
Args:
video_id: YouTube video ID
Returns:
Number of deleted documents
"""
if not self._collection:
await self.initialize()
try:
# Get documents to delete
results = self._collection.get(
where={'video_id': video_id},
include=['documents']
)
if results['ids']:
# Delete documents
self._collection.delete(ids=results['ids'])
deleted_count = len(results['ids'])
logger.info(f"Deleted {deleted_count} chunks for video {video_id}")
return deleted_count
return 0
except Exception as e:
logger.error(f"Failed to delete video chunks: {e}")
raise ChromaDBError(f"Failed to delete video chunks: {e}")
async def reset_collection(self) -> None:
"""Reset the collection (delete all documents)."""
if not self._client:
await self.initialize()
try:
# Delete and recreate collection
self._client.delete_collection(self.collection_name)
self._collection = self._client.create_collection(
name=self.collection_name,
embedding_function=self._embedding_function,
metadata={"description": "YouTube video transcript chunks for RAG"}
)
# Reset stats
self.stats = {
'documents_added': 0,
'queries_executed': 0,
'total_embedding_time': 0.0,
'total_search_time': 0.0
}
logger.info("ChromaDB collection reset successfully")
except Exception as e:
logger.error(f"Failed to reset collection: {e}")
raise ChromaDBError(f"Failed to reset collection: {e}")
async def health_check(self) -> Dict[str, Any]:
"""Perform health check on ChromaDB service."""
try:
if not self._collection:
await self.initialize()
# Test basic operations
count = self._collection.count()
# Test embedding generation
test_embedding = self._embedding_model.encode(["test query"])
return {
'status': 'healthy',
'collection_count': count,
'embedding_model': self.embedding_model_name,
'embedding_dimension': len(test_embedding[0]),
'persist_directory': self.persist_directory
}
except Exception as e:
logger.error(f"ChromaDB health check failed: {e}")
return {
'status': 'unhealthy',
'error': str(e)
}
def __del__(self):
"""Cleanup resources."""
if self._client:
try:
# ChromaDB client doesn't need explicit cleanup
pass
except:
pass