#!/usr/bin/env python3 """ Taskmaster Task Tracker Automatically updates Taskmaster tasks with datetime stamps whenever they're created or status changed. This script monitors the tasks.json file and adds timestamps to track task lifecycle events. Usage: python scripts/taskmaster_tracker.py [--watch] [--interval SECONDS] """ import json import os import sys import time import argparse from datetime import datetime, timezone from pathlib import Path from typing import Dict, Any, Optional, Set import hashlib import logging # Add project root to path for imports project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from src.config import config # Ensure logs directory exists (project_root / 'logs').mkdir(exist_ok=True) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(project_root / 'logs' / 'taskmaster_tracker.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class TaskmasterTracker: """Tracks Taskmaster tasks and adds timestamps for lifecycle events.""" def __init__(self, project_root: Path): self.project_root = project_root self.tasks_file = project_root / '.taskmaster' / 'tasks' / 'tasks.json' self.backup_dir = project_root / '.taskmaster' / 'backups' self.state_file = project_root / '.taskmaster' / 'tracker_state.json' self.last_hash: Optional[str] = None self.known_tasks: Dict[str, Dict[str, Any]] = {} # Ensure backup directory exists self.backup_dir.mkdir(parents=True, exist_ok=True) # Load previous state self.load_state() def get_file_hash(self, file_path: Path) -> str: """Get MD5 hash of file content for change detection.""" if not file_path.exists(): return "" with open(file_path, 'rb') as f: content = f.read() return hashlib.md5(content).hexdigest() def load_tasks(self) -> Dict[str, Any]: """Load tasks from the tasks.json file.""" if not self.tasks_file.exists(): logger.warning(f"Tasks file not found: {self.tasks_file}") return {"tasks": [], "tags": {}} try: with open(self.tasks_file, 'r', encoding='utf-8') as f: return json.load(f) except json.JSONDecodeError as e: logger.error(f"Error parsing tasks.json: {e}") return {"tasks": [], "tags": {}} except Exception as e: logger.error(f"Error loading tasks: {e}") return {"tasks": [], "tags": {}} def save_tasks(self, tasks_data: Dict[str, Any]) -> bool: """Save tasks to the tasks.json file with backup.""" try: # Create backup timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") backup_file = self.backup_dir / f"tasks_backup_{timestamp}.json" if self.tasks_file.exists(): with open(self.tasks_file, 'r', encoding='utf-8') as f: backup_content = f.read() with open(backup_file, 'w', encoding='utf-8') as f: f.write(backup_content) logger.info(f"Created backup: {backup_file}") # Save updated tasks with open(self.tasks_file, 'w', encoding='utf-8') as f: json.dump(tasks_data, f, indent=2, ensure_ascii=False) logger.info(f"Updated tasks file: {self.tasks_file}") return True except Exception as e: logger.error(f"Error saving tasks: {e}") return False def get_task_key(self, task: Dict[str, Any]) -> str: """Generate a unique key for a task.""" # Use just the ID for the key since it should be unique return str(task.get('id', 'unknown')) def add_timestamp_field(self, task: Dict[str, Any], field: str, value: str) -> None: """Add a timestamp field to a task if it doesn't exist.""" if field not in task: task[field] = value logger.info(f"Added {field} to task {task.get('id')}: {value}") def process_tasks(self, tasks_data: Dict[str, Any]) -> bool: """Process tasks and add timestamps for new tasks and status changes.""" current_time = datetime.now(timezone.utc).isoformat() changed = False # Handle both old format (direct tasks array) and new format (tagged structure) if 'tasks' in tasks_data: # Old format - direct tasks array tasks = tasks_data.get('tasks', []) else: # New format - tagged structure tasks = [] for tag_name, tag_data in tasks_data.items(): if isinstance(tag_data, dict) and 'tasks' in tag_data: tasks.extend(tag_data['tasks']) for task in tasks: task_key = self.get_task_key(task) previous_task = self.known_tasks.get(task_key) # Check if this is a new task if task_key not in self.known_tasks: self.add_timestamp_field(task, 'created_at', current_time) self.add_timestamp_field(task, 'updated_at', current_time) changed = True logger.info(f"New task detected: {task.get('id')} - {task.get('title')}") # Check for status changes elif previous_task and previous_task.get('status') != task.get('status'): self.add_timestamp_field(task, 'updated_at', current_time) self.add_timestamp_field(task, f"status_changed_to_{task.get('status')}", current_time) changed = True logger.info(f"Status change detected for task {task.get('id')}: " f"{previous_task.get('status')} -> {task.get('status')}") # Update known tasks self.known_tasks[task_key] = task.copy() return changed def load_state(self) -> None: """Load tracker state from file.""" try: if self.state_file.exists(): with open(self.state_file, 'r', encoding='utf-8') as f: state = json.load(f) self.known_tasks = state.get('known_tasks', {}) self.last_hash = state.get('last_hash') logger.info(f"Loaded state with {len(self.known_tasks)} known tasks") except Exception as e: logger.warning(f"Could not load state: {e}") self.known_tasks = {} self.last_hash = None def save_state(self) -> None: """Save tracker state to file.""" try: state = { 'known_tasks': self.known_tasks, 'last_hash': self.last_hash, 'last_updated': datetime.now(timezone.utc).isoformat() } with open(self.state_file, 'w', encoding='utf-8') as f: json.dump(state, f, indent=2, ensure_ascii=False) logger.debug("State saved successfully") except Exception as e: logger.error(f"Could not save state: {e}") def cleanup_old_backups(self, max_backups: int = 10) -> None: """Clean up old backup files, keeping only the most recent ones.""" try: backup_files = sorted( self.backup_dir.glob("tasks_backup_*.json"), key=lambda x: x.stat().st_mtime, reverse=True ) if len(backup_files) > max_backups: for old_backup in backup_files[max_backups:]: old_backup.unlink() logger.info(f"Removed old backup: {old_backup}") except Exception as e: logger.error(f"Error cleaning up backups: {e}") def run_once(self) -> bool: """Run the tracker once and return True if changes were made.""" current_hash = self.get_file_hash(self.tasks_file) if current_hash == self.last_hash: return False logger.info("Detected changes in tasks file, processing...") tasks_data = self.load_tasks() if self.process_tasks(tasks_data): if self.save_tasks(tasks_data): self.last_hash = current_hash self.save_state() self.cleanup_old_backups() return True self.last_hash = current_hash self.save_state() return False def watch(self, interval: float = 5.0) -> None: """Watch for changes in the tasks file continuously.""" logger.info(f"Starting Taskmaster tracker (interval: {interval}s)") logger.info(f"Monitoring: {self.tasks_file}") try: while True: try: if self.run_once(): logger.info("Tasks updated successfully") time.sleep(interval) except KeyboardInterrupt: logger.info("Tracker stopped by user") break except Exception as e: logger.error(f"Error in watch loop: {e}") time.sleep(interval) except Exception as e: logger.error(f"Fatal error in tracker: {e}") sys.exit(1) def main(): """Main entry point for the script.""" parser = argparse.ArgumentParser( description="Track Taskmaster tasks and add timestamps for lifecycle events" ) parser.add_argument( '--watch', action='store_true', help='Watch for changes continuously' ) parser.add_argument( '--interval', type=float, default=5.0, help='Watch interval in seconds (default: 5.0)' ) parser.add_argument( '--project-root', type=Path, default=Path(__file__).parent.parent, help='Project root directory' ) args = parser.parse_args() # Initialize tracker tracker = TaskmasterTracker(args.project_root) if args.watch: tracker.watch(args.interval) else: # Run once if tracker.run_once(): logger.info("Tasks processed and updated") else: logger.info("No changes detected") if __name__ == "__main__": main()