trax/scripts/taskmaster_tracker.py

291 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Taskmaster Task Tracker
Automatically updates Taskmaster tasks with datetime stamps whenever they're created
or status changed. This script monitors the tasks.json file and adds timestamps to
track task lifecycle events.
Usage:
python scripts/taskmaster_tracker.py [--watch] [--interval SECONDS]
"""
import json
import os
import sys
import time
import argparse
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Any, Optional, Set
import hashlib
import logging
# Add project root to path for imports
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.config import config
# Ensure logs directory exists
(project_root / 'logs').mkdir(exist_ok=True)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(project_root / 'logs' / 'taskmaster_tracker.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class TaskmasterTracker:
"""Tracks Taskmaster tasks and adds timestamps for lifecycle events."""
def __init__(self, project_root: Path):
self.project_root = project_root
self.tasks_file = project_root / '.taskmaster' / 'tasks' / 'tasks.json'
self.backup_dir = project_root / '.taskmaster' / 'backups'
self.state_file = project_root / '.taskmaster' / 'tracker_state.json'
self.last_hash: Optional[str] = None
self.known_tasks: Dict[str, Dict[str, Any]] = {}
# Ensure backup directory exists
self.backup_dir.mkdir(parents=True, exist_ok=True)
# Load previous state
self.load_state()
def get_file_hash(self, file_path: Path) -> str:
"""Get MD5 hash of file content for change detection."""
if not file_path.exists():
return ""
with open(file_path, 'rb') as f:
content = f.read()
return hashlib.md5(content).hexdigest()
def load_tasks(self) -> Dict[str, Any]:
"""Load tasks from the tasks.json file."""
if not self.tasks_file.exists():
logger.warning(f"Tasks file not found: {self.tasks_file}")
return {"tasks": [], "tags": {}}
try:
with open(self.tasks_file, 'r', encoding='utf-8') as f:
return json.load(f)
except json.JSONDecodeError as e:
logger.error(f"Error parsing tasks.json: {e}")
return {"tasks": [], "tags": {}}
except Exception as e:
logger.error(f"Error loading tasks: {e}")
return {"tasks": [], "tags": {}}
def save_tasks(self, tasks_data: Dict[str, Any]) -> bool:
"""Save tasks to the tasks.json file with backup."""
try:
# Create backup
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
backup_file = self.backup_dir / f"tasks_backup_{timestamp}.json"
if self.tasks_file.exists():
with open(self.tasks_file, 'r', encoding='utf-8') as f:
backup_content = f.read()
with open(backup_file, 'w', encoding='utf-8') as f:
f.write(backup_content)
logger.info(f"Created backup: {backup_file}")
# Save updated tasks
with open(self.tasks_file, 'w', encoding='utf-8') as f:
json.dump(tasks_data, f, indent=2, ensure_ascii=False)
logger.info(f"Updated tasks file: {self.tasks_file}")
return True
except Exception as e:
logger.error(f"Error saving tasks: {e}")
return False
def get_task_key(self, task: Dict[str, Any]) -> str:
"""Generate a unique key for a task."""
# Use just the ID for the key since it should be unique
return str(task.get('id', 'unknown'))
def add_timestamp_field(self, task: Dict[str, Any], field: str, value: str) -> None:
"""Add a timestamp field to a task if it doesn't exist."""
if field not in task:
task[field] = value
logger.info(f"Added {field} to task {task.get('id')}: {value}")
def process_tasks(self, tasks_data: Dict[str, Any]) -> bool:
"""Process tasks and add timestamps for new tasks and status changes."""
current_time = datetime.now(timezone.utc).isoformat()
changed = False
# Handle both old format (direct tasks array) and new format (tagged structure)
if 'tasks' in tasks_data:
# Old format - direct tasks array
tasks = tasks_data.get('tasks', [])
else:
# New format - tagged structure
tasks = []
for tag_name, tag_data in tasks_data.items():
if isinstance(tag_data, dict) and 'tasks' in tag_data:
tasks.extend(tag_data['tasks'])
for task in tasks:
task_key = self.get_task_key(task)
previous_task = self.known_tasks.get(task_key)
# Check if this is a new task
if task_key not in self.known_tasks:
self.add_timestamp_field(task, 'created_at', current_time)
self.add_timestamp_field(task, 'updated_at', current_time)
changed = True
logger.info(f"New task detected: {task.get('id')} - {task.get('title')}")
# Check for status changes
elif previous_task and previous_task.get('status') != task.get('status'):
self.add_timestamp_field(task, 'updated_at', current_time)
self.add_timestamp_field(task, f"status_changed_to_{task.get('status')}", current_time)
changed = True
logger.info(f"Status change detected for task {task.get('id')}: "
f"{previous_task.get('status')} -> {task.get('status')}")
# Update known tasks
self.known_tasks[task_key] = task.copy()
return changed
def load_state(self) -> None:
"""Load tracker state from file."""
try:
if self.state_file.exists():
with open(self.state_file, 'r', encoding='utf-8') as f:
state = json.load(f)
self.known_tasks = state.get('known_tasks', {})
self.last_hash = state.get('last_hash')
logger.info(f"Loaded state with {len(self.known_tasks)} known tasks")
except Exception as e:
logger.warning(f"Could not load state: {e}")
self.known_tasks = {}
self.last_hash = None
def save_state(self) -> None:
"""Save tracker state to file."""
try:
state = {
'known_tasks': self.known_tasks,
'last_hash': self.last_hash,
'last_updated': datetime.now(timezone.utc).isoformat()
}
with open(self.state_file, 'w', encoding='utf-8') as f:
json.dump(state, f, indent=2, ensure_ascii=False)
logger.debug("State saved successfully")
except Exception as e:
logger.error(f"Could not save state: {e}")
def cleanup_old_backups(self, max_backups: int = 10) -> None:
"""Clean up old backup files, keeping only the most recent ones."""
try:
backup_files = sorted(
self.backup_dir.glob("tasks_backup_*.json"),
key=lambda x: x.stat().st_mtime,
reverse=True
)
if len(backup_files) > max_backups:
for old_backup in backup_files[max_backups:]:
old_backup.unlink()
logger.info(f"Removed old backup: {old_backup}")
except Exception as e:
logger.error(f"Error cleaning up backups: {e}")
def run_once(self) -> bool:
"""Run the tracker once and return True if changes were made."""
current_hash = self.get_file_hash(self.tasks_file)
if current_hash == self.last_hash:
return False
logger.info("Detected changes in tasks file, processing...")
tasks_data = self.load_tasks()
if self.process_tasks(tasks_data):
if self.save_tasks(tasks_data):
self.last_hash = current_hash
self.save_state()
self.cleanup_old_backups()
return True
self.last_hash = current_hash
self.save_state()
return False
def watch(self, interval: float = 5.0) -> None:
"""Watch for changes in the tasks file continuously."""
logger.info(f"Starting Taskmaster tracker (interval: {interval}s)")
logger.info(f"Monitoring: {self.tasks_file}")
try:
while True:
try:
if self.run_once():
logger.info("Tasks updated successfully")
time.sleep(interval)
except KeyboardInterrupt:
logger.info("Tracker stopped by user")
break
except Exception as e:
logger.error(f"Error in watch loop: {e}")
time.sleep(interval)
except Exception as e:
logger.error(f"Fatal error in tracker: {e}")
sys.exit(1)
def main():
"""Main entry point for the script."""
parser = argparse.ArgumentParser(
description="Track Taskmaster tasks and add timestamps for lifecycle events"
)
parser.add_argument(
'--watch',
action='store_true',
help='Watch for changes continuously'
)
parser.add_argument(
'--interval',
type=float,
default=5.0,
help='Watch interval in seconds (default: 5.0)'
)
parser.add_argument(
'--project-root',
type=Path,
default=Path(__file__).parent.parent,
help='Project root directory'
)
args = parser.parse_args()
# Initialize tracker
tracker = TaskmasterTracker(args.project_root)
if args.watch:
tracker.watch(args.interval)
else:
# Run once
if tracker.run_once():
logger.info("Tasks processed and updated")
else:
logger.info("No changes detected")
if __name__ == "__main__":
main()