youtube-automation/test_service.py

129 lines
4.6 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Test script for YouTube-Directus Automation Service
"""
import asyncio
import json
import httpx
from pathlib import Path
async def test_service():
"""Test the YouTube automation service"""
base_url = "http://localhost:8000"
print("🧪 Testing YouTube-Directus Automation Service")
print("=" * 50)
async with httpx.AsyncClient() as client:
# Test 1: Health check
print("1⃣ Testing health check...")
try:
response = await client.get(f"{base_url}/health")
if response.status_code == 200:
health_data = response.json()
print(f" ✅ Service is healthy")
print(f" 📊 Config: {health_data.get('config', {})}")
else:
print(f" ❌ Health check failed: {response.status_code}")
return False
except Exception as e:
print(f" ❌ Could not connect to service: {e}")
print(" 💡 Make sure the service is running: ./start.sh")
return False
# Test 2: Root endpoint
print("\n2⃣ Testing root endpoint...")
try:
response = await client.get(f"{base_url}/")
if response.status_code == 200:
root_data = response.json()
print(f" ✅ Root endpoint working")
print(f" 🔗 Available endpoints: {root_data.get('endpoints', {})}")
else:
print(f" ❌ Root endpoint failed: {response.status_code}")
except Exception as e:
print(f" ❌ Root endpoint error: {e}")
# Test 3: Mock webhook (YouTube video creation)
print("\n3⃣ Testing webhook with mock YouTube video...")
try:
# Mock Directus webhook payload for a YouTube video creation
mock_payload = {
"event": "items.create",
"accountability": {
"user": "test-user",
"role": "admin"
},
"payload": {
"id": "123",
"youtube_url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ", # Rick Roll for testing
"title": "Test YouTube Video"
},
"keys": ["123"]
}
headers = {
"Content-Type": "application/json",
# Note: In production, you'd include x-directus-signature header
}
response = await client.post(
f"{base_url}/webhook/directus",
json=mock_payload,
headers=headers
)
if response.status_code == 200:
webhook_data = response.json()
print(f" ✅ Webhook processed successfully")
print(f" 📝 Response: {webhook_data}")
# Wait a moment for background processing
print(" ⏳ Waiting for background thumbnail processing...")
await asyncio.sleep(3)
else:
print(f" ❌ Webhook failed: {response.status_code}")
print(f" 📄 Response: {response.text}")
except Exception as e:
print(f" ❌ Webhook test error: {e}")
# Test 4: Test YouTube URL parsing
print("\n4⃣ Testing YouTube URL parsing...")
test_urls = [
"https://www.youtube.com/watch?v=dQw4w9WgXcQ",
"https://youtu.be/dQw4w9WgXcQ",
"https://www.youtube.com/embed/dQw4w9WgXcQ",
"https://www.youtube.com/watch?v=dQw4w9WgXcQ&t=30s"
]
# Import the processor to test URL parsing
import sys
sys.path.append("src")
from main import YouTubeProcessor
processor = YouTubeProcessor()
for url in test_urls:
video_id = processor.extract_video_id(url)
if video_id == "dQw4w9WgXcQ":
print(f"{url}{video_id}")
else:
print(f"{url}{video_id} (expected: dQw4w9WgXcQ)")
print("\n" + "=" * 50)
print("✅ Service testing completed!")
print("\n💡 Next steps:")
print(" 1. Configure Directus webhook at https://enias.zeabur.app/admin to point to: http://localhost:8000/webhook/directus")
print(" 2. Add YouTube API key to .env for better thumbnail quality")
print(" 3. Test with real Directus media_items creation")
return True
if __name__ == "__main__":
asyncio.run(test_service())