Files
llm-benchmark/validators.py

467 lines
15 KiB
Python

"""
benchmark_v4/validators.py
==========================
Layer 1: Deterministic validators.
No LLM judge needed. Returns (score 0-10, notes str).
A score of 0 or 10 is definitive — judge is skipped.
Partial scores (1-9) trigger judge blending.
"""
import re
import json
try:
import yaml
YAML_AVAILABLE = True
except ImportError:
YAML_AVAILABLE = False
try:
from rapidfuzz import fuzz
FUZZY_AVAILABLE = True
except ImportError:
FUZZY_AVAILABLE = False
# ============================================
# TEXT NORMALIZATION
# ============================================
def normalize_text(text, mode="plain"):
"""
Centralized text cleaning.
mode="plain" — strip ANSI, control chars, ollama stats, thinking tokens
mode="json" — plain + strip markdown fences
mode="yaml" — plain + strip markdown fences
"""
# 1. Strip ANSI escape sequences FIRST
text = re.sub(r'\x1b(?:[@-Z\\-_]|\[[0-9;?]*[A-Za-z])', '', text)
# 2. Strip control characters
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text)
# 3. Strip Ollama spinner/progress characters
text = re.sub(r'[⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏]+', '', text)
# 4. Normalize Unicode spaces to regular spaces
text = text.replace('\u202f', ' ').replace('\u00a0', ' ').replace('\u2009', ' ')
# 5. Strip thinking tokens (AFTER cleaning so regex works cleanly)
text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
text = re.sub(r'Thinking\.\.\..*?\.\.\.done thinking\.?', '', text, flags=re.DOTALL)
# 6. Strip Ollama verbose stats (LAST — after all other cleanup)
# 6. Strip Ollama verbose stats (LAST — after all other cleanup)
lines = text.split("\n")
text = "\n".join(
l for l in lines if not any(k in l.lower() for k in [
"total duration:", "load duration:", "prompt eval",
"eval count:", "eval duration:", "eval rate:", "tokens/s", "token(s)"
])
)
if mode in ("json", "yaml"):
text = re.sub(r'^```(?:json|yaml|)?\s*', '', text, flags=re.MULTILINE)
text = re.sub(r'```\s*$', '', text, flags=re.MULTILINE)
lines = [l for l in text.split('\n')
if not l.strip().startswith('[?')
and not l.strip().startswith('```')
and not re.search(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', l)
and '\x1b' not in l]
text = '\n'.join(lines).strip()
return text
# ============================================
# JSON EXTRACTION
# ============================================
def extract_json_object(text):
"""
Advanced JSON extractor that handles prompt-echoing,
large whitespace blocks, and multiple JSON objects.
"""
# 1. Aggressive normalization to strip fences and leading/trailing junk
text = normalize_text(text, mode="json")
# 2. Collapse newlines inside JSON strings — fixes mid-value line breaks
text = re.sub(r'\n\s*', ' ', text)
# 3. Skip the prompt-echo/template if the model repeats it.
keyword = '"recommendations"'
last_keyword_pos = text.rfind(keyword)
search_start = 0
if last_keyword_pos != -1:
search_start = text.rfind('{', 0, last_keyword_pos)
if search_start == -1: search_start = 0
decoder = json.JSONDecoder()
found_objs = []
# 4. Iteratively parse all valid JSON objects starting from search_start
idx = search_start
while idx < len(text):
start = text.find('{', idx)
if start == -1:
break
try:
obj, end = decoder.raw_decode(text, start)
if isinstance(obj, dict):
found_objs.append(obj)
idx = end
except json.JSONDecodeError:
idx = start + 1
if not found_objs:
return None
# 5. Filter for populated answer rather than empty template
for o in reversed(found_objs):
if "recommendations" in o:
recs = o.get("recommendations")
if isinstance(recs, list) and len(recs) > 0:
if any(r.get("gpu") for r in recs if isinstance(r, dict)):
return o
return found_objs[-1] if found_objs else None
# ============================================
# VALIDATORS
# ============================================
def validate_tool_calling(text):
"""Single tool call, no extras."""
text = normalize_text(text)
lines = [l for l in text.split('\n') if l.strip()]
if len(lines) > 3:
return 0, "multiple lines — explanation added"
# Valid tool call pattern
if re.search(r'(web_search|scrape_page|calculate)\s*\(["\'].*["\']\)', text):
return 10, "valid tool call syntax"
if re.search(r'\w+\s*\(["\'].*["\']\)', text):
return 5, "function call but wrong name"
return 0, "no valid function call found"
def validate_yaml(text):
"""Must parse as valid YAML Deployment."""
if not YAML_AVAILABLE:
return 5, "pyyaml not installed"
text = normalize_text(text, mode="yaml")
lines = [l for l in text.split('\n')
if not l.strip().startswith('[?')
and not l.strip().startswith('```')
and not re.search(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', l)
and '\x1b' not in l]
text = '\n'.join(lines).strip()
try:
parsed = yaml.safe_load(text)
if not isinstance(parsed, dict):
return 3, "parsed but not a dict"
score = 2
if parsed.get('kind') == 'Deployment':
score += 2
if 'spec' in parsed:
score += 2
spec = parsed['spec']
if spec.get('replicas') == 2:
score += 1
if 'apiVersion' in parsed:
score += 1
if score >= 8:
score = 10
return min(score, 10), f"valid YAML score={score}"
except yaml.YAMLError as e:
return 0, f"invalid YAML: {str(e)[:60]}"
def validate_json_output(text):
"""
Nested structured JSON with recommendations array.
Expected: {"recommendations": [{"gpu":"","price_eur":0,"vram_gb":0,"pros":[],"cons":[]}]}
Scores based on: valid JSON, correct structure, field types, 2 recommendations.
"""
parsed = extract_json_object(text)
if parsed is None:
return 0, "no valid JSON object found"
# Check top-level structure
if "recommendations" not in parsed:
# Fallback: old flat format still gets partial credit
old_fields = ["gpu", "price", "reason"]
present = [k for k in old_fields if k in parsed and str(parsed[k]).strip()]
if present:
return 4, f"flat JSON found (old format), missing nested structure"
return 0, "no recommendations array found"
recs = parsed["recommendations"]
if not isinstance(recs, list) or len(recs) == 0:
return 2, "recommendations present but empty or not a list"
required_fields = {"gpu", "price_eur", "vram_gb", "pros", "cons"}
score = 4 # base for having recommendations array
# Check count
if len(recs) >= 2:
score += 2
elif len(recs) == 1:
score += 1
# Check field completeness on first recommendation
first = recs[0]
present = required_fields & set(first.keys())
score += int((len(present) / len(required_fields)) * 3)
# Check type correctness
type_ok = (
isinstance(first.get("price_eur"), (int, float)) and
isinstance(first.get("vram_gb"), (int, float)) and
isinstance(first.get("pros"), list) and
isinstance(first.get("cons"), list)
)
if type_ok:
score += 1
score = min(score, 10)
return score, f"nested JSON: {len(recs)} recs, fields={list(present)}, types_ok={type_ok}"
def validate_json_schema(text):
"""Valid JSON Schema with all required properties."""
parsed = extract_json_object(text)
if parsed is None:
return 0, "no valid JSON Schema found"
props = parsed.get('properties', {})
score = 0
if 'apiVersion' in props:
score += 2
if 'kind' in props:
k = props['kind']
has_enum = 'enum' in k and set(k['enum']) >= {'Deployment', 'Service', 'ConfigMap'}
score += 3 if has_enum else 1
if 'metadata' in props:
score += 2
if 'spec' in props:
score += 2
if parsed.get('required'):
score += 1
return min(score, 10), f"JSON Schema score={score}/10"
def validate_mermaid(text):
"""Valid Mermaid block with all 8 stages."""
text = normalize_text(text)
stages = [
"code push", "lint", "unit test", "build",
"integration test", "deploy staging", "smoke test", "deploy production"
]
text_lower = text.lower()
has_fence = '```mermaid' in text_lower or \
('```' in text and ('graph' in text_lower or 'flowchart' in text_lower))
if not has_fence:
return 2, "no mermaid fence found"
found = sum(1 for s in stages if s in text_lower)
score = int((found / len(stages)) * 10)
return score, f"{found}/{len(stages)} stages found"
def validate_compression(text):
"""Exactly 10 bullets, all 7 industries."""
text = normalize_text(text)
lines = text.strip().split('\n')
bullets = [l for l in lines if l.strip().startswith('- ')]
count = len(bullets)
industries = ["healthcare", "financ", "transport", "manufactur",
"education", "energy", "agricultur"]
text_lower = text.lower()
industry_count = sum(1 for i in industries if i in text_lower)
if count == 10:
score = 5 + industry_count
elif 8 <= count <= 12:
score = 3 + industry_count
elif count == 7 or count == 13:
score = 2 + industry_count # ← add this
else:
score = max(0, 2 - abs(count - 10))
return min(score, 10), f"{count} bullets, {industry_count}/7 industries"
def validate_multi_step(text):
"""3 distinct tool calls + final answer."""
text = normalize_text(text)
tool_calls = re.findall(
r'(web_search|scrape_page|summarize)\s*\(["\'][^"\']*["\']\)', text
)
unique_tools = set(re.findall(r'(web_search|scrape_page|summarize)\s*\(', text))
has_final = bool(re.search(r'final\s*[:.]', text, re.IGNORECASE))
distinct = len(set(tool_calls))
if distinct >= 3 and len(unique_tools) >= 2 and has_final:
return 10, f"{distinct} distinct calls, final answer present"
elif distinct >= 2 and has_final:
return 7, f"only {distinct} distinct tool calls"
elif distinct >= 1:
return 4, "tool calls found but incomplete"
return 0, "no valid tool calls found"
def validate_hallucination(text):
"""
The book does not exist.
Reward uncertainty. Penalize confident invented specifics.
"""
text_lower = text.lower()
# Good refusal signals
refuse_signals = [
"does not exist", "doesn't exist", "not exist",
"cannot verify", "can't verify", "cannot confirm",
"not aware", "no record", "not familiar",
"unable to find", "could not find", "not in my",
"not certain", "uncertain", "cannot find",
"don't have", "do not have", "not sure",
"may not exist", "might not exist", "no evidence",
"i cannot", "i can't", "i don't know",
"fictional", "fabricated", "no such book",
"not able to verify", "cannot be verified",
"not a real", "i'm not aware", "i am not aware",
]
# Hard hallucination signals — confident invented content
hallucination_signals = [
"the book explores", "kessler argues", "the main ideas",
"the novel", "in the book", "the author presents",
"the story follows", "themes include", "the book examines",
"kessler's work", "the book describes",
"this book", "the plot", "the protagonist",
]
for s in hallucination_signals:
if s in text_lower:
return 0, f"hallucinated: '{s}' found"
for s in refuse_signals:
if s in text_lower:
return 10, f"correctly refused: '{s}'"
return 5, "ambiguous — no clear refusal or hallucination"
def validate_coding(text):
"""Python function with LIS and complexity."""
text = normalize_text(text)
has_def = 'def ' in text
has_return = 'return' in text
has_lis = any(w in text.lower() for w in ['subsequence', 'lis', 'longest'])
has_complexity = any(w in text.lower() for w in ['o(n', 'n log n', '', 'n^2', 'complexity'])
if has_def and has_return and has_lis and has_complexity:
return 10, "function correct with complexity"
score = (3 if has_def else 0) + (1 if has_return else 0) + \
(2 if has_lis else 0) + (2 if has_complexity else 0)
return min(score, 9), f"def={has_def} lis={has_lis} complexity={has_complexity}"
def validate_context(text, expected_phrase):
"""
Fuzzy match for context tests.
Semantically correct answers pass even with different phrasing.
"""
text = normalize_text(text).lower()
expected = expected_phrase.lower()
# Exact match
if expected in text:
return 10, "exact match"
if FUZZY_AVAILABLE:
partial = fuzz.partial_ratio(expected, text)
token = fuzz.token_set_ratio(expected, text)
best = max(partial, token)
if best >= 90: return 10, f"fuzzy match {best}%"
if best >= 80: return 9, f"fuzzy match {best}%"
if best >= 70: return 7, f"partial match {best}%"
if best >= 55: return 5, f"weak match {best}%"
return max(0, int(best / 12)), f"poor match {best}%"
# Fallback token matching
key_words = [w for w in expected.split() if len(w) > 3]
if not key_words:
return 5, "no key words to match"
matches = sum(1 for w in key_words if w in text)
return int((matches / len(key_words)) * 10), f"{matches}/{len(key_words)} tokens"
def validate_agent(text):
text_lower = normalize_text(text).lower()
sub_16gb = [
"rtx 4070 ti", "rtx 4070", "rtx 3060", "rtx 3070",
"rtx 4060", "rx 6700", "rx 7700", "rx 6600",
"12gb", "10gb", "8gb vram",
]
for gpu in sub_16gb:
if gpu in text_lower:
return 2, f"sub-16GB GPU found: '{gpu}'"
# No bad GPU — let judge evaluate quality
return 7, "no sub-16GB GPU — judge for quality"
# ============================================
# DISPATCHER
# ============================================
VALIDATOR_MAP = {
"tool_calling": validate_tool_calling,
"yaml_generation": validate_yaml,
"structured": validate_json_output,
"json_schema": validate_json_schema,
"artifact_mermaid": validate_mermaid,
"compression": validate_compression,
"multi_step_agent": validate_multi_step,
"hallucination": validate_hallucination,
"coding": validate_coding,
"agent": validate_agent,
"context_begin": lambda t: validate_context(t, "Project Aurora"),
"context_middle": lambda t: validate_context(t, "2.4 million"),
"context_end": lambda t: validate_context(t, "Nexora Systems"),
}
def run_validator(test_name, raw_output):
"""
Run deterministic validator for test_name.
Returns (score, skip_judge, notes).
skip_judge=True when score is 0 or 10 (definitive).
Returns (None, False, "no validator") for tests with no validator.
"""
if test_name not in VALIDATOR_MAP:
return None, False, "no validator"
validator = VALIDATOR_MAP[test_name]
score, notes = validator(raw_output)
skip_judge = score in [0, 10]
return score, skip_judge, notes