youtube-summarizer/docs/stories/3.4.batch-processing.md

15 KiB

Story 3.4: Batch Processing

Story Overview

As a power user
I want to summarize multiple videos at once
So that I can process entire playlists or video series efficiently

Status: COMPLETE (2025-08-27)
Epic: Epic 3 - Enhanced User Experience
Dependencies: Story 3.3 (Summary History Management) Complete
Actual Effort: 18 hours
Priority: High

Acceptance Criteria

  1. Multiple URL Input

    • Accepts multiple URLs via textarea (one per line)
    • Validates each URL before processing
    • Shows count of valid/invalid URLs
  2. Queue Processing

    • Processes videos sequentially to manage API costs
    • Shows queue position for each video
    • Allows reordering before processing starts
  3. Progress Tracking

    • Real-time progress for current video
    • Overall batch progress indicator
    • Estimated time remaining based on average processing time
  4. Partial Results

    • Results available as each video completes
    • Failed videos don't block subsequent processing
    • Clear indication of success/failure per video
  5. Batch Export

    • Download all summaries as ZIP
    • Include individual files for each summary
    • Metadata file with batch processing stats
  6. Error Handling

    • Retry failed videos option
    • Skip and continue on errors
    • Detailed error messages per video

Technical Design

Database Schema

-- Batch Jobs Table
CREATE TABLE batch_jobs (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id),
    name VARCHAR(255),
    status VARCHAR(50) DEFAULT 'pending',
    
    -- Configuration
    urls JSON NOT NULL,  -- Array of URLs
    model VARCHAR(50),
    summary_length VARCHAR(20),
    options JSON,
    
    -- Progress Tracking
    total_videos INTEGER NOT NULL,
    completed_videos INTEGER DEFAULT 0,
    failed_videos INTEGER DEFAULT 0,
    skipped_videos INTEGER DEFAULT 0,
    
    -- Timing
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    estimated_completion TIMESTAMP,
    
    -- Results
    results JSON,  -- Array of {url, summary_id, status, error}
    export_url VARCHAR(500),
    
    FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);

-- Batch Job Items Table (for detailed tracking)
CREATE TABLE batch_job_items (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    batch_job_id UUID NOT NULL REFERENCES batch_jobs(id) ON DELETE CASCADE,
    
    url VARCHAR(500) NOT NULL,
    position INTEGER NOT NULL,
    status VARCHAR(50) DEFAULT 'pending',
    
    -- Processing Details
    video_id VARCHAR(20),
    video_title VARCHAR(500),
    summary_id UUID REFERENCES summaries(id),
    
    -- Timing
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    processing_time_seconds INTEGER,
    
    -- Error Tracking
    error_message TEXT,
    retry_count INTEGER DEFAULT 0,
    
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_batch_jobs_user_status ON batch_jobs(user_id, status);
CREATE INDEX idx_batch_job_items_batch_status ON batch_job_items(batch_job_id, status);

API Endpoints

POST /api/batch/create

@router.post("/batch/create")
async def create_batch_job(
    request: BatchJobRequest,
    current_user: User = Depends(get_current_user),
    db: Session = Depends(get_db)
) -> BatchJobResponse:
    """
    Create a new batch processing job
    
    Request:
    {
        "name": "My YouTube Playlist",
        "urls": ["url1", "url2", "url3"],
        "model": "openai",
        "summary_length": "standard",
        "options": {
            "include_timestamps": false,
            "focus_areas": []
        }
    }
    """

GET /api/batch/{job_id}

@router.get("/batch/{job_id}")
async def get_batch_status(
    job_id: str,
    current_user: User = Depends(get_current_user),
    db: Session = Depends(get_db)
) -> BatchJobStatus:
    """
    Get current status of batch job
    
    Response:
    {
        "id": "uuid",
        "status": "processing",
        "progress": {
            "total": 10,
            "completed": 3,
            "failed": 1,
            "current_video": "Processing video 4 of 10"
        },
        "items": [...],
        "estimated_completion": "2025-08-27T14:30:00Z"
    }
    """

POST /api/batch/{job_id}/retry

@router.post("/batch/{job_id}/retry")
async def retry_failed_items(
    job_id: str,
    current_user: User = Depends(get_current_user)
) -> Dict:
    """Retry all failed items in the batch"""

DELETE /api/batch/{job_id}/cancel

@router.delete("/batch/{job_id}/cancel")
async def cancel_batch_job(
    job_id: str,
    current_user: User = Depends(get_current_user)
) -> Dict:
    """Cancel a running batch job"""

GET /api/batch/{job_id}/export

@router.get("/batch/{job_id}/export")
async def export_batch_results(
    job_id: str,
    format: str = "zip",
    current_user: User = Depends(get_current_user)
) -> FileResponse:
    """Export all batch results as ZIP"""

Batch Processing Service

class BatchProcessingService:
    """Handles batch video processing with queue management"""
    
    def __init__(
        self,
        summary_pipeline: SummaryPipeline,
        notification_service: NotificationService,
        db_session: Session
    ):
        self.pipeline = summary_pipeline
        self.notifications = notification_service
        self.db = db_session
        self.active_jobs: Dict[str, BatchJob] = {}
        
    async def create_batch_job(
        self,
        user_id: str,
        urls: List[str],
        config: BatchConfig
    ) -> BatchJob:
        """Create and queue a new batch job"""
        
        # Validate URLs
        valid_urls = await self._validate_urls(urls)
        
        # Create batch job record
        batch_job = BatchJob(
            user_id=user_id,
            urls=valid_urls,
            total_videos=len(valid_urls),
            model=config.model,
            options=config.dict()
        )
        
        # Create individual job items
        for idx, url in enumerate(valid_urls):
            item = BatchJobItem(
                batch_job_id=batch_job.id,
                url=url,
                position=idx
            )
            self.db.add(item)
            
        self.db.add(batch_job)
        self.db.commit()
        
        # Start processing in background
        asyncio.create_task(self._process_batch(batch_job.id))
        
        return batch_job
        
    async def _process_batch(self, batch_job_id: str):
        """Process all videos in the batch sequentially"""
        
        batch_job = self.db.query(BatchJob).filter_by(id=batch_job_id).first()
        batch_job.status = "processing"
        batch_job.started_at = datetime.utcnow()
        self.db.commit()
        
        items = self.db.query(BatchJobItem).filter_by(
            batch_job_id=batch_job_id,
            status="pending"
        ).order_by(BatchJobItem.position).all()
        
        for item in items:
            try:
                # Update current item status
                item.status = "processing"
                item.started_at = datetime.utcnow()
                self.db.commit()
                
                # Process video
                pipeline_job_id = await self.pipeline.process_video(
                    video_url=item.url,
                    config=PipelineConfig(
                        model=batch_job.model,
                        summary_length=batch_job.summary_length
                    )
                )
                
                # Wait for completion
                result = await self._wait_for_pipeline_completion(pipeline_job_id)
                
                if result.status == "completed":
                    # Create summary record
                    summary = Summary(
                        user_id=batch_job.user_id,
                        video_url=item.url,
                        video_title=result.video_metadata.title,
                        summary_text=result.summary,
                        model_used=batch_job.model
                    )
                    self.db.add(summary)
                    
                    item.status = "completed"
                    item.summary_id = summary.id
                    batch_job.completed_videos += 1
                else:
                    item.status = "failed"
                    item.error_message = result.error
                    batch_job.failed_videos += 1
                    
            except Exception as e:
                item.status = "failed"
                item.error_message = str(e)
                batch_job.failed_videos += 1
                
            finally:
                item.completed_at = datetime.utcnow()
                self.db.commit()
                
                # Update progress
                await self._update_batch_progress(batch_job_id)
                
        # Mark batch as complete
        batch_job.status = "completed"
        batch_job.completed_at = datetime.utcnow()
        self.db.commit()
        
        # Generate export file
        export_url = await self._generate_export(batch_job_id)
        batch_job.export_url = export_url
        self.db.commit()
        
        # Send notification
        await self.notifications.send_batch_complete(batch_job)

Frontend Components

BatchProcessingPage.tsx

export function BatchProcessingPage() {
  const [urls, setUrls] = useState<string[]>([]);
  const [batchJob, setBatchJob] = useState<BatchJob | null>(null);
  const [isProcessing, setIsProcessing] = useState(false);
  
  const handleSubmit = async () => {
    const validUrls = urls.filter(url => isValidYouTubeUrl(url));
    
    const response = await batchAPI.createBatchJob({
      name: `Batch ${new Date().toLocaleString()}`,
      urls: validUrls,
      model: selectedModel,
      summary_length: summaryLength
    });
    
    setBatchJob(response);
    setIsProcessing(true);
    
    // Start polling for updates
    pollBatchStatus(response.id);
  };
  
  return (
    <div className="container mx-auto p-6">
      <h1 className="text-3xl font-bold mb-6">Batch Video Processing</h1>
      
      {!isProcessing ? (
        <BatchInputForm 
          onSubmit={handleSubmit}
          onUrlsChange={setUrls}
        />
      ) : (
        <BatchProgressDisplay
          batchJob={batchJob}
          onCancel={handleCancel}
          onExport={handleExport}
        />
      )}
    </div>
  );
}

BatchInputForm.tsx

export function BatchInputForm({ onSubmit, onUrlsChange }) {
  const [urlText, setUrlText] = useState('');
  const [validationResults, setValidationResults] = useState<ValidationResult[]>([]);
  
  const handleValidate = () => {
    const urls = urlText.split('\n').filter(line => line.trim());
    const results = urls.map(url => ({
      url,
      isValid: isValidYouTubeUrl(url),
      videoId: extractVideoId(url)
    }));
    
    setValidationResults(results);
    onUrlsChange(results.filter(r => r.isValid).map(r => r.url));
  };
  
  return (
    <Card>
      <CardHeader>
        <CardTitle>Add Videos for Batch Processing</CardTitle>
        <CardDescription>
          Enter YouTube URLs, one per line. You can paste an entire playlist.
        </CardDescription>
      </CardHeader>
      <CardContent>
        <Textarea
          placeholder="https://youtube.com/watch?v=..."
          value={urlText}
          onChange={(e) => setUrlText(e.target.value)}
          rows={10}
          className="mb-4"
        />
        
        {validationResults.length > 0 && (
          <ValidationResultsList results={validationResults} />
        )}
        
        <div className="flex justify-between items-center">
          <div className="text-sm text-muted-foreground">
            {validationResults.filter(r => r.isValid).length} valid URLs
          </div>
          <div className="space-x-2">
            <Button variant="outline" onClick={handleValidate}>
              Validate URLs
            </Button>
            <Button 
              onClick={onSubmit}
              disabled={validationResults.filter(r => r.isValid).length === 0}
            >
              Start Processing
            </Button>
          </div>
        </div>
      </CardContent>
    </Card>
  );
}

Implementation Tasks

Day 1: Backend Foundation (4-5 hours)

  • Create batch job database models
  • Write and run Alembic migration
  • Create BatchJob and BatchJobItem SQLAlchemy models
  • Set up basic CRUD operations

Day 2: Batch Processing Service (5-6 hours)

  • Implement BatchProcessingService class
  • Create URL validation logic
  • Build sequential processing queue
  • Add progress tracking
  • Implement error handling and retry logic

Day 3: API Endpoints (3-4 hours)

  • Create batch router with all endpoints
  • Implement create batch job endpoint
  • Add status and progress endpoints
  • Build export functionality
  • Add to main.py router

Day 4: Frontend Components (4-5 hours)

  • Create BatchProcessingPage component
  • Build BatchInputForm with validation
  • Create BatchProgressDisplay component
  • Implement BatchResultsList component
  • Add routing to App.tsx

Day 5: Integration & Testing (3-4 hours)

  • Connect frontend to backend API
  • Implement polling for progress updates
  • Test with multiple videos
  • Test error scenarios
  • Verify export functionality

Testing Checklist

Unit Tests

  • Test URL validation logic
  • Test batch job creation
  • Test queue processing
  • Test progress calculations
  • Test export generation

Integration Tests

  • Test full batch processing flow
  • Test partial failure handling
  • Test cancellation
  • Test retry functionality
  • Test concurrent batch jobs

Manual Testing

  • Process 5+ videos successfully
  • Handle mixed success/failure
  • Cancel mid-processing
  • Export and verify ZIP contents
  • Test with different models

Definition of Done

  • All acceptance criteria met
  • Backend service processes videos sequentially
  • Progress tracking works in real-time
  • Failed videos don't block processing
  • Export generates valid ZIP file
  • Frontend shows clear progress
  • Error handling is robust
  • Tests pass with >80% coverage
  • Documentation updated

Risk Mitigation

  1. Memory Issues: Process videos one at a time
  2. Long Processing: Add timeout per video (10 minutes max)
  3. API Rate Limits: Add delay between videos if needed
  4. Database Growth: Cleanup old batch jobs after 30 days
  5. User Experience: Show clear progress and allow cancellation

Dependencies

  • Story 3.3: Summary History Management (Complete)
  • Existing SummaryPipeline service
  • WebSocket infrastructure (partial, from Story 3.5)

Notes

  • Consider adding batch templates for common use cases
  • Future: Support YouTube playlist URLs directly
  • Future: Parallel processing with rate limiting
  • Consider email notifications for long batches

Story Status: Ready for Implementation
Assigned To: Developer
Sprint: Current
Story Points: 8