15 KiB
15 KiB
Story 3.4: Batch Processing
Story Overview
As a power user
I want to summarize multiple videos at once
So that I can process entire playlists or video series efficiently
Status: ✅ COMPLETE (2025-08-27)
Epic: Epic 3 - Enhanced User Experience
Dependencies: Story 3.3 (Summary History Management) ✅ Complete
Actual Effort: 18 hours
Priority: High
Acceptance Criteria
-
✅ Multiple URL Input
- Accepts multiple URLs via textarea (one per line)
- Validates each URL before processing
- Shows count of valid/invalid URLs
-
✅ Queue Processing
- Processes videos sequentially to manage API costs
- Shows queue position for each video
- Allows reordering before processing starts
-
✅ Progress Tracking
- Real-time progress for current video
- Overall batch progress indicator
- Estimated time remaining based on average processing time
-
✅ Partial Results
- Results available as each video completes
- Failed videos don't block subsequent processing
- Clear indication of success/failure per video
-
✅ Batch Export
- Download all summaries as ZIP
- Include individual files for each summary
- Metadata file with batch processing stats
-
✅ Error Handling
- Retry failed videos option
- Skip and continue on errors
- Detailed error messages per video
Technical Design
Database Schema
-- Batch Jobs Table
CREATE TABLE batch_jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
name VARCHAR(255),
status VARCHAR(50) DEFAULT 'pending',
-- Configuration
urls JSON NOT NULL, -- Array of URLs
model VARCHAR(50),
summary_length VARCHAR(20),
options JSON,
-- Progress Tracking
total_videos INTEGER NOT NULL,
completed_videos INTEGER DEFAULT 0,
failed_videos INTEGER DEFAULT 0,
skipped_videos INTEGER DEFAULT 0,
-- Timing
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
estimated_completion TIMESTAMP,
-- Results
results JSON, -- Array of {url, summary_id, status, error}
export_url VARCHAR(500),
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
-- Batch Job Items Table (for detailed tracking)
CREATE TABLE batch_job_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
batch_job_id UUID NOT NULL REFERENCES batch_jobs(id) ON DELETE CASCADE,
url VARCHAR(500) NOT NULL,
position INTEGER NOT NULL,
status VARCHAR(50) DEFAULT 'pending',
-- Processing Details
video_id VARCHAR(20),
video_title VARCHAR(500),
summary_id UUID REFERENCES summaries(id),
-- Timing
started_at TIMESTAMP,
completed_at TIMESTAMP,
processing_time_seconds INTEGER,
-- Error Tracking
error_message TEXT,
retry_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_batch_jobs_user_status ON batch_jobs(user_id, status);
CREATE INDEX idx_batch_job_items_batch_status ON batch_job_items(batch_job_id, status);
API Endpoints
POST /api/batch/create
@router.post("/batch/create")
async def create_batch_job(
request: BatchJobRequest,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
) -> BatchJobResponse:
"""
Create a new batch processing job
Request:
{
"name": "My YouTube Playlist",
"urls": ["url1", "url2", "url3"],
"model": "openai",
"summary_length": "standard",
"options": {
"include_timestamps": false,
"focus_areas": []
}
}
"""
GET /api/batch/{job_id}
@router.get("/batch/{job_id}")
async def get_batch_status(
job_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
) -> BatchJobStatus:
"""
Get current status of batch job
Response:
{
"id": "uuid",
"status": "processing",
"progress": {
"total": 10,
"completed": 3,
"failed": 1,
"current_video": "Processing video 4 of 10"
},
"items": [...],
"estimated_completion": "2025-08-27T14:30:00Z"
}
"""
POST /api/batch/{job_id}/retry
@router.post("/batch/{job_id}/retry")
async def retry_failed_items(
job_id: str,
current_user: User = Depends(get_current_user)
) -> Dict:
"""Retry all failed items in the batch"""
DELETE /api/batch/{job_id}/cancel
@router.delete("/batch/{job_id}/cancel")
async def cancel_batch_job(
job_id: str,
current_user: User = Depends(get_current_user)
) -> Dict:
"""Cancel a running batch job"""
GET /api/batch/{job_id}/export
@router.get("/batch/{job_id}/export")
async def export_batch_results(
job_id: str,
format: str = "zip",
current_user: User = Depends(get_current_user)
) -> FileResponse:
"""Export all batch results as ZIP"""
Batch Processing Service
class BatchProcessingService:
"""Handles batch video processing with queue management"""
def __init__(
self,
summary_pipeline: SummaryPipeline,
notification_service: NotificationService,
db_session: Session
):
self.pipeline = summary_pipeline
self.notifications = notification_service
self.db = db_session
self.active_jobs: Dict[str, BatchJob] = {}
async def create_batch_job(
self,
user_id: str,
urls: List[str],
config: BatchConfig
) -> BatchJob:
"""Create and queue a new batch job"""
# Validate URLs
valid_urls = await self._validate_urls(urls)
# Create batch job record
batch_job = BatchJob(
user_id=user_id,
urls=valid_urls,
total_videos=len(valid_urls),
model=config.model,
options=config.dict()
)
# Create individual job items
for idx, url in enumerate(valid_urls):
item = BatchJobItem(
batch_job_id=batch_job.id,
url=url,
position=idx
)
self.db.add(item)
self.db.add(batch_job)
self.db.commit()
# Start processing in background
asyncio.create_task(self._process_batch(batch_job.id))
return batch_job
async def _process_batch(self, batch_job_id: str):
"""Process all videos in the batch sequentially"""
batch_job = self.db.query(BatchJob).filter_by(id=batch_job_id).first()
batch_job.status = "processing"
batch_job.started_at = datetime.utcnow()
self.db.commit()
items = self.db.query(BatchJobItem).filter_by(
batch_job_id=batch_job_id,
status="pending"
).order_by(BatchJobItem.position).all()
for item in items:
try:
# Update current item status
item.status = "processing"
item.started_at = datetime.utcnow()
self.db.commit()
# Process video
pipeline_job_id = await self.pipeline.process_video(
video_url=item.url,
config=PipelineConfig(
model=batch_job.model,
summary_length=batch_job.summary_length
)
)
# Wait for completion
result = await self._wait_for_pipeline_completion(pipeline_job_id)
if result.status == "completed":
# Create summary record
summary = Summary(
user_id=batch_job.user_id,
video_url=item.url,
video_title=result.video_metadata.title,
summary_text=result.summary,
model_used=batch_job.model
)
self.db.add(summary)
item.status = "completed"
item.summary_id = summary.id
batch_job.completed_videos += 1
else:
item.status = "failed"
item.error_message = result.error
batch_job.failed_videos += 1
except Exception as e:
item.status = "failed"
item.error_message = str(e)
batch_job.failed_videos += 1
finally:
item.completed_at = datetime.utcnow()
self.db.commit()
# Update progress
await self._update_batch_progress(batch_job_id)
# Mark batch as complete
batch_job.status = "completed"
batch_job.completed_at = datetime.utcnow()
self.db.commit()
# Generate export file
export_url = await self._generate_export(batch_job_id)
batch_job.export_url = export_url
self.db.commit()
# Send notification
await self.notifications.send_batch_complete(batch_job)
Frontend Components
BatchProcessingPage.tsx
export function BatchProcessingPage() {
const [urls, setUrls] = useState<string[]>([]);
const [batchJob, setBatchJob] = useState<BatchJob | null>(null);
const [isProcessing, setIsProcessing] = useState(false);
const handleSubmit = async () => {
const validUrls = urls.filter(url => isValidYouTubeUrl(url));
const response = await batchAPI.createBatchJob({
name: `Batch ${new Date().toLocaleString()}`,
urls: validUrls,
model: selectedModel,
summary_length: summaryLength
});
setBatchJob(response);
setIsProcessing(true);
// Start polling for updates
pollBatchStatus(response.id);
};
return (
<div className="container mx-auto p-6">
<h1 className="text-3xl font-bold mb-6">Batch Video Processing</h1>
{!isProcessing ? (
<BatchInputForm
onSubmit={handleSubmit}
onUrlsChange={setUrls}
/>
) : (
<BatchProgressDisplay
batchJob={batchJob}
onCancel={handleCancel}
onExport={handleExport}
/>
)}
</div>
);
}
BatchInputForm.tsx
export function BatchInputForm({ onSubmit, onUrlsChange }) {
const [urlText, setUrlText] = useState('');
const [validationResults, setValidationResults] = useState<ValidationResult[]>([]);
const handleValidate = () => {
const urls = urlText.split('\n').filter(line => line.trim());
const results = urls.map(url => ({
url,
isValid: isValidYouTubeUrl(url),
videoId: extractVideoId(url)
}));
setValidationResults(results);
onUrlsChange(results.filter(r => r.isValid).map(r => r.url));
};
return (
<Card>
<CardHeader>
<CardTitle>Add Videos for Batch Processing</CardTitle>
<CardDescription>
Enter YouTube URLs, one per line. You can paste an entire playlist.
</CardDescription>
</CardHeader>
<CardContent>
<Textarea
placeholder="https://youtube.com/watch?v=..."
value={urlText}
onChange={(e) => setUrlText(e.target.value)}
rows={10}
className="mb-4"
/>
{validationResults.length > 0 && (
<ValidationResultsList results={validationResults} />
)}
<div className="flex justify-between items-center">
<div className="text-sm text-muted-foreground">
{validationResults.filter(r => r.isValid).length} valid URLs
</div>
<div className="space-x-2">
<Button variant="outline" onClick={handleValidate}>
Validate URLs
</Button>
<Button
onClick={onSubmit}
disabled={validationResults.filter(r => r.isValid).length === 0}
>
Start Processing
</Button>
</div>
</div>
</CardContent>
</Card>
);
}
Implementation Tasks
Day 1: Backend Foundation (4-5 hours)
- Create batch job database models
- Write and run Alembic migration
- Create BatchJob and BatchJobItem SQLAlchemy models
- Set up basic CRUD operations
Day 2: Batch Processing Service (5-6 hours)
- Implement BatchProcessingService class
- Create URL validation logic
- Build sequential processing queue
- Add progress tracking
- Implement error handling and retry logic
Day 3: API Endpoints (3-4 hours)
- Create batch router with all endpoints
- Implement create batch job endpoint
- Add status and progress endpoints
- Build export functionality
- Add to main.py router
Day 4: Frontend Components (4-5 hours)
- Create BatchProcessingPage component
- Build BatchInputForm with validation
- Create BatchProgressDisplay component
- Implement BatchResultsList component
- Add routing to App.tsx
Day 5: Integration & Testing (3-4 hours)
- Connect frontend to backend API
- Implement polling for progress updates
- Test with multiple videos
- Test error scenarios
- Verify export functionality
Testing Checklist
Unit Tests
- Test URL validation logic
- Test batch job creation
- Test queue processing
- Test progress calculations
- Test export generation
Integration Tests
- Test full batch processing flow
- Test partial failure handling
- Test cancellation
- Test retry functionality
- Test concurrent batch jobs
Manual Testing
- Process 5+ videos successfully
- Handle mixed success/failure
- Cancel mid-processing
- Export and verify ZIP contents
- Test with different models
Definition of Done
- All acceptance criteria met ✅
- Backend service processes videos sequentially ✅
- Progress tracking works in real-time ✅
- Failed videos don't block processing ✅
- Export generates valid ZIP file ✅
- Frontend shows clear progress ✅
- Error handling is robust ✅
- Tests pass with >80% coverage ✅
- Documentation updated ✅
Risk Mitigation
- Memory Issues: Process videos one at a time
- Long Processing: Add timeout per video (10 minutes max)
- API Rate Limits: Add delay between videos if needed
- Database Growth: Cleanup old batch jobs after 30 days
- User Experience: Show clear progress and allow cancellation
Dependencies
- Story 3.3: Summary History Management ✅ (Complete)
- Existing SummaryPipeline service
- WebSocket infrastructure (partial, from Story 3.5)
Notes
- Consider adding batch templates for common use cases
- Future: Support YouTube playlist URLs directly
- Future: Parallel processing with rate limiting
- Consider email notifications for long batches
Story Status: Ready for Implementation
Assigned To: Developer
Sprint: Current
Story Points: 8