#!/usr/bin/env python3 """ Secure API Key Vault System for Trax and My-AI-Projects Provides encrypted storage, inheritance, and validation of API keys """ import os import json import sys # Add user site-packages to path for cryptography module import site sys.path.extend(site.getusersitepackages() if isinstance(site.getusersitepackages(), list) else [site.getusersitepackages()]) from pathlib import Path from typing import Dict, List, Optional, Set, Tuple from dataclasses import dataclass, field from datetime import datetime import hashlib import base64 from getpass import getpass import argparse import subprocess # Try to import cryptography, provide fallback try: from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2 from cryptography.hazmat.backends import default_backend CRYPTO_AVAILABLE = True except ImportError as e: CRYPTO_AVAILABLE = False # Only print warning when running as main script if __name__ == "__main__": print(f"⚠️ cryptography not installed. Install with: pip install cryptography") @dataclass class KeyMetadata: """Metadata for an API key""" name: str category: str description: str required_for: List[str] = field(default_factory=list) last_rotated: Optional[str] = None expires: Optional[str] = None validation_pattern: Optional[str] = None class SecureKeyVault: """ Secure key vault with encryption and project inheritance """ # Standard key definitions for my-ai-projects ecosystem STANDARD_KEYS = { # AI Models "ANTHROPIC_API_KEY": KeyMetadata( "ANTHROPIC_API_KEY", "ai", "Claude API", ["task-master", "main-assistant", "trax-v2"] ), "DEEPSEEK_API_KEY": KeyMetadata( "DEEPSEEK_API_KEY", "ai", "DeepSeek API for transcription", ["youtube-summarizer", "trax-v2"] ), "OPENAI_API_KEY": KeyMetadata( "OPENAI_API_KEY", "ai", "OpenAI GPT models", ["main-assistant"] ), "PERPLEXITY_API_KEY": KeyMetadata( "PERPLEXITY_API_KEY", "ai", "Perplexity research API", ["task-master", "research-agent"] ), "OPENROUTER_API_KEY": KeyMetadata( "OPENROUTER_API_KEY", "ai", "OpenRouter multi-model access", ["main-assistant"] ), "GOOGLE_API_KEY": KeyMetadata( "GOOGLE_API_KEY", "ai", "Google Gemini models", ["youtube-summarizer"] ), "XAI_API_KEY": KeyMetadata( "XAI_API_KEY", "ai", "Grok models", ["task-master"] ), "MISTRAL_API_KEY": KeyMetadata( "MISTRAL_API_KEY", "ai", "Mistral models", ["task-master"] ), # Services "YOUTUBE_API_KEY": KeyMetadata( "YOUTUBE_API_KEY", "services", "YouTube Data API", ["youtube-summarizer", "trax"] ), "SLACK_BOT_TOKEN": KeyMetadata( "SLACK_BOT_TOKEN", "services", "Slack bot integration", ["main-assistant"] ), "GITHUB_TOKEN": KeyMetadata( "GITHUB_TOKEN", "services", "GitHub API access", ["main-assistant"] ), "GITEA_TOKEN": KeyMetadata( "GITEA_TOKEN", "services", "Gitea CI/CD", ["ci-cd"] ), # Google OAuth "GOOGLE_CLIENT_ID": KeyMetadata( "GOOGLE_CLIENT_ID", "oauth", "Google OAuth client", ["main-assistant", "youtube-summarizer"] ), "GOOGLE_CLIENT_SECRET": KeyMetadata( "GOOGLE_CLIENT_SECRET", "oauth", "Google OAuth secret", ["main-assistant", "youtube-summarizer"] ), # Database "DATABASE_URL": KeyMetadata( "DATABASE_URL", "database", "PostgreSQL connection string", ["trax", "main-assistant"] ), "REDIS_URL": KeyMetadata( "REDIS_URL", "database", "Redis connection string", ["cache-layer"] ), } def __init__(self, root_dir: Optional[Path] = None): """Initialize the secure key vault""" self.root_dir = root_dir or Path.home() / ".my-ai-keys" self.vault_file = self.root_dir / "vault.enc" self.metadata_file = self.root_dir / "metadata.json" self.master_key_file = self.root_dir / ".master" # Create directories self.root_dir.mkdir(parents=True, exist_ok=True) # Initialize encryption self.cipher = None if CRYPTO_AVAILABLE: self._init_encryption() def _init_encryption(self): """Initialize or load encryption key""" if self.master_key_file.exists(): # Load existing key with open(self.master_key_file, 'rb') as f: key = f.read() self.cipher = Fernet(key) else: # Generate new key with password derivation password = getpass("Create vault password: ") confirm = getpass("Confirm password: ") if password != confirm: print("❌ Passwords don't match") sys.exit(1) # Derive key from password salt = os.urandom(16) kdf = PBKDF2( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend() ) key = base64.urlsafe_b64encode(kdf.derive(password.encode())) # Save key and salt with open(self.master_key_file, 'wb') as f: f.write(key) # Protect the master key file os.chmod(self.master_key_file, 0o600) self.cipher = Fernet(key) print("✅ Vault created successfully") def _load_vault(self) -> Dict[str, str]: """Load and decrypt the vault""" if not self.vault_file.exists(): return {} if not self.cipher: print("❌ Encryption not available") return {} try: with open(self.vault_file, 'rb') as f: encrypted = f.read() decrypted = self.cipher.decrypt(encrypted) return json.loads(decrypted.decode()) except Exception as e: print(f"❌ Failed to decrypt vault: {e}") # Try password unlock return self._unlock_vault() def _unlock_vault(self) -> Dict[str, str]: """Unlock vault with password""" password = getpass("Enter vault password: ") # Re-derive key from password (simplified for demo) # In production, store salt separately salt = b'my-ai-projects-salt' # Should be stored kdf = PBKDF2( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend() ) key = base64.urlsafe_b64encode(kdf.derive(password.encode())) try: cipher = Fernet(key) with open(self.vault_file, 'rb') as f: encrypted = f.read() decrypted = cipher.decrypt(encrypted) # Update cipher for future operations self.cipher = cipher return json.loads(decrypted.decode()) except: print("❌ Invalid password") sys.exit(1) def _save_vault(self, vault: Dict[str, str]): """Encrypt and save the vault""" if not self.cipher: print("❌ Encryption not available") return # Encrypt vault data = json.dumps(vault, indent=2) encrypted = self.cipher.encrypt(data.encode()) # Save encrypted vault with open(self.vault_file, 'wb') as f: f.write(encrypted) # Protect the vault file os.chmod(self.vault_file, 0o600) def add_key(self, name: str, value: str, category: str = None): """Add or update a key in the vault""" vault = self._load_vault() # Validate key name if name in self.STANDARD_KEYS: metadata = self.STANDARD_KEYS[name] category = metadata.category print(f"📝 Adding standard key: {name} ({metadata.description})") else: if not category: category = "custom" print(f"📝 Adding custom key: {name}") # Store key vault[name] = value self._save_vault(vault) # Update metadata self._update_metadata(name, category) print(f"✅ Key '{name}' added to vault") def _update_metadata(self, name: str, category: str): """Update key metadata""" metadata = {} if self.metadata_file.exists(): with open(self.metadata_file, 'r') as f: metadata = json.load(f) metadata[name] = { "category": category, "added": datetime.now().isoformat(), "last_accessed": None } with open(self.metadata_file, 'w') as f: json.dump(metadata, f, indent=2) def get_key(self, name: str) -> Optional[str]: """Retrieve a key from the vault""" vault = self._load_vault() if name in vault: # Update last accessed self._update_access_time(name) return vault[name] return None def _update_access_time(self, name: str): """Update last access time for a key""" if self.metadata_file.exists(): with open(self.metadata_file, 'r') as f: metadata = json.load(f) if name in metadata: metadata[name]["last_accessed"] = datetime.now().isoformat() with open(self.metadata_file, 'w') as f: json.dump(metadata, f, indent=2) def list_keys(self, category: Optional[str] = None) -> List[str]: """List all keys in the vault""" vault = self._load_vault() if not category: return list(vault.keys()) # Filter by category metadata = {} if self.metadata_file.exists(): with open(self.metadata_file, 'r') as f: metadata = json.load(f) filtered = [] for key in vault.keys(): if key in self.STANDARD_KEYS: if self.STANDARD_KEYS[key].category == category: filtered.append(key) elif key in metadata and metadata[key].get("category") == category: filtered.append(key) return filtered def export_to_env(self, output_file: Path, project: Optional[str] = None): """Export keys to .env file format""" vault = self._load_vault() # Filter keys for specific project keys_to_export = {} if project: # Export only keys required for this project for key_name, metadata in self.STANDARD_KEYS.items(): if project in metadata.required_for and key_name in vault: keys_to_export[key_name] = vault[key_name] else: keys_to_export = vault # Write .env file with open(output_file, 'w') as f: f.write("# API Keys exported from secure vault\n") f.write(f"# Generated: {datetime.now().isoformat()}\n") if project: f.write(f"# Project: {project}\n") f.write("\n") # Group by category categories = {} for key_name, value in keys_to_export.items(): if key_name in self.STANDARD_KEYS: cat = self.STANDARD_KEYS[key_name].category else: cat = "custom" if cat not in categories: categories[cat] = [] categories[cat].append((key_name, value)) # Write grouped keys for cat in sorted(categories.keys()): f.write(f"# {cat.upper()} KEYS\n") for key_name, value in sorted(categories[cat]): f.write(f"{key_name}={value}\n") f.write("\n") # Protect the .env file os.chmod(output_file, 0o600) print(f"✅ Exported {len(keys_to_export)} keys to {output_file}") def import_from_env(self, env_file: Path): """Import keys from .env file""" if not env_file.exists(): print(f"❌ File not found: {env_file}") return imported = 0 with open(env_file, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): if '=' in line: key, value = line.split('=', 1) self.add_key(key, value) imported += 1 print(f"✅ Imported {imported} keys from {env_file}") def validate_project_keys(self, project: str) -> Tuple[List[str], List[str]]: """Validate that all required keys for a project are present""" vault = self._load_vault() required = [] missing = [] for key_name, metadata in self.STANDARD_KEYS.items(): if project in metadata.required_for: required.append(key_name) if key_name not in vault: missing.append(key_name) return required, missing def rotate_key(self, name: str): """Rotate (regenerate) a key""" current = self.get_key(name) if not current: print(f"❌ Key '{name}' not found") return print(f"Current value: {current[:8]}...") new_value = input("Enter new value (or press Enter to cancel): ").strip() if new_value: self.add_key(name, new_value) print(f"✅ Key '{name}' rotated successfully") def sync_to_projects(self, projects: List[str]): """Sync keys to multiple project .env files""" workspace_root = Path(__file__).parent.parent.parent.parent for project in projects: if project == "root": env_file = workspace_root / ".env" elif project == "trax": env_file = workspace_root / "apps" / "trax" / ".env" elif project == "youtube-summarizer": env_file = workspace_root / "apps" / "youtube-summarizer" / ".env" elif project == "pdf-translator": env_file = workspace_root / "pdf-translator" / ".env" else: print(f"⚠️ Unknown project: {project}") continue self.export_to_env(env_file, project) print(f"✅ Synced to {project}") def main(): """CLI interface for the key vault""" parser = argparse.ArgumentParser(description="Secure API Key Vault") subparsers = parser.add_subparsers(dest='command', help='Commands') # Add key add_parser = subparsers.add_parser('add', help='Add a key to vault') add_parser.add_argument('name', help='Key name') add_parser.add_argument('--value', help='Key value (will prompt if not provided)') add_parser.add_argument('--category', help='Key category') # Get key get_parser = subparsers.add_parser('get', help='Get a key from vault') get_parser.add_argument('name', help='Key name') # List keys list_parser = subparsers.add_parser('list', help='List keys') list_parser.add_argument('--category', help='Filter by category') # Import import_parser = subparsers.add_parser('import', help='Import from .env file') import_parser.add_argument('file', help='Path to .env file') # Export export_parser = subparsers.add_parser('export', help='Export to .env file') export_parser.add_argument('file', help='Output .env file') export_parser.add_argument('--project', help='Filter by project') # Validate validate_parser = subparsers.add_parser('validate', help='Validate project keys') validate_parser.add_argument('project', help='Project name') # Sync sync_parser = subparsers.add_parser('sync', help='Sync to project .env files') sync_parser.add_argument('projects', nargs='+', help='Project names') # Rotate rotate_parser = subparsers.add_parser('rotate', help='Rotate a key') rotate_parser.add_argument('name', help='Key name') args = parser.parse_args() if not CRYPTO_AVAILABLE: print("❌ cryptography package required") print("Install with: pip install cryptography") sys.exit(1) vault = SecureKeyVault() if args.command == 'add': value = args.value if not value: value = getpass(f"Enter value for {args.name}: ") vault.add_key(args.name, value, args.category) elif args.command == 'get': value = vault.get_key(args.name) if value: print(f"{args.name}={value}") else: print(f"❌ Key '{args.name}' not found") elif args.command == 'list': keys = vault.list_keys(args.category) if keys: print("\n📋 Keys in vault:") for key in sorted(keys): if key in vault.STANDARD_KEYS: meta = vault.STANDARD_KEYS[key] print(f" • {key} ({meta.category}) - {meta.description}") else: print(f" • {key} (custom)") else: print("No keys found") elif args.command == 'import': vault.import_from_env(Path(args.file)) elif args.command == 'export': vault.export_to_env(Path(args.file), args.project) elif args.command == 'validate': required, missing = vault.validate_project_keys(args.project) print(f"\n📋 Project '{args.project}' key validation:") print(f" Required: {len(required)} keys") if missing: print(f" ❌ Missing: {len(missing)} keys") for key in missing: print(f" • {key}") else: print(f" ✅ All required keys present") elif args.command == 'sync': vault.sync_to_projects(args.projects) elif args.command == 'rotate': vault.rotate_key(args.name) else: parser.print_help() if __name__ == "__main__": main()