youtube-summarizer/backend/autonomous/autonomous_controller.py

769 lines
29 KiB
Python

"""
Autonomous Operation Controller for YouTube Summarizer
Provides intelligent automation, scheduling, and autonomous processing capabilities
"""
import asyncio
import json
import logging
from typing import Any, Dict, List, Optional, Callable, Union
from datetime import datetime, timedelta
from enum import Enum
from dataclasses import dataclass, field
import uuid
from .webhook_system import WebhookEvent, trigger_event
# Import backend services
try:
from ..services.dual_transcript_service import DualTranscriptService
from ..services.summary_pipeline import SummaryPipeline
from ..services.batch_processing_service import BatchProcessingService
from ..models.transcript import TranscriptSource
BACKEND_SERVICES_AVAILABLE = True
except ImportError:
BACKEND_SERVICES_AVAILABLE = False
logger = logging.getLogger(__name__)
class AutomationTrigger(str, Enum):
"""Types of automation triggers"""
SCHEDULED = "scheduled" # Time-based scheduling
EVENT_DRIVEN = "event_driven" # Triggered by events
QUEUE_BASED = "queue_based" # Triggered by queue depth
THRESHOLD_BASED = "threshold_based" # Triggered by metrics
WEBHOOK_TRIGGERED = "webhook_triggered" # External webhook trigger
USER_ACTIVITY = "user_activity" # Based on user patterns
class AutomationAction(str, Enum):
"""Types of automation actions"""
PROCESS_VIDEO = "process_video"
BATCH_PROCESS = "batch_process"
CLEANUP_CACHE = "cleanup_cache"
GENERATE_REPORT = "generate_report"
SCALE_RESOURCES = "scale_resources"
SEND_NOTIFICATION = "send_notification"
OPTIMIZE_PERFORMANCE = "optimize_performance"
BACKUP_DATA = "backup_data"
class AutomationStatus(str, Enum):
"""Status of automation rules"""
ACTIVE = "active"
INACTIVE = "inactive"
PAUSED = "paused"
ERROR = "error"
COMPLETED = "completed"
@dataclass
class AutomationRule:
"""Defines an automation rule"""
id: str
name: str
description: str
trigger: AutomationTrigger
action: AutomationAction
parameters: Dict[str, Any] = field(default_factory=dict)
conditions: Dict[str, Any] = field(default_factory=dict)
status: AutomationStatus = AutomationStatus.ACTIVE
last_executed: Optional[datetime] = None
execution_count: int = 0
success_count: int = 0
error_count: int = 0
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
@dataclass
class AutomationExecution:
"""Records an automation execution"""
id: str
rule_id: str
started_at: datetime
completed_at: Optional[datetime] = None
status: str = "running"
result: Optional[Dict[str, Any]] = None
error_message: Optional[str] = None
context: Dict[str, Any] = field(default_factory=dict)
class AutonomousController:
"""Main controller for autonomous operations"""
def __init__(self):
self.rules: Dict[str, AutomationRule] = {}
self.executions: Dict[str, AutomationExecution] = {}
self.is_running = False
self.scheduler_task = None
self.metrics = {
"total_executions": 0,
"successful_executions": 0,
"failed_executions": 0,
"average_execution_time": 0.0,
"rules_processed_today": 0
}
# Initialize services
self._initialize_services()
# Setup default automation rules
self._setup_default_rules()
def _initialize_services(self):
"""Initialize backend services"""
if BACKEND_SERVICES_AVAILABLE:
try:
self.transcript_service = DualTranscriptService()
self.batch_service = BatchProcessingService()
# Pipeline service requires dependency injection
self.pipeline_service = None
except Exception as e:
logger.warning(f"Could not initialize services: {e}")
self.transcript_service = None
self.batch_service = None
self.pipeline_service = None
else:
self.transcript_service = None
self.batch_service = None
self.pipeline_service = None
def _setup_default_rules(self):
"""Setup default automation rules"""
# Daily cleanup rule
self.add_rule(
name="Daily Cache Cleanup",
description="Clean up old cache entries daily at 2 AM",
trigger=AutomationTrigger.SCHEDULED,
action=AutomationAction.CLEANUP_CACHE,
parameters={
"schedule": "0 2 * * *", # Daily at 2 AM
"max_age_hours": 24,
"cleanup_types": ["transcripts", "summaries", "metadata"]
}
)
# Queue depth monitoring
self.add_rule(
name="Queue Depth Monitor",
description="Trigger batch processing when queue exceeds threshold",
trigger=AutomationTrigger.QUEUE_BASED,
action=AutomationAction.BATCH_PROCESS,
parameters={
"queue_threshold": 10,
"check_interval_minutes": 5,
"batch_size": 5
},
conditions={
"min_queue_age_minutes": 10, # Wait 10 mins before processing
"max_concurrent_batches": 3
}
)
# Performance optimization
self.add_rule(
name="Performance Optimizer",
description="Optimize performance based on system metrics",
trigger=AutomationTrigger.THRESHOLD_BASED,
action=AutomationAction.OPTIMIZE_PERFORMANCE,
parameters={
"cpu_threshold": 80,
"memory_threshold": 85,
"response_time_threshold": 5.0,
"check_interval_minutes": 15
}
)
# Daily report generation
self.add_rule(
name="Daily Report",
description="Generate daily usage and performance report",
trigger=AutomationTrigger.SCHEDULED,
action=AutomationAction.GENERATE_REPORT,
parameters={
"schedule": "0 6 * * *", # Daily at 6 AM
"report_types": ["usage", "performance", "errors"],
"recipients": ["admin"]
}
)
# User activity monitoring
self.add_rule(
name="User Activity Monitor",
description="Monitor user activity patterns and optimize accordingly",
trigger=AutomationTrigger.USER_ACTIVITY,
action=AutomationAction.SCALE_RESOURCES,
parameters={
"activity_window_hours": 1,
"scale_threshold": 5, # 5+ users in window
"check_interval_minutes": 10
}
)
def add_rule(
self,
name: str,
description: str,
trigger: AutomationTrigger,
action: AutomationAction,
parameters: Optional[Dict[str, Any]] = None,
conditions: Optional[Dict[str, Any]] = None
) -> str:
"""Add a new automation rule"""
rule_id = str(uuid.uuid4())
rule = AutomationRule(
id=rule_id,
name=name,
description=description,
trigger=trigger,
action=action,
parameters=parameters or {},
conditions=conditions or {}
)
self.rules[rule_id] = rule
logger.info(f"Added automation rule: {name} ({rule_id})")
return rule_id
def update_rule(self, rule_id: str, **updates) -> bool:
"""Update an automation rule"""
if rule_id not in self.rules:
return False
rule = self.rules[rule_id]
for key, value in updates.items():
if hasattr(rule, key):
setattr(rule, key, value)
rule.updated_at = datetime.now()
logger.info(f"Updated automation rule: {rule_id}")
return True
def remove_rule(self, rule_id: str) -> bool:
"""Remove an automation rule"""
if rule_id not in self.rules:
return False
rule = self.rules[rule_id]
del self.rules[rule_id]
logger.info(f"Removed automation rule: {rule.name} ({rule_id})")
return True
def activate_rule(self, rule_id: str) -> bool:
"""Activate an automation rule"""
return self.update_rule(rule_id, status=AutomationStatus.ACTIVE)
def deactivate_rule(self, rule_id: str) -> bool:
"""Deactivate an automation rule"""
return self.update_rule(rule_id, status=AutomationStatus.INACTIVE)
async def start(self):
"""Start the autonomous controller"""
if self.is_running:
logger.warning("Autonomous controller is already running")
return
self.is_running = True
self.scheduler_task = asyncio.create_task(self._scheduler_loop())
logger.info("Started autonomous controller")
# Trigger startup event
await trigger_event(WebhookEvent.SYSTEM_STATUS, {
"status": "autonomous_controller_started",
"active_rules": len([r for r in self.rules.values() if r.status == AutomationStatus.ACTIVE]),
"timestamp": datetime.now().isoformat()
})
async def stop(self):
"""Stop the autonomous controller"""
if not self.is_running:
return
self.is_running = False
if self.scheduler_task:
self.scheduler_task.cancel()
try:
await self.scheduler_task
except asyncio.CancelledError:
pass
logger.info("Stopped autonomous controller")
# Trigger shutdown event
await trigger_event(WebhookEvent.SYSTEM_STATUS, {
"status": "autonomous_controller_stopped",
"total_executions": self.metrics["total_executions"],
"timestamp": datetime.now().isoformat()
})
async def _scheduler_loop(self):
"""Main scheduler loop"""
logger.info("Starting autonomous scheduler loop")
while self.is_running:
try:
# Check all active rules
for rule in self.rules.values():
if rule.status != AutomationStatus.ACTIVE:
continue
# Check if rule should be executed
if await self._should_execute_rule(rule):
await self._execute_rule(rule)
# Clean up old executions
await self._cleanup_old_executions()
# Wait before next iteration
await asyncio.sleep(30) # Check every 30 seconds
except Exception as e:
logger.error(f"Error in scheduler loop: {e}")
await asyncio.sleep(60) # Longer pause on errors
async def _should_execute_rule(self, rule: AutomationRule) -> bool:
"""Check if a rule should be executed"""
try:
if rule.trigger == AutomationTrigger.SCHEDULED:
return self._check_schedule(rule)
elif rule.trigger == AutomationTrigger.QUEUE_BASED:
return await self._check_queue_conditions(rule)
elif rule.trigger == AutomationTrigger.THRESHOLD_BASED:
return await self._check_threshold_conditions(rule)
elif rule.trigger == AutomationTrigger.USER_ACTIVITY:
return await self._check_user_activity(rule)
else:
return False
except Exception as e:
logger.error(f"Error checking rule {rule.id}: {e}")
return False
def _check_schedule(self, rule: AutomationRule) -> bool:
"""Check if scheduled rule should execute"""
# Simple time-based check (would use croniter in production)
schedule = rule.parameters.get("schedule")
if not schedule:
return False
# For demo, check if we haven't run in the last hour
if rule.last_executed:
time_since_last = datetime.now() - rule.last_executed
return time_since_last > timedelta(hours=1)
return True
async def _check_queue_conditions(self, rule: AutomationRule) -> bool:
"""Check queue-based conditions"""
threshold = rule.parameters.get("queue_threshold", 10)
# Mock queue check (would connect to real queue in production)
mock_queue_size = 15 # Simulated queue size
if mock_queue_size >= threshold:
# Check additional conditions
min_age = rule.conditions.get("min_queue_age_minutes", 0)
max_concurrent = rule.conditions.get("max_concurrent_batches", 5)
# Mock checks
queue_age_ok = True # Would check actual queue age
concurrent_ok = True # Would check running batches
return queue_age_ok and concurrent_ok
return False
async def _check_threshold_conditions(self, rule: AutomationRule) -> bool:
"""Check threshold-based conditions"""
cpu_threshold = rule.parameters.get("cpu_threshold", 80)
memory_threshold = rule.parameters.get("memory_threshold", 85)
response_time_threshold = rule.parameters.get("response_time_threshold", 5.0)
# Mock system metrics (would use real monitoring in production)
mock_cpu = 75
mock_memory = 82
mock_response_time = 4.2
return (mock_cpu > cpu_threshold or
mock_memory > memory_threshold or
mock_response_time > response_time_threshold)
async def _check_user_activity(self, rule: AutomationRule) -> bool:
"""Check user activity patterns"""
window_hours = rule.parameters.get("activity_window_hours", 1)
scale_threshold = rule.parameters.get("scale_threshold", 5)
# Mock user activity check
mock_active_users = 7 # Would query real user activity
return mock_active_users >= scale_threshold
async def _execute_rule(self, rule: AutomationRule):
"""Execute an automation rule"""
execution_id = str(uuid.uuid4())
execution = AutomationExecution(
id=execution_id,
rule_id=rule.id,
started_at=datetime.now()
)
self.executions[execution_id] = execution
logger.info(f"Executing rule: {rule.name} ({rule.id})")
try:
# Execute the action
result = await self._perform_action(rule.action, rule.parameters)
# Update execution record
execution.completed_at = datetime.now()
execution.status = "completed"
execution.result = result
# Update rule stats
rule.last_executed = datetime.now()
rule.execution_count += 1
rule.success_count += 1
# Update system metrics
self.metrics["total_executions"] += 1
self.metrics["successful_executions"] += 1
# Calculate execution time
if execution.completed_at and execution.started_at:
execution_time = (execution.completed_at - execution.started_at).total_seconds()
self._update_average_execution_time(execution_time)
logger.info(f"Successfully executed rule: {rule.name}")
# Trigger success webhook
await trigger_event(WebhookEvent.SYSTEM_STATUS, {
"event_type": "automation_rule_executed",
"rule_id": rule.id,
"rule_name": rule.name,
"execution_id": execution_id,
"result": result,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
# Update execution record
execution.completed_at = datetime.now()
execution.status = "failed"
execution.error_message = str(e)
# Update rule stats
rule.error_count += 1
# Update system metrics
self.metrics["total_executions"] += 1
self.metrics["failed_executions"] += 1
logger.error(f"Failed to execute rule {rule.name}: {e}")
# Trigger error webhook
await trigger_event(WebhookEvent.ERROR_OCCURRED, {
"error_type": "automation_rule_failed",
"rule_id": rule.id,
"rule_name": rule.name,
"execution_id": execution_id,
"error": str(e),
"timestamp": datetime.now().isoformat()
})
async def _perform_action(self, action: AutomationAction, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Perform the specified automation action"""
if action == AutomationAction.CLEANUP_CACHE:
return await self._cleanup_cache_action(parameters)
elif action == AutomationAction.BATCH_PROCESS:
return await self._batch_process_action(parameters)
elif action == AutomationAction.GENERATE_REPORT:
return await self._generate_report_action(parameters)
elif action == AutomationAction.SCALE_RESOURCES:
return await self._scale_resources_action(parameters)
elif action == AutomationAction.OPTIMIZE_PERFORMANCE:
return await self._optimize_performance_action(parameters)
elif action == AutomationAction.SEND_NOTIFICATION:
return await self._send_notification_action(parameters)
elif action == AutomationAction.BACKUP_DATA:
return await self._backup_data_action(parameters)
else:
raise ValueError(f"Unknown action: {action}")
async def _cleanup_cache_action(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Perform cache cleanup"""
max_age_hours = parameters.get("max_age_hours", 24)
cleanup_types = parameters.get("cleanup_types", ["transcripts", "summaries"])
# Mock cleanup (would connect to real cache in production)
cleaned_items = 0
for cleanup_type in cleanup_types:
# Simulate cleanup
items_cleaned = 15 # Mock number
cleaned_items += items_cleaned
logger.info(f"Cleaned {items_cleaned} {cleanup_type} cache entries")
return {
"action": "cleanup_cache",
"items_cleaned": cleaned_items,
"cleanup_types": cleanup_types,
"max_age_hours": max_age_hours
}
async def _batch_process_action(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Perform batch processing"""
batch_size = parameters.get("batch_size", 5)
# Mock batch processing
mock_video_urls = [
f"https://youtube.com/watch?v=mock_{i}"
for i in range(batch_size)
]
if self.batch_service and BACKEND_SERVICES_AVAILABLE:
# Would use real batch service
batch_id = f"auto_batch_{int(datetime.now().timestamp())}"
logger.info(f"Started automated batch processing: {batch_id}")
else:
batch_id = f"mock_batch_{int(datetime.now().timestamp())}"
return {
"action": "batch_process",
"batch_id": batch_id,
"video_count": batch_size,
"videos": mock_video_urls
}
async def _generate_report_action(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Generate system reports"""
report_types = parameters.get("report_types", ["usage"])
reports_generated = []
for report_type in report_types:
report_id = f"{report_type}_{datetime.now().strftime('%Y%m%d')}"
# Mock report generation
if report_type == "usage":
report_data = {
"total_videos_processed": 145,
"total_transcripts": 132,
"total_summaries": 98,
"active_users": 23
}
elif report_type == "performance":
report_data = {
"average_processing_time": 45.2,
"success_rate": 0.97,
"error_rate": 0.03,
"system_uptime": "99.8%"
}
elif report_type == "errors":
report_data = {
"total_errors": 12,
"critical_errors": 2,
"warning_errors": 10,
"top_error_types": ["timeout", "api_limit"]
}
else:
report_data = {"message": f"Unknown report type: {report_type}"}
reports_generated.append({
"report_id": report_id,
"type": report_type,
"data": report_data
})
return {
"action": "generate_report",
"reports": reports_generated,
"generated_at": datetime.now().isoformat()
}
async def _scale_resources_action(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Scale system resources"""
activity_window = parameters.get("activity_window_hours", 1)
scale_threshold = parameters.get("scale_threshold", 5)
# Mock resource scaling
current_capacity = 100 # Mock current capacity
recommended_capacity = 150 # Mock recommended
return {
"action": "scale_resources",
"current_capacity": current_capacity,
"recommended_capacity": recommended_capacity,
"scaling_factor": 1.5,
"activity_window_hours": activity_window
}
async def _optimize_performance_action(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Optimize system performance"""
cpu_threshold = parameters.get("cpu_threshold", 80)
memory_threshold = parameters.get("memory_threshold", 85)
optimizations = []
# Mock performance optimizations
optimizations.append("Enabled connection pooling")
optimizations.append("Increased cache TTL")
optimizations.append("Reduced background task frequency")
return {
"action": "optimize_performance",
"optimizations_applied": optimizations,
"performance_improvement": "15%",
"resource_usage_reduction": "12%"
}
async def _send_notification_action(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Send notifications"""
recipients = parameters.get("recipients", ["admin"])
message = parameters.get("message", "Automated notification")
# Mock notification sending
notifications_sent = len(recipients)
return {
"action": "send_notification",
"recipients": recipients,
"message": message,
"notifications_sent": notifications_sent
}
async def _backup_data_action(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Backup system data"""
backup_types = parameters.get("backup_types", ["database", "cache"])
backups_created = []
for backup_type in backup_types:
backup_id = f"{backup_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backups_created.append({
"backup_id": backup_id,
"type": backup_type,
"size_mb": 250 # Mock size
})
return {
"action": "backup_data",
"backups_created": backups_created,
"total_size_mb": sum(b["size_mb"] for b in backups_created)
}
def _update_average_execution_time(self, execution_time: float):
"""Update average execution time"""
current_avg = self.metrics["average_execution_time"]
total_executions = self.metrics["total_executions"]
if total_executions == 1:
self.metrics["average_execution_time"] = execution_time
else:
self.metrics["average_execution_time"] = (
(current_avg * (total_executions - 1) + execution_time) / total_executions
)
async def _cleanup_old_executions(self):
"""Clean up old execution records"""
cutoff_date = datetime.now() - timedelta(days=7)
old_executions = [
exec_id for exec_id, execution in self.executions.items()
if execution.started_at < cutoff_date and execution.status in ["completed", "failed"]
]
for exec_id in old_executions:
del self.executions[exec_id]
if old_executions:
logger.info(f"Cleaned up {len(old_executions)} old execution records")
def get_rule_status(self, rule_id: str) -> Optional[Dict[str, Any]]:
"""Get status of a specific rule"""
if rule_id not in self.rules:
return None
rule = self.rules[rule_id]
return {
"rule_id": rule.id,
"name": rule.name,
"description": rule.description,
"trigger": rule.trigger,
"action": rule.action,
"status": rule.status,
"last_executed": rule.last_executed.isoformat() if rule.last_executed else None,
"execution_count": rule.execution_count,
"success_count": rule.success_count,
"error_count": rule.error_count,
"success_rate": rule.success_count / rule.execution_count if rule.execution_count > 0 else 0.0,
"created_at": rule.created_at.isoformat(),
"updated_at": rule.updated_at.isoformat()
}
def get_system_status(self) -> Dict[str, Any]:
"""Get overall system status"""
active_rules = len([r for r in self.rules.values() if r.status == AutomationStatus.ACTIVE])
running_executions = len([e for e in self.executions.values() if e.status == "running"])
return {
"controller_status": "running" if self.is_running else "stopped",
"total_rules": len(self.rules),
"active_rules": active_rules,
"running_executions": running_executions,
"total_executions": self.metrics["total_executions"],
"successful_executions": self.metrics["successful_executions"],
"failed_executions": self.metrics["failed_executions"],
"success_rate": (
self.metrics["successful_executions"] / self.metrics["total_executions"]
if self.metrics["total_executions"] > 0 else 0.0
),
"average_execution_time": round(self.metrics["average_execution_time"], 3),
"rules_processed_today": self.metrics["rules_processed_today"],
"services_available": BACKEND_SERVICES_AVAILABLE
}
def get_execution_history(self, rule_id: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]:
"""Get execution history"""
executions = list(self.executions.values())
if rule_id:
executions = [e for e in executions if e.rule_id == rule_id]
executions.sort(key=lambda x: x.started_at, reverse=True)
executions = executions[:limit]
return [
{
"execution_id": e.id,
"rule_id": e.rule_id,
"started_at": e.started_at.isoformat(),
"completed_at": e.completed_at.isoformat() if e.completed_at else None,
"status": e.status,
"result": e.result,
"error_message": e.error_message
}
for e in executions
]
# Global autonomous controller instance
autonomous_controller = AutonomousController()
# Convenience functions
async def start_autonomous_operations():
"""Start autonomous operations"""
await autonomous_controller.start()
async def stop_autonomous_operations():
"""Stop autonomous operations"""
await autonomous_controller.stop()
def get_automation_status() -> Dict[str, Any]:
"""Get automation system status"""
return autonomous_controller.get_system_status()
async def trigger_manual_execution(rule_id: str) -> bool:
"""Manually trigger rule execution"""
if rule_id not in autonomous_controller.rules:
return False
rule = autonomous_controller.rules[rule_id]
await autonomous_controller._execute_rule(rule)
return True