667 lines
26 KiB
Python
667 lines
26 KiB
Python
"""Enhanced Multi-Agent Orchestrator with template and registry integration."""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Dict, List, Optional, Any, Union
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import Session
|
|
|
|
from ..core.base_agent import AgentState, AgentContext
|
|
|
|
from ..models.analysis_templates import TemplateSet, TemplateRegistry, TemplateType
|
|
from ..models.agent_models import AgentSummary
|
|
from ..services.deepseek_service import DeepSeekService
|
|
from .template_agent_factory import TemplateAgentFactory, get_template_agent_factory
|
|
from .unified_analysis_agent import UnifiedAnalysisAgent
|
|
from .template_driven_agent import TemplateAnalysisResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OrchestrationConfig:
|
|
"""Configuration for multi-agent orchestration."""
|
|
|
|
def __init__(
|
|
self,
|
|
parallel_execution: bool = True,
|
|
synthesis_enabled: bool = True,
|
|
max_concurrent_agents: int = 4,
|
|
timeout_seconds: int = 300,
|
|
quality_threshold: float = 0.7,
|
|
enable_database_persistence: bool = True
|
|
):
|
|
self.parallel_execution = parallel_execution
|
|
self.synthesis_enabled = synthesis_enabled
|
|
self.max_concurrent_agents = max_concurrent_agents
|
|
self.timeout_seconds = timeout_seconds
|
|
self.quality_threshold = quality_threshold
|
|
self.enable_database_persistence = enable_database_persistence
|
|
|
|
|
|
class OrchestrationResult:
|
|
"""Result from multi-agent orchestration."""
|
|
|
|
def __init__(
|
|
self,
|
|
job_id: str,
|
|
template_set_id: str,
|
|
results: Dict[str, TemplateAnalysisResult],
|
|
synthesis_result: Optional[TemplateAnalysisResult] = None,
|
|
processing_time_seconds: float = 0.0,
|
|
success: bool = True,
|
|
error: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
):
|
|
self.job_id = job_id
|
|
self.template_set_id = template_set_id
|
|
self.results = results
|
|
self.synthesis_result = synthesis_result
|
|
self.processing_time_seconds = processing_time_seconds
|
|
self.success = success
|
|
self.error = error
|
|
self.metadata = metadata or {}
|
|
self.timestamp = datetime.utcnow()
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary representation."""
|
|
return {
|
|
"job_id": self.job_id,
|
|
"template_set_id": self.template_set_id,
|
|
"results": {k: v.dict() for k, v in self.results.items()},
|
|
"synthesis_result": self.synthesis_result.dict() if self.synthesis_result else None,
|
|
"processing_time_seconds": self.processing_time_seconds,
|
|
"success": self.success,
|
|
"error": self.error,
|
|
"metadata": self.metadata,
|
|
"timestamp": self.timestamp.isoformat()
|
|
}
|
|
|
|
|
|
class EnhancedMultiAgentOrchestrator:
|
|
"""
|
|
Enhanced orchestrator for template-driven multi-agent analysis.
|
|
|
|
Features:
|
|
- Template-set-based orchestration
|
|
- Agent factory integration for dynamic agent creation
|
|
- Parallel and sequential execution modes
|
|
- Synthesis capability for unified results
|
|
- Database persistence and performance tracking
|
|
- Mixed perspective workflows (Educational + Domain)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
template_registry: Optional[TemplateRegistry] = None,
|
|
agent_factory: Optional[TemplateAgentFactory] = None,
|
|
ai_service: Optional[DeepSeekService] = None,
|
|
config: Optional[OrchestrationConfig] = None
|
|
):
|
|
"""Initialize the enhanced orchestrator.
|
|
|
|
Args:
|
|
template_registry: Registry for template lookups
|
|
agent_factory: Factory for agent creation
|
|
ai_service: AI service for synthesis operations
|
|
config: Orchestration configuration
|
|
"""
|
|
self.template_registry = template_registry
|
|
self.agent_factory = agent_factory or get_template_agent_factory(template_registry)
|
|
self.ai_service = ai_service or DeepSeekService()
|
|
self.config = config or OrchestrationConfig()
|
|
|
|
# Active orchestration jobs
|
|
self._active_jobs: Dict[str, Dict[str, Any]] = {}
|
|
|
|
# Performance tracking
|
|
self._total_orchestrations = 0
|
|
self._successful_orchestrations = 0
|
|
self._total_processing_time = 0.0
|
|
|
|
logger.info("Enhanced Multi-Agent Orchestrator initialized")
|
|
|
|
async def orchestrate_template_set(
|
|
self,
|
|
job_id: str,
|
|
template_set_id: str,
|
|
content: str,
|
|
context: Optional[Dict[str, Any]] = None,
|
|
video_id: Optional[str] = None
|
|
) -> OrchestrationResult:
|
|
"""Orchestrate analysis using all templates in a template set.
|
|
|
|
Args:
|
|
job_id: Unique job identifier
|
|
template_set_id: Template set to use for analysis
|
|
content: Content to analyze
|
|
context: Additional context for templates
|
|
video_id: Video ID if analyzing video content
|
|
|
|
Returns:
|
|
Orchestration result with all analysis outputs
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
# Get template set
|
|
template_set = self.template_registry.get_template_set(template_set_id)
|
|
if not template_set:
|
|
raise ValueError(f"Template set not found: {template_set_id}")
|
|
|
|
if not template_set.is_active:
|
|
raise ValueError(f"Template set is inactive: {template_set_id}")
|
|
|
|
# Initialize job tracking
|
|
self._active_jobs[job_id] = {
|
|
"start_time": start_time,
|
|
"template_set_id": template_set_id,
|
|
"status": "running",
|
|
"agents_count": len(template_set.templates)
|
|
}
|
|
|
|
# Create agents for the template set
|
|
agents = await self.agent_factory.create_agent_set(template_set_id)
|
|
|
|
# Prepare analysis state and context
|
|
state = AgentState({
|
|
"content": content,
|
|
"transcript": content, # Alias for compatibility
|
|
"video_id": video_id,
|
|
"context": context or {},
|
|
"job_id": job_id,
|
|
"template_set_id": template_set_id
|
|
})
|
|
|
|
agent_context = AgentContext(
|
|
request_id=job_id,
|
|
metadata={"template_set_id": template_set_id, "video_id": video_id}
|
|
)
|
|
|
|
# Execute agents
|
|
if template_set.parallel_execution and self.config.parallel_execution:
|
|
results = await self._execute_agents_parallel(agents, state, agent_context)
|
|
else:
|
|
results = await self._execute_agents_sequential(agents, template_set, state, agent_context)
|
|
|
|
# Perform synthesis if enabled and configured
|
|
synthesis_result = None
|
|
if self.config.synthesis_enabled and template_set.synthesis_template:
|
|
synthesis_result = await self._synthesize_results(
|
|
results, template_set, state, agent_context
|
|
)
|
|
|
|
# Calculate total processing time
|
|
processing_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Create orchestration result
|
|
orchestration_result = OrchestrationResult(
|
|
job_id=job_id,
|
|
template_set_id=template_set_id,
|
|
results=results,
|
|
synthesis_result=synthesis_result,
|
|
processing_time_seconds=processing_time,
|
|
success=True,
|
|
metadata={
|
|
"agents_used": list(agents.keys()),
|
|
"parallel_execution": template_set.parallel_execution and self.config.parallel_execution,
|
|
"synthesis_performed": synthesis_result is not None
|
|
}
|
|
)
|
|
|
|
# Update performance metrics
|
|
self._update_performance_metrics(orchestration_result)
|
|
|
|
# Remove from active jobs
|
|
self._active_jobs.pop(job_id, None)
|
|
|
|
logger.info(f"Orchestration {job_id} completed in {processing_time:.2f}s")
|
|
return orchestration_result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Orchestration {job_id} failed: {e}")
|
|
|
|
processing_time = (datetime.utcnow() - start_time).total_seconds()
|
|
self._active_jobs.pop(job_id, None)
|
|
|
|
return OrchestrationResult(
|
|
job_id=job_id,
|
|
template_set_id=template_set_id,
|
|
results={},
|
|
processing_time_seconds=processing_time,
|
|
success=False,
|
|
error=str(e)
|
|
)
|
|
|
|
async def orchestrate_mixed_perspectives(
|
|
self,
|
|
job_id: str,
|
|
template_ids: List[str],
|
|
content: str,
|
|
context: Optional[Dict[str, Any]] = None,
|
|
video_id: Optional[str] = None,
|
|
enable_synthesis: bool = True
|
|
) -> OrchestrationResult:
|
|
"""Orchestrate analysis using a custom mix of templates.
|
|
|
|
This allows combining Educational and Domain perspectives in a single workflow.
|
|
|
|
Args:
|
|
job_id: Unique job identifier
|
|
template_ids: List of template IDs to use
|
|
content: Content to analyze
|
|
context: Additional context for templates
|
|
video_id: Video ID if analyzing video content
|
|
enable_synthesis: Whether to synthesize results
|
|
|
|
Returns:
|
|
Orchestration result with all analysis outputs
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
# Validate templates
|
|
templates = {}
|
|
for template_id in template_ids:
|
|
template = self.template_registry.get_template(template_id)
|
|
if not template:
|
|
raise ValueError(f"Template not found: {template_id}")
|
|
if not template.is_active:
|
|
raise ValueError(f"Template is inactive: {template_id}")
|
|
templates[template_id] = template
|
|
|
|
# Initialize job tracking
|
|
self._active_jobs[job_id] = {
|
|
"start_time": start_time,
|
|
"template_ids": template_ids,
|
|
"status": "running",
|
|
"agents_count": len(templates)
|
|
}
|
|
|
|
# Create agents for the templates
|
|
agents = {}
|
|
for template_id in template_ids:
|
|
agent = await self.agent_factory.create_agent(template_id)
|
|
agents[template_id] = agent
|
|
|
|
# Prepare analysis state and context
|
|
state = AgentState({
|
|
"content": content,
|
|
"transcript": content,
|
|
"video_id": video_id,
|
|
"context": context or {},
|
|
"job_id": job_id,
|
|
"mixed_perspectives": True
|
|
})
|
|
|
|
agent_context = AgentContext(
|
|
request_id=job_id,
|
|
metadata={"template_ids": template_ids, "video_id": video_id}
|
|
)
|
|
|
|
# Execute agents in parallel (mixed perspectives work well in parallel)
|
|
results = await self._execute_agents_parallel(agents, state, agent_context)
|
|
|
|
# Perform synthesis if enabled
|
|
synthesis_result = None
|
|
if enable_synthesis and len(results) > 1:
|
|
synthesis_result = await self._synthesize_mixed_results(
|
|
results, templates, state, agent_context
|
|
)
|
|
|
|
# Calculate total processing time
|
|
processing_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Create orchestration result
|
|
orchestration_result = OrchestrationResult(
|
|
job_id=job_id,
|
|
template_set_id="mixed_perspectives",
|
|
results=results,
|
|
synthesis_result=synthesis_result,
|
|
processing_time_seconds=processing_time,
|
|
success=True,
|
|
metadata={
|
|
"template_ids": template_ids,
|
|
"template_types": [t.template_type.value for t in templates.values()],
|
|
"mixed_perspectives": True,
|
|
"synthesis_performed": synthesis_result is not None
|
|
}
|
|
)
|
|
|
|
# Update performance metrics
|
|
self._update_performance_metrics(orchestration_result)
|
|
|
|
# Remove from active jobs
|
|
self._active_jobs.pop(job_id, None)
|
|
|
|
logger.info(f"Mixed perspective orchestration {job_id} completed in {processing_time:.2f}s")
|
|
return orchestration_result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Mixed perspective orchestration {job_id} failed: {e}")
|
|
|
|
processing_time = (datetime.utcnow() - start_time).total_seconds()
|
|
self._active_jobs.pop(job_id, None)
|
|
|
|
return OrchestrationResult(
|
|
job_id=job_id,
|
|
template_set_id="mixed_perspectives",
|
|
results={},
|
|
processing_time_seconds=processing_time,
|
|
success=False,
|
|
error=str(e)
|
|
)
|
|
|
|
async def save_orchestration_to_database(
|
|
self,
|
|
orchestration_result: OrchestrationResult,
|
|
summary_id: str,
|
|
db: Session
|
|
) -> List[AgentSummary]:
|
|
"""Save orchestration results to database.
|
|
|
|
Args:
|
|
orchestration_result: Results to save
|
|
summary_id: Parent summary ID
|
|
db: Database session
|
|
|
|
Returns:
|
|
List of created AgentSummary records
|
|
"""
|
|
if not self.config.enable_database_persistence:
|
|
return []
|
|
|
|
try:
|
|
agent_summaries = []
|
|
|
|
# Save individual agent results
|
|
for template_id, result in orchestration_result.results.items():
|
|
agent_summary = AgentSummary(
|
|
summary_id=summary_id,
|
|
agent_name=result.template_name,
|
|
agent_type=template_id,
|
|
analysis_result=result.analysis,
|
|
key_insights=result.key_insights,
|
|
confidence_score=result.confidence_score,
|
|
processing_time=result.processing_time_seconds,
|
|
template_used=template_id,
|
|
analysis_metadata={
|
|
"context_used": result.context_used,
|
|
"template_variables": result.template_variables,
|
|
"timestamp": result.timestamp.isoformat()
|
|
}
|
|
)
|
|
db.add(agent_summary)
|
|
agent_summaries.append(agent_summary)
|
|
|
|
# Save synthesis result if available
|
|
if orchestration_result.synthesis_result:
|
|
synthesis_summary = AgentSummary(
|
|
summary_id=summary_id,
|
|
agent_name="Synthesis Agent",
|
|
agent_type="synthesis",
|
|
analysis_result=orchestration_result.synthesis_result.analysis,
|
|
key_insights=orchestration_result.synthesis_result.key_insights,
|
|
confidence_score=orchestration_result.synthesis_result.confidence_score,
|
|
processing_time=orchestration_result.synthesis_result.processing_time_seconds,
|
|
template_used=orchestration_result.synthesis_result.template_id,
|
|
analysis_metadata={
|
|
"orchestration_metadata": orchestration_result.metadata,
|
|
"total_processing_time": orchestration_result.processing_time_seconds,
|
|
"timestamp": orchestration_result.timestamp.isoformat()
|
|
}
|
|
)
|
|
db.add(synthesis_summary)
|
|
agent_summaries.append(synthesis_summary)
|
|
|
|
db.commit()
|
|
logger.info(f"Saved {len(agent_summaries)} agent summaries to database")
|
|
return agent_summaries
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to save orchestration results to database: {e}")
|
|
db.rollback()
|
|
return []
|
|
|
|
def get_active_orchestrations(self) -> Dict[str, Dict[str, Any]]:
|
|
"""Get information about active orchestration jobs."""
|
|
return {
|
|
job_id: {
|
|
**job_info,
|
|
"runtime_seconds": (datetime.utcnow() - job_info["start_time"]).total_seconds()
|
|
}
|
|
for job_id, job_info in self._active_jobs.items()
|
|
}
|
|
|
|
def get_orchestration_statistics(self) -> Dict[str, Any]:
|
|
"""Get comprehensive orchestration statistics."""
|
|
success_rate = (
|
|
self._successful_orchestrations / max(self._total_orchestrations, 1)
|
|
)
|
|
avg_processing_time = (
|
|
self._total_processing_time / max(self._successful_orchestrations, 1)
|
|
)
|
|
|
|
return {
|
|
"total_orchestrations": self._total_orchestrations,
|
|
"successful_orchestrations": self._successful_orchestrations,
|
|
"success_rate": success_rate,
|
|
"active_orchestrations": len(self._active_jobs),
|
|
"average_processing_time_seconds": avg_processing_time,
|
|
"total_processing_time_seconds": self._total_processing_time,
|
|
"factory_statistics": self.agent_factory.get_factory_statistics()
|
|
}
|
|
|
|
async def _execute_agents_parallel(
|
|
self,
|
|
agents: Dict[str, UnifiedAnalysisAgent],
|
|
state: AgentState,
|
|
context: AgentContext
|
|
) -> Dict[str, TemplateAnalysisResult]:
|
|
"""Execute agents in parallel."""
|
|
semaphore = asyncio.Semaphore(self.config.max_concurrent_agents)
|
|
|
|
async def execute_agent_with_semaphore(template_id: str, agent: UnifiedAnalysisAgent):
|
|
async with semaphore:
|
|
try:
|
|
updated_state = await agent.execute(state.copy(), context)
|
|
agent_key = f"agent_{template_id}"
|
|
return template_id, updated_state[agent_key]["result"]
|
|
except Exception as e:
|
|
logger.error(f"Agent {template_id} failed: {e}")
|
|
return template_id, None
|
|
|
|
# Execute all agents concurrently
|
|
tasks = [
|
|
execute_agent_with_semaphore(template_id, agent)
|
|
for template_id, agent in agents.items()
|
|
]
|
|
|
|
# Wait for all agents with timeout
|
|
try:
|
|
results = await asyncio.wait_for(
|
|
asyncio.gather(*tasks, return_exceptions=True),
|
|
timeout=self.config.timeout_seconds
|
|
)
|
|
except asyncio.TimeoutError:
|
|
logger.error("Agent execution timed out")
|
|
raise
|
|
|
|
# Process results
|
|
agent_results = {}
|
|
for template_id, result in results:
|
|
if result is not None and not isinstance(result, Exception):
|
|
# Convert dict back to TemplateAnalysisResult
|
|
if isinstance(result, dict):
|
|
agent_results[template_id] = TemplateAnalysisResult(**result)
|
|
else:
|
|
agent_results[template_id] = result
|
|
|
|
return agent_results
|
|
|
|
async def _execute_agents_sequential(
|
|
self,
|
|
agents: Dict[str, UnifiedAnalysisAgent],
|
|
template_set: TemplateSet,
|
|
state: AgentState,
|
|
context: AgentContext
|
|
) -> Dict[str, TemplateAnalysisResult]:
|
|
"""Execute agents in sequential order."""
|
|
agent_results = {}
|
|
execution_order = template_set.execution_order or list(agents.keys())
|
|
|
|
for template_id in execution_order:
|
|
if template_id not in agents:
|
|
continue
|
|
|
|
agent = agents[template_id]
|
|
try:
|
|
updated_state = await agent.execute(state.copy(), context)
|
|
agent_key = f"agent_{template_id}"
|
|
result = updated_state[agent_key]["result"]
|
|
|
|
# Convert dict to TemplateAnalysisResult if needed
|
|
if isinstance(result, dict):
|
|
agent_results[template_id] = TemplateAnalysisResult(**result)
|
|
else:
|
|
agent_results[template_id] = result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Sequential agent {template_id} failed: {e}")
|
|
|
|
return agent_results
|
|
|
|
async def _synthesize_results(
|
|
self,
|
|
results: Dict[str, TemplateAnalysisResult],
|
|
template_set: TemplateSet,
|
|
state: AgentState,
|
|
context: AgentContext
|
|
) -> Optional[TemplateAnalysisResult]:
|
|
"""Synthesize results using the template set's synthesis template."""
|
|
if not template_set.synthesis_template:
|
|
return None
|
|
|
|
try:
|
|
# Create synthesis agent
|
|
synthesis_agent = await self.agent_factory.create_agent(
|
|
template_set.synthesis_template.id
|
|
)
|
|
|
|
# Prepare synthesis context
|
|
synthesis_state = state.copy()
|
|
for template_id, result in results.items():
|
|
synthesis_state[f"{template_id}_analysis"] = result.analysis
|
|
synthesis_state[f"{template_id}_insights"] = result.key_insights
|
|
|
|
# Execute synthesis
|
|
updated_state = await synthesis_agent.execute(synthesis_state, context)
|
|
agent_key = f"agent_{template_set.synthesis_template.id}"
|
|
result = updated_state[agent_key]["result"]
|
|
|
|
# Convert dict to TemplateAnalysisResult if needed
|
|
if isinstance(result, dict):
|
|
return TemplateAnalysisResult(**result)
|
|
else:
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Synthesis failed: {e}")
|
|
return None
|
|
|
|
async def _synthesize_mixed_results(
|
|
self,
|
|
results: Dict[str, TemplateAnalysisResult],
|
|
templates: Dict[str, Any],
|
|
state: AgentState,
|
|
context: AgentContext
|
|
) -> Optional[TemplateAnalysisResult]:
|
|
"""Synthesize results from mixed perspectives."""
|
|
try:
|
|
# Create a dynamic synthesis prompt
|
|
synthesis_prompt = self._create_mixed_synthesis_prompt(results, templates)
|
|
|
|
# Use AI service directly for synthesis
|
|
synthesis_response = await self.ai_service.generate_summary({
|
|
"prompt": synthesis_prompt,
|
|
"system_prompt": "You are a synthesis expert combining multiple analytical perspectives.",
|
|
"max_tokens": 2000,
|
|
"temperature": 0.7
|
|
})
|
|
|
|
# Create synthesis result
|
|
return TemplateAnalysisResult(
|
|
template_id="mixed_synthesis",
|
|
template_name="Mixed Perspective Synthesis",
|
|
analysis=synthesis_response,
|
|
key_insights=self._extract_synthesis_insights(synthesis_response),
|
|
confidence_score=0.8, # Default for synthesis
|
|
processing_time_seconds=1.0,
|
|
context_used={},
|
|
template_variables={}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Mixed synthesis failed: {e}")
|
|
return None
|
|
|
|
def _create_mixed_synthesis_prompt(
|
|
self,
|
|
results: Dict[str, TemplateAnalysisResult],
|
|
templates: Dict[str, Any]
|
|
) -> str:
|
|
"""Create synthesis prompt for mixed perspectives."""
|
|
analyses = []
|
|
for template_id, result in results.items():
|
|
template = templates[template_id]
|
|
analyses.append(f"""
|
|
**{template.name} Analysis:**
|
|
{result.analysis}
|
|
|
|
**Key Insights:**
|
|
{chr(10).join(f"- {insight}" for insight in result.key_insights)}
|
|
""")
|
|
|
|
return f"""
|
|
Please synthesize the following multi-perspective analyses into a unified understanding:
|
|
|
|
{chr(10).join(analyses)}
|
|
|
|
Create a comprehensive synthesis that:
|
|
1. Identifies common themes across perspectives
|
|
2. Highlights unique insights from each viewpoint
|
|
3. Provides an integrated understanding
|
|
4. Offers actionable recommendations
|
|
|
|
Format as a structured analysis with clear sections.
|
|
"""
|
|
|
|
def _extract_synthesis_insights(self, synthesis_response: str) -> List[str]:
|
|
"""Extract key insights from synthesis response."""
|
|
insights = []
|
|
lines = synthesis_response.split('\n')
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line.startswith('-') or line.startswith('•'):
|
|
insight = line[1:].strip()
|
|
if len(insight) > 10:
|
|
insights.append(insight)
|
|
|
|
# Ensure we have at least some insights
|
|
if len(insights) < 3:
|
|
sentences = synthesis_response.split('.')
|
|
for sentence in sentences:
|
|
sentence = sentence.strip()
|
|
if len(sentence) > 20 and any(word in sentence.lower() for word in
|
|
['important', 'key', 'significant', 'synthesis']):
|
|
if len(insights) < 5:
|
|
insights.append(sentence)
|
|
|
|
return insights[:5] # Limit to 5 insights
|
|
|
|
def _update_performance_metrics(self, result: OrchestrationResult) -> None:
|
|
"""Update orchestration performance metrics."""
|
|
self._total_orchestrations += 1
|
|
if result.success:
|
|
self._successful_orchestrations += 1
|
|
self._total_processing_time += result.processing_time_seconds |