youtube-summarizer/backend/api/websocket_chat.py

159 lines
6.0 KiB
Python

"""
WebSocket endpoints for real-time chat functionality (Story 4.6).
"""
import logging
from typing import Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, Query
from backend.core.websocket_manager import websocket_manager
from backend.api.dependencies import get_current_user_ws
from backend.models.user import User
logger = logging.getLogger(__name__)
router = APIRouter()
@router.websocket("/ws/chat/{session_id}")
async def websocket_chat_endpoint(
websocket: WebSocket,
session_id: str,
user_id: Optional[str] = Query(None),
# current_user: Optional[User] = Depends(get_current_user_ws) # Optional auth for now
):
"""
WebSocket endpoint for real-time chat functionality.
Args:
websocket: WebSocket connection
session_id: Chat session ID for the video
user_id: Optional user ID for authenticated users
Message Types:
- connection_status: Connection established/lost
- message: New chat message from AI or user
- typing_start: User started typing
- typing_end: User stopped typing
- error: Error in chat processing
"""
try:
# Connect the WebSocket for chat
await websocket_manager.connect_chat(websocket, session_id, user_id)
# Send initial connection confirmation
await websocket_manager.send_chat_status(session_id, {
"status": "connected",
"message": "WebSocket connection established for chat",
"session_id": session_id,
"user_id": user_id
})
logger.info(f"Chat WebSocket connected: session={session_id}, user={user_id}")
# Keep connection alive and handle incoming messages
while True:
try:
# Wait for messages from the client
data = await websocket.receive_json()
message_type = data.get("type")
if message_type == "ping":
# Handle ping/pong for connection health
await websocket.send_json({"type": "pong"})
elif message_type == "typing_start":
# Handle typing indicator
await websocket_manager.send_typing_indicator(
session_id, user_id or "anonymous", True
)
elif message_type == "typing_end":
# Handle end typing indicator
await websocket_manager.send_typing_indicator(
session_id, user_id or "anonymous", False
)
elif message_type == "message":
# For now, just acknowledge the message
# The actual chat processing will be handled by the chat API endpoints
logger.info(f"Received message from user {user_id} in session {session_id}")
# Echo back message received confirmation
await websocket.send_json({
"type": "message_received",
"message_id": data.get("message_id"),
"timestamp": data.get("timestamp")
})
else:
logger.warning(f"Unknown message type: {message_type}")
except WebSocketDisconnect:
logger.info(f"Chat WebSocket disconnected: session={session_id}, user={user_id}")
break
except Exception as e:
logger.error(f"Error handling WebSocket message: {e}")
# Send error to client
await websocket_manager.send_chat_status(session_id, {
"status": "error",
"message": f"Error processing message: {str(e)}",
"error_type": "processing_error"
})
except WebSocketDisconnect:
logger.info(f"Chat WebSocket disconnected during setup: session={session_id}, user={user_id}")
except Exception as e:
logger.error(f"Error in chat WebSocket endpoint: {e}")
finally:
# Clean up the connection
websocket_manager.disconnect(websocket)
logger.info(f"Chat WebSocket cleanup completed: session={session_id}, user={user_id}")
@router.websocket("/ws/chat/{session_id}/status")
async def websocket_chat_status_endpoint(
websocket: WebSocket,
session_id: str
):
"""
WebSocket endpoint for monitoring chat session status.
Provides real-time updates about session health, connection counts, etc.
"""
try:
await websocket.accept()
while True:
try:
# Send periodic status updates
stats = websocket_manager.get_stats()
session_stats = {
"session_id": session_id,
"connections": stats.get("chat_connections", {}).get(session_id, 0),
"typing_users": stats.get("typing_sessions", {}).get(session_id, []),
"timestamp": logger.handlers[0].formatter.formatTime(logger.makeRecord(
"", 0, "", 0, "", (), None
), None) if logger.handlers else None
}
await websocket.send_json({
"type": "status_update",
"data": session_stats
})
# Wait 10 seconds before next update
import asyncio
await asyncio.sleep(10)
except WebSocketDisconnect:
break
except Exception as e:
logger.error(f"Error in status WebSocket: {e}")
break
except Exception as e:
logger.error(f"Error in chat status WebSocket endpoint: {e}")
finally:
try:
await websocket.close()
except:
pass