trax/scripts/migrate_keys.py

344 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""
API Key Migration Tool - Consolidate scattered keys into secure vault
Scans all projects and creates a unified key vault
"""
import os
import sys
# Add user site-packages to path for cryptography module
import site
sys.path.extend(site.getusersitepackages() if isinstance(site.getusersitepackages(), list) else [site.getusersitepackages()])
from pathlib import Path
from typing import Dict, List, Set, Tuple
import json
import re
from collections import defaultdict
from datetime import datetime
# Add parent directory to path for imports
sys.path.append(str(Path(__file__).parent))
try:
from key_vault import SecureKeyVault, CRYPTO_AVAILABLE
except ImportError:
print("❌ key_vault.py not found in scripts directory")
sys.exit(1)
class KeyMigrator:
"""Migrate and consolidate API keys from multiple sources"""
def __init__(self, workspace_root: Path):
self.workspace_root = workspace_root
self.vault = SecureKeyVault()
self.found_keys = defaultdict(set) # key_name -> set of (file, value) tuples
self.scan_report = []
def scan_directory(self, directory: Path, recursive: bool = True) -> Dict[str, str]:
"""Scan a directory for .env files and extract keys"""
keys = {}
env_files = []
if recursive:
env_files = list(directory.rglob(".env*"))
else:
env_files = list(directory.glob(".env*"))
for env_file in env_files:
# Skip certain patterns
if any(skip in str(env_file) for skip in [
'node_modules', 'venv', '.venv', '__pycache__',
'.git', 'dist', 'build', '.env.example', '.env.template'
]):
continue
self.scan_report.append(f"📂 Scanning: {env_file.relative_to(self.workspace_root)}")
try:
with open(env_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
if '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
# Track where each key was found
self.found_keys[key].add((str(env_file), value))
keys[key] = value
except Exception as e:
self.scan_report.append(f" ⚠️ Error reading {env_file}: {e}")
return keys
def scan_all_projects(self) -> Dict[str, Dict[str, str]]:
"""Scan all known projects for API keys"""
projects = {
"root": self.workspace_root,
"trax": self.workspace_root / "apps" / "trax",
"youtube-summarizer": self.workspace_root / "apps" / "youtube-summarizer",
"pdf-translator": self.workspace_root / "pdf-translator",
"mixcloud-rss": self.workspace_root / "mixcloud-rss-generator",
"clean-tracks": self.workspace_root / "projects" / "clean-tracks-main",
"task-master": self.workspace_root / "tools" / "claude-task-master",
"directus-mcp": self.workspace_root / "tools" / "directus-mcp-server",
}
all_keys = {}
print("\n🔍 Scanning projects for API keys...\n")
for project_name, project_path in projects.items():
if project_path.exists():
print(f"📦 Project: {project_name}")
keys = self.scan_directory(project_path, recursive=False)
if keys:
all_keys[project_name] = keys
print(f" ✅ Found {len(keys)} keys")
else:
print(f" ⚠️ No keys found")
else:
print(f"📦 Project: {project_name} - ⚠️ Directory not found")
return all_keys
def analyze_duplicates(self) -> List[Tuple[str, List[Tuple[str, str]]]]:
"""Analyze duplicate keys with different values"""
conflicts = []
for key_name, locations in self.found_keys.items():
unique_values = {}
for file_path, value in locations:
if value not in unique_values:
unique_values[value] = []
unique_values[value].append(file_path)
if len(unique_values) > 1:
# Found conflicting values
conflict_info = []
for value, files in unique_values.items():
masked_value = value[:8] + "..." if len(value) > 8 else value
conflict_info.append((masked_value, files))
conflicts.append((key_name, conflict_info))
return conflicts
def generate_migration_report(self) -> str:
"""Generate a detailed migration report"""
report = []
report.append("=" * 60)
report.append("API KEY MIGRATION REPORT")
report.append("=" * 60)
report.append(f"Generated: {datetime.now().isoformat()}")
report.append(f"Workspace: {self.workspace_root}")
report.append("")
# Summary
total_keys = len(self.found_keys)
total_files = len(set(f for locs in self.found_keys.values() for f, _ in locs))
report.append("SUMMARY")
report.append("-" * 40)
report.append(f"Total unique keys found: {total_keys}")
report.append(f"Total .env files scanned: {total_files}")
report.append("")
# Key categories
categories = defaultdict(list)
for key_name in self.found_keys.keys():
if key_name in SecureKeyVault.STANDARD_KEYS:
meta = SecureKeyVault.STANDARD_KEYS[key_name]
categories[meta.category].append(key_name)
else:
categories["custom"].append(key_name)
report.append("KEYS BY CATEGORY")
report.append("-" * 40)
for category, keys in sorted(categories.items()):
report.append(f"\n{category.upper()} ({len(keys)} keys):")
for key in sorted(keys):
locations_count = len(self.found_keys[key])
report.append(f"{key} (found in {locations_count} locations)")
report.append("")
# Conflicts
conflicts = self.analyze_duplicates()
if conflicts:
report.append("⚠️ CONFLICTS DETECTED")
report.append("-" * 40)
report.append("The following keys have different values in different locations:")
report.append("")
for key_name, conflict_info in conflicts:
report.append(f" {key_name}:")
for masked_value, files in conflict_info:
report.append(f" Value: {masked_value}")
for file_path in files[:3]: # Limit to 3 files
rel_path = Path(file_path).relative_to(self.workspace_root)
report.append(f" - {rel_path}")
report.append("")
# Missing standard keys
standard_keys = set(SecureKeyVault.STANDARD_KEYS.keys())
found_standard = set(k for k in self.found_keys.keys() if k in standard_keys)
missing_standard = standard_keys - found_standard
if missing_standard:
report.append("MISSING STANDARD KEYS")
report.append("-" * 40)
report.append("The following standard keys were not found:")
for key in sorted(missing_standard):
meta = SecureKeyVault.STANDARD_KEYS[key]
report.append(f"{key} - {meta.description}")
report.append(f" Required for: {', '.join(meta.required_for)}")
report.append("")
# Scan details
report.append("SCAN DETAILS")
report.append("-" * 40)
for entry in self.scan_report[-10:]: # Last 10 entries
report.append(entry)
report.append("")
report.append("=" * 60)
return "\n".join(report)
def migrate_to_vault(self, interactive: bool = True, resolve_conflicts: str = "ask"):
"""
Migrate all found keys to the secure vault
Args:
interactive: Whether to ask for confirmation
resolve_conflicts: How to handle conflicts ('ask', 'newest', 'skip')
"""
if not CRYPTO_AVAILABLE:
print("❌ cryptography package required for vault")
print("Install with: pip install cryptography")
return False
print("\n🔐 Migrating keys to secure vault...\n")
migrated = 0
skipped = 0
errors = 0
for key_name, locations in self.found_keys.items():
# Check for conflicts
unique_values = {}
for file_path, value in locations:
if value not in unique_values:
unique_values[value] = []
unique_values[value].append(file_path)
if len(unique_values) > 1:
# Handle conflict
print(f"\n⚠️ Conflict found for {key_name}:")
values_list = list(unique_values.items())
for i, (value, files) in enumerate(values_list):
masked = value[:8] + "..." if len(value) > 8 else value
print(f" {i+1}. {masked}")
for f in files[:2]:
rel_path = Path(f).relative_to(self.workspace_root)
print(f" - {rel_path}")
if resolve_conflicts == "ask" and interactive:
choice = input(f"Which value to use? (1-{len(values_list)}, s=skip): ").strip()
if choice.lower() == 's':
skipped += 1
continue
try:
idx = int(choice) - 1
final_value = values_list[idx][0]
except:
print(" Skipping...")
skipped += 1
continue
elif resolve_conflicts == "newest":
# Use the most recently modified file's value
newest_file = max(unique_values.items(),
key=lambda x: max(Path(f).stat().st_mtime for f in x[1]))
final_value = newest_file[0]
else:
skipped += 1
continue
else:
# No conflict, use the single value
final_value = list(unique_values.keys())[0]
# Add to vault
try:
category = None
if key_name in SecureKeyVault.STANDARD_KEYS:
category = SecureKeyVault.STANDARD_KEYS[key_name].category
self.vault.add_key(key_name, final_value, category)
migrated += 1
except Exception as e:
print(f"❌ Error migrating {key_name}: {e}")
errors += 1
print(f"\n✅ Migration complete!")
print(f" • Migrated: {migrated} keys")
print(f" • Skipped: {skipped} keys")
print(f" • Errors: {errors} keys")
return migrated > 0
def main():
"""Main migration workflow"""
import argparse
parser = argparse.ArgumentParser(description="Migrate API keys to secure vault")
parser.add_argument('--scan-only', action='store_true',
help='Only scan and report, don\'t migrate')
parser.add_argument('--auto', action='store_true',
help='Automatic mode (no prompts)')
parser.add_argument('--conflict-resolution', choices=['ask', 'newest', 'skip'],
default='ask', help='How to resolve conflicts')
parser.add_argument('--export-report', type=str,
help='Export report to file')
parser.add_argument('--workspace', type=Path,
default=Path(__file__).parent.parent.parent.parent,
help='Workspace root directory')
args = parser.parse_args()
# Initialize migrator
migrator = KeyMigrator(args.workspace)
# Scan all projects
all_keys = migrator.scan_all_projects()
# Generate report
report = migrator.generate_migration_report()
# Display report
print("\n" + report)
# Export report if requested
if args.export_report:
report_path = Path(args.export_report)
with open(report_path, 'w') as f:
f.write(report)
print(f"\n📄 Report exported to: {report_path}")
# Perform migration unless scan-only
if not args.scan_only:
if args.auto or input("\n🔐 Migrate keys to secure vault? (y/n): ").lower() == 'y':
success = migrator.migrate_to_vault(
interactive=not args.auto,
resolve_conflicts=args.conflict_resolution
)
if success:
print("\n📋 Next steps:")
print("1. Test vault access: python3 scripts/key_vault.py list")
print("2. Export to project: python3 scripts/key_vault.py export .env --project=trax")
print("3. Sync to all projects: python3 scripts/key_vault.py sync root trax youtube-summarizer")
print("4. Validate project keys: python3 scripts/key_vault.py validate trax")
if __name__ == "__main__":
main()