# Story 3.4: Batch Processing ## Story Overview **As a** power user **I want** to summarize multiple videos at once **So that** I can process entire playlists or video series efficiently **Status**: ✅ COMPLETE (2025-08-27) **Epic**: Epic 3 - Enhanced User Experience **Dependencies**: Story 3.3 (Summary History Management) ✅ Complete **Actual Effort**: 18 hours **Priority**: High ## Acceptance Criteria 1. ✅ **Multiple URL Input** - Accepts multiple URLs via textarea (one per line) - Validates each URL before processing - Shows count of valid/invalid URLs 2. ✅ **Queue Processing** - Processes videos sequentially to manage API costs - Shows queue position for each video - Allows reordering before processing starts 3. ✅ **Progress Tracking** - Real-time progress for current video - Overall batch progress indicator - Estimated time remaining based on average processing time 4. ✅ **Partial Results** - Results available as each video completes - Failed videos don't block subsequent processing - Clear indication of success/failure per video 5. ✅ **Batch Export** - Download all summaries as ZIP - Include individual files for each summary - Metadata file with batch processing stats 6. ✅ **Error Handling** - Retry failed videos option - Skip and continue on errors - Detailed error messages per video ## Technical Design ### Database Schema ```sql -- Batch Jobs Table CREATE TABLE batch_jobs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL REFERENCES users(id), name VARCHAR(255), status VARCHAR(50) DEFAULT 'pending', -- Configuration urls JSON NOT NULL, -- Array of URLs model VARCHAR(50), summary_length VARCHAR(20), options JSON, -- Progress Tracking total_videos INTEGER NOT NULL, completed_videos INTEGER DEFAULT 0, failed_videos INTEGER DEFAULT 0, skipped_videos INTEGER DEFAULT 0, -- Timing created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, started_at TIMESTAMP, completed_at TIMESTAMP, estimated_completion TIMESTAMP, -- Results results JSON, -- Array of {url, summary_id, status, error} export_url VARCHAR(500), FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ); -- Batch Job Items Table (for detailed tracking) CREATE TABLE batch_job_items ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), batch_job_id UUID NOT NULL REFERENCES batch_jobs(id) ON DELETE CASCADE, url VARCHAR(500) NOT NULL, position INTEGER NOT NULL, status VARCHAR(50) DEFAULT 'pending', -- Processing Details video_id VARCHAR(20), video_title VARCHAR(500), summary_id UUID REFERENCES summaries(id), -- Timing started_at TIMESTAMP, completed_at TIMESTAMP, processing_time_seconds INTEGER, -- Error Tracking error_message TEXT, retry_count INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX idx_batch_jobs_user_status ON batch_jobs(user_id, status); CREATE INDEX idx_batch_job_items_batch_status ON batch_job_items(batch_job_id, status); ``` ### API Endpoints #### POST /api/batch/create ```python @router.post("/batch/create") async def create_batch_job( request: BatchJobRequest, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ) -> BatchJobResponse: """ Create a new batch processing job Request: { "name": "My YouTube Playlist", "urls": ["url1", "url2", "url3"], "model": "openai", "summary_length": "standard", "options": { "include_timestamps": false, "focus_areas": [] } } """ ``` #### GET /api/batch/{job_id} ```python @router.get("/batch/{job_id}") async def get_batch_status( job_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ) -> BatchJobStatus: """ Get current status of batch job Response: { "id": "uuid", "status": "processing", "progress": { "total": 10, "completed": 3, "failed": 1, "current_video": "Processing video 4 of 10" }, "items": [...], "estimated_completion": "2025-08-27T14:30:00Z" } """ ``` #### POST /api/batch/{job_id}/retry ```python @router.post("/batch/{job_id}/retry") async def retry_failed_items( job_id: str, current_user: User = Depends(get_current_user) ) -> Dict: """Retry all failed items in the batch""" ``` #### DELETE /api/batch/{job_id}/cancel ```python @router.delete("/batch/{job_id}/cancel") async def cancel_batch_job( job_id: str, current_user: User = Depends(get_current_user) ) -> Dict: """Cancel a running batch job""" ``` #### GET /api/batch/{job_id}/export ```python @router.get("/batch/{job_id}/export") async def export_batch_results( job_id: str, format: str = "zip", current_user: User = Depends(get_current_user) ) -> FileResponse: """Export all batch results as ZIP""" ``` ### Batch Processing Service ```python class BatchProcessingService: """Handles batch video processing with queue management""" def __init__( self, summary_pipeline: SummaryPipeline, notification_service: NotificationService, db_session: Session ): self.pipeline = summary_pipeline self.notifications = notification_service self.db = db_session self.active_jobs: Dict[str, BatchJob] = {} async def create_batch_job( self, user_id: str, urls: List[str], config: BatchConfig ) -> BatchJob: """Create and queue a new batch job""" # Validate URLs valid_urls = await self._validate_urls(urls) # Create batch job record batch_job = BatchJob( user_id=user_id, urls=valid_urls, total_videos=len(valid_urls), model=config.model, options=config.dict() ) # Create individual job items for idx, url in enumerate(valid_urls): item = BatchJobItem( batch_job_id=batch_job.id, url=url, position=idx ) self.db.add(item) self.db.add(batch_job) self.db.commit() # Start processing in background asyncio.create_task(self._process_batch(batch_job.id)) return batch_job async def _process_batch(self, batch_job_id: str): """Process all videos in the batch sequentially""" batch_job = self.db.query(BatchJob).filter_by(id=batch_job_id).first() batch_job.status = "processing" batch_job.started_at = datetime.utcnow() self.db.commit() items = self.db.query(BatchJobItem).filter_by( batch_job_id=batch_job_id, status="pending" ).order_by(BatchJobItem.position).all() for item in items: try: # Update current item status item.status = "processing" item.started_at = datetime.utcnow() self.db.commit() # Process video pipeline_job_id = await self.pipeline.process_video( video_url=item.url, config=PipelineConfig( model=batch_job.model, summary_length=batch_job.summary_length ) ) # Wait for completion result = await self._wait_for_pipeline_completion(pipeline_job_id) if result.status == "completed": # Create summary record summary = Summary( user_id=batch_job.user_id, video_url=item.url, video_title=result.video_metadata.title, summary_text=result.summary, model_used=batch_job.model ) self.db.add(summary) item.status = "completed" item.summary_id = summary.id batch_job.completed_videos += 1 else: item.status = "failed" item.error_message = result.error batch_job.failed_videos += 1 except Exception as e: item.status = "failed" item.error_message = str(e) batch_job.failed_videos += 1 finally: item.completed_at = datetime.utcnow() self.db.commit() # Update progress await self._update_batch_progress(batch_job_id) # Mark batch as complete batch_job.status = "completed" batch_job.completed_at = datetime.utcnow() self.db.commit() # Generate export file export_url = await self._generate_export(batch_job_id) batch_job.export_url = export_url self.db.commit() # Send notification await self.notifications.send_batch_complete(batch_job) ``` ### Frontend Components #### BatchProcessingPage.tsx ```tsx export function BatchProcessingPage() { const [urls, setUrls] = useState([]); const [batchJob, setBatchJob] = useState(null); const [isProcessing, setIsProcessing] = useState(false); const handleSubmit = async () => { const validUrls = urls.filter(url => isValidYouTubeUrl(url)); const response = await batchAPI.createBatchJob({ name: `Batch ${new Date().toLocaleString()}`, urls: validUrls, model: selectedModel, summary_length: summaryLength }); setBatchJob(response); setIsProcessing(true); // Start polling for updates pollBatchStatus(response.id); }; return (

Batch Video Processing

{!isProcessing ? ( ) : ( )}
); } ``` #### BatchInputForm.tsx ```tsx export function BatchInputForm({ onSubmit, onUrlsChange }) { const [urlText, setUrlText] = useState(''); const [validationResults, setValidationResults] = useState([]); const handleValidate = () => { const urls = urlText.split('\n').filter(line => line.trim()); const results = urls.map(url => ({ url, isValid: isValidYouTubeUrl(url), videoId: extractVideoId(url) })); setValidationResults(results); onUrlsChange(results.filter(r => r.isValid).map(r => r.url)); }; return ( Add Videos for Batch Processing Enter YouTube URLs, one per line. You can paste an entire playlist.