youtube-summarizer/sdks/python/youtube_summarizer_sdk/client.py

474 lines
16 KiB
Python

"""
YouTube Summarizer Python SDK Client
Async HTTP client for the YouTube Summarizer Developer Platform
"""
import asyncio
import aiohttp
import logging
from typing import Optional, Dict, Any, List, AsyncGenerator, Union
from datetime import datetime
from urllib.parse import urljoin
import json
from .models import (
TranscriptRequest, BatchProcessingRequest, JobResponse, BatchJobResponse,
TranscriptResult, DualTranscriptResult, APIUsageStats, ProcessingTimeEstimate,
SDKConfig, WebSocketConfig, JobStatus, WebhookPayload
)
from .exceptions import (
YouTubeSummarizerError, AuthenticationError, RateLimitError,
ValidationError, APIError
)
logger = logging.getLogger(__name__)
class YouTubeSummarizerClient:
"""
Async client for the YouTube Summarizer Developer Platform
Provides comprehensive access to transcript extraction, batch processing,
analytics, and real-time updates through WebSocket connections.
"""
def __init__(self, config: SDKConfig):
"""
Initialize the YouTube Summarizer client
Args:
config: SDK configuration with API key and settings
"""
self.config = config
self._session: Optional[aiohttp.ClientSession] = None
self._ws_session: Optional[aiohttp.ClientSession] = None
self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None
# Validate configuration
if not self.config.api_key:
raise ValueError("API key is required")
self._headers = {
"Authorization": f"Bearer {self.config.api_key}",
"User-Agent": self.config.user_agent,
"Content-Type": "application/json"
}
async def __aenter__(self):
"""Async context manager entry"""
await self._ensure_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self.close()
async def _ensure_session(self):
"""Ensure HTTP session is created"""
if not self._session or self._session.closed:
timeout = aiohttp.ClientTimeout(total=self.config.timeout)
connector = aiohttp.TCPConnector(verify_ssl=self.config.verify_ssl)
self._session = aiohttp.ClientSession(
timeout=timeout,
connector=connector,
headers=self._headers
)
async def close(self):
"""Close all connections"""
if self._websocket:
await self._websocket.close()
self._websocket = None
if self._session and not self._session.closed:
await self._session.close()
self._session = None
if self._ws_session and not self._ws_session.closed:
await self._ws_session.close()
self._ws_session = None
async def _make_request(
self,
method: str,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
retry_count: int = 0
) -> Dict[str, Any]:
"""
Make HTTP request with error handling and retries
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint path
data: JSON request body
params: Query parameters
retry_count: Current retry attempt
Returns:
JSON response data
Raises:
YouTubeSummarizerError: On API errors
"""
await self._ensure_session()
url = urljoin(self.config.base_url, endpoint)
try:
kwargs = {}
if data:
kwargs['json'] = data
if params:
kwargs['params'] = params
async with self._session.request(method, url, **kwargs) as response:
response_data = await response.json()
# Handle successful responses
if 200 <= response.status < 300:
return response_data
# Handle specific error cases
elif response.status == 401:
raise AuthenticationError("Invalid API key")
elif response.status == 429:
rate_limit_info = {
'remaining': response.headers.get('X-RateLimit-Remaining', 0),
'reset': response.headers.get('X-RateLimit-Reset'),
'limit': response.headers.get('X-RateLimit-Limit')
}
raise RateLimitError(
f"Rate limit exceeded. Try again in {rate_limit_info['reset']} seconds",
rate_limit_info
)
elif response.status == 422:
error_detail = response_data.get('detail', 'Validation error')
raise ValidationError(f"Validation failed: {error_detail}")
else:
error_msg = response_data.get('detail', f'API error: {response.status}')
raise APIError(error_msg, response.status)
except aiohttp.ClientError as e:
if retry_count < self.config.max_retries:
await asyncio.sleep(self.config.retry_delay * (retry_count + 1))
return await self._make_request(method, endpoint, data, params, retry_count + 1)
raise YouTubeSummarizerError(f"Network error: {str(e)}")
except json.JSONDecodeError:
raise YouTubeSummarizerError("Invalid JSON response from server")
# Core API Methods
async def extract_transcript(self, request: TranscriptRequest) -> JobResponse:
"""
Extract transcript from YouTube video
Args:
request: Transcript extraction request
Returns:
Job response with job ID and status
"""
data = request.dict()
response = await self._make_request('POST', '/api/v1/enhanced/transcript/extract', data)
return JobResponse(**response)
async def batch_process(self, request: BatchProcessingRequest) -> BatchJobResponse:
"""
Process multiple videos in batch
Args:
request: Batch processing request
Returns:
Batch job response
"""
data = request.dict()
response = await self._make_request('POST', '/api/v1/enhanced/batch/process', data)
return BatchJobResponse(**response)
async def get_job_status(self, job_id: str) -> JobResponse:
"""
Get job status and progress
Args:
job_id: Job identifier
Returns:
Current job status
"""
response = await self._make_request('GET', f'/api/v1/enhanced/jobs/{job_id}/status')
return JobResponse(**response)
async def get_job_result(self, job_id: str) -> Union[TranscriptResult, DualTranscriptResult]:
"""
Get completed job results
Args:
job_id: Job identifier
Returns:
Transcript result (single or dual)
"""
response = await self._make_request('GET', f'/api/v1/enhanced/jobs/{job_id}/result')
# Determine result type based on response structure
if 'quality_comparison' in response:
return DualTranscriptResult(**response)
else:
return TranscriptResult(**response)
async def cancel_job(self, job_id: str) -> Dict[str, Any]:
"""
Cancel a running job
Args:
job_id: Job identifier
Returns:
Cancellation confirmation
"""
return await self._make_request('POST', f'/api/v1/enhanced/jobs/{job_id}/cancel')
async def get_processing_estimate(self, video_url: str) -> ProcessingTimeEstimate:
"""
Get processing time estimate for video
Args:
video_url: YouTube video URL
Returns:
Processing time estimate
"""
params = {'video_url': video_url}
response = await self._make_request('GET', '/api/v1/enhanced/estimate', params=params)
return ProcessingTimeEstimate(**response)
async def get_usage_stats(self) -> APIUsageStats:
"""
Get API usage statistics
Returns:
Usage statistics for the API key
"""
response = await self._make_request('GET', '/api/v1/enhanced/usage')
return APIUsageStats(**response)
async def search_summaries(
self,
query: str,
limit: int = 10,
offset: int = 0
) -> Dict[str, Any]:
"""
Search through processed summaries
Args:
query: Search query
limit: Maximum results to return
offset: Results offset for pagination
Returns:
Search results
"""
params = {
'query': query,
'limit': limit,
'offset': offset
}
return await self._make_request('GET', '/api/v1/enhanced/search', params=params)
async def export_data(
self,
format: str = 'json',
date_from: Optional[str] = None,
date_to: Optional[str] = None
) -> Dict[str, Any]:
"""
Export user data in specified format
Args:
format: Export format (json, csv, etc.)
date_from: Start date (ISO format)
date_to: End date (ISO format)
Returns:
Export data or download link
"""
params = {'format': format}
if date_from:
params['date_from'] = date_from
if date_to:
params['date_to'] = date_to
return await self._make_request('GET', '/api/v1/enhanced/export', params=params)
# Real-time Updates
async def connect_websocket(self, ws_config: Optional[WebSocketConfig] = None) -> bool:
"""
Connect to WebSocket for real-time updates
Args:
ws_config: WebSocket configuration (optional)
Returns:
True if connected successfully
"""
if not ws_config:
ws_config = WebSocketConfig()
try:
if not self._ws_session or self._ws_session.closed:
self._ws_session = aiohttp.ClientSession()
# Add API key to WebSocket connection
headers = {"Authorization": f"Bearer {self.config.api_key}"}
self._websocket = await self._ws_session.ws_connect(
ws_config.url,
headers=headers,
heartbeat=ws_config.heartbeat_interval
)
logger.info("WebSocket connected successfully")
return True
except Exception as e:
logger.error(f"WebSocket connection failed: {e}")
return False
async def listen_for_updates(self) -> AsyncGenerator[WebhookPayload, None]:
"""
Listen for real-time job updates via WebSocket
Yields:
WebhookPayload objects with job updates
"""
if not self._websocket:
raise YouTubeSummarizerError("WebSocket not connected. Call connect_websocket() first")
try:
async for msg in self._websocket:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
data = json.loads(msg.data)
yield WebhookPayload(**data)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON in WebSocket message: {msg.data}")
continue
elif msg.type == aiohttp.WSMsgType.ERROR:
raise YouTubeSummarizerError(f"WebSocket error: {self._websocket.exception()}")
elif msg.type == aiohttp.WSMsgType.CLOSE:
logger.info("WebSocket connection closed")
break
except Exception as e:
logger.error(f"WebSocket listening error: {e}")
raise YouTubeSummarizerError(f"WebSocket error: {str(e)}")
async def disconnect_websocket(self):
"""Disconnect WebSocket connection"""
if self._websocket:
await self._websocket.close()
self._websocket = None
logger.info("WebSocket disconnected")
# Convenience Methods
async def extract_and_wait(
self,
request: TranscriptRequest,
poll_interval: float = 2.0,
timeout: float = 300.0
) -> Union[TranscriptResult, DualTranscriptResult]:
"""
Extract transcript and wait for completion
Args:
request: Transcript extraction request
poll_interval: Status polling interval in seconds
timeout: Maximum wait time in seconds
Returns:
Completed transcript result
"""
job = await self.extract_transcript(request)
start_time = asyncio.get_event_loop().time()
while True:
current_time = asyncio.get_event_loop().time()
if current_time - start_time > timeout:
raise YouTubeSummarizerError(f"Job {job.job_id} timed out after {timeout} seconds")
status = await self.get_job_status(job.job_id)
if status.status == JobStatus.COMPLETED:
return await self.get_job_result(job.job_id)
elif status.status == JobStatus.FAILED:
raise APIError(f"Job {job.job_id} failed: {status.metadata.get('error', 'Unknown error')}")
elif status.status == JobStatus.CANCELLED:
raise YouTubeSummarizerError(f"Job {job.job_id} was cancelled")
await asyncio.sleep(poll_interval)
async def wait_for_job(
self,
job_id: str,
poll_interval: float = 2.0,
timeout: float = 300.0
) -> Union[TranscriptResult, DualTranscriptResult]:
"""
Wait for job completion and return result
Args:
job_id: Job identifier
poll_interval: Status polling interval in seconds
timeout: Maximum wait time in seconds
Returns:
Completed job result
"""
start_time = asyncio.get_event_loop().time()
while True:
current_time = asyncio.get_event_loop().time()
if current_time - start_time > timeout:
raise YouTubeSummarizerError(f"Job {job_id} timed out after {timeout} seconds")
status = await self.get_job_status(job_id)
if status.status == JobStatus.COMPLETED:
return await self.get_job_result(job_id)
elif status.status == JobStatus.FAILED:
raise APIError(f"Job {job_id} failed: {status.metadata.get('error', 'Unknown error')}")
elif status.status == JobStatus.CANCELLED:
raise YouTubeSummarizerError(f"Job {job_id} was cancelled")
await asyncio.sleep(poll_interval)
# Convenience factory function
def create_client(
api_key: str,
base_url: str = "https://api.youtube-summarizer.com",
**kwargs
) -> YouTubeSummarizerClient:
"""
Create a YouTube Summarizer client with default configuration
Args:
api_key: Your API key
base_url: API base URL (default: production)
**kwargs: Additional configuration options
Returns:
Configured client instance
"""
config = SDKConfig(api_key=api_key, base_url=base_url, **kwargs)
return YouTubeSummarizerClient(config)