344 lines
14 KiB
Python
Executable File
344 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
API Key Migration Tool - Consolidate scattered keys into secure vault
|
|
Scans all projects and creates a unified key vault
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
# Add user site-packages to path for cryptography module
|
|
import site
|
|
sys.path.extend(site.getusersitepackages() if isinstance(site.getusersitepackages(), list) else [site.getusersitepackages()])
|
|
from pathlib import Path
|
|
from typing import Dict, List, Set, Tuple
|
|
import json
|
|
import re
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.append(str(Path(__file__).parent))
|
|
|
|
try:
|
|
from key_vault import SecureKeyVault, CRYPTO_AVAILABLE
|
|
except ImportError:
|
|
print("❌ key_vault.py not found in scripts directory")
|
|
sys.exit(1)
|
|
|
|
class KeyMigrator:
|
|
"""Migrate and consolidate API keys from multiple sources"""
|
|
|
|
def __init__(self, workspace_root: Path):
|
|
self.workspace_root = workspace_root
|
|
self.vault = SecureKeyVault()
|
|
self.found_keys = defaultdict(set) # key_name -> set of (file, value) tuples
|
|
self.scan_report = []
|
|
|
|
def scan_directory(self, directory: Path, recursive: bool = True) -> Dict[str, str]:
|
|
"""Scan a directory for .env files and extract keys"""
|
|
keys = {}
|
|
env_files = []
|
|
|
|
if recursive:
|
|
env_files = list(directory.rglob(".env*"))
|
|
else:
|
|
env_files = list(directory.glob(".env*"))
|
|
|
|
for env_file in env_files:
|
|
# Skip certain patterns
|
|
if any(skip in str(env_file) for skip in [
|
|
'node_modules', 'venv', '.venv', '__pycache__',
|
|
'.git', 'dist', 'build', '.env.example', '.env.template'
|
|
]):
|
|
continue
|
|
|
|
self.scan_report.append(f"📂 Scanning: {env_file.relative_to(self.workspace_root)}")
|
|
|
|
try:
|
|
with open(env_file, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#'):
|
|
if '=' in line:
|
|
key, value = line.split('=', 1)
|
|
key = key.strip()
|
|
value = value.strip().strip('"').strip("'")
|
|
|
|
# Track where each key was found
|
|
self.found_keys[key].add((str(env_file), value))
|
|
keys[key] = value
|
|
except Exception as e:
|
|
self.scan_report.append(f" ⚠️ Error reading {env_file}: {e}")
|
|
|
|
return keys
|
|
|
|
def scan_all_projects(self) -> Dict[str, Dict[str, str]]:
|
|
"""Scan all known projects for API keys"""
|
|
projects = {
|
|
"root": self.workspace_root,
|
|
"trax": self.workspace_root / "apps" / "trax",
|
|
"youtube-summarizer": self.workspace_root / "apps" / "youtube-summarizer",
|
|
"pdf-translator": self.workspace_root / "pdf-translator",
|
|
"mixcloud-rss": self.workspace_root / "mixcloud-rss-generator",
|
|
"clean-tracks": self.workspace_root / "projects" / "clean-tracks-main",
|
|
"task-master": self.workspace_root / "tools" / "claude-task-master",
|
|
"directus-mcp": self.workspace_root / "tools" / "directus-mcp-server",
|
|
}
|
|
|
|
all_keys = {}
|
|
|
|
print("\n🔍 Scanning projects for API keys...\n")
|
|
|
|
for project_name, project_path in projects.items():
|
|
if project_path.exists():
|
|
print(f"📦 Project: {project_name}")
|
|
keys = self.scan_directory(project_path, recursive=False)
|
|
if keys:
|
|
all_keys[project_name] = keys
|
|
print(f" ✅ Found {len(keys)} keys")
|
|
else:
|
|
print(f" ⚠️ No keys found")
|
|
else:
|
|
print(f"📦 Project: {project_name} - ⚠️ Directory not found")
|
|
|
|
return all_keys
|
|
|
|
def analyze_duplicates(self) -> List[Tuple[str, List[Tuple[str, str]]]]:
|
|
"""Analyze duplicate keys with different values"""
|
|
conflicts = []
|
|
|
|
for key_name, locations in self.found_keys.items():
|
|
unique_values = {}
|
|
for file_path, value in locations:
|
|
if value not in unique_values:
|
|
unique_values[value] = []
|
|
unique_values[value].append(file_path)
|
|
|
|
if len(unique_values) > 1:
|
|
# Found conflicting values
|
|
conflict_info = []
|
|
for value, files in unique_values.items():
|
|
masked_value = value[:8] + "..." if len(value) > 8 else value
|
|
conflict_info.append((masked_value, files))
|
|
conflicts.append((key_name, conflict_info))
|
|
|
|
return conflicts
|
|
|
|
def generate_migration_report(self) -> str:
|
|
"""Generate a detailed migration report"""
|
|
report = []
|
|
report.append("=" * 60)
|
|
report.append("API KEY MIGRATION REPORT")
|
|
report.append("=" * 60)
|
|
report.append(f"Generated: {datetime.now().isoformat()}")
|
|
report.append(f"Workspace: {self.workspace_root}")
|
|
report.append("")
|
|
|
|
# Summary
|
|
total_keys = len(self.found_keys)
|
|
total_files = len(set(f for locs in self.found_keys.values() for f, _ in locs))
|
|
|
|
report.append("SUMMARY")
|
|
report.append("-" * 40)
|
|
report.append(f"Total unique keys found: {total_keys}")
|
|
report.append(f"Total .env files scanned: {total_files}")
|
|
report.append("")
|
|
|
|
# Key categories
|
|
categories = defaultdict(list)
|
|
for key_name in self.found_keys.keys():
|
|
if key_name in SecureKeyVault.STANDARD_KEYS:
|
|
meta = SecureKeyVault.STANDARD_KEYS[key_name]
|
|
categories[meta.category].append(key_name)
|
|
else:
|
|
categories["custom"].append(key_name)
|
|
|
|
report.append("KEYS BY CATEGORY")
|
|
report.append("-" * 40)
|
|
for category, keys in sorted(categories.items()):
|
|
report.append(f"\n{category.upper()} ({len(keys)} keys):")
|
|
for key in sorted(keys):
|
|
locations_count = len(self.found_keys[key])
|
|
report.append(f" • {key} (found in {locations_count} locations)")
|
|
report.append("")
|
|
|
|
# Conflicts
|
|
conflicts = self.analyze_duplicates()
|
|
if conflicts:
|
|
report.append("⚠️ CONFLICTS DETECTED")
|
|
report.append("-" * 40)
|
|
report.append("The following keys have different values in different locations:")
|
|
report.append("")
|
|
|
|
for key_name, conflict_info in conflicts:
|
|
report.append(f" {key_name}:")
|
|
for masked_value, files in conflict_info:
|
|
report.append(f" Value: {masked_value}")
|
|
for file_path in files[:3]: # Limit to 3 files
|
|
rel_path = Path(file_path).relative_to(self.workspace_root)
|
|
report.append(f" - {rel_path}")
|
|
report.append("")
|
|
|
|
# Missing standard keys
|
|
standard_keys = set(SecureKeyVault.STANDARD_KEYS.keys())
|
|
found_standard = set(k for k in self.found_keys.keys() if k in standard_keys)
|
|
missing_standard = standard_keys - found_standard
|
|
|
|
if missing_standard:
|
|
report.append("MISSING STANDARD KEYS")
|
|
report.append("-" * 40)
|
|
report.append("The following standard keys were not found:")
|
|
for key in sorted(missing_standard):
|
|
meta = SecureKeyVault.STANDARD_KEYS[key]
|
|
report.append(f" • {key} - {meta.description}")
|
|
report.append(f" Required for: {', '.join(meta.required_for)}")
|
|
report.append("")
|
|
|
|
# Scan details
|
|
report.append("SCAN DETAILS")
|
|
report.append("-" * 40)
|
|
for entry in self.scan_report[-10:]: # Last 10 entries
|
|
report.append(entry)
|
|
|
|
report.append("")
|
|
report.append("=" * 60)
|
|
|
|
return "\n".join(report)
|
|
|
|
def migrate_to_vault(self, interactive: bool = True, resolve_conflicts: str = "ask"):
|
|
"""
|
|
Migrate all found keys to the secure vault
|
|
|
|
Args:
|
|
interactive: Whether to ask for confirmation
|
|
resolve_conflicts: How to handle conflicts ('ask', 'newest', 'skip')
|
|
"""
|
|
if not CRYPTO_AVAILABLE:
|
|
print("❌ cryptography package required for vault")
|
|
print("Install with: pip install cryptography")
|
|
return False
|
|
|
|
print("\n🔐 Migrating keys to secure vault...\n")
|
|
|
|
migrated = 0
|
|
skipped = 0
|
|
errors = 0
|
|
|
|
for key_name, locations in self.found_keys.items():
|
|
# Check for conflicts
|
|
unique_values = {}
|
|
for file_path, value in locations:
|
|
if value not in unique_values:
|
|
unique_values[value] = []
|
|
unique_values[value].append(file_path)
|
|
|
|
if len(unique_values) > 1:
|
|
# Handle conflict
|
|
print(f"\n⚠️ Conflict found for {key_name}:")
|
|
|
|
values_list = list(unique_values.items())
|
|
for i, (value, files) in enumerate(values_list):
|
|
masked = value[:8] + "..." if len(value) > 8 else value
|
|
print(f" {i+1}. {masked}")
|
|
for f in files[:2]:
|
|
rel_path = Path(f).relative_to(self.workspace_root)
|
|
print(f" - {rel_path}")
|
|
|
|
if resolve_conflicts == "ask" and interactive:
|
|
choice = input(f"Which value to use? (1-{len(values_list)}, s=skip): ").strip()
|
|
if choice.lower() == 's':
|
|
skipped += 1
|
|
continue
|
|
try:
|
|
idx = int(choice) - 1
|
|
final_value = values_list[idx][0]
|
|
except:
|
|
print(" Skipping...")
|
|
skipped += 1
|
|
continue
|
|
elif resolve_conflicts == "newest":
|
|
# Use the most recently modified file's value
|
|
newest_file = max(unique_values.items(),
|
|
key=lambda x: max(Path(f).stat().st_mtime for f in x[1]))
|
|
final_value = newest_file[0]
|
|
else:
|
|
skipped += 1
|
|
continue
|
|
else:
|
|
# No conflict, use the single value
|
|
final_value = list(unique_values.keys())[0]
|
|
|
|
# Add to vault
|
|
try:
|
|
category = None
|
|
if key_name in SecureKeyVault.STANDARD_KEYS:
|
|
category = SecureKeyVault.STANDARD_KEYS[key_name].category
|
|
|
|
self.vault.add_key(key_name, final_value, category)
|
|
migrated += 1
|
|
except Exception as e:
|
|
print(f"❌ Error migrating {key_name}: {e}")
|
|
errors += 1
|
|
|
|
print(f"\n✅ Migration complete!")
|
|
print(f" • Migrated: {migrated} keys")
|
|
print(f" • Skipped: {skipped} keys")
|
|
print(f" • Errors: {errors} keys")
|
|
|
|
return migrated > 0
|
|
|
|
def main():
|
|
"""Main migration workflow"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Migrate API keys to secure vault")
|
|
parser.add_argument('--scan-only', action='store_true',
|
|
help='Only scan and report, don\'t migrate')
|
|
parser.add_argument('--auto', action='store_true',
|
|
help='Automatic mode (no prompts)')
|
|
parser.add_argument('--conflict-resolution', choices=['ask', 'newest', 'skip'],
|
|
default='ask', help='How to resolve conflicts')
|
|
parser.add_argument('--export-report', type=str,
|
|
help='Export report to file')
|
|
parser.add_argument('--workspace', type=Path,
|
|
default=Path(__file__).parent.parent.parent.parent,
|
|
help='Workspace root directory')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Initialize migrator
|
|
migrator = KeyMigrator(args.workspace)
|
|
|
|
# Scan all projects
|
|
all_keys = migrator.scan_all_projects()
|
|
|
|
# Generate report
|
|
report = migrator.generate_migration_report()
|
|
|
|
# Display report
|
|
print("\n" + report)
|
|
|
|
# Export report if requested
|
|
if args.export_report:
|
|
report_path = Path(args.export_report)
|
|
with open(report_path, 'w') as f:
|
|
f.write(report)
|
|
print(f"\n📄 Report exported to: {report_path}")
|
|
|
|
# Perform migration unless scan-only
|
|
if not args.scan_only:
|
|
if args.auto or input("\n🔐 Migrate keys to secure vault? (y/n): ").lower() == 'y':
|
|
success = migrator.migrate_to_vault(
|
|
interactive=not args.auto,
|
|
resolve_conflicts=args.conflict_resolution
|
|
)
|
|
|
|
if success:
|
|
print("\n📋 Next steps:")
|
|
print("1. Test vault access: python3 scripts/key_vault.py list")
|
|
print("2. Export to project: python3 scripts/key_vault.py export .env --project=trax")
|
|
print("3. Sync to all projects: python3 scripts/key_vault.py sync root trax youtube-summarizer")
|
|
print("4. Validate project keys: python3 scripts/key_vault.py validate trax")
|
|
|
|
if __name__ == "__main__":
|
|
main() |