youtube-summarizer/backend/test_runner/core/reporting.py

536 lines
21 KiB
Python

"""
Test Reporting System
Comprehensive test result reporting with support for multiple output formats,
detailed analysis, trend tracking, and CI/CD integration.
"""
import json
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any
import logging
from dataclasses import asdict
from enum import Enum
from .models import TestRunResult, ExecutionResult, TestCategory, ReportFormat
# ReportFormat is now imported from models module
class TestReporter:
"""
Comprehensive test result reporter.
Generates detailed reports in multiple formats with historical tracking,
performance analysis, and CI/CD integration support.
"""
def __init__(self, config):
"""
Initialize test reporter.
Args:
config: Test configuration object
"""
self.config = config
self.logger = logging.getLogger("TestReporter")
# Report output directories
self.reports_dir = Path("test_reports")
self.reports_dir.mkdir(exist_ok=True)
# Historical data
self.history_file = self.reports_dir / "test_history.json"
async def generate_report(
self,
result: TestRunResult,
format_type: ReportFormat,
output_path: Optional[Path] = None
) -> Path:
"""
Generate a test report in the specified format.
Args:
result: Test run results to report
format_type: Format for the report
output_path: Optional custom output path
Returns:
Path to the generated report file
"""
if format_type == ReportFormat.CONSOLE:
return await self._generate_console_report(result)
elif format_type == ReportFormat.HTML:
return await self._generate_html_report(result, output_path)
elif format_type == ReportFormat.JSON:
return await self._generate_json_report(result, output_path)
elif format_type == ReportFormat.JUNIT:
return await self._generate_junit_report(result, output_path)
elif format_type == ReportFormat.MARKDOWN:
return await self._generate_markdown_report(result, output_path)
elif format_type == ReportFormat.CSV:
return await self._generate_csv_report(result, output_path)
else:
raise ValueError(f"Unsupported report format: {format_type}")
async def _generate_console_report(self, result: TestRunResult) -> Path:
"""Generate console output report."""
# This is handled directly by the TestRunner's _log_summary method
# Return a placeholder path
return Path("/dev/stdout")
async def _generate_html_report(
self,
result: TestRunResult,
output_path: Optional[Path] = None
) -> Path:
"""Generate comprehensive HTML report."""
output_path = output_path or (self.reports_dir / f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html")
# Generate HTML content
html_content = self._create_html_report(result)
# Write to file
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
self.logger.info(f"HTML report generated: {output_path}")
return output_path
async def _generate_json_report(
self,
result: TestRunResult,
output_path: Optional[Path] = None
) -> Path:
"""Generate JSON report."""
output_path = output_path or (self.reports_dir / f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
# Create comprehensive JSON report
report_data = {
"timestamp": datetime.now().isoformat(),
"summary": {
"success": result.success,
"total_tests": result.total_tests,
"passed_tests": result.passed_tests,
"failed_tests": result.failed_tests,
"skipped_tests": result.skipped_tests,
"error_tests": result.error_tests,
"execution_time": result.execution_time,
"coverage_percentage": result.coverage_percentage,
"pass_rate": (result.passed_tests / max(result.total_tests, 1)) * 100
},
"results_by_category": {
category.value: {
"total_tests": exec_result.total_tests,
"passed": exec_result.passed,
"failed": exec_result.failed,
"skipped": exec_result.skipped,
"errors": exec_result.errors,
"execution_time": exec_result.execution_time,
"pass_rate": exec_result.pass_rate
}
for category, exec_result in result.results_by_category.items()
},
"failure_details": result.failure_details,
"performance_metrics": result.performance_metrics,
"environment": {
"python_version": self._get_python_version(),
"platform": self._get_platform_info()
}
}
# Write JSON report
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report_data, f, indent=2, default=str)
# Update historical data
await self._update_history(report_data)
self.logger.info(f"JSON report generated: {output_path}")
return output_path
async def _generate_junit_report(
self,
result: TestRunResult,
output_path: Optional[Path] = None
) -> Path:
"""Generate JUnit XML report for CI/CD integration."""
output_path = output_path or (self.reports_dir / f"junit_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xml")
# Create JUnit XML structure
testsuites = ET.Element("testsuites")
testsuites.set("name", "YouTube Summarizer Tests")
testsuites.set("tests", str(result.total_tests))
testsuites.set("failures", str(result.failed_tests))
testsuites.set("errors", str(result.error_tests))
testsuites.set("skipped", str(result.skipped_tests))
testsuites.set("time", f"{result.execution_time:.3f}")
# Create test suites for each category
for category, exec_result in result.results_by_category.items():
testsuite = ET.SubElement(testsuites, "testsuite")
testsuite.set("name", f"{category.value}_tests")
testsuite.set("tests", str(exec_result.total_tests))
testsuite.set("failures", str(exec_result.failed))
testsuite.set("errors", str(exec_result.errors))
testsuite.set("skipped", str(exec_result.skipped))
testsuite.set("time", f"{exec_result.execution_time:.3f}")
# Add individual test cases (simplified)
for i in range(exec_result.total_tests):
testcase = ET.SubElement(testsuite, "testcase")
testcase.set("name", f"test_{i+1}")
testcase.set("classname", f"{category.value}.TestClass")
testcase.set("time", "1.0")
# Add failures if any (simplified)
if i < exec_result.failed and exec_result.error_details:
failure = ET.SubElement(testcase, "failure")
failure.set("type", "TestFailure")
if i < len(exec_result.error_details):
failure.text = exec_result.error_details[i].get("error", "Test failed")
# Write XML
tree = ET.ElementTree(testsuites)
tree.write(output_path, encoding='utf-8', xml_declaration=True)
self.logger.info(f"JUnit report generated: {output_path}")
return output_path
async def _generate_markdown_report(
self,
result: TestRunResult,
output_path: Optional[Path] = None
) -> Path:
"""Generate Markdown report."""
output_path = output_path or (self.reports_dir / f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md")
# Generate markdown content
md_content = self._create_markdown_report(result)
# Write to file
with open(output_path, 'w', encoding='utf-8') as f:
f.write(md_content)
self.logger.info(f"Markdown report generated: {output_path}")
return output_path
async def _generate_csv_report(
self,
result: TestRunResult,
output_path: Optional[Path] = None
) -> Path:
"""Generate CSV report for data analysis."""
output_path = output_path or (self.reports_dir / f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
# Create CSV content
import csv
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# Header
writer.writerow([
"Category", "Total", "Passed", "Failed", "Skipped", "Errors",
"Execution Time", "Pass Rate", "Timestamp"
])
# Data rows
timestamp = datetime.now().isoformat()
# Overall summary
writer.writerow([
"OVERALL",
result.total_tests,
result.passed_tests,
result.failed_tests,
result.skipped_tests,
result.error_tests,
f"{result.execution_time:.2f}",
f"{(result.passed_tests / max(result.total_tests, 1)) * 100:.1f}%",
timestamp
])
# Category details
for category, exec_result in result.results_by_category.items():
writer.writerow([
category.value,
exec_result.total_tests,
exec_result.passed,
exec_result.failed,
exec_result.skipped,
exec_result.errors,
f"{exec_result.execution_time:.2f}",
f"{exec_result.pass_rate:.1f}%",
timestamp
])
self.logger.info(f"CSV report generated: {output_path}")
return output_path
def _create_html_report(self, result: TestRunResult) -> str:
"""Create comprehensive HTML report."""
status_class = "success" if result.success else "failure"
status_text = "PASSED" if result.success else "FAILED"
# Generate category breakdown
category_rows = ""
for category, exec_result in result.results_by_category.items():
row_class = "success" if exec_result.success else "failure"
category_rows += f"""
<tr class="{row_class}">
<td>{category.value.title()}</td>
<td>{exec_result.total_tests}</td>
<td>{exec_result.passed}</td>
<td>{exec_result.failed}</td>
<td>{exec_result.skipped}</td>
<td>{exec_result.errors}</td>
<td>{exec_result.execution_time:.2f}s</td>
<td>{exec_result.pass_rate:.1f}%</td>
</tr>
"""
# Generate failure details
failure_details = ""
if result.failure_details:
failure_details = "<h3>Failure Details</h3><ul>"
for failure in result.failure_details[:20]: # Limit to first 20
test_name = failure.get("test_name", "Unknown")
error = failure.get("error", "No details")[:200] # Truncate long errors
failure_details += f"<li><strong>{test_name}</strong>: {error}</li>"
failure_details += "</ul>"
# Performance metrics
perf_metrics = ""
if result.performance_metrics:
perf_metrics = "<h3>Performance Metrics</h3><ul>"
for metric, value in result.performance_metrics.items():
if isinstance(value, float):
perf_metrics += f"<li><strong>{metric.replace('_', ' ').title()}</strong>: {value:.2f}s</li>"
else:
perf_metrics += f"<li><strong>{metric.replace('_', ' ').title()}</strong>: {value}</li>"
perf_metrics += "</ul>"
html_template = f"""
<!DOCTYPE html>
<html>
<head>
<title>YouTube Summarizer Test Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; }}
.header {{ background: #f5f5f5; padding: 20px; border-radius: 5px; }}
.status.success {{ color: green; font-weight: bold; }}
.status.failure {{ color: red; font-weight: bold; }}
table {{ width: 100%; border-collapse: collapse; margin: 20px 0; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
tr.success {{ background-color: #d4edda; }}
tr.failure {{ background-color: #f8d7da; }}
.summary {{ display: flex; justify-content: space-between; margin: 20px 0; }}
.summary-item {{ text-align: center; padding: 10px; border: 1px solid #ddd; border-radius: 5px; }}
.coverage {{ font-size: 18px; font-weight: bold; color: #007bff; }}
ul {{ max-height: 300px; overflow-y: auto; }}
</style>
</head>
<body>
<div class="header">
<h1>YouTube Summarizer Test Report</h1>
<p><strong>Status:</strong> <span class="status {status_class}">{status_text}</span></p>
<p><strong>Generated:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<p><strong>Total Execution Time:</strong> {result.execution_time:.2f} seconds</p>
{f'<p><strong>Coverage:</strong> <span class="coverage">{result.coverage_percentage:.1f}%</span></p>' if result.coverage_percentage else ''}
</div>
<div class="summary">
<div class="summary-item">
<h3>{result.total_tests}</h3>
<p>Total Tests</p>
</div>
<div class="summary-item">
<h3 style="color: green;">{result.passed_tests}</h3>
<p>Passed</p>
</div>
<div class="summary-item">
<h3 style="color: red;">{result.failed_tests}</h3>
<p>Failed</p>
</div>
<div class="summary-item">
<h3 style="color: orange;">{result.skipped_tests}</h3>
<p>Skipped</p>
</div>
<div class="summary-item">
<h3 style="color: purple;">{result.error_tests}</h3>
<p>Errors</p>
</div>
</div>
<h2>Results by Category</h2>
<table>
<tr>
<th>Category</th>
<th>Total</th>
<th>Passed</th>
<th>Failed</th>
<th>Skipped</th>
<th>Errors</th>
<th>Time</th>
<th>Pass Rate</th>
</tr>
{category_rows}
</table>
{failure_details}
{perf_metrics}
<hr>
<footer>
<p><em>Generated by YouTube Summarizer Test Runner v1.0.0</em></p>
</footer>
</body>
</html>
"""
return html_template
def _create_markdown_report(self, result: TestRunResult) -> str:
"""Create Markdown report."""
status_emoji = "" if result.success else ""
status_text = "PASSED" if result.success else "FAILED"
# Category breakdown
category_table = "| Category | Total | Passed | Failed | Skipped | Errors | Time | Pass Rate |\n"
category_table += "|----------|-------|--------|--------|---------|--------|------|----------|\n"
for category, exec_result in result.results_by_category.items():
category_table += f"| {category.value.title()} | {exec_result.total_tests} | {exec_result.passed} | {exec_result.failed} | {exec_result.skipped} | {exec_result.errors} | {exec_result.execution_time:.2f}s | {exec_result.pass_rate:.1f}% |\n"
# Failure details
failure_section = ""
if result.failure_details:
failure_section = "\n## Failure Details\n\n"
for i, failure in enumerate(result.failure_details[:10], 1):
test_name = failure.get("test_name", "Unknown")
error = failure.get("error", "No details")[:200]
failure_section += f"{i}. **{test_name}**: {error}\n"
coverage_section = ""
if result.coverage_percentage:
coverage_section = f"\n**Coverage**: {result.coverage_percentage:.1f}%"
markdown_content = f"""# YouTube Summarizer Test Report
{status_emoji} **Status**: {status_text}
**Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**Total Execution Time**: {result.execution_time:.2f} seconds{coverage_section}
## Summary
- **Total Tests**: {result.total_tests}
- **Passed**: {result.passed_tests}
- **Failed**: {result.failed_tests}
- **Skipped**: {result.skipped_tests} ⏭️
- **Errors**: {result.error_tests} 💥
## Results by Category
{category_table}
{failure_section}
---
*Generated by YouTube Summarizer Test Runner v1.0.0*
"""
return markdown_content
async def _update_history(self, report_data: Dict[str, Any]) -> None:
"""Update historical test data."""
history = []
# Load existing history
if self.history_file.exists():
try:
with open(self.history_file) as f:
history = json.load(f)
except Exception as e:
self.logger.warning(f"Failed to load test history: {e}")
# Add current run
history_entry = {
"timestamp": report_data["timestamp"],
"summary": report_data["summary"],
"categories": list(report_data["results_by_category"].keys()),
"performance": report_data["performance_metrics"]
}
history.append(history_entry)
# Keep only last 100 runs
if len(history) > 100:
history = history[-100:]
# Save updated history
try:
with open(self.history_file, 'w') as f:
json.dump(history, f, indent=2, default=str)
except Exception as e:
self.logger.warning(f"Failed to save test history: {e}")
def get_test_trends(self, days: int = 30) -> Dict[str, Any]:
"""Get test trends over the specified period."""
if not self.history_file.exists():
return {"error": "No historical data available"}
try:
with open(self.history_file) as f:
history = json.load(f)
except Exception as e:
return {"error": f"Failed to load history: {e}"}
# Filter by date range
cutoff_date = datetime.now() - timedelta(days=days)
filtered_history = [
entry for entry in history
if datetime.fromisoformat(entry["timestamp"]) >= cutoff_date
]
if not filtered_history:
return {"error": f"No data found for the last {days} days"}
# Calculate trends
trends = {
"period_days": days,
"total_runs": len(filtered_history),
"success_rate": sum(1 for entry in filtered_history if entry["summary"]["success"]) / len(filtered_history) * 100,
"average_execution_time": sum(entry["summary"]["execution_time"] for entry in filtered_history) / len(filtered_history),
"average_pass_rate": sum(entry["summary"]["pass_rate"] for entry in filtered_history) / len(filtered_history),
"most_recent": filtered_history[-1] if filtered_history else None,
"oldest": filtered_history[0] if filtered_history else None
}
return trends
def _get_python_version(self) -> str:
"""Get Python version info."""
import sys
return f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
def _get_platform_info(self) -> str:
"""Get platform information."""
import platform
return f"{platform.system()} {platform.release()} ({platform.machine()})"