397 lines
15 KiB
Python
397 lines
15 KiB
Python
"""
|
|
Main TestRunner class - orchestrates the entire test execution process.
|
|
|
|
This module provides the primary interface for running tests with various configurations,
|
|
handling parallel execution, environment management, and result aggregation.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Set, Union
|
|
|
|
from .models import (
|
|
TestRunnerMode, TestCategory, TestStatus, ReportFormat,
|
|
TestInfo, TestSuite, ExecutionResult, TestRunResult
|
|
)
|
|
from .test_discovery import TestDiscovery
|
|
from .test_execution import TestExecutor
|
|
from .reporting import TestReporter
|
|
from ..config.test_config import TestConfig
|
|
|
|
|
|
# TestRunnerMode and TestRunResult are now imported from models module
|
|
|
|
|
|
class TestRunner:
|
|
"""
|
|
Main test runner orchestrator.
|
|
|
|
Coordinates test discovery, execution, and reporting across different test categories
|
|
with support for parallel execution, environment management, and comprehensive reporting.
|
|
"""
|
|
|
|
def __init__(self, config: Optional[TestConfig] = None, project_root: Optional[Path] = None):
|
|
"""
|
|
Initialize the test runner.
|
|
|
|
Args:
|
|
config: Test configuration object. If None, loads default config.
|
|
project_root: Project root directory. If None, auto-detects from current location.
|
|
"""
|
|
self.project_root = project_root or self._detect_project_root()
|
|
self.config = config or TestConfig.load_default(self.project_root)
|
|
|
|
# Initialize components
|
|
self.discovery = TestDiscovery(self.project_root, self.config)
|
|
self.executor = TestExecutor(self.config)
|
|
self.reporter = TestReporter(self.config)
|
|
|
|
# Setup logging
|
|
self.logger = self._setup_logging()
|
|
|
|
# State tracking
|
|
self._test_suites: Dict[TestCategory, TestSuite] = {}
|
|
self._execution_start_time: Optional[float] = None
|
|
|
|
def _detect_project_root(self) -> Path:
|
|
"""Auto-detect project root by looking for key files."""
|
|
current = Path.cwd()
|
|
|
|
# Look for project markers
|
|
markers = ["pyproject.toml", "setup.py", "backend/main.py", "package.json"]
|
|
|
|
while current != current.parent:
|
|
if any((current / marker).exists() for marker in markers):
|
|
return current
|
|
current = current.parent
|
|
|
|
# Fallback to current directory
|
|
return Path.cwd()
|
|
|
|
def _setup_logging(self) -> logging.Logger:
|
|
"""Setup logging for test runner."""
|
|
logger = logging.getLogger("TestRunner")
|
|
|
|
if not logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
|
|
logger.setLevel(self.config.log_level)
|
|
return logger
|
|
|
|
async def run_tests(
|
|
self,
|
|
mode: TestRunnerMode = TestRunnerMode.ALL,
|
|
test_patterns: Optional[List[str]] = None,
|
|
parallel: bool = True,
|
|
coverage: bool = True,
|
|
reports: Optional[List[ReportFormat]] = None,
|
|
fail_fast: bool = False,
|
|
verbose: bool = False
|
|
) -> TestRunResult:
|
|
"""
|
|
Run tests based on specified mode and options.
|
|
|
|
Args:
|
|
mode: Test execution mode (all, unit, integration, etc.)
|
|
test_patterns: Specific test patterns to run (for SPECIFIC mode)
|
|
parallel: Enable parallel execution where possible
|
|
coverage: Enable coverage collection
|
|
reports: Report formats to generate
|
|
fail_fast: Stop on first failure
|
|
verbose: Enable verbose output
|
|
|
|
Returns:
|
|
TestRunResult with comprehensive execution results
|
|
"""
|
|
self._execution_start_time = time.time()
|
|
reports = reports or [ReportFormat.CONSOLE]
|
|
|
|
try:
|
|
self.logger.info(f"Starting test run in {mode.value} mode")
|
|
|
|
# Discover tests based on mode
|
|
await self._discover_tests(mode, test_patterns)
|
|
|
|
if not self._test_suites:
|
|
self.logger.warning("No tests found matching criteria")
|
|
return TestRunResult(
|
|
success=True,
|
|
total_tests=0,
|
|
passed_tests=0,
|
|
failed_tests=0,
|
|
skipped_tests=0,
|
|
error_tests=0,
|
|
execution_time=0.0
|
|
)
|
|
|
|
# Execute tests
|
|
execution_results = await self._execute_tests(
|
|
parallel=parallel,
|
|
coverage=coverage,
|
|
fail_fast=fail_fast,
|
|
verbose=verbose
|
|
)
|
|
|
|
# Aggregate results
|
|
final_result = self._aggregate_results(execution_results)
|
|
|
|
# Generate reports
|
|
await self._generate_reports(final_result, reports)
|
|
|
|
# Log summary
|
|
self._log_summary(final_result)
|
|
|
|
return final_result
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Test run failed: {e}")
|
|
raise
|
|
|
|
async def _discover_tests(
|
|
self,
|
|
mode: TestRunnerMode,
|
|
test_patterns: Optional[List[str]] = None
|
|
) -> None:
|
|
"""Discover tests based on execution mode."""
|
|
self.logger.info("Discovering tests...")
|
|
|
|
if mode == TestRunnerMode.ALL:
|
|
# Discover all test categories
|
|
categories = [
|
|
TestCategory.UNIT,
|
|
TestCategory.INTEGRATION,
|
|
TestCategory.API,
|
|
TestCategory.FRONTEND
|
|
]
|
|
elif mode == TestRunnerMode.UNIT:
|
|
categories = [TestCategory.UNIT]
|
|
elif mode == TestRunnerMode.INTEGRATION:
|
|
categories = [TestCategory.INTEGRATION, TestCategory.API]
|
|
elif mode == TestRunnerMode.FRONTEND:
|
|
categories = [TestCategory.FRONTEND]
|
|
elif mode == TestRunnerMode.E2E:
|
|
categories = [TestCategory.E2E]
|
|
elif mode == TestRunnerMode.PERFORMANCE:
|
|
categories = [TestCategory.PERFORMANCE]
|
|
elif mode == TestRunnerMode.CHANGED:
|
|
# Discover tests affected by recent changes
|
|
categories = await self._discover_changed_tests()
|
|
elif mode == TestRunnerMode.SPECIFIC:
|
|
# Use test patterns to determine categories
|
|
self._test_suites = await self.discovery.discover_by_patterns(
|
|
test_patterns or []
|
|
)
|
|
return
|
|
else:
|
|
categories = [TestCategory.UNIT] # Default fallback
|
|
|
|
# Discover tests for each category
|
|
for category in categories:
|
|
suite = await self.discovery.discover_by_category(category)
|
|
if suite and suite.tests:
|
|
self._test_suites[category] = suite
|
|
|
|
total_tests = sum(len(suite.tests) for suite in self._test_suites.values())
|
|
self.logger.info(f"Discovered {total_tests} tests across {len(self._test_suites)} categories")
|
|
|
|
async def _discover_changed_tests(self) -> List[TestCategory]:
|
|
"""Discover test categories affected by recent changes."""
|
|
# This is a simplified implementation - in practice, you'd analyze git changes
|
|
# and map them to test categories based on file paths and dependencies
|
|
|
|
# For now, return all categories as a safe fallback
|
|
return [TestCategory.UNIT, TestCategory.INTEGRATION, TestCategory.API]
|
|
|
|
async def _execute_tests(
|
|
self,
|
|
parallel: bool,
|
|
coverage: bool,
|
|
fail_fast: bool,
|
|
verbose: bool
|
|
) -> Dict[TestCategory, ExecutionResult]:
|
|
"""Execute discovered test suites."""
|
|
self.logger.info("Executing tests...")
|
|
|
|
results = {}
|
|
|
|
if parallel and len(self._test_suites) > 1:
|
|
# Execute categories in parallel
|
|
tasks = [
|
|
self.executor.execute_suite(
|
|
suite,
|
|
coverage=coverage,
|
|
fail_fast=fail_fast,
|
|
verbose=verbose
|
|
)
|
|
for suite in self._test_suites.values()
|
|
]
|
|
|
|
execution_results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
for category, result in zip(self._test_suites.keys(), execution_results):
|
|
if isinstance(result, Exception):
|
|
self.logger.error(f"Execution failed for {category.value}: {result}")
|
|
# Create error result
|
|
results[category] = ExecutionResult(
|
|
success=False,
|
|
total_tests=len(self._test_suites[category].tests),
|
|
passed=0,
|
|
failed=len(self._test_suites[category].tests),
|
|
skipped=0,
|
|
errors=1,
|
|
execution_time=0.0,
|
|
error_details=[{"error": str(result), "category": category.value}]
|
|
)
|
|
else:
|
|
results[category] = result
|
|
else:
|
|
# Execute sequentially
|
|
for category, suite in self._test_suites.items():
|
|
self.logger.info(f"Executing {category.value} tests...")
|
|
|
|
result = await self.executor.execute_suite(
|
|
suite,
|
|
coverage=coverage,
|
|
fail_fast=fail_fast,
|
|
verbose=verbose
|
|
)
|
|
|
|
results[category] = result
|
|
|
|
# Check fail_fast
|
|
if fail_fast and not result.success:
|
|
self.logger.warning("Stopping execution due to fail_fast mode")
|
|
break
|
|
|
|
return results
|
|
|
|
def _aggregate_results(
|
|
self,
|
|
execution_results: Dict[TestCategory, ExecutionResult]
|
|
) -> TestRunResult:
|
|
"""Aggregate results from all test categories."""
|
|
|
|
total_execution_time = time.time() - (self._execution_start_time or time.time())
|
|
|
|
# Aggregate counts
|
|
total_tests = sum(r.total_tests for r in execution_results.values())
|
|
passed_tests = sum(r.passed for r in execution_results.values())
|
|
failed_tests = sum(r.failed for r in execution_results.values())
|
|
skipped_tests = sum(r.skipped for r in execution_results.values())
|
|
error_tests = sum(r.errors for r in execution_results.values())
|
|
|
|
# Overall success
|
|
success = all(r.success for r in execution_results.values())
|
|
|
|
# Aggregate failure details
|
|
failure_details = []
|
|
for category, result in execution_results.items():
|
|
if result.error_details:
|
|
for error in result.error_details:
|
|
failure_details.append({
|
|
**error,
|
|
"category": category.value
|
|
})
|
|
|
|
# Performance metrics
|
|
performance_metrics = {
|
|
"total_execution_time": total_execution_time,
|
|
"average_test_time": total_execution_time / max(total_tests, 1),
|
|
}
|
|
|
|
# Add per-category execution times
|
|
for category, result in execution_results.items():
|
|
performance_metrics[f"{category.value}_execution_time"] = result.execution_time
|
|
|
|
# Coverage (simplified - would need actual coverage integration)
|
|
coverage_percentage = None
|
|
if any(hasattr(r, 'coverage') and r.coverage for r in execution_results.values()):
|
|
# In a real implementation, you'd aggregate coverage data
|
|
coverage_percentage = 85.0 # Placeholder
|
|
|
|
return TestRunResult(
|
|
success=success,
|
|
total_tests=total_tests,
|
|
passed_tests=passed_tests,
|
|
failed_tests=failed_tests,
|
|
skipped_tests=skipped_tests,
|
|
error_tests=error_tests,
|
|
execution_time=total_execution_time,
|
|
coverage_percentage=coverage_percentage,
|
|
results_by_category=execution_results,
|
|
failure_details=failure_details,
|
|
performance_metrics=performance_metrics
|
|
)
|
|
|
|
async def _generate_reports(
|
|
self,
|
|
result: TestRunResult,
|
|
formats: List[ReportFormat]
|
|
) -> None:
|
|
"""Generate test reports in specified formats."""
|
|
self.logger.info("Generating reports...")
|
|
|
|
for format_type in formats:
|
|
await self.reporter.generate_report(result, format_type)
|
|
|
|
def _log_summary(self, result: TestRunResult) -> None:
|
|
"""Log test run summary."""
|
|
status = "PASSED" if result.success else "FAILED"
|
|
|
|
self.logger.info(f"\n{'='*60}")
|
|
self.logger.info(f"TEST RUN SUMMARY - {status}")
|
|
self.logger.info(f"{'='*60}")
|
|
self.logger.info(f"Total Tests: {result.total_tests}")
|
|
self.logger.info(f"Passed: {result.passed_tests}")
|
|
self.logger.info(f"Failed: {result.failed_tests}")
|
|
self.logger.info(f"Skipped: {result.skipped_tests}")
|
|
self.logger.info(f"Errors: {result.error_tests}")
|
|
self.logger.info(f"Execution Time: {result.execution_time:.2f}s")
|
|
|
|
if result.coverage_percentage:
|
|
self.logger.info(f"Coverage: {result.coverage_percentage:.1f}%")
|
|
|
|
if result.failure_details:
|
|
self.logger.info(f"\nFailure Details:")
|
|
for failure in result.failure_details[:5]: # Show first 5 failures
|
|
self.logger.info(f" - {failure.get('test_name', 'Unknown')}: {failure.get('error', 'No details')}")
|
|
|
|
if len(result.failure_details) > 5:
|
|
self.logger.info(f" ... and {len(result.failure_details) - 5} more failures")
|
|
|
|
self.logger.info(f"{'='*60}")
|
|
|
|
def get_test_categories(self) -> List[TestCategory]:
|
|
"""Get available test categories in the project."""
|
|
return [
|
|
TestCategory.UNIT,
|
|
TestCategory.INTEGRATION,
|
|
TestCategory.API,
|
|
TestCategory.FRONTEND,
|
|
TestCategory.E2E,
|
|
TestCategory.PERFORMANCE
|
|
]
|
|
|
|
async def list_tests(self, category: Optional[TestCategory] = None) -> Dict[TestCategory, List[str]]:
|
|
"""List available tests by category."""
|
|
if category:
|
|
suite = await self.discovery.discover_by_category(category)
|
|
return {category: [test.name for test in suite.tests]} if suite else {}
|
|
|
|
# List all tests
|
|
all_tests = {}
|
|
for cat in self.get_test_categories():
|
|
suite = await self.discovery.discover_by_category(cat)
|
|
if suite and suite.tests:
|
|
all_tests[cat] = [test.name for test in suite.tests]
|
|
|
|
return all_tests |