trax/.cursor/rules/batch-processing.mdc

108 lines
3.5 KiB
Plaintext

---
description: Batch processing patterns for efficient and reliable media processing
alwaysApply: false
---
# Batch Processing Rule
## Core Principles
- **Independent Failures**: One job failure shouldn't affect others
- **Real-Time Progress**: Track progress with atomic updates
- **Resource Limits**: Respect system constraints (8 workers, 2GB/worker)
- **Size Optimization**: Group files by size for balanced processing
## Implementation Patterns
### Queue-Based Processing
```python
# ✅ DO: Use queue-based batch processing
async def process_batch(file_paths: List[Path]) -> Dict[Path, Result]:
# Create a queue of jobs
queue = asyncio.Queue()
for path in file_paths:
await queue.put(path)
# Process with limited concurrency
results = {}
workers = []
for _ in range(min(8, len(file_paths))):
workers.append(asyncio.create_task(worker(queue, results)))
# Wait for all work to complete
await queue.join()
# Cancel workers
for w in workers:
w.cancel()
return results
async def worker(queue: asyncio.Queue, results: Dict[Path, Result]):
while True:
path = await queue.get()
try:
results[path] = await process_file(path)
except Exception as e:
results[path] = Error(str(e))
finally:
queue.task_done()
```
### Progress Tracking
```python
# ✅ DO: Track progress with atomic updates
class BatchProgress:
def __init__(self, total: int):
self.total = total
self._completed = 0
self._lock = asyncio.Lock()
async def increment(self) -> float:
"""Increment completed count and return progress percentage."""
async with self._lock:
self._completed += 1
return self._completed / self.total * 100
```
### Resource Management
```python
# ✅ DO: Group files by size for balanced processing
def group_by_size(file_paths: List[Path], target_groups: int = 8) -> List[List[Path]]:
"""Group files by size to balance processing load."""
# Get file sizes
files_with_size = [(path, path.stat().st_size) for path in file_paths]
# Sort by size (largest first)
files_with_size.sort(key=lambda x: x[1], reverse=True)
# Distribute into groups (using greedy algorithm)
groups = [[] for _ in range(target_groups)]
group_sizes = [0] * target_groups
for path, size in files_with_size:
# Find group with smallest current size
smallest_group = group_sizes.index(min(group_sizes))
groups[smallest_group].append(path)
group_sizes[smallest_group] += size
return groups
```
### Anti-Patterns
```python
# ❌ DON'T: Process all files in parallel without limits
async def process_all_parallel(files: List[Path]):
# This ignores system limits and can crash
tasks = [process_file(f) for f in files]
return await asyncio.gather(*tasks) # Too many tasks!
# ❌ DON'T: Let one failure stop the entire batch
async def process_batch_unsafe(files: List[Path]):
results = []
for file in files:
# If this raises an exception, the whole batch fails
result = await process_file(file)
results.append(result)
return results
```
When processing multiple files, ALWAYS use queue-based batch processing. Jobs must fail independently - one failure shouldn't stop the batch. Track progress in real-time with atomic updates. Respect system limits: max 8 parallel workers, 2GB memory per worker. Group files by size for optimal processing.