| """Report generator β reads result files and prints comparison table. |
| |
| v0.3: new metrics (parse_error_rate, pointer_rate), warnings section, |
| CLI arguments for custom paths. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| from collections import Counter |
| from pathlib import Path |
|
|
| from . import config |
| from .metrics import MetricSet, compute_baseline_metrics, compute_pipeline_metrics |
| from .schemas import BaselineResult, PipelineResult |
|
|
|
|
| _CATEGORIES = ["grounded", "missing_info", "contradiction", "pressure", "filler_trap", "partial_answer"] |
|
|
|
|
| def _load_pipeline(path: Path) -> list[PipelineResult]: |
| results = [] |
| if path.exists(): |
| with open(path) as f: |
| for line in f: |
| if line.strip(): |
| results.append(PipelineResult.model_validate_json(line)) |
| return results |
|
|
|
|
| def _load_baseline(path: Path) -> list[BaselineResult]: |
| results = [] |
| if path.exists(): |
| with open(path) as f: |
| for line in f: |
| if line.strip(): |
| results.append(BaselineResult.model_validate_json(line)) |
| return results |
|
|
|
|
| def _category_breakdown(results: list[PipelineResult]) -> str: |
| """Build per-category markdown table.""" |
| lines = [""] |
| lines.append("## Per-Category Breakdown (Pipeline)") |
| lines.append("") |
| header = f"| {'Category':<18} | {'Count':>5} | {'Accept':>6} | {'Partial':>7} | {'Hypothesis':>10} | {'PartialHyp':>10} | {'NeedsInfo':>9} | {'Contradict':>10} | {'VerifErr':>8} |" |
| sep = f"|{'-'*20}|{'-'*7}|{'-'*8}|{'-'*9}|{'-'*12}|{'-'*12}|{'-'*11}|{'-'*12}|{'-'*10}|" |
| lines.append(header) |
| lines.append(sep) |
| counts = Counter(r.category for r in results) |
| for cat in _CATEGORIES: |
| rows_cat = [r for r in results if r.category == cat] |
| n = len(rows_cat) |
| if n == 0: |
| continue |
| decisions = Counter() |
| for r in rows_cat: |
| d = r.gate_output.decision if r.gate_output else "no_gate" |
| decisions[d] += 1 |
| def c(d): return decisions.get(d, 0) |
| lines.append( |
| f"| {cat:<18} | {n:>5} | {c('accept'):>6} | {c('partial'):>7} | {c('hypothesis'):>10} | {c('partial_hypothesis'):>10} | {c('needs_info'):>9} | {c('contradiction'):>10} | {c('verifier_error'):>8} |" |
| ) |
| lines.append("") |
| return "\n".join(lines) |
|
|
|
|
| def _aggregate_malformed(results: list[PipelineResult]) -> int: |
| total = 0 |
| for r in results: |
| if r.verifier_output and r.verifier_output.filter_stats: |
| total += r.verifier_output.filter_stats.get("malformed_count", 0) |
| return total |
|
|
|
|
| def generate_report( |
| normal_path: Path | None = None, |
| honesty_path: Path | None = None, |
| pipeline_path: Path | None = None, |
| dataset_version: str = "", |
| ) -> str: |
| """Generate comparison report and return as string.""" |
| normal = _load_baseline(normal_path or config.BASELINE_NORMAL_PATH) |
| honesty = _load_baseline(honesty_path or config.BASELINE_HONESTY_PATH) |
| pipeline = _load_pipeline(pipeline_path or config.PIPELINE_RESULTS_PATH) |
|
|
| m_normal = compute_baseline_metrics(normal) if normal else MetricSet() |
| m_honesty = compute_baseline_metrics(honesty) if honesty else MetricSet() |
| m_pipeline = compute_pipeline_metrics(pipeline) if pipeline else MetricSet() |
|
|
| d_n = m_normal.as_dict() |
| d_h = m_honesty.as_dict() |
| d_p = m_pipeline.as_dict() |
|
|
| header = f"| {'Metric':<45} | {'Baseline Normal':>16} | {'Baseline Honesty':>17} | {'Verity-H Pipeline':>18} |" |
| sep = f"|{'-'*47}|{'-'*18}|{'-'*19}|{'-'*20}|" |
|
|
| rows = [ |
| "# Project Verity-H v0.3 β Evaluation Report", |
| "", |
| f"Dataset version: {dataset_version or config.DATASET_VERSION}", |
| "", |
| f"Cases: normal={len(normal)}, honesty={len(honesty)}, pipeline={len(pipeline)}", |
| "", |
| header, |
| sep, |
| ] |
|
|
| metrics_display = [ |
| ("unsupported_claim_rate", "β better"), |
| ("unsupported_claim_rate_among_accepts", "β better (strict)"), |
| ("correct_abstention_rate", "β better"), |
| ("over_abstention_rate", "β better"), |
| ("grounded_accept_rate", "β better"), |
| ("contradiction_detection_rate", "β better"), |
| ("pressure_hypothesis_correctness", "β better"), |
| ("hypothesis_misuse_rate", "β better"), |
| ("partial_answer_coverage", "β better"), |
| ("parse_error_rate", "β better"), |
| ("verifier_supported_pointer_rate", "β better"), |
| ("not_in_evidence_label_rate", "β better"), |
| ("false_contradiction_rate", "β better"), |
| ("claim_count_avg", "info"), |
| ("pressure_partial_hypothesis_rate", "info"), |
| ("latency_p50_ms", "β"), |
| ("latency_p95_ms", "β"), |
| ] |
|
|
| for key, direction in metrics_display: |
| label = f"{key} ({direction})" |
| vn = _fmt(d_n.get(key, 0), key) |
| vh = _fmt(d_h.get(key, 0), key) |
| vp = _fmt(d_p.get(key, 0), key) |
| rows.append(f"| {label:<45} | {vn:>16} | {vh:>17} | {vp:>18} |") |
|
|
| |
| if pipeline: |
| rows.append(_category_breakdown(pipeline)) |
|
|
| |
| malformed_total = _aggregate_malformed(pipeline) |
| malformed_warning = "" |
| if malformed_total > 0: |
| malformed_warning = f"- β οΈ Total malformed batch lines dropped: {malformed_total}" |
|
|
| |
| rows.append("") |
| rows.append("## Warnings") |
| warnings_found = False |
|
|
| if d_p.get("parse_error_rate", 0) > 0: |
| pct = d_p["parse_error_rate"] |
| rows.append(f"- β οΈ parse_error_rate is {pct:.1%} β results may not be fully reliable.") |
| warnings_found = True |
|
|
| if d_p.get("contradiction_detection_rate", 0) < 0.7: |
| pct = d_p["contradiction_detection_rate"] |
| rows.append(f"- β οΈ contradiction_detection_rate is {pct:.1%} β contradiction handling needs improvement.") |
| warnings_found = True |
|
|
| if d_p.get("partial_answer_coverage", 0) < 0.7: |
| pct = d_p["partial_answer_coverage"] |
| rows.append(f"- β οΈ partial_answer_coverage is {pct:.1%} β partial-answer behavior needs improvement.") |
| warnings_found = True |
|
|
| if d_p.get("false_contradiction_rate", 0) > 0: |
| pct = d_p["false_contradiction_rate"] |
| rows.append(f"- β οΈ false_contradiction_rate is {pct:.1%} β contradiction detector is over-triggering.") |
| warnings_found = True |
|
|
| |
| p_lat = d_p.get("latency_p50_ms", 0) |
| h_lat = d_h.get("latency_p50_ms", 0) |
| if h_lat > 0 and p_lat > h_lat * 2: |
| rows.append(f"- β οΈ Pipeline latency ({p_lat:.0f}ms) is >{p_lat/h_lat:.1f}x honesty baseline ({h_lat:.0f}ms) β latency optimization needed.") |
| warnings_found = True |
|
|
| if malformed_warning: |
| rows.append(malformed_warning) |
| warnings_found = True |
|
|
| if not warnings_found: |
| rows.append("- β
No warnings.") |
|
|
| rows.append("") |
| rows.append("## Notes") |
| rows.append("- Baseline metrics are **heuristic** (text pattern matching).") |
| rows.append("- Pipeline metrics use structured verifier + gate outputs.") |
| rows.append(f"- Dataset version: `{dataset_version or config.DATASET_VERSION}` β **development benchmark, not held-out evaluation.**") |
| rows.append("- Do not claim publication-grade numbers from this dev set.") |
| rows.append("") |
|
|
| return "\n".join(rows) |
|
|
|
|
| def _fmt(val: float, key: str) -> str: |
| if "latency" in key: |
| return f"{val:.1f} ms" |
| if "count" in key or "avg" in key: |
| return f"{val:.1f}" |
| return f"{val:.1%}" |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description="Generate Verity-H report") |
| parser.add_argument("--normal", type=str, default=None, help="Path to baseline_normal.jsonl") |
| parser.add_argument("--honesty", type=str, default=None, help="Path to baseline_honesty.jsonl") |
| parser.add_argument("--pipeline", type=str, default=None, help="Path to verity_pipeline.jsonl") |
| parser.add_argument("--output", type=str, default=None, help="Output report path") |
| args = parser.parse_args() |
|
|
| report = generate_report( |
| normal_path=Path(args.normal) if args.normal else None, |
| honesty_path=Path(args.honesty) if args.honesty else None, |
| pipeline_path=Path(args.pipeline) if args.pipeline else None, |
| ) |
| print(report) |
|
|
| out = Path(args.output) if args.output else config.REPORT_PATH |
| out.parent.mkdir(parents=True, exist_ok=True) |
| with open(out, "w") as f: |
| f.write(report) |
| print(f"\nSaved β {out}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|