youtube-summarizer/backend/test_runner/core/test_execution.py

636 lines
21 KiB
Python

"""
Test Execution Engine
Handles the actual execution of test suites with support for parallel execution,
environment setup, result collection, and real-time progress reporting.
"""
import asyncio
import json
import os
import subprocess
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Any, Callable
import logging
from .models import TestSuite, TestInfo, TestCategory, ExecutionResult, TestStatus
# ExecutionResult is now imported from models module
class TestExecutor:
"""
Test execution engine with support for parallel execution and real-time reporting.
Handles environment setup, test execution, result collection, and cleanup
for different test categories and execution modes.
"""
def __init__(self, config):
"""
Initialize test executor.
Args:
config: Test configuration object
"""
self.config = config
self.logger = logging.getLogger("TestExecutor")
# Progress callback for real-time updates
self._progress_callback: Optional[Callable] = None
# Environment management
self._temp_dirs: List[Path] = []
self._test_databases: List[str] = []
def set_progress_callback(self, callback: Callable[[str, float, str], None]) -> None:
"""
Set callback for progress updates.
Args:
callback: Function called with (stage, progress_percent, message)
"""
self._progress_callback = callback
async def execute_suite(
self,
suite: TestSuite,
coverage: bool = True,
fail_fast: bool = False,
verbose: bool = False,
parallel: bool = True
) -> ExecutionResult:
"""
Execute a test suite.
Args:
suite: Test suite to execute
coverage: Enable coverage collection
fail_fast: Stop on first failure
verbose: Enable verbose output
parallel: Enable parallel execution within suite
Returns:
ExecutionResult with execution details
"""
start_time = time.time()
try:
self.logger.info(f"Executing {suite.category.value} test suite "
f"({len(suite.tests)} tests)")
# Setup environment
await self._setup_environment(suite)
# Execute tests based on category
if suite.category == TestCategory.FRONTEND:
result = await self._execute_frontend_suite(suite, verbose)
else:
result = await self._execute_python_suite(
suite, coverage, fail_fast, verbose, parallel
)
# Calculate execution time
result.execution_time = time.time() - start_time
self.logger.info(f"Completed {suite.category.value} tests: "
f"{result.passed}/{result.total_tests} passed "
f"({result.execution_time:.2f}s)")
return result
except Exception as e:
self.logger.error(f"Test execution failed: {e}")
return ExecutionResult(
success=False,
total_tests=len(suite.tests),
passed=0,
failed=len(suite.tests),
skipped=0,
errors=1,
execution_time=time.time() - start_time,
error_details=[{
"error": str(e),
"type": "ExecutionError",
"stage": "setup"
}]
)
finally:
# Cleanup environment
await self._cleanup_environment()
async def _setup_environment(self, suite: TestSuite) -> None:
"""Setup test environment based on suite requirements."""
if "database" in suite.setup_requirements:
await self._setup_test_database()
if "network" in suite.setup_requirements:
await self._setup_network_mocks()
if "auth" in suite.setup_requirements:
await self._setup_auth_environment()
# Set up category-specific environment
if suite.category == TestCategory.FRONTEND:
await self._setup_frontend_environment()
async def _setup_test_database(self) -> None:
"""Setup test database."""
self.logger.debug("Setting up test database...")
# Create temporary database
temp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
temp_db.close()
self._test_databases.append(temp_db.name)
# Set environment variable
os.environ["TEST_DATABASE_URL"] = f"sqlite:///{temp_db.name}"
os.environ["DATABASE_URL"] = f"sqlite:///{temp_db.name}"
os.environ["TESTING"] = "true"
self.logger.debug(f"Test database created: {temp_db.name}")
async def _setup_network_mocks(self) -> None:
"""Setup network mocking if needed."""
# This would typically involve setting up mock servers or
# environment variables to disable external calls
os.environ["MOCK_EXTERNAL_APIS"] = "true"
async def _setup_auth_environment(self) -> None:
"""Setup authentication testing environment."""
# Set test API keys and secrets
os.environ["JWT_SECRET_KEY"] = "test_secret_key_for_testing_only"
os.environ["TEST_AUTH_ENABLED"] = "true"
async def _setup_frontend_environment(self) -> None:
"""Setup frontend testing environment."""
# Ensure Node.js dependencies are available
# This is a placeholder - in practice you might check npm install status
pass
async def _execute_python_suite(
self,
suite: TestSuite,
coverage: bool,
fail_fast: bool,
verbose: bool,
parallel: bool
) -> ExecutionResult:
"""Execute Python test suite using pytest."""
# Build pytest command
cmd = ["python", "-m", "pytest"]
# Add test files
test_files = list(set(str(test.file_path) for test in suite.tests))
cmd.extend(test_files)
# Add pytest options
if verbose:
cmd.append("-v")
else:
cmd.append("-q")
if fail_fast:
cmd.append("-x")
# Coverage options
if coverage:
cmd.extend([
"--cov=backend",
"--cov-report=term-missing",
"--cov-report=json:coverage.json"
])
# Parallel execution for pytest
if parallel and len(suite.tests) > 4:
cmd.extend(["-n", "auto"]) # Requires pytest-xdist
# JSON output for result parsing
cmd.extend(["--json-report", "--json-report-file=test_results.json"])
# Async marker support
cmd.extend(["-m", "not slow"]) # Skip slow tests by default unless specifically requested
# Environment variables
env = os.environ.copy()
env.update({
"PYTHONPATH": str(Path.cwd()),
"TESTING": "true"
})
self.logger.debug(f"Executing command: {' '.join(cmd)}")
try:
# Execute pytest
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
cwd=Path.cwd()
)
# Real-time progress monitoring
if self._progress_callback:
asyncio.create_task(self._monitor_pytest_progress(process, len(suite.tests)))
stdout, stderr = await process.communicate()
# Parse results
result = await self._parse_pytest_results(
stdout.decode(),
stderr.decode(),
process.returncode,
len(suite.tests)
)
return result
except Exception as e:
self.logger.error(f"Pytest execution failed: {e}")
return ExecutionResult(
success=False,
total_tests=len(suite.tests),
passed=0,
failed=len(suite.tests),
skipped=0,
errors=1,
execution_time=0.0,
error_details=[{
"error": str(e),
"type": "PytestExecutionError"
}]
)
async def _execute_frontend_suite(
self,
suite: TestSuite,
verbose: bool
) -> ExecutionResult:
"""Execute frontend test suite using Vitest or Jest."""
# Determine if using Vitest or Jest
frontend_path = Path("frontend")
package_json_path = frontend_path / "package.json"
if not package_json_path.exists():
return ExecutionResult(
success=False,
total_tests=len(suite.tests),
passed=0,
failed=len(suite.tests),
skipped=0,
errors=1,
execution_time=0.0,
error_details=[{
"error": "Frontend package.json not found",
"type": "ConfigurationError"
}]
)
# Read package.json to determine test runner
with open(package_json_path) as f:
package_data = json.load(f)
test_command = "test"
runner = "vitest" # Default assumption
if "vitest" in package_data.get("devDependencies", {}):
runner = "vitest"
test_command = "test"
elif "jest" in package_data.get("devDependencies", {}):
runner = "jest"
test_command = "test"
# Build command
cmd = ["npm", "run", test_command]
if verbose:
cmd.append("--")
cmd.append("--reporter=verbose")
# Add JSON output for result parsing
if runner == "vitest":
cmd.extend(["--", "--reporter=json", "--outputFile=test-results.json"])
elif runner == "jest":
cmd.extend(["--", "--json", "--outputFile=test-results.json"])
self.logger.debug(f"Executing frontend command: {' '.join(cmd)}")
try:
# Execute test runner
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=frontend_path
)
stdout, stderr = await process.communicate()
# Parse results
result = await self._parse_frontend_results(
stdout.decode(),
stderr.decode(),
process.returncode,
len(suite.tests),
runner
)
return result
except Exception as e:
self.logger.error(f"Frontend test execution failed: {e}")
return ExecutionResult(
success=False,
total_tests=len(suite.tests),
passed=0,
failed=len(suite.tests),
skipped=0,
errors=1,
execution_time=0.0,
error_details=[{
"error": str(e),
"type": "FrontendTestError"
}]
)
async def _monitor_pytest_progress(
self,
process: asyncio.subprocess.Process,
total_tests: int
) -> None:
"""Monitor pytest progress and call progress callback."""
if not self._progress_callback:
return
# This is a simplified progress monitor
# In practice, you'd parse pytest output for more accurate progress
start_time = time.time()
while process.returncode is None:
elapsed = time.time() - start_time
# Estimate progress based on elapsed time and estimated test duration
estimated_total_time = total_tests * 2.0 # 2 seconds per test estimate
progress = min(elapsed / estimated_total_time * 100, 95) # Cap at 95% until completion
self._progress_callback(
"executing_tests",
progress,
f"Running tests... ({elapsed:.1f}s elapsed)"
)
await asyncio.sleep(1.0)
async def _parse_pytest_results(
self,
stdout: str,
stderr: str,
return_code: int,
expected_tests: int
) -> ExecutionResult:
"""Parse pytest results from output."""
# Try to parse JSON report first
json_report_path = Path("test_results.json")
if json_report_path.exists():
try:
with open(json_report_path) as f:
data = json.load(f)
return ExecutionResult(
success=(return_code == 0),
total_tests=data.get("summary", {}).get("total", expected_tests),
passed=data.get("summary", {}).get("passed", 0),
failed=data.get("summary", {}).get("failed", 0),
skipped=data.get("summary", {}).get("skipped", 0),
errors=data.get("summary", {}).get("error", 0),
execution_time=data.get("duration", 0.0),
output=stdout,
error_details=self._extract_error_details(data)
)
except Exception as e:
self.logger.warning(f"Failed to parse JSON report: {e}")
finally:
# Cleanup
json_report_path.unlink(missing_ok=True)
# Fallback to parsing stdout
return self._parse_pytest_stdout(stdout, stderr, return_code, expected_tests)
def _parse_pytest_stdout(
self,
stdout: str,
stderr: str,
return_code: int,
expected_tests: int
) -> ExecutionResult:
"""Parse pytest results from stdout."""
# Parse the summary line like: "5 passed, 2 failed, 1 skipped in 10.23s"
import re
passed = failed = skipped = errors = 0
execution_time = 0.0
# Look for summary line
summary_pattern = r"(\d+) passed(?:, (\d+) failed)?(?:, (\d+) skipped)?(?:, (\d+) error)?.*in ([\d.]+)s"
summary_match = re.search(summary_pattern, stdout)
if summary_match:
passed = int(summary_match.group(1) or 0)
failed = int(summary_match.group(2) or 0)
skipped = int(summary_match.group(3) or 0)
errors = int(summary_match.group(4) or 0)
execution_time = float(summary_match.group(5) or 0)
else:
# Fallback parsing
if "FAILED" in stdout:
failed = stdout.count("FAILED")
if "PASSED" in stdout:
passed = stdout.count("PASSED")
if "SKIPPED" in stdout:
skipped = stdout.count("SKIPPED")
total_tests = passed + failed + skipped + errors
if total_tests == 0:
total_tests = expected_tests
return ExecutionResult(
success=(return_code == 0 and failed == 0),
total_tests=total_tests,
passed=passed,
failed=failed,
skipped=skipped,
errors=errors,
execution_time=execution_time,
output=stdout,
error_details=self._extract_stdout_errors(stdout)
)
async def _parse_frontend_results(
self,
stdout: str,
stderr: str,
return_code: int,
expected_tests: int,
runner: str
) -> ExecutionResult:
"""Parse frontend test results."""
# Try to parse JSON output
json_file = Path("frontend/test-results.json")
if json_file.exists():
try:
with open(json_file) as f:
data = json.load(f)
if runner == "vitest":
return self._parse_vitest_json(data, return_code)
elif runner == "jest":
return self._parse_jest_json(data, return_code)
except Exception as e:
self.logger.warning(f"Failed to parse frontend JSON results: {e}")
finally:
json_file.unlink(missing_ok=True)
# Fallback to stdout parsing
return self._parse_frontend_stdout(stdout, stderr, return_code, expected_tests)
def _parse_vitest_json(self, data: Dict, return_code: int) -> ExecutionResult:
"""Parse Vitest JSON results."""
total = data.get("numTotalTestSuites", 0)
passed = data.get("numPassedTestSuites", 0)
failed = data.get("numFailedTestSuites", 0)
return ExecutionResult(
success=(return_code == 0),
total_tests=total,
passed=passed,
failed=failed,
skipped=0,
errors=0,
execution_time=data.get("testResults", [{}])[0].get("perfStats", {}).get("runtime", 0) / 1000.0
)
def _parse_jest_json(self, data: Dict, return_code: int) -> ExecutionResult:
"""Parse Jest JSON results."""
return ExecutionResult(
success=data.get("success", False),
total_tests=data.get("numTotalTests", 0),
passed=data.get("numPassedTests", 0),
failed=data.get("numFailedTests", 0),
skipped=data.get("numPendingTests", 0),
errors=0,
execution_time=data.get("testResults", [{}])[0].get("perfStats", {}).get("runtime", 0) / 1000.0
)
def _parse_frontend_stdout(
self,
stdout: str,
stderr: str,
return_code: int,
expected_tests: int
) -> ExecutionResult:
"""Parse frontend test results from stdout."""
# Simple parsing - could be enhanced based on specific output formats
passed = stdout.count("") + stdout.count("PASS")
failed = stdout.count("") + stdout.count("FAIL")
total = max(passed + failed, expected_tests)
return ExecutionResult(
success=(return_code == 0),
total_tests=total,
passed=passed,
failed=failed,
skipped=0,
errors=0,
execution_time=0.0,
output=stdout
)
def _extract_error_details(self, json_data: Dict) -> List[Dict[str, Any]]:
"""Extract detailed error information from JSON data."""
errors = []
tests = json_data.get("tests", [])
for test in tests:
if test.get("outcome") in ["failed", "error"]:
errors.append({
"test_name": test.get("nodeid", "Unknown"),
"error": test.get("call", {}).get("longrepr", "No error details"),
"type": test.get("outcome", "error"),
"file": test.get("file", ""),
"line": test.get("line", 0)
})
return errors[:10] # Limit to first 10 errors
def _extract_stdout_errors(self, stdout: str) -> List[Dict[str, Any]]:
"""Extract error details from stdout."""
errors = []
# Look for FAILED test lines
failed_lines = [line for line in stdout.split('\n') if 'FAILED' in line]
for line in failed_lines[:10]: # Limit to first 10
errors.append({
"test_name": line.strip(),
"error": "See full output for details",
"type": "failed"
})
return errors
async def _cleanup_environment(self) -> None:
"""Clean up test environment."""
# Remove temporary databases
for db_path in self._test_databases:
try:
if os.path.exists(db_path):
os.unlink(db_path)
self.logger.debug(f"Cleaned up test database: {db_path}")
except Exception as e:
self.logger.warning(f"Failed to cleanup database {db_path}: {e}")
self._test_databases.clear()
# Remove temporary directories
for temp_dir in self._temp_dirs:
try:
if temp_dir.exists():
import shutil
shutil.rmtree(temp_dir)
self.logger.debug(f"Cleaned up temp directory: {temp_dir}")
except Exception as e:
self.logger.warning(f"Failed to cleanup directory {temp_dir}: {e}")
self._temp_dirs.clear()
# Clean up environment variables
test_env_vars = [
"TEST_DATABASE_URL",
"DATABASE_URL",
"TESTING",
"MOCK_EXTERNAL_APIS",
"JWT_SECRET_KEY",
"TEST_AUTH_ENABLED"
]
for var in test_env_vars:
if var in os.environ:
del os.environ[var]