108 lines
3.5 KiB
Plaintext
108 lines
3.5 KiB
Plaintext
---
|
|
description: Batch processing patterns for efficient and reliable media processing
|
|
alwaysApply: false
|
|
---
|
|
# Batch Processing Rule
|
|
|
|
## Core Principles
|
|
- **Independent Failures**: One job failure shouldn't affect others
|
|
- **Real-Time Progress**: Track progress with atomic updates
|
|
- **Resource Limits**: Respect system constraints (8 workers, 2GB/worker)
|
|
- **Size Optimization**: Group files by size for balanced processing
|
|
|
|
## Implementation Patterns
|
|
|
|
### Queue-Based Processing
|
|
```python
|
|
# ✅ DO: Use queue-based batch processing
|
|
async def process_batch(file_paths: List[Path]) -> Dict[Path, Result]:
|
|
# Create a queue of jobs
|
|
queue = asyncio.Queue()
|
|
for path in file_paths:
|
|
await queue.put(path)
|
|
|
|
# Process with limited concurrency
|
|
results = {}
|
|
workers = []
|
|
for _ in range(min(8, len(file_paths))):
|
|
workers.append(asyncio.create_task(worker(queue, results)))
|
|
|
|
# Wait for all work to complete
|
|
await queue.join()
|
|
|
|
# Cancel workers
|
|
for w in workers:
|
|
w.cancel()
|
|
|
|
return results
|
|
|
|
async def worker(queue: asyncio.Queue, results: Dict[Path, Result]):
|
|
while True:
|
|
path = await queue.get()
|
|
try:
|
|
results[path] = await process_file(path)
|
|
except Exception as e:
|
|
results[path] = Error(str(e))
|
|
finally:
|
|
queue.task_done()
|
|
```
|
|
|
|
### Progress Tracking
|
|
```python
|
|
# ✅ DO: Track progress with atomic updates
|
|
class BatchProgress:
|
|
def __init__(self, total: int):
|
|
self.total = total
|
|
self._completed = 0
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def increment(self) -> float:
|
|
"""Increment completed count and return progress percentage."""
|
|
async with self._lock:
|
|
self._completed += 1
|
|
return self._completed / self.total * 100
|
|
```
|
|
|
|
### Resource Management
|
|
```python
|
|
# ✅ DO: Group files by size for balanced processing
|
|
def group_by_size(file_paths: List[Path], target_groups: int = 8) -> List[List[Path]]:
|
|
"""Group files by size to balance processing load."""
|
|
# Get file sizes
|
|
files_with_size = [(path, path.stat().st_size) for path in file_paths]
|
|
|
|
# Sort by size (largest first)
|
|
files_with_size.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
# Distribute into groups (using greedy algorithm)
|
|
groups = [[] for _ in range(target_groups)]
|
|
group_sizes = [0] * target_groups
|
|
|
|
for path, size in files_with_size:
|
|
# Find group with smallest current size
|
|
smallest_group = group_sizes.index(min(group_sizes))
|
|
groups[smallest_group].append(path)
|
|
group_sizes[smallest_group] += size
|
|
|
|
return groups
|
|
```
|
|
|
|
### Anti-Patterns
|
|
```python
|
|
# ❌ DON'T: Process all files in parallel without limits
|
|
async def process_all_parallel(files: List[Path]):
|
|
# This ignores system limits and can crash
|
|
tasks = [process_file(f) for f in files]
|
|
return await asyncio.gather(*tasks) # Too many tasks!
|
|
|
|
# ❌ DON'T: Let one failure stop the entire batch
|
|
async def process_batch_unsafe(files: List[Path]):
|
|
results = []
|
|
for file in files:
|
|
# If this raises an exception, the whole batch fails
|
|
result = await process_file(file)
|
|
results.append(result)
|
|
return results
|
|
```
|
|
|
|
When processing multiple files, ALWAYS use queue-based batch processing. Jobs must fail independently - one failure shouldn't stop the batch. Track progress in real-time with atomic updates. Respect system limits: max 8 parallel workers, 2GB memory per worker. Group files by size for optimal processing. |