194 lines
6.4 KiB
Python
194 lines
6.4 KiB
Python
"""
|
|
benchmark_v4/scoring.py
|
|
=======================
|
|
Combines validator, judge, and embedding into final scores.
|
|
Computes: format_score, semantic_score, combined_score.
|
|
Computes: category scores, weighted total, compliance, variance.
|
|
"""
|
|
|
|
import re
|
|
import statistics
|
|
from config import TEST_WEIGHTS, CATEGORIES, COMPLIANCE_GROUPS
|
|
from validators import normalize_text, run_validator
|
|
from judge import call_judge, embedding_score
|
|
from prompts import GROUND_TRUTHS
|
|
|
|
|
|
# ============================================
|
|
# FORMAT SCORE
|
|
# ============================================
|
|
|
|
def compute_format_score(output, prompt):
|
|
"""
|
|
Scores format obedience only — separate from semantic quality.
|
|
Checks: ANSI codes, word limit, markdown when not requested.
|
|
Returns 0-10.
|
|
"""
|
|
text = normalize_text(output)
|
|
score = 10
|
|
|
|
# ANSI escape codes in output (model is polluting its output)
|
|
if re.search(r'\x1b\[', output):
|
|
score -= 2
|
|
|
|
# Word limit
|
|
limit_m = re.search(r'Maximum (\d+) words?', prompt, re.IGNORECASE)
|
|
if limit_m:
|
|
limit = int(limit_m.group(1))
|
|
words = len(text.split())
|
|
if words > limit * 1.3:
|
|
score -= min(3, int((words - limit) / limit * 5))
|
|
|
|
# Markdown when prompt says "No markdown" or "No explanation"
|
|
if ("no markdown" in prompt.lower() or "no explanation" in prompt.lower()):
|
|
if "```" in text and len(text.split("```")) > 2:
|
|
score -= 2
|
|
|
|
return max(0, score)
|
|
|
|
|
|
# ============================================
|
|
# COMBINED SCORE
|
|
# ============================================
|
|
|
|
def score_test(test_name, prompt, raw_output):
|
|
"""
|
|
Main scoring pipeline:
|
|
1. Run deterministic validator
|
|
2. If partial, blend with judge
|
|
3. For RAG, blend judge with embedding similarity
|
|
4. Compute format score separately
|
|
5. Combined = semantic * 0.8 + format * 0.2
|
|
|
|
Returns dict with all score components.
|
|
"""
|
|
# Normalize for quality assessment
|
|
clean = normalize_text(raw_output)
|
|
|
|
# Format score (always computed, separate dimension)
|
|
fmt_score = compute_format_score(raw_output, prompt)
|
|
|
|
# Validator
|
|
val_score, skip_judge, val_notes = run_validator(test_name, clean)
|
|
|
|
if val_score is not None and skip_judge:
|
|
# Definitive — 0 or 10
|
|
semantic = val_score
|
|
used_judge = False
|
|
notes = val_notes
|
|
|
|
elif val_score is not None:
|
|
# High-confidence tests: trust validator when score >= 8, skip judge
|
|
high_confidence = {"compression", "artifact_mermaid", "tool_calling",
|
|
"yaml_generation", "multi_step_agent"}
|
|
if test_name in high_confidence and val_score >= 8:
|
|
semantic = val_score
|
|
used_judge = False
|
|
notes = val_notes
|
|
else:
|
|
# Partial validator score — blend with judge (80/20)
|
|
j_score, j_reason = call_judge(test_name, prompt, clean)
|
|
semantic = round(val_score * 0.8 + j_score * 0.2)
|
|
used_judge = True
|
|
notes = f"val={val_score} j={j_score} → {j_reason[:55]}"
|
|
|
|
elif test_name == "rag":
|
|
ref = GROUND_TRUTHS.get("rag", "")
|
|
e_sim = embedding_score(clean, ref)
|
|
j_score, j_reason = call_judge(test_name, prompt, clean)
|
|
# Weight judge more — embedding unreliable for technical content
|
|
if e_sim == 0:
|
|
semantic = j_score # embedding failed, use judge only
|
|
else:
|
|
semantic = round(e_sim * 0.3 + j_score * 0.7)
|
|
used_judge = True
|
|
notes = f"embed={e_sim} j={j_score} → {j_reason[:50]}"
|
|
|
|
else:
|
|
# Pure judge
|
|
j_score, j_reason = call_judge(test_name, prompt, clean)
|
|
semantic = j_score
|
|
used_judge = True
|
|
notes = j_reason[:80]
|
|
|
|
# Combined: 80% semantic, 20% format — mathematically correct
|
|
combined = round(semantic * 0.8 + fmt_score * 0.2, 2)
|
|
|
|
return {
|
|
"semantic_score": int(semantic),
|
|
"format_score": fmt_score,
|
|
"combined_score": combined,
|
|
"used_judge": used_judge,
|
|
"notes": notes,
|
|
}
|
|
|
|
|
|
# ============================================
|
|
# WEIGHTED + CATEGORY SCORES
|
|
# ============================================
|
|
|
|
def compute_weighted(semantic_scores):
|
|
"""
|
|
Compute weighted total and average from semantic scores.
|
|
Returns (weighted_total, weighted_avg).
|
|
"""
|
|
total = weight_sum = 0.0
|
|
for test, score in semantic_scores.items():
|
|
w = TEST_WEIGHTS.get(test, 0)
|
|
total += (score / 10) * w * 7
|
|
weight_sum += w
|
|
if weight_sum == 0:
|
|
return 0, 0
|
|
return round(total, 2), round(total / weight_sum, 2)
|
|
|
|
|
|
def compute_category_scores(semantic_scores):
|
|
"""
|
|
Compute average semantic score per category.
|
|
Returns dict: {category_name: avg_score}.
|
|
"""
|
|
cat_scores = {}
|
|
for cat, tests in CATEGORIES.items():
|
|
scores = [semantic_scores[t] for t in tests if t in semantic_scores]
|
|
cat_scores[cat] = round(sum(scores) / len(scores), 2) if scores else 0
|
|
return cat_scores
|
|
|
|
|
|
def compute_compliance(semantic_scores_by_run):
|
|
"""
|
|
Compliance = % of runs where semantic_score >= 8.
|
|
Input: {test_name: [score_run1, score_run2, ...]}
|
|
Returns: {group_name: percentage}
|
|
"""
|
|
compliance = {}
|
|
for group, tests in COMPLIANCE_GROUPS.items():
|
|
all_scores = []
|
|
for t in tests:
|
|
if t in semantic_scores_by_run:
|
|
all_scores.extend(semantic_scores_by_run[t])
|
|
if all_scores:
|
|
rate = sum(1 for s in all_scores if s >= 8) / len(all_scores)
|
|
compliance[group] = round(rate * 100, 1)
|
|
else:
|
|
compliance[group] = None
|
|
return compliance
|
|
|
|
|
|
def compute_variance_stats(scores_by_test):
|
|
"""
|
|
Compute variance statistics across multiple runs.
|
|
Input: {test_name: [score_run1, score_run2, ...]}
|
|
Returns: {mean, stdev, min, max, failure_rate_pct}
|
|
"""
|
|
all_scores = [s for scores in scores_by_test.values() for s in scores]
|
|
if not all_scores:
|
|
return {"mean": 0, "stdev": 0, "min": 0, "max": 0, "failure_rate": 0}
|
|
|
|
return {
|
|
"mean": round(statistics.mean(all_scores), 2),
|
|
"stdev": round(statistics.stdev(all_scores), 2) if len(all_scores) > 1 else 0,
|
|
"min": min(all_scores),
|
|
"max": max(all_scores),
|
|
"failure_rate": round(sum(1 for s in all_scores if s <= 2) / len(all_scores) * 100, 1),
|
|
}
|