trax/docs/architecture/error-handling-and-logging.md

17 KiB

Error Handling and Logging System

Overview

The Trax platform implements a comprehensive error handling and logging system designed for production reliability, observability, and maintainability. This system provides structured logging, error classification, retry mechanisms, recovery strategies, and performance monitoring.

Architecture

The error handling and logging system is organized into several key modules:

src/
├── logging/
│   ├── __init__.py          # Main logging interface
│   ├── config.py            # Logging configuration
│   ├── utils.py             # Logging utilities
│   └── metrics.py           # Performance metrics
├── errors/
│   ├── __init__.py          # Error system interface
│   ├── base.py              # Base error classes
│   ├── codes.py             # Error codes and categories
│   └── classification.py    # Error classification utilities
├── retry/
│   ├── __init__.py          # Retry system interface
│   ├── base.py              # Retry configuration and strategies
│   └── decorators.py        # Retry decorators
└── recovery/
    ├── __init__.py          # Recovery system interface
    ├── strategies/          # Recovery strategies
    ├── fallbacks/           # Fallback mechanisms
    └── state/               # State recovery

Core Components

1. Structured Logging System

The logging system provides structured, contextual logging with file rotation and multiple output formats.

Key Features:

  • Structured JSON Logging: All logs include contextual information (timestamp, module, correlation ID)
  • File Rotation: Automatic log rotation based on size and time
  • Multiple Output Formats: JSON for machine processing, human-readable for console
  • Performance Integration: Built-in performance metrics collection
  • Debug Mode: Verbose logging for development and troubleshooting

Usage:

from src.logging import get_logger, initialize_logging

# Initialize logging system
initialize_logging()

# Get logger
logger = get_logger(__name__)

# Structured logging
logger.info("Processing started", extra={
    "operation": "transcription",
    "file_size": "15.2MB",
    "correlation_id": "req-123"
})

Configuration:

from src.logging import LoggingConfig

config = LoggingConfig(
    level="INFO",
    log_to_console=True,
    log_to_file=True,
    log_to_json=True,
    log_dir="logs",
    max_file_size=10 * 1024 * 1024,  # 10MB
    backup_count=5
)

2. Error Classification System

A hierarchical error classification system that provides standardized error handling across the application.

Error Hierarchy:

TraxError (base)
├── NetworkError
│   ├── ConnectionError
│   ├── TimeoutError
│   └── DNSResolutionError
├── APIError
│   ├── AuthenticationError
│   ├── RateLimitError
│   ├── QuotaExceededError
│   └── ServiceUnavailableError
├── FileSystemError
│   ├── FileNotFoundError
│   ├── PermissionError
│   ├── DiskSpaceError
│   └── CorruptedFileError
├── ValidationError
│   ├── InvalidInputError
│   ├── MissingRequiredFieldError
│   └── FormatError
├── ProcessingError
│   ├── TranscriptionError
│   ├── EnhancementError
│   └── MediaProcessingError
└── ConfigurationError
    ├── MissingConfigError
    ├── InvalidConfigError
    └── EnvironmentError

Error Codes:

Each error includes a standardized error code for easy identification and handling:

  • TRAX-001: Network connection failed
  • TRAX-002: API authentication failed
  • TRAX-003: File not found
  • TRAX-004: Invalid input format
  • TRAX-005: Processing timeout
  • TRAX-006: Configuration error
  • TRAX-007: Recovery failed

Usage:

from src.errors import (
    NetworkError, APIError, ValidationError,
    create_network_error, create_api_error
)

# Create specific errors
try:
    response = await api_client.call()
except ConnectionError as e:
    raise create_network_error("API connection failed", original_error=e)

# Error classification
from src.errors import classify_error, is_retryable_error

error = classify_error(exception)
if is_retryable_error(error):
    # Implement retry logic
    pass

3. Retry System

A robust retry system with exponential backoff, jitter, and circuit breaker patterns.

Features:

  • Multiple Strategies: Exponential, linear, constant, and Fibonacci backoff
  • Jitter Support: Prevents thundering herd problems
  • Circuit Breaker: Prevents repeated calls to failing services
  • Async Support: Full async/await compatibility
  • Error Classification: Automatic retry based on error type

Usage:

from src.retry import retry, async_retry, RetryConfig

# Basic retry decorator
@retry(max_retries=3, initial_delay=1.0)
def api_call():
    return external_api.request()

# Async retry with custom config
@async_retry(RetryConfig(
    max_retries=5,
    initial_delay=0.5,
    max_delay=30.0,
    jitter=0.1
))
async def async_api_call():
    return await external_api.async_request()

# Context manager
from src.retry import RetryContext

async with RetryContext(max_retries=3) as retry_ctx:
    result = await retry_ctx.execute(api_function)

Circuit Breaker:

from src.retry import CircuitBreaker

circuit_breaker = CircuitBreaker(
    failure_threshold=5,
    timeout=60.0,
    expected_exception=NetworkError
)

# Circuit breaker will open after 5 failures
# and allow one test request after 60 seconds

4. Recovery Strategies

A comprehensive recovery system that provides fallback mechanisms and state recovery for different error scenarios.

Recovery Strategies:

  • Fallback Mechanisms: Alternative service providers, cached responses
  • Graceful Degradation: Reduce functionality when services are unavailable
  • State Recovery: Resume interrupted operations from saved state
  • Transaction Rollback: Automatic rollback of database operations
  • Resource Cleanup: Automatic cleanup of temporary resources
  • Health Checks: Proactive monitoring and recovery

Usage:

from src.recovery import (
    RecoveryManager, FallbackStrategy, StateRecoveryStrategy,
    create_fallback_strategy
)

# Create recovery manager
recovery_manager = RecoveryManager()

# Add fallback strategy
fallback_strategy = await create_fallback_strategy(
    primary_operation=whisper_transcribe,
    fallback_operations=[basic_transcribe, cached_transcribe]
)
recovery_manager.add_strategy(fallback_strategy)

# Attempt recovery
result = await recovery_manager.attempt_recovery(context)

Fallback Managers:

from src.recovery import TranscriptionFallbackManager

# Specialized fallback manager for transcription
transcription_fallback = TranscriptionFallbackManager()
await transcription_fallback.add_whisper_fallback(whisper_service)
await transcription_fallback.add_cached_transcription_fallback(cache_store, cache_retrieve)

# Execute with fallbacks
result = await transcription_fallback.execute_with_fallbacks(transcribe_function, audio_file)

State Recovery:

from src.recovery import StateRecoveryManager, operation_state_context

# Create state recovery manager
state_manager = StateRecoveryManager(storage)

# Track operation state
async with operation_state_context(
    state_manager, "transcription_123", "corr_456", "transcription"
) as state:
    # Operation is automatically tracked
    result = await transcribe_audio(audio_file)
    # State is automatically saved on completion

# Recover interrupted operations
interrupted_ops = await state_manager.list_interrupted_operations()
for op in interrupted_ops:
    recovered_state = await state_manager.recover_operation(op.operation_id)

5. Performance Metrics

A comprehensive performance monitoring system that tracks operation timing, resource usage, and system health.

Features:

  • Operation Timing: Measure execution time of operations
  • Resource Monitoring: Track memory and CPU usage
  • System Health: Periodic monitoring of system metrics
  • Threshold Alerts: Configurable alerts for performance issues
  • Metrics Export: JSON export for monitoring systems

Usage:

from src.logging import (
    timing_context, async_timing_context,
    timing_decorator, async_timing_decorator,
    start_health_monitoring, export_all_metrics
)

# Context manager for timing
with timing_context("transcription_operation") as timer:
    result = transcribe_audio(audio_file)

# Async timing
async with async_timing_context("api_call"):
    response = await api_client.call()

# Decorator for automatic timing
@timing_decorator("file_processing")
def process_file(file_path):
    return process_large_file(file_path)

# Health monitoring
await start_health_monitoring(interval_seconds=60)

# Export metrics
metrics_json = export_all_metrics()

Performance Metrics Collected:

  • Operation duration (milliseconds)
  • Memory usage (MB)
  • CPU usage (percentage)
  • Success/failure rates
  • Operation counters
  • System health metrics (CPU, memory, disk usage)

Integration Patterns

1. Service Layer Integration

from src.logging import get_logger, timing_context
from src.errors import create_api_error, classify_error
from src.retry import async_retry
from src.recovery import fallback_context

logger = get_logger(__name__)

class TranscriptionService:
    @async_retry(max_retries=3)
    async def transcribe_audio(self, audio_file: str) -> str:
        with timing_context("transcription_operation") as timer:
            try:
                async with fallback_context(self.fallback_manager):
                    result = await self.whisper_client.transcribe(audio_file)
                    logger.info("Transcription completed", extra={
                        "duration_ms": timer.duration_ms,
                        "file_size": os.path.getsize(audio_file)
                    })
                    return result
            except Exception as e:
                error = classify_error(e)
                logger.error("Transcription failed", extra={
                    "error_code": error.error_code,
                    "error_type": type(e).__name__
                })
                raise create_api_error("Transcription service failed", original_error=e)

2. CLI Integration

from src.logging import get_logger, initialize_logging
from src.errors import error_handler

logger = get_logger(__name__)

@error_handler
def main():
    initialize_logging()
    
    try:
        # CLI logic here
        logger.info("CLI operation started")
        # ... processing ...
        logger.info("CLI operation completed")
    except Exception as e:
        logger.error("CLI operation failed", exc_info=True)
        raise

3. Batch Processing Integration

from src.logging import get_logger, timing_context
from src.recovery import StateRecoveryManager, operation_state_context
from src.errors import create_processing_error

logger = get_logger(__name__)

class BatchProcessor:
    def __init__(self):
        self.state_manager = StateRecoveryManager(storage)
    
    async def process_batch(self, files: List[str]):
        for file in files:
            async with operation_state_context(
                self.state_manager, f"batch_{file}", "batch_corr", "batch_processing"
            ) as state:
                try:
                    with timing_context("batch_file_processing"):
                        result = await self.process_file(file)
                        logger.info("File processed successfully", extra={
                            "file": file,
                            "result_size": len(result)
                        })
                except Exception as e:
                    logger.error("File processing failed", extra={
                        "file": file,
                        "error": str(e)
                    })
                    raise create_processing_error(f"Failed to process {file}", original_error=e)

Configuration

Environment Variables

# Logging configuration
TRAX_LOG_LEVEL=INFO
TRAX_LOG_TO_CONSOLE=true
TRAX_LOG_TO_FILE=true
TRAX_LOG_DIR=logs
TRAX_MAX_LOG_SIZE=10485760  # 10MB
TRAX_LOG_BACKUP_COUNT=5

# Error handling
TRAX_MAX_RETRIES=3
TRAX_INITIAL_RETRY_DELAY=1.0
TRAX_MAX_RETRY_DELAY=30.0
TRAX_RETRY_JITTER=0.1

# Performance monitoring
TRAX_HEALTH_MONITOR_INTERVAL=60
TRAX_METRICS_EXPORT_ENABLED=true

Configuration Files

{
  "logging": {
    "level": "INFO",
    "log_to_console": true,
    "log_to_file": true,
    "log_to_json": true,
    "log_dir": "logs",
    "max_file_size": 10485760,
    "backup_count": 5
  },
  "retry": {
    "max_retries": 3,
    "initial_delay": 1.0,
    "max_delay": 30.0,
    "jitter": 0.1
  },
  "recovery": {
    "enabled": true,
    "max_fallbacks": 3,
    "timeout": 30.0
  },
  "metrics": {
    "enabled": true,
    "health_monitor_interval": 60,
    "export_enabled": true
  }
}

Best Practices

1. Error Handling

  • Always use specific error types from the error hierarchy
  • Include contextual information in error messages
  • Use error codes for consistent error identification
  • Implement proper error recovery strategies

2. Logging

  • Use structured logging with contextual information
  • Include correlation IDs for request tracing
  • Log at appropriate levels (DEBUG, INFO, WARNING, ERROR)
  • Use performance metrics for slow operations

3. Retry Logic

  • Only retry transient errors (network, temporary API failures)
  • Use exponential backoff with jitter
  • Implement circuit breakers for failing services
  • Set appropriate retry limits

4. Recovery Strategies

  • Implement fallback mechanisms for critical operations
  • Use graceful degradation when possible
  • Save operation state for recovery
  • Clean up resources on failure

5. Performance Monitoring

  • Monitor all critical operations
  • Set appropriate thresholds for alerts
  • Export metrics for external monitoring systems
  • Use health checks for proactive monitoring

Testing

Unit Tests

import pytest
from src.logging import get_logger
from src.errors import NetworkError, create_network_error
from src.retry import retry

def test_error_classification():
    error = create_network_error("Connection failed")
    assert isinstance(error, NetworkError)
    assert error.error_code == "TRAX-001"

def test_retry_logic():
    @retry(max_retries=2)
    def failing_function():
        raise NetworkError("Test error")
    
    with pytest.raises(NetworkError):
        failing_function()

def test_logging():
    logger = get_logger("test")
    with timing_context("test_operation"):
        # Test operation
        pass

Integration Tests

async def test_recovery_strategies():
    recovery_manager = RecoveryManager()
    # Add test strategies
    # Test recovery scenarios

async def test_performance_monitoring():
    await start_health_monitoring(interval_seconds=1)
    # Perform operations
    # Verify metrics collection
    await stop_health_monitoring()

Monitoring and Alerting

Metrics Dashboard

  • Operation success rates
  • Response times
  • Error rates by type
  • Resource usage (CPU, memory, disk)
  • System health status

Alerts

  • High error rates (>5%)
  • Slow response times (>30s)
  • High resource usage (>90%)
  • Service unavailability
  • Circuit breaker activations

Log Analysis

  • Error pattern analysis
  • Performance bottleneck identification
  • Security incident detection
  • Usage pattern analysis

Troubleshooting

Common Issues

  1. High Memory Usage

    • Check for memory leaks in long-running operations
    • Monitor memory usage in performance metrics
    • Implement proper resource cleanup
  2. Slow Response Times

    • Use timing contexts to identify slow operations
    • Check for blocking operations
    • Implement caching where appropriate
  3. High Error Rates

    • Check error logs for patterns
    • Verify external service availability
    • Review retry and recovery configurations
  4. Log File Issues

    • Check disk space
    • Verify log rotation configuration
    • Review log level settings

Debug Mode

from src.logging import enable_debug

# Enable debug mode for detailed logging
enable_debug()

# Debug mode provides:
# - Detailed error stack traces
# - Performance timing for all operations
# - Verbose retry and recovery logs
# - Memory usage tracking

Future Enhancements

  1. Distributed Tracing: Integration with OpenTelemetry for distributed request tracing
  2. Advanced Metrics: Custom business metrics and KPIs
  3. Machine Learning: Anomaly detection for performance issues
  4. Security Logging: Enhanced security event logging and monitoring
  5. Compliance: GDPR and other compliance-related logging features