769 lines
29 KiB
Python
769 lines
29 KiB
Python
"""
|
|
Autonomous Operation Controller for YouTube Summarizer
|
|
Provides intelligent automation, scheduling, and autonomous processing capabilities
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Any, Dict, List, Optional, Callable, Union
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
from dataclasses import dataclass, field
|
|
import uuid
|
|
|
|
from .webhook_system import WebhookEvent, trigger_event
|
|
|
|
# Import backend services
|
|
try:
|
|
from ..services.dual_transcript_service import DualTranscriptService
|
|
from ..services.summary_pipeline import SummaryPipeline
|
|
from ..services.batch_processing_service import BatchProcessingService
|
|
from ..models.transcript import TranscriptSource
|
|
BACKEND_SERVICES_AVAILABLE = True
|
|
except ImportError:
|
|
BACKEND_SERVICES_AVAILABLE = False
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class AutomationTrigger(str, Enum):
|
|
"""Types of automation triggers"""
|
|
SCHEDULED = "scheduled" # Time-based scheduling
|
|
EVENT_DRIVEN = "event_driven" # Triggered by events
|
|
QUEUE_BASED = "queue_based" # Triggered by queue depth
|
|
THRESHOLD_BASED = "threshold_based" # Triggered by metrics
|
|
WEBHOOK_TRIGGERED = "webhook_triggered" # External webhook trigger
|
|
USER_ACTIVITY = "user_activity" # Based on user patterns
|
|
|
|
class AutomationAction(str, Enum):
|
|
"""Types of automation actions"""
|
|
PROCESS_VIDEO = "process_video"
|
|
BATCH_PROCESS = "batch_process"
|
|
CLEANUP_CACHE = "cleanup_cache"
|
|
GENERATE_REPORT = "generate_report"
|
|
SCALE_RESOURCES = "scale_resources"
|
|
SEND_NOTIFICATION = "send_notification"
|
|
OPTIMIZE_PERFORMANCE = "optimize_performance"
|
|
BACKUP_DATA = "backup_data"
|
|
|
|
class AutomationStatus(str, Enum):
|
|
"""Status of automation rules"""
|
|
ACTIVE = "active"
|
|
INACTIVE = "inactive"
|
|
PAUSED = "paused"
|
|
ERROR = "error"
|
|
COMPLETED = "completed"
|
|
|
|
@dataclass
|
|
class AutomationRule:
|
|
"""Defines an automation rule"""
|
|
id: str
|
|
name: str
|
|
description: str
|
|
trigger: AutomationTrigger
|
|
action: AutomationAction
|
|
parameters: Dict[str, Any] = field(default_factory=dict)
|
|
conditions: Dict[str, Any] = field(default_factory=dict)
|
|
status: AutomationStatus = AutomationStatus.ACTIVE
|
|
last_executed: Optional[datetime] = None
|
|
execution_count: int = 0
|
|
success_count: int = 0
|
|
error_count: int = 0
|
|
created_at: datetime = field(default_factory=datetime.now)
|
|
updated_at: datetime = field(default_factory=datetime.now)
|
|
|
|
@dataclass
|
|
class AutomationExecution:
|
|
"""Records an automation execution"""
|
|
id: str
|
|
rule_id: str
|
|
started_at: datetime
|
|
completed_at: Optional[datetime] = None
|
|
status: str = "running"
|
|
result: Optional[Dict[str, Any]] = None
|
|
error_message: Optional[str] = None
|
|
context: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
class AutonomousController:
|
|
"""Main controller for autonomous operations"""
|
|
|
|
def __init__(self):
|
|
self.rules: Dict[str, AutomationRule] = {}
|
|
self.executions: Dict[str, AutomationExecution] = {}
|
|
self.is_running = False
|
|
self.scheduler_task = None
|
|
self.metrics = {
|
|
"total_executions": 0,
|
|
"successful_executions": 0,
|
|
"failed_executions": 0,
|
|
"average_execution_time": 0.0,
|
|
"rules_processed_today": 0
|
|
}
|
|
|
|
# Initialize services
|
|
self._initialize_services()
|
|
|
|
# Setup default automation rules
|
|
self._setup_default_rules()
|
|
|
|
def _initialize_services(self):
|
|
"""Initialize backend services"""
|
|
if BACKEND_SERVICES_AVAILABLE:
|
|
try:
|
|
self.transcript_service = DualTranscriptService()
|
|
self.batch_service = BatchProcessingService()
|
|
# Pipeline service requires dependency injection
|
|
self.pipeline_service = None
|
|
except Exception as e:
|
|
logger.warning(f"Could not initialize services: {e}")
|
|
self.transcript_service = None
|
|
self.batch_service = None
|
|
self.pipeline_service = None
|
|
else:
|
|
self.transcript_service = None
|
|
self.batch_service = None
|
|
self.pipeline_service = None
|
|
|
|
def _setup_default_rules(self):
|
|
"""Setup default automation rules"""
|
|
|
|
# Daily cleanup rule
|
|
self.add_rule(
|
|
name="Daily Cache Cleanup",
|
|
description="Clean up old cache entries daily at 2 AM",
|
|
trigger=AutomationTrigger.SCHEDULED,
|
|
action=AutomationAction.CLEANUP_CACHE,
|
|
parameters={
|
|
"schedule": "0 2 * * *", # Daily at 2 AM
|
|
"max_age_hours": 24,
|
|
"cleanup_types": ["transcripts", "summaries", "metadata"]
|
|
}
|
|
)
|
|
|
|
# Queue depth monitoring
|
|
self.add_rule(
|
|
name="Queue Depth Monitor",
|
|
description="Trigger batch processing when queue exceeds threshold",
|
|
trigger=AutomationTrigger.QUEUE_BASED,
|
|
action=AutomationAction.BATCH_PROCESS,
|
|
parameters={
|
|
"queue_threshold": 10,
|
|
"check_interval_minutes": 5,
|
|
"batch_size": 5
|
|
},
|
|
conditions={
|
|
"min_queue_age_minutes": 10, # Wait 10 mins before processing
|
|
"max_concurrent_batches": 3
|
|
}
|
|
)
|
|
|
|
# Performance optimization
|
|
self.add_rule(
|
|
name="Performance Optimizer",
|
|
description="Optimize performance based on system metrics",
|
|
trigger=AutomationTrigger.THRESHOLD_BASED,
|
|
action=AutomationAction.OPTIMIZE_PERFORMANCE,
|
|
parameters={
|
|
"cpu_threshold": 80,
|
|
"memory_threshold": 85,
|
|
"response_time_threshold": 5.0,
|
|
"check_interval_minutes": 15
|
|
}
|
|
)
|
|
|
|
# Daily report generation
|
|
self.add_rule(
|
|
name="Daily Report",
|
|
description="Generate daily usage and performance report",
|
|
trigger=AutomationTrigger.SCHEDULED,
|
|
action=AutomationAction.GENERATE_REPORT,
|
|
parameters={
|
|
"schedule": "0 6 * * *", # Daily at 6 AM
|
|
"report_types": ["usage", "performance", "errors"],
|
|
"recipients": ["admin"]
|
|
}
|
|
)
|
|
|
|
# User activity monitoring
|
|
self.add_rule(
|
|
name="User Activity Monitor",
|
|
description="Monitor user activity patterns and optimize accordingly",
|
|
trigger=AutomationTrigger.USER_ACTIVITY,
|
|
action=AutomationAction.SCALE_RESOURCES,
|
|
parameters={
|
|
"activity_window_hours": 1,
|
|
"scale_threshold": 5, # 5+ users in window
|
|
"check_interval_minutes": 10
|
|
}
|
|
)
|
|
|
|
def add_rule(
|
|
self,
|
|
name: str,
|
|
description: str,
|
|
trigger: AutomationTrigger,
|
|
action: AutomationAction,
|
|
parameters: Optional[Dict[str, Any]] = None,
|
|
conditions: Optional[Dict[str, Any]] = None
|
|
) -> str:
|
|
"""Add a new automation rule"""
|
|
|
|
rule_id = str(uuid.uuid4())
|
|
rule = AutomationRule(
|
|
id=rule_id,
|
|
name=name,
|
|
description=description,
|
|
trigger=trigger,
|
|
action=action,
|
|
parameters=parameters or {},
|
|
conditions=conditions or {}
|
|
)
|
|
|
|
self.rules[rule_id] = rule
|
|
logger.info(f"Added automation rule: {name} ({rule_id})")
|
|
return rule_id
|
|
|
|
def update_rule(self, rule_id: str, **updates) -> bool:
|
|
"""Update an automation rule"""
|
|
if rule_id not in self.rules:
|
|
return False
|
|
|
|
rule = self.rules[rule_id]
|
|
for key, value in updates.items():
|
|
if hasattr(rule, key):
|
|
setattr(rule, key, value)
|
|
|
|
rule.updated_at = datetime.now()
|
|
logger.info(f"Updated automation rule: {rule_id}")
|
|
return True
|
|
|
|
def remove_rule(self, rule_id: str) -> bool:
|
|
"""Remove an automation rule"""
|
|
if rule_id not in self.rules:
|
|
return False
|
|
|
|
rule = self.rules[rule_id]
|
|
del self.rules[rule_id]
|
|
logger.info(f"Removed automation rule: {rule.name} ({rule_id})")
|
|
return True
|
|
|
|
def activate_rule(self, rule_id: str) -> bool:
|
|
"""Activate an automation rule"""
|
|
return self.update_rule(rule_id, status=AutomationStatus.ACTIVE)
|
|
|
|
def deactivate_rule(self, rule_id: str) -> bool:
|
|
"""Deactivate an automation rule"""
|
|
return self.update_rule(rule_id, status=AutomationStatus.INACTIVE)
|
|
|
|
async def start(self):
|
|
"""Start the autonomous controller"""
|
|
if self.is_running:
|
|
logger.warning("Autonomous controller is already running")
|
|
return
|
|
|
|
self.is_running = True
|
|
self.scheduler_task = asyncio.create_task(self._scheduler_loop())
|
|
logger.info("Started autonomous controller")
|
|
|
|
# Trigger startup event
|
|
await trigger_event(WebhookEvent.SYSTEM_STATUS, {
|
|
"status": "autonomous_controller_started",
|
|
"active_rules": len([r for r in self.rules.values() if r.status == AutomationStatus.ACTIVE]),
|
|
"timestamp": datetime.now().isoformat()
|
|
})
|
|
|
|
async def stop(self):
|
|
"""Stop the autonomous controller"""
|
|
if not self.is_running:
|
|
return
|
|
|
|
self.is_running = False
|
|
|
|
if self.scheduler_task:
|
|
self.scheduler_task.cancel()
|
|
try:
|
|
await self.scheduler_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
logger.info("Stopped autonomous controller")
|
|
|
|
# Trigger shutdown event
|
|
await trigger_event(WebhookEvent.SYSTEM_STATUS, {
|
|
"status": "autonomous_controller_stopped",
|
|
"total_executions": self.metrics["total_executions"],
|
|
"timestamp": datetime.now().isoformat()
|
|
})
|
|
|
|
async def _scheduler_loop(self):
|
|
"""Main scheduler loop"""
|
|
logger.info("Starting autonomous scheduler loop")
|
|
|
|
while self.is_running:
|
|
try:
|
|
# Check all active rules
|
|
for rule in self.rules.values():
|
|
if rule.status != AutomationStatus.ACTIVE:
|
|
continue
|
|
|
|
# Check if rule should be executed
|
|
if await self._should_execute_rule(rule):
|
|
await self._execute_rule(rule)
|
|
|
|
# Clean up old executions
|
|
await self._cleanup_old_executions()
|
|
|
|
# Wait before next iteration
|
|
await asyncio.sleep(30) # Check every 30 seconds
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in scheduler loop: {e}")
|
|
await asyncio.sleep(60) # Longer pause on errors
|
|
|
|
async def _should_execute_rule(self, rule: AutomationRule) -> bool:
|
|
"""Check if a rule should be executed"""
|
|
try:
|
|
if rule.trigger == AutomationTrigger.SCHEDULED:
|
|
return self._check_schedule(rule)
|
|
elif rule.trigger == AutomationTrigger.QUEUE_BASED:
|
|
return await self._check_queue_conditions(rule)
|
|
elif rule.trigger == AutomationTrigger.THRESHOLD_BASED:
|
|
return await self._check_threshold_conditions(rule)
|
|
elif rule.trigger == AutomationTrigger.USER_ACTIVITY:
|
|
return await self._check_user_activity(rule)
|
|
else:
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error checking rule {rule.id}: {e}")
|
|
return False
|
|
|
|
def _check_schedule(self, rule: AutomationRule) -> bool:
|
|
"""Check if scheduled rule should execute"""
|
|
# Simple time-based check (would use croniter in production)
|
|
schedule = rule.parameters.get("schedule")
|
|
if not schedule:
|
|
return False
|
|
|
|
# For demo, check if we haven't run in the last hour
|
|
if rule.last_executed:
|
|
time_since_last = datetime.now() - rule.last_executed
|
|
return time_since_last > timedelta(hours=1)
|
|
|
|
return True
|
|
|
|
async def _check_queue_conditions(self, rule: AutomationRule) -> bool:
|
|
"""Check queue-based conditions"""
|
|
threshold = rule.parameters.get("queue_threshold", 10)
|
|
|
|
# Mock queue check (would connect to real queue in production)
|
|
mock_queue_size = 15 # Simulated queue size
|
|
|
|
if mock_queue_size >= threshold:
|
|
# Check additional conditions
|
|
min_age = rule.conditions.get("min_queue_age_minutes", 0)
|
|
max_concurrent = rule.conditions.get("max_concurrent_batches", 5)
|
|
|
|
# Mock checks
|
|
queue_age_ok = True # Would check actual queue age
|
|
concurrent_ok = True # Would check running batches
|
|
|
|
return queue_age_ok and concurrent_ok
|
|
|
|
return False
|
|
|
|
async def _check_threshold_conditions(self, rule: AutomationRule) -> bool:
|
|
"""Check threshold-based conditions"""
|
|
cpu_threshold = rule.parameters.get("cpu_threshold", 80)
|
|
memory_threshold = rule.parameters.get("memory_threshold", 85)
|
|
response_time_threshold = rule.parameters.get("response_time_threshold", 5.0)
|
|
|
|
# Mock system metrics (would use real monitoring in production)
|
|
mock_cpu = 75
|
|
mock_memory = 82
|
|
mock_response_time = 4.2
|
|
|
|
return (mock_cpu > cpu_threshold or
|
|
mock_memory > memory_threshold or
|
|
mock_response_time > response_time_threshold)
|
|
|
|
async def _check_user_activity(self, rule: AutomationRule) -> bool:
|
|
"""Check user activity patterns"""
|
|
window_hours = rule.parameters.get("activity_window_hours", 1)
|
|
scale_threshold = rule.parameters.get("scale_threshold", 5)
|
|
|
|
# Mock user activity check
|
|
mock_active_users = 7 # Would query real user activity
|
|
|
|
return mock_active_users >= scale_threshold
|
|
|
|
async def _execute_rule(self, rule: AutomationRule):
|
|
"""Execute an automation rule"""
|
|
execution_id = str(uuid.uuid4())
|
|
execution = AutomationExecution(
|
|
id=execution_id,
|
|
rule_id=rule.id,
|
|
started_at=datetime.now()
|
|
)
|
|
|
|
self.executions[execution_id] = execution
|
|
logger.info(f"Executing rule: {rule.name} ({rule.id})")
|
|
|
|
try:
|
|
# Execute the action
|
|
result = await self._perform_action(rule.action, rule.parameters)
|
|
|
|
# Update execution record
|
|
execution.completed_at = datetime.now()
|
|
execution.status = "completed"
|
|
execution.result = result
|
|
|
|
# Update rule stats
|
|
rule.last_executed = datetime.now()
|
|
rule.execution_count += 1
|
|
rule.success_count += 1
|
|
|
|
# Update system metrics
|
|
self.metrics["total_executions"] += 1
|
|
self.metrics["successful_executions"] += 1
|
|
|
|
# Calculate execution time
|
|
if execution.completed_at and execution.started_at:
|
|
execution_time = (execution.completed_at - execution.started_at).total_seconds()
|
|
self._update_average_execution_time(execution_time)
|
|
|
|
logger.info(f"Successfully executed rule: {rule.name}")
|
|
|
|
# Trigger success webhook
|
|
await trigger_event(WebhookEvent.SYSTEM_STATUS, {
|
|
"event_type": "automation_rule_executed",
|
|
"rule_id": rule.id,
|
|
"rule_name": rule.name,
|
|
"execution_id": execution_id,
|
|
"result": result,
|
|
"timestamp": datetime.now().isoformat()
|
|
})
|
|
|
|
except Exception as e:
|
|
# Update execution record
|
|
execution.completed_at = datetime.now()
|
|
execution.status = "failed"
|
|
execution.error_message = str(e)
|
|
|
|
# Update rule stats
|
|
rule.error_count += 1
|
|
|
|
# Update system metrics
|
|
self.metrics["total_executions"] += 1
|
|
self.metrics["failed_executions"] += 1
|
|
|
|
logger.error(f"Failed to execute rule {rule.name}: {e}")
|
|
|
|
# Trigger error webhook
|
|
await trigger_event(WebhookEvent.ERROR_OCCURRED, {
|
|
"error_type": "automation_rule_failed",
|
|
"rule_id": rule.id,
|
|
"rule_name": rule.name,
|
|
"execution_id": execution_id,
|
|
"error": str(e),
|
|
"timestamp": datetime.now().isoformat()
|
|
})
|
|
|
|
async def _perform_action(self, action: AutomationAction, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Perform the specified automation action"""
|
|
|
|
if action == AutomationAction.CLEANUP_CACHE:
|
|
return await self._cleanup_cache_action(parameters)
|
|
elif action == AutomationAction.BATCH_PROCESS:
|
|
return await self._batch_process_action(parameters)
|
|
elif action == AutomationAction.GENERATE_REPORT:
|
|
return await self._generate_report_action(parameters)
|
|
elif action == AutomationAction.SCALE_RESOURCES:
|
|
return await self._scale_resources_action(parameters)
|
|
elif action == AutomationAction.OPTIMIZE_PERFORMANCE:
|
|
return await self._optimize_performance_action(parameters)
|
|
elif action == AutomationAction.SEND_NOTIFICATION:
|
|
return await self._send_notification_action(parameters)
|
|
elif action == AutomationAction.BACKUP_DATA:
|
|
return await self._backup_data_action(parameters)
|
|
else:
|
|
raise ValueError(f"Unknown action: {action}")
|
|
|
|
async def _cleanup_cache_action(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Perform cache cleanup"""
|
|
max_age_hours = parameters.get("max_age_hours", 24)
|
|
cleanup_types = parameters.get("cleanup_types", ["transcripts", "summaries"])
|
|
|
|
# Mock cleanup (would connect to real cache in production)
|
|
cleaned_items = 0
|
|
for cleanup_type in cleanup_types:
|
|
# Simulate cleanup
|
|
items_cleaned = 15 # Mock number
|
|
cleaned_items += items_cleaned
|
|
logger.info(f"Cleaned {items_cleaned} {cleanup_type} cache entries")
|
|
|
|
return {
|
|
"action": "cleanup_cache",
|
|
"items_cleaned": cleaned_items,
|
|
"cleanup_types": cleanup_types,
|
|
"max_age_hours": max_age_hours
|
|
}
|
|
|
|
async def _batch_process_action(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Perform batch processing"""
|
|
batch_size = parameters.get("batch_size", 5)
|
|
|
|
# Mock batch processing
|
|
mock_video_urls = [
|
|
f"https://youtube.com/watch?v=mock_{i}"
|
|
for i in range(batch_size)
|
|
]
|
|
|
|
if self.batch_service and BACKEND_SERVICES_AVAILABLE:
|
|
# Would use real batch service
|
|
batch_id = f"auto_batch_{int(datetime.now().timestamp())}"
|
|
logger.info(f"Started automated batch processing: {batch_id}")
|
|
else:
|
|
batch_id = f"mock_batch_{int(datetime.now().timestamp())}"
|
|
|
|
return {
|
|
"action": "batch_process",
|
|
"batch_id": batch_id,
|
|
"video_count": batch_size,
|
|
"videos": mock_video_urls
|
|
}
|
|
|
|
async def _generate_report_action(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Generate system reports"""
|
|
report_types = parameters.get("report_types", ["usage"])
|
|
|
|
reports_generated = []
|
|
for report_type in report_types:
|
|
report_id = f"{report_type}_{datetime.now().strftime('%Y%m%d')}"
|
|
|
|
# Mock report generation
|
|
if report_type == "usage":
|
|
report_data = {
|
|
"total_videos_processed": 145,
|
|
"total_transcripts": 132,
|
|
"total_summaries": 98,
|
|
"active_users": 23
|
|
}
|
|
elif report_type == "performance":
|
|
report_data = {
|
|
"average_processing_time": 45.2,
|
|
"success_rate": 0.97,
|
|
"error_rate": 0.03,
|
|
"system_uptime": "99.8%"
|
|
}
|
|
elif report_type == "errors":
|
|
report_data = {
|
|
"total_errors": 12,
|
|
"critical_errors": 2,
|
|
"warning_errors": 10,
|
|
"top_error_types": ["timeout", "api_limit"]
|
|
}
|
|
else:
|
|
report_data = {"message": f"Unknown report type: {report_type}"}
|
|
|
|
reports_generated.append({
|
|
"report_id": report_id,
|
|
"type": report_type,
|
|
"data": report_data
|
|
})
|
|
|
|
return {
|
|
"action": "generate_report",
|
|
"reports": reports_generated,
|
|
"generated_at": datetime.now().isoformat()
|
|
}
|
|
|
|
async def _scale_resources_action(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Scale system resources"""
|
|
activity_window = parameters.get("activity_window_hours", 1)
|
|
scale_threshold = parameters.get("scale_threshold", 5)
|
|
|
|
# Mock resource scaling
|
|
current_capacity = 100 # Mock current capacity
|
|
recommended_capacity = 150 # Mock recommended
|
|
|
|
return {
|
|
"action": "scale_resources",
|
|
"current_capacity": current_capacity,
|
|
"recommended_capacity": recommended_capacity,
|
|
"scaling_factor": 1.5,
|
|
"activity_window_hours": activity_window
|
|
}
|
|
|
|
async def _optimize_performance_action(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Optimize system performance"""
|
|
cpu_threshold = parameters.get("cpu_threshold", 80)
|
|
memory_threshold = parameters.get("memory_threshold", 85)
|
|
|
|
optimizations = []
|
|
|
|
# Mock performance optimizations
|
|
optimizations.append("Enabled connection pooling")
|
|
optimizations.append("Increased cache TTL")
|
|
optimizations.append("Reduced background task frequency")
|
|
|
|
return {
|
|
"action": "optimize_performance",
|
|
"optimizations_applied": optimizations,
|
|
"performance_improvement": "15%",
|
|
"resource_usage_reduction": "12%"
|
|
}
|
|
|
|
async def _send_notification_action(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Send notifications"""
|
|
recipients = parameters.get("recipients", ["admin"])
|
|
message = parameters.get("message", "Automated notification")
|
|
|
|
# Mock notification sending
|
|
notifications_sent = len(recipients)
|
|
|
|
return {
|
|
"action": "send_notification",
|
|
"recipients": recipients,
|
|
"message": message,
|
|
"notifications_sent": notifications_sent
|
|
}
|
|
|
|
async def _backup_data_action(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Backup system data"""
|
|
backup_types = parameters.get("backup_types", ["database", "cache"])
|
|
|
|
backups_created = []
|
|
for backup_type in backup_types:
|
|
backup_id = f"{backup_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
backups_created.append({
|
|
"backup_id": backup_id,
|
|
"type": backup_type,
|
|
"size_mb": 250 # Mock size
|
|
})
|
|
|
|
return {
|
|
"action": "backup_data",
|
|
"backups_created": backups_created,
|
|
"total_size_mb": sum(b["size_mb"] for b in backups_created)
|
|
}
|
|
|
|
def _update_average_execution_time(self, execution_time: float):
|
|
"""Update average execution time"""
|
|
current_avg = self.metrics["average_execution_time"]
|
|
total_executions = self.metrics["total_executions"]
|
|
|
|
if total_executions == 1:
|
|
self.metrics["average_execution_time"] = execution_time
|
|
else:
|
|
self.metrics["average_execution_time"] = (
|
|
(current_avg * (total_executions - 1) + execution_time) / total_executions
|
|
)
|
|
|
|
async def _cleanup_old_executions(self):
|
|
"""Clean up old execution records"""
|
|
cutoff_date = datetime.now() - timedelta(days=7)
|
|
|
|
old_executions = [
|
|
exec_id for exec_id, execution in self.executions.items()
|
|
if execution.started_at < cutoff_date and execution.status in ["completed", "failed"]
|
|
]
|
|
|
|
for exec_id in old_executions:
|
|
del self.executions[exec_id]
|
|
|
|
if old_executions:
|
|
logger.info(f"Cleaned up {len(old_executions)} old execution records")
|
|
|
|
def get_rule_status(self, rule_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get status of a specific rule"""
|
|
if rule_id not in self.rules:
|
|
return None
|
|
|
|
rule = self.rules[rule_id]
|
|
|
|
return {
|
|
"rule_id": rule.id,
|
|
"name": rule.name,
|
|
"description": rule.description,
|
|
"trigger": rule.trigger,
|
|
"action": rule.action,
|
|
"status": rule.status,
|
|
"last_executed": rule.last_executed.isoformat() if rule.last_executed else None,
|
|
"execution_count": rule.execution_count,
|
|
"success_count": rule.success_count,
|
|
"error_count": rule.error_count,
|
|
"success_rate": rule.success_count / rule.execution_count if rule.execution_count > 0 else 0.0,
|
|
"created_at": rule.created_at.isoformat(),
|
|
"updated_at": rule.updated_at.isoformat()
|
|
}
|
|
|
|
def get_system_status(self) -> Dict[str, Any]:
|
|
"""Get overall system status"""
|
|
active_rules = len([r for r in self.rules.values() if r.status == AutomationStatus.ACTIVE])
|
|
running_executions = len([e for e in self.executions.values() if e.status == "running"])
|
|
|
|
return {
|
|
"controller_status": "running" if self.is_running else "stopped",
|
|
"total_rules": len(self.rules),
|
|
"active_rules": active_rules,
|
|
"running_executions": running_executions,
|
|
"total_executions": self.metrics["total_executions"],
|
|
"successful_executions": self.metrics["successful_executions"],
|
|
"failed_executions": self.metrics["failed_executions"],
|
|
"success_rate": (
|
|
self.metrics["successful_executions"] / self.metrics["total_executions"]
|
|
if self.metrics["total_executions"] > 0 else 0.0
|
|
),
|
|
"average_execution_time": round(self.metrics["average_execution_time"], 3),
|
|
"rules_processed_today": self.metrics["rules_processed_today"],
|
|
"services_available": BACKEND_SERVICES_AVAILABLE
|
|
}
|
|
|
|
def get_execution_history(self, rule_id: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]:
|
|
"""Get execution history"""
|
|
executions = list(self.executions.values())
|
|
|
|
if rule_id:
|
|
executions = [e for e in executions if e.rule_id == rule_id]
|
|
|
|
executions.sort(key=lambda x: x.started_at, reverse=True)
|
|
executions = executions[:limit]
|
|
|
|
return [
|
|
{
|
|
"execution_id": e.id,
|
|
"rule_id": e.rule_id,
|
|
"started_at": e.started_at.isoformat(),
|
|
"completed_at": e.completed_at.isoformat() if e.completed_at else None,
|
|
"status": e.status,
|
|
"result": e.result,
|
|
"error_message": e.error_message
|
|
}
|
|
for e in executions
|
|
]
|
|
|
|
# Global autonomous controller instance
|
|
autonomous_controller = AutonomousController()
|
|
|
|
# Convenience functions
|
|
|
|
async def start_autonomous_operations():
|
|
"""Start autonomous operations"""
|
|
await autonomous_controller.start()
|
|
|
|
async def stop_autonomous_operations():
|
|
"""Stop autonomous operations"""
|
|
await autonomous_controller.stop()
|
|
|
|
def get_automation_status() -> Dict[str, Any]:
|
|
"""Get automation system status"""
|
|
return autonomous_controller.get_system_status()
|
|
|
|
async def trigger_manual_execution(rule_id: str) -> bool:
|
|
"""Manually trigger rule execution"""
|
|
if rule_id not in autonomous_controller.rules:
|
|
return False
|
|
|
|
rule = autonomous_controller.rules[rule_id]
|
|
await autonomous_controller._execute_rule(rule)
|
|
return True |