21 KiB
Story 3.5: Real-time Updates
Story Overview
As a user
I want live progress updates during processing
So that I know the system is working and how long to wait
Status: ✅ COMPLETED (2025-08-27)
Epic: Epic 3 - Enhanced User Experience
Dependencies: Story 3.4 (Batch Processing) ✅ Complete
Actual Effort: 6 hours
Priority: High
Implementation Summary
Successfully implemented comprehensive real-time updates with WebSocket infrastructure, featuring automatic reconnection, message queuing, time estimation, and job cancellation. The implementation exceeds the original requirements with additional features like heartbeat monitoring and offline recovery.
Key Achievements
- ✅ Enhanced WebSocket manager with recovery and queuing
- ✅ Granular pipeline progress tracking with sub-tasks
- ✅ Real-time progress UI component with multiple views
- ✅ Time estimation based on historical data
- ✅ Job cancellation with immediate termination
- ✅ Connection recovery with message replay
- ✅ Heartbeat monitoring for connection health
Context
WebSocket infrastructure already exists from the batch processing implementation (Story 3.4). This story focuses on extending real-time updates to single video processing and improving the user experience with detailed progress information.
Acceptance Criteria ✅
-
WebSocket Connection Management ✅
- ✅ Automatic connection on process start
- ✅ Graceful reconnection on disconnect with exponential backoff
- ✅ Connection status indicator in UI
- ✅ Message queuing for offline recovery (enhanced feature)
-
Progress Stages Display ✅
- ✅ Clear visualization of processing stages:
- URL Validation (5%)
- Metadata Extraction (15%)
- Transcript Retrieval (35%)
- Content Analysis (50%)
- Summary Generation (75%)
- Quality Validation (90%)
- Complete (100%)
- ✅ Visual progress bar with stage labels
- ✅ Current stage highlighted with icons
- ✅ Clear visualization of processing stages:
-
Percentage Calculation ✅
- ✅ Accurate progress based on actual work done
- ✅ Sub-progress for long operations (e.g., chunk processing)
- ✅ Smooth progress transitions
- ✅ Never goes backwards
-
Time Estimation ✅
- ✅ Calculate based on similar video processing times
- ✅ Update dynamically as processing progresses
- ✅ Show elapsed time and estimated remaining
- ✅ Format as MM:SS for both elapsed and remaining
-
Cancel Operation ✅
- ✅ Cancel button available during processing
- ✅ Immediate response to cancellation
- ✅ Cleanup of partial results
- ✅ Clear feedback when cancelled
-
Connection Recovery ✅
- ✅ Auto-reconnect with exponential backoff
- ✅ Queue missed messages during disconnect
- ✅ Resume progress display after reconnect
- ✅ Show connection status to user
Technical Design
WebSocket Protocol Enhancement
Message Types
// Client -> Server
interface ClientMessage {
type: 'subscribe' | 'unsubscribe' | 'cancel' | 'ping';
job_id?: string;
timestamp: string;
}
// Server -> Client
interface ServerMessage {
type: 'progress' | 'stage_change' | 'complete' | 'error' | 'cancelled' | 'pong';
job_id: string;
data: ProgressData | StageData | ResultData | ErrorData;
timestamp: string;
}
interface ProgressData {
percentage: number;
stage: ProcessingStage;
message: string;
sub_progress?: {
current: number;
total: number;
description: string;
};
time_elapsed: number;
estimated_remaining?: number;
}
interface StageData {
previous_stage: ProcessingStage;
current_stage: ProcessingStage;
stage_progress: number;
stage_message: string;
}
Backend Enhancements
WebSocket Manager Updates
class WebSocketManager:
"""Enhanced WebSocket manager with connection tracking"""
def __init__(self):
self.connections: Dict[str, Set[WebSocket]] = {}
self.connection_metadata: Dict[str, ConnectionInfo] = {}
self.message_queue: Dict[str, List[Message]] = {}
async def connect(self, websocket: WebSocket, job_id: str):
await websocket.accept()
# Track connection
if job_id not in self.connections:
self.connections[job_id] = set()
self.connections[job_id].add(websocket)
# Send queued messages if any
if job_id in self.message_queue:
for message in self.message_queue[job_id]:
await websocket.send_json(message)
del self.message_queue[job_id]
# Send current status
await self.send_current_status(websocket, job_id)
async def broadcast_progress(
self,
job_id: str,
stage: PipelineStage,
percentage: float,
message: str,
details: Optional[Dict] = None
):
"""Broadcast progress to all connected clients"""
message_data = {
"type": "progress",
"job_id": job_id,
"data": {
"percentage": percentage,
"stage": stage.value,
"message": message,
"time_elapsed": self.get_elapsed_time(job_id),
"estimated_remaining": self.estimate_remaining_time(job_id, percentage)
},
"timestamp": datetime.utcnow().isoformat()
}
if details:
message_data["data"]["sub_progress"] = details
# Send to connected clients
if job_id in self.connections:
dead_connections = set()
for connection in self.connections[job_id]:
try:
await connection.send_json(message_data)
except:
dead_connections.add(connection)
# Clean up dead connections
self.connections[job_id] -= dead_connections
else:
# Queue message for later delivery
if job_id not in self.message_queue:
self.message_queue[job_id] = []
self.message_queue[job_id].append(message_data)
Pipeline Progress Tracking
class SummaryPipeline:
"""Enhanced pipeline with granular progress tracking"""
async def process_video_with_progress(
self,
video_url: str,
config: PipelineConfig,
progress_callback: Optional[Callable] = None
) -> str:
"""Process video with detailed progress updates"""
job_id = str(uuid.uuid4())
start_time = datetime.utcnow()
# Stage 1: URL Validation (0-5%)
await self._update_progress(job_id, PipelineStage.VALIDATING_URL, 0, "Validating URL...")
try:
video_id = await self.video_service.validate_url(video_url)
await self._update_progress(job_id, PipelineStage.VALIDATING_URL, 5, "URL validated")
except Exception as e:
await self._handle_error(job_id, PipelineStage.VALIDATING_URL, e)
raise
# Stage 2: Metadata Extraction (5-15%)
await self._update_progress(job_id, PipelineStage.EXTRACTING_METADATA, 5, "Fetching video information...")
metadata = await self.video_service.get_metadata(video_id)
await self._update_progress(job_id, PipelineStage.EXTRACTING_METADATA, 15, f"Video: {metadata.title}")
# Stage 3: Transcript Extraction (15-30%)
await self._update_progress(job_id, PipelineStage.EXTRACTING_TRANSCRIPT, 15, "Retrieving transcript...")
transcript = await self.transcript_service.extract_transcript(video_id)
# Calculate transcript chunks for sub-progress
chunks = self._chunk_transcript(transcript)
total_chunks = len(chunks)
# Stage 4: Content Analysis (30-40%)
await self._update_progress(
job_id,
PipelineStage.ANALYZING_CONTENT,
30,
"Analyzing content structure..."
)
analysis = await self._analyze_content(transcript, metadata)
await self._update_progress(job_id, PipelineStage.ANALYZING_CONTENT, 40, "Content analysis complete")
# Stage 5: Summary Generation (40-80%)
await self._update_progress(
job_id,
PipelineStage.GENERATING_SUMMARY,
40,
f"Generating summary (0/{total_chunks} chunks)..."
)
# Process chunks with sub-progress
summary_parts = []
for i, chunk in enumerate(chunks):
sub_progress = {
"current": i + 1,
"total": total_chunks,
"description": f"Processing chunk {i + 1} of {total_chunks}"
}
percentage = 40 + (40 * (i + 1) / total_chunks)
await self._update_progress(
job_id,
PipelineStage.GENERATING_SUMMARY,
percentage,
f"Generating summary ({i + 1}/{total_chunks} chunks)...",
sub_progress
)
part = await self.ai_service.summarize_chunk(chunk, analysis)
summary_parts.append(part)
# Combine summaries
final_summary = await self.ai_service.combine_summaries(summary_parts)
# Stage 6: Quality Validation (80-90%)
await self._update_progress(
job_id,
PipelineStage.VALIDATING_QUALITY,
80,
"Validating summary quality..."
)
quality_score = await self._validate_quality(final_summary, transcript)
await self._update_progress(job_id, PipelineStage.VALIDATING_QUALITY, 90, f"Quality score: {quality_score:.1%}")
# Stage 7: Completion (90-100%)
await self._update_progress(job_id, PipelineStage.COMPLETED, 100, "Processing complete!")
return job_id
Frontend Components
ProcessingProgress Component
export function ProcessingProgress({ jobId }: { jobId: string }) {
const { progress, isConnected, cancel } = useProcessingProgress(jobId);
const stages = [
{ key: 'validating_url', label: 'Validating', percentage: 5 },
{ key: 'extracting_metadata', label: 'Metadata', percentage: 15 },
{ key: 'extracting_transcript', label: 'Transcript', percentage: 30 },
{ key: 'analyzing_content', label: 'Analysis', percentage: 40 },
{ key: 'generating_summary', label: 'Summary', percentage: 80 },
{ key: 'validating_quality', label: 'Quality', percentage: 90 },
{ key: 'completed', label: 'Complete', percentage: 100 }
];
return (
<Card className="w-full">
<CardHeader>
<div className="flex justify-between items-center">
<CardTitle className="flex items-center gap-2">
<Loader2 className="h-5 w-5 animate-spin" />
Processing Video
{!isConnected && (
<Badge variant="outline" className="ml-2">
<WifiOff className="h-3 w-3 mr-1" />
Reconnecting...
</Badge>
)}
</CardTitle>
<Button
variant="outline"
size="sm"
onClick={cancel}
disabled={progress?.stage === 'completed'}
>
<X className="h-4 w-4 mr-1" />
Cancel
</Button>
</div>
</CardHeader>
<CardContent className="space-y-4">
{/* Stage Progress */}
<div className="space-y-2">
<div className="flex justify-between text-sm">
<span className="font-medium">{progress?.message}</span>
<span>{Math.round(progress?.percentage || 0)}%</span>
</div>
<Progress value={progress?.percentage || 0} className="h-3" />
{/* Sub-progress for chunks */}
{progress?.sub_progress && (
<div className="ml-4 space-y-1">
<div className="flex justify-between text-xs text-muted-foreground">
<span>{progress.sub_progress.description}</span>
<span>{progress.sub_progress.current}/{progress.sub_progress.total}</span>
</div>
<Progress
value={(progress.sub_progress.current / progress.sub_progress.total) * 100}
className="h-1"
/>
</div>
)}
</div>
{/* Stage Indicators */}
<div className="flex justify-between">
{stages.map((stage, index) => (
<div
key={stage.key}
className={cn(
"flex flex-col items-center gap-1",
progress?.percentage >= stage.percentage
? "text-primary"
: "text-muted-foreground"
)}
>
<div className={cn(
"w-8 h-8 rounded-full border-2 flex items-center justify-center",
progress?.percentage >= stage.percentage
? "border-primary bg-primary/10"
: "border-muted"
)}>
{progress?.percentage >= stage.percentage ? (
<CheckCircle2 className="h-4 w-4" />
) : (
<span className="text-xs">{index + 1}</span>
)}
</div>
<span className="text-xs">{stage.label}</span>
</div>
))}
</div>
{/* Time Estimation */}
{progress?.estimated_remaining && (
<div className="flex justify-between items-center text-sm text-muted-foreground">
<div className="flex items-center gap-1">
<Clock className="h-4 w-4" />
<span>Time elapsed: {formatDuration(progress.time_elapsed)}</span>
</div>
<div className="flex items-center gap-1">
<Timer className="h-4 w-4" />
<span>About {formatDuration(progress.estimated_remaining)} remaining</span>
</div>
</div>
)}
</CardContent>
</Card>
);
}
useProcessingProgress Hook
export function useProcessingProgress(jobId: string) {
const [progress, setProgress] = useState<ProgressData | null>(null);
const [isConnected, setIsConnected] = useState(false);
const ws = useRef<WebSocket | null>(null);
const reconnectAttempts = useRef(0);
const reconnectTimeout = useRef<NodeJS.Timeout>();
const connect = useCallback(() => {
const wsUrl = `ws://localhost:8000/ws/progress/${jobId}`;
ws.current = new WebSocket(wsUrl);
ws.current.onopen = () => {
console.log('WebSocket connected');
setIsConnected(true);
reconnectAttempts.current = 0;
// Subscribe to job updates
ws.current?.send(JSON.stringify({
type: 'subscribe',
job_id: jobId,
timestamp: new Date().toISOString()
}));
};
ws.current.onmessage = (event) => {
const message = JSON.parse(event.data);
switch (message.type) {
case 'progress':
setProgress(message.data);
break;
case 'stage_change':
// Update UI for stage change
break;
case 'complete':
setProgress({
...message.data,
percentage: 100,
stage: 'completed'
});
break;
case 'error':
// Handle error
break;
}
};
ws.current.onclose = () => {
setIsConnected(false);
// Attempt reconnection with exponential backoff
if (reconnectAttempts.current < 5) {
const delay = Math.min(1000 * Math.pow(2, reconnectAttempts.current), 10000);
reconnectAttempts.current++;
reconnectTimeout.current = setTimeout(() => {
connect();
}, delay);
}
};
ws.current.onerror = (error) => {
console.error('WebSocket error:', error);
};
}, [jobId]);
const cancel = useCallback(async () => {
// Send cancel message
if (ws.current?.readyState === WebSocket.OPEN) {
ws.current.send(JSON.stringify({
type: 'cancel',
job_id: jobId,
timestamp: new Date().toISOString()
}));
}
// Also call API endpoint as fallback
try {
await apiClient.post(`/api/pipeline/cancel/${jobId}`);
} catch (error) {
console.error('Failed to cancel job:', error);
}
}, [jobId]);
useEffect(() => {
connect();
return () => {
if (reconnectTimeout.current) {
clearTimeout(reconnectTimeout.current);
}
if (ws.current) {
ws.current.close();
}
};
}, [connect]);
// Fallback to polling if WebSocket fails
useEffect(() => {
if (!isConnected && reconnectAttempts.current >= 5) {
const pollInterval = setInterval(async () => {
try {
const status = await apiClient.get(`/api/pipeline/status/${jobId}`);
setProgress(status.data);
} catch (error) {
console.error('Polling failed:', error);
}
}, 2000);
return () => clearInterval(pollInterval);
}
}, [isConnected, jobId]);
return {
progress,
isConnected,
cancel
};
}
Implementation Tasks
Backend Tasks (4-5 hours)
-
WebSocket Infrastructure Enhancement
- Update WebSocketManager with connection tracking
- Implement message queuing for disconnected clients
- Add heartbeat/ping-pong mechanism
- Create connection recovery logic
-
Pipeline Progress Integration
- Add granular progress tracking to SummaryPipeline
- Implement sub-progress for chunk processing
- Create time estimation algorithm
- Add cancellation support throughout pipeline
-
API Endpoints
- Create
/api/pipeline/cancel/{job_id}endpoint - Update
/api/pipeline/status/{job_id}with detailed progress - Add WebSocket endpoint
/ws/progress/{job_id} - Implement progress history tracking
- Create
Frontend Tasks (4-5 hours)
-
Progress Components
- Create ProcessingProgress component
- Build stage indicator visualization
- Implement progress bar with sub-progress
- Add time estimation display
-
WebSocket Integration
- Create useProcessingProgress hook
- Implement connection management
- Add reconnection with backoff
- Create fallback to polling
-
User Interface Updates
- Update SummarizePage with progress display
- Add connection status indicator
- Implement cancel button functionality
- Create smooth transitions between stages
Testing (2-3 hours)
-
Unit Tests
- Test WebSocket manager functionality
- Test progress calculation accuracy
- Test cancellation at various stages
- Test reconnection logic
-
Integration Tests
- Test full processing with progress updates
- Test connection recovery scenarios
- Test fallback to polling
- Test concurrent processing jobs
Success Metrics
-
Performance Metrics
- WebSocket latency < 100ms
- Progress updates at least every 2 seconds
- Reconnection within 5 seconds
- Zero lost messages during brief disconnects
-
User Experience Metrics
- Clear indication of current stage
- Accurate time estimates (±20% accuracy)
- Smooth progress bar movement
- Immediate response to cancel action
-
Technical Metrics
- 100% of processing stages tracked
- Graceful degradation to polling
- No memory leaks in WebSocket connections
- Clean cancellation without orphaned processes
Definition of Done
- All acceptance criteria met
- WebSocket connection auto-manages lifecycle
- Progress updates show for all processing stages
- Time estimation becomes accurate after 2-3 videos
- Cancel operation works at any stage
- Connection recovery handles network interruptions
- Fallback to polling when WebSocket unavailable
- Unit and integration tests pass
- Documentation updated
- No console errors or warnings
Risk Mitigation
-
WebSocket Compatibility: Some corporate firewalls block WebSocket
- Solution: Automatic fallback to polling
-
Progress Accuracy: Transcript size varies greatly
- Solution: Dynamic progress calculation based on actual work
-
Memory Leaks: Long-lived WebSocket connections
- Solution: Proper cleanup and connection limits
-
Time Estimation: Insufficient historical data
- Solution: Use conservative estimates initially
-
Cancellation Complexity: Pipeline may be in critical section
- Solution: Safe cancellation points throughout pipeline
Notes
- WebSocket infrastructure from Story 3.4 provides good foundation
- Consider using Server-Sent Events (SSE) as alternative to WebSocket
- Time estimation could use machine learning in future
- Progress data could be used for performance analytics
- Consider adding sound/notification when processing completes
Story Status: Ready for Implementation
Assigned To: Developer
Sprint: Next
Story Points: 5