youtube-summarizer/backend/services/browser_notification_servic...

441 lines
16 KiB
Python

"""
Browser notification service for real-time user notifications (Task 14.4).
Provides browser push notifications via WebSocket for processing events.
"""
import asyncio
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
from ..core.websocket_manager import websocket_manager
logger = logging.getLogger(__name__)
class NotificationPriority(Enum):
"""Notification priority levels."""
LOW = "low"
NORMAL = "normal"
HIGH = "high"
URGENT = "urgent"
class NotificationType(Enum):
"""Types of browser notifications."""
PROCESSING_COMPLETE = "processing_complete"
PROCESSING_FAILED = "processing_failed"
TRANSCRIPT_READY = "transcript_ready"
SUMMARY_GENERATED = "summary_generated"
SYSTEM_MAINTENANCE = "system_maintenance"
QUOTA_WARNING = "quota_warning"
EXPORT_READY = "export_ready"
USER_ACTION_REQUIRED = "user_action_required"
@dataclass
class BrowserNotification:
"""Represents a browser notification."""
notification_id: str
user_id: Optional[str]
job_id: Optional[str]
type: NotificationType
priority: NotificationPriority
title: str
message: str
action_url: Optional[str] = None
action_text: Optional[str] = None
auto_dismiss_seconds: Optional[int] = None
created_at: datetime = None
expires_at: Optional[datetime] = None
metadata: Optional[Dict[str, Any]] = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.utcnow()
if self.auto_dismiss_seconds and self.expires_at is None:
self.expires_at = self.created_at + timedelta(seconds=self.auto_dismiss_seconds)
class BrowserNotificationService:
"""
Service for managing browser notifications via WebSocket.
Handles notification delivery, preferences, and user interaction.
"""
def __init__(self):
self.active_notifications: Dict[str, BrowserNotification] = {}
self.user_preferences: Dict[str, Dict[str, Any]] = {}
self.notification_history: List[BrowserNotification] = []
self.rate_limits: Dict[str, List[datetime]] = {}
self.blocked_notifications: Dict[str, List[NotificationType]] = {}
async def send_notification(
self,
notification: BrowserNotification,
target_users: Optional[List[str]] = None
) -> bool:
"""
Send a browser notification to specified users or all connected users.
Args:
notification: Notification to send
target_users: List of user IDs to send to (None for broadcast)
Returns:
True if notification was sent successfully
"""
try:
# Check rate limiting
if not self._check_rate_limit(notification.user_id or "system", notification.type):
logger.warning(f"Rate limit exceeded for notification type {notification.type}")
return False
# Check user preferences and blocking
if not self._should_send_notification(notification):
logger.debug(f"Notification blocked by preferences: {notification.notification_id}")
return False
# Store notification
self.active_notifications[notification.notification_id] = notification
self._add_to_history(notification)
# Prepare notification data for WebSocket
notification_data = {
"notification_id": notification.notification_id,
"type": notification.type.value,
"priority": notification.priority.value,
"title": notification.title,
"message": notification.message,
"action_url": notification.action_url,
"action_text": notification.action_text,
"auto_dismiss_seconds": notification.auto_dismiss_seconds,
"created_at": notification.created_at.isoformat(),
"expires_at": notification.expires_at.isoformat() if notification.expires_at else None,
"metadata": notification.metadata or {},
"job_id": notification.job_id,
"user_id": notification.user_id
}
# Send via WebSocket manager
if target_users:
# Send to specific users (would need user-specific connection tracking)
await websocket_manager.broadcast_system_message({
"type": "browser_notification",
"target_users": target_users,
"notification": notification_data
})
else:
# Broadcast to all connected users
await websocket_manager.broadcast_system_message({
"type": "browser_notification",
"notification": notification_data
})
logger.info(f"Sent browser notification: {notification.title} (ID: {notification.notification_id})")
# Schedule auto-dismiss if configured
if notification.auto_dismiss_seconds:
asyncio.create_task(
self._auto_dismiss_notification(
notification.notification_id,
notification.auto_dismiss_seconds
)
)
return True
except Exception as e:
logger.error(f"Failed to send browser notification: {e}")
return False
async def send_processing_complete_notification(
self,
job_id: str,
video_title: str,
user_id: Optional[str] = None,
summary_url: Optional[str] = None
) -> bool:
"""Send notification for completed video processing."""
notification = BrowserNotification(
notification_id=f"complete_{job_id}",
user_id=user_id,
job_id=job_id,
type=NotificationType.PROCESSING_COMPLETE,
priority=NotificationPriority.NORMAL,
title="Video Processing Complete! 🎉",
message=f'Successfully processed: "{video_title}"',
action_url=summary_url,
action_text="View Summary",
auto_dismiss_seconds=30,
metadata={
"video_title": video_title,
"completion_type": "success"
}
)
return await self.send_notification(notification)
async def send_processing_failed_notification(
self,
job_id: str,
video_title: str,
error_message: str,
user_id: Optional[str] = None,
retry_url: Optional[str] = None
) -> bool:
"""Send notification for failed video processing."""
notification = BrowserNotification(
notification_id=f"failed_{job_id}",
user_id=user_id,
job_id=job_id,
type=NotificationType.PROCESSING_FAILED,
priority=NotificationPriority.HIGH,
title="Video Processing Failed ❌",
message=f'Failed to process: "{video_title}". {error_message}',
action_url=retry_url,
action_text="Try Again",
auto_dismiss_seconds=60,
metadata={
"video_title": video_title,
"error_message": error_message,
"failure_type": "processing_error"
}
)
return await self.send_notification(notification)
async def send_transcript_ready_notification(
self,
job_id: str,
video_title: str,
user_id: Optional[str] = None,
transcript_url: Optional[str] = None
) -> bool:
"""Send notification when transcript becomes available."""
notification = BrowserNotification(
notification_id=f"transcript_{job_id}",
user_id=user_id,
job_id=job_id,
type=NotificationType.TRANSCRIPT_READY,
priority=NotificationPriority.LOW,
title="Transcript Ready 📝",
message=f'Transcript extracted for: "{video_title}"',
action_url=transcript_url,
action_text="View Transcript",
auto_dismiss_seconds=20,
metadata={
"video_title": video_title,
"stage": "transcript_complete"
}
)
return await self.send_notification(notification)
async def send_export_ready_notification(
self,
job_id: str,
export_format: str,
video_title: str,
download_url: str,
user_id: Optional[str] = None
) -> bool:
"""Send notification when export is ready for download."""
notification = BrowserNotification(
notification_id=f"export_{job_id}_{export_format}",
user_id=user_id,
job_id=job_id,
type=NotificationType.EXPORT_READY,
priority=NotificationPriority.NORMAL,
title=f"{export_format.upper()} Export Ready 📄",
message=f'Your {export_format} export is ready for "{video_title}"',
action_url=download_url,
action_text="Download",
auto_dismiss_seconds=120, # 2 minutes
metadata={
"video_title": video_title,
"export_format": export_format,
"download_url": download_url
}
)
return await self.send_notification(notification)
async def send_quota_warning_notification(
self,
remaining_quota: int,
quota_type: str,
user_id: Optional[str] = None
) -> bool:
"""Send notification about approaching quota limits."""
notification = BrowserNotification(
notification_id=f"quota_{quota_type}_{datetime.utcnow().strftime('%Y%m%d')}",
user_id=user_id,
job_id=None,
type=NotificationType.QUOTA_WARNING,
priority=NotificationPriority.HIGH,
title="Quota Warning ⚠️",
message=f"You have {remaining_quota} {quota_type} remaining today",
action_url="/settings/billing",
action_text="Upgrade Plan",
auto_dismiss_seconds=45,
metadata={
"remaining_quota": remaining_quota,
"quota_type": quota_type,
"warning_threshold": True
}
)
return await self.send_notification(notification)
async def dismiss_notification(self, notification_id: str) -> bool:
"""Dismiss a specific notification."""
if notification_id in self.active_notifications:
del self.active_notifications[notification_id]
# Send dismissal message via WebSocket
await websocket_manager.broadcast_system_message({
"type": "notification_dismissed",
"notification_id": notification_id
})
logger.debug(f"Dismissed notification: {notification_id}")
return True
return False
async def update_user_preferences(
self,
user_id: str,
preferences: Dict[str, Any]
) -> None:
"""Update notification preferences for a user."""
self.user_preferences[user_id] = preferences
# Extract blocked notification types
blocked_types = []
for notif_type, enabled in preferences.items():
if notif_type.startswith("enable_") and not enabled:
# Convert enable_processing_complete -> PROCESSING_COMPLETE
type_name = notif_type[7:].upper() # Remove "enable_"
try:
blocked_types.append(NotificationType(type_name))
except ValueError:
logger.warning(f"Unknown notification type in preferences: {type_name}")
self.blocked_notifications[user_id] = blocked_types
logger.info(f"Updated notification preferences for user {user_id}")
def get_active_notifications(self, user_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get list of active notifications for a user."""
notifications = []
for notif in self.active_notifications.values():
# Filter by user if specified
if user_id and notif.user_id and notif.user_id != user_id:
continue
# Check if notification has expired
if notif.expires_at and datetime.utcnow() > notif.expires_at:
continue
notifications.append({
"notification_id": notif.notification_id,
"type": notif.type.value,
"priority": notif.priority.value,
"title": notif.title,
"message": notif.message,
"action_url": notif.action_url,
"action_text": notif.action_text,
"created_at": notif.created_at.isoformat(),
"expires_at": notif.expires_at.isoformat() if notif.expires_at else None,
"metadata": notif.metadata
})
# Sort by priority and creation time
priority_order = {
NotificationPriority.URGENT: 0,
NotificationPriority.HIGH: 1,
NotificationPriority.NORMAL: 2,
NotificationPriority.LOW: 3
}
notifications.sort(
key=lambda n: (
priority_order.get(NotificationPriority(n["priority"]), 999),
n["created_at"]
)
)
return notifications
async def _auto_dismiss_notification(self, notification_id: str, delay: int) -> None:
"""Auto-dismiss notification after delay."""
await asyncio.sleep(delay)
await self.dismiss_notification(notification_id)
def _check_rate_limit(
self,
identifier: str,
notification_type: NotificationType,
limit: int = 10,
window_minutes: int = 5
) -> bool:
"""Check if rate limit allows sending notification."""
now = datetime.utcnow()
window_start = now - timedelta(minutes=window_minutes)
# Get rate limit key
rate_key = f"{identifier}_{notification_type.value}"
# Clean old entries
if rate_key in self.rate_limits:
self.rate_limits[rate_key] = [
timestamp for timestamp in self.rate_limits[rate_key]
if timestamp > window_start
]
else:
self.rate_limits[rate_key] = []
# Check limit
if len(self.rate_limits[rate_key]) >= limit:
return False
# Add current timestamp
self.rate_limits[rate_key].append(now)
return True
def _should_send_notification(self, notification: BrowserNotification) -> bool:
"""Check if notification should be sent based on user preferences."""
if not notification.user_id:
return True # System notifications always sent
# Check if notification type is blocked
blocked_types = self.blocked_notifications.get(notification.user_id, [])
return notification.type not in blocked_types
def _add_to_history(self, notification: BrowserNotification) -> None:
"""Add notification to history."""
self.notification_history.append(notification)
# Keep only last 1000 notifications in memory
if len(self.notification_history) > 1000:
self.notification_history = self.notification_history[-500:]
def get_notification_stats(self) -> Dict[str, Any]:
"""Get notification service statistics."""
return {
"active_notifications": len(self.active_notifications),
"total_sent": len(self.notification_history),
"users_with_preferences": len(self.user_preferences),
"blocked_users": len(self.blocked_notifications),
"rate_limited_identifiers": len(self.rate_limits)
}
# Global browser notification service instance
browser_notification_service = BrowserNotificationService()