diff --git a/config.py b/config.py new file mode 100644 index 0000000..787519d --- /dev/null +++ b/config.py @@ -0,0 +1,100 @@ +""" +benchmark_v4/config.py +====================== +All configuration in one place. Edit this file to change models, +weights, judge, and runtime settings. +""" + +# ============================================ +# MODELS +# ============================================ + +MODELS_BASELINE_DIRECT = [ + "granite4.1:8b", + "qwen2.5-coder:14b", + +] + +MODELS_BASELINE_THINKING = [ + "nemotron-3-nano:4b", + "gemma4:e4b", +] + +MODELS_NEW_DIRECT = [] +MODELS_NEW_THINKING = [] + +# ============================================ +# JUDGE + EMBEDDINGS +# ============================================ + +JUDGE_MODEL = "qwen2.5:14b" +EMBED_MODEL = "nomic-embed-text" +OLLAMA_URL = "http://localhost:11434" + +# ============================================ +# RUNTIME +# ============================================ + +COOLDOWN_SECONDS = 20 # between tests (thermal normalization) +GPU_POLL_EVERY = 3 # poll nvidia-smi every N tests (0 = every test) + +# ============================================ +# TEST WEIGHTS (must sum to 1.0) +# ============================================ + +TEST_WEIGHTS = { + # Agent / tool reliability — 25% + "tool_calling": 0.13, + "multi_step_agent": 0.12, + + # Coding / infrastructure — 25% + "coding": 0.10, + "yaml_generation": 0.08, + "artifact_mermaid": 0.04, + "json_schema": 0.03, + + # RAG / context fidelity — 20% + "rag": 0.07, + "context_begin": 0.04, + "context_middle": 0.05, + "context_end": 0.04, + + # Structured outputs — 15% + "structured": 0.08, + "compression": 0.07, + + # Hallucination resistance — 10% + "hallucination": 0.10, + + # Pure reasoning — 5% + "reasoning": 0.03, + "agent": 0.01, + "math": 0.01, +} + +assert abs(sum(TEST_WEIGHTS.values()) - 1.0) < 0.001, "Weights must sum to 1.0" + +# Category groupings for category-level scores +CATEGORIES = { + "agent_tool": ["tool_calling", "multi_step_agent"], + "coding": ["coding", "yaml_generation", "artifact_mermaid", "json_schema"], + "rag_context": ["rag", "context_begin", "context_middle", "context_end"], + "structured": ["structured", "compression"], + "hallucination": ["hallucination"], + "reasoning": ["reasoning", "agent", "math"], +} + +# Compliance groups — pass if semantic_score >= 8 +COMPLIANCE_GROUPS = { + "json_valid": ["structured", "json_schema"], + "yaml_valid": ["yaml_generation"], + "tool_format": ["tool_calling", "multi_step_agent"], + "hallucination_free": ["hallucination"], +} + +# Context files +CONTEXT_FILE = "./rag_samples/context_test.md" +RAG_FILE = "./rag_samples/note1.md" + +# Database +DB_FILE = "benchmark_v4.db" diff --git a/judge.py b/judge.py new file mode 100644 index 0000000..af4c8a4 --- /dev/null +++ b/judge.py @@ -0,0 +1,178 @@ +""" +benchmark_v4/judge.py +===================== +Layer 2: Semantic judge (rubric-based). +Layer 3: Embedding similarity via Ollama nomic-embed-text. + +Judge is only called when validator gives a partial score. +Embedding similarity used for RAG test. +""" + +import re +import json +import math +import requests +from config import JUDGE_MODEL, EMBED_MODEL, OLLAMA_URL +from prompts import JUDGE_RUBRICS, GROUND_TRUTHS, DEFAULT_RUBRIC + + +# ============================================ +# JUDGE PROMPT TEMPLATE +# ============================================ + +JUDGE_PROMPT_TEMPLATE = """You are a strict benchmark judge. Score from 0-10. + +ABSOLUTE RULES: +1. Judge ONLY what the prompt asked for. Nothing else. +2. NEVER penalise for missing information not requested. +3. NEVER penalise for being concise — brevity is correct. +4. NEVER invent requirements. Only the prompt counts. +5. Minimal correct answers score 8-10. +6. Extra unnecessary content scores lower, not higher. + +SCORING: +10 = perfect | 8 = correct, trivial issue | 6 = mostly correct +4 = partial | 2 = major error | 0 = wrong or hallucination + +TEST: {test_name} +WHAT TO JUDGE: {rubric} +GROUND TRUTH: {ground_truth} + +PROMPT (what was asked): +{prompt} + +OUTPUT (what model answered): +{output} + +Return ONLY this JSON on one line, nothing else: +{{"semantic_score": <0-10>, "reason": ""}}""" + + +# ============================================ +# JUDGE WARMUP +# ============================================ + +def warmup_judge(): + """Load judge model without generating output.""" + print(f" Warming up judge: {JUDGE_MODEL}") + try: + requests.post( + f"{OLLAMA_URL}/api/generate", + json={ + "model": JUDGE_MODEL, + "prompt": "hi", + "stream": False, + "options": {"num_predict": 1} + }, + timeout=120 + ) + except Exception as e: + print(f" Judge warmup error: {e}") + + +# ============================================ +# JUDGE CALL +# ============================================ + +def call_judge(test_name, prompt, output): + """ + Call LLM judge with strict rubric. + Returns (semantic_score 0-10, reason str). + Falls back to midpoint (5) on failure to avoid corrupting results. + """ + rubric = JUDGE_RUBRICS.get(test_name, DEFAULT_RUBRIC) + ground_truth = GROUND_TRUTHS.get(test_name, "See prompt requirements.") + + judge_prompt = JUDGE_PROMPT_TEMPLATE.format( + test_name=test_name, + rubric=rubric, + ground_truth=ground_truth, + prompt=prompt[:500], + output=output[:1500], + ) + + try: + response = requests.post( + f"{OLLAMA_URL}/api/generate", + json={"model": JUDGE_MODEL, "prompt": judge_prompt, "stream": False}, + timeout=180 + ) + raw = response.json().get("response", "").strip() + + # Try clean JSON parse + m = re.search(r'\{[^{}]*"semantic_score"[^{}]*\}', raw) + if m: + try: + parsed = json.loads(m.group()) + score = max(0, min(10, int(parsed.get("semantic_score", 5)))) + reason = str(parsed.get("reason", ""))[:80] + return score, reason + except (json.JSONDecodeError, ValueError): + pass + + # Fallback: extract score number + sm = re.search(r'"semantic_score"\s*:\s*(\d+)', raw) + rm = re.search(r'"reason"\s*:\s*"([^"]{1,80})"', raw) + if sm: + score = max(0, min(10, int(sm.group(1)))) + reason = rm.group(1) if rm else "extracted" + return score, reason + + # Last resort + last = re.search(r'semantic_score[^\d]*(\d+)', raw, re.IGNORECASE) + if last: + return max(0, min(10, int(last.group(1)))), "score extracted" + + print(f" Judge unparseable: {raw[:80]}") + return 5, "judge unparseable — midpoint" + + except requests.exceptions.Timeout: + return 5, "judge timeout — midpoint" + except Exception as e: + return 5, f"judge error — midpoint" + + +# ============================================ +# EMBEDDING SIMILARITY +# ============================================ + +def get_embedding(text): + """Get embedding vector from nomic-embed-text via Ollama.""" + try: + r = requests.post( + f"{OLLAMA_URL}/api/embed", + json={"model": EMBED_MODEL, "input": text[:2000]}, + timeout=30 + ) + return r.json().get("embeddings", [[]])[0] + except Exception: + return [] + + +def cosine_similarity(v1, v2): + if not v1 or not v2 or len(v1) != len(v2): + return 0.0 + dot = sum(a * b for a, b in zip(v1, v2)) + mag = math.sqrt(sum(a**2 for a in v1)) * math.sqrt(sum(b**2 for b in v2)) + return dot / mag if mag else 0.0 + + +def embedding_score(text, reference): + """ + Score 0-10 based on cosine similarity. + Uses stepped mapping for better discrimination. + """ + if not text or not reference: + return 0 + + v1 = get_embedding(text[:1000]) + v2 = get_embedding(reference) + sim = cosine_similarity(v1, v2) + + # Stepped mapping — more discriminating than linear + if sim >= 0.92: return 10 + if sim >= 0.85: return 8 + if sim >= 0.78: return 6 + if sim >= 0.70: return 4 + if sim >= 0.60: return 2 + return 0 diff --git a/main.py b/main.py new file mode 100644 index 0000000..8a50629 --- /dev/null +++ b/main.py @@ -0,0 +1,299 @@ +""" +benchmark_v4/main.py +==================== +Entry point. CLI argument parsing and orchestration. + +Usage: + python3 main.py # run all baseline models + python3 main.py --test-all # auto-discover and test all ollama models + python3 main.py --mode baseline # baseline only + python3 main.py --mode new # new models only + python3 main.py --model granite4.1:8b # single model + python3 main.py --runs 3 # variance analysis + python3 main.py --no-cooldown # fast run (no thermal wait) + python3 main.py --report # show reports of latest run + python3 main.py --report --report-best # show best run per model + python3 main.py --export # export CSV from DB +""" + +import argparse +import sys +import subprocess +import requests + +from config import ( + MODELS_BASELINE_DIRECT, MODELS_BASELINE_THINKING, + MODELS_NEW_DIRECT, MODELS_NEW_THINKING, + JUDGE_MODEL, EMBED_MODEL, DB_FILE, OLLAMA_URL, +) +from storage import init_db, load_latest_runs, export_summary_csv +from prompts import build_all_prompts +from runner import run_benchmark +from reporting import ( + print_weights, print_comparison, + print_full_ranking, print_category_breakdown, + print_compliance_table, run_report +) + +try: + import yaml + YAML_AVAILABLE = True +except ImportError: + YAML_AVAILABLE = False + +try: + from rapidfuzz import fuzz + FUZZY_AVAILABLE = True +except ImportError: + FUZZY_AVAILABLE = False + + +# ============================================ +# THINKING MODEL DETECTION +# ============================================ + +def detect_thinking_model(model_name): + """ + Detect if a model supports thinking mode via Ollama capabilities API. + Uses /api/show and checks for 'thinking' in capabilities array. + Fast — single API call, no generation needed. + """ + try: + r = requests.post( + f"{OLLAMA_URL}/api/show", + json={"name": model_name}, + timeout=10 + ) + caps = r.json().get("capabilities", []) + return "thinking" in caps + except Exception: + return False + + +# ============================================ +# MAIN +# ============================================ + +def main(): + parser = argparse.ArgumentParser( + description="LLM Benchmark V4 — Modular, SQLite-backed", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python3 main.py # full baseline run + python3 main.py --test-all # auto-discover all ollama models + python3 main.py --model granite4.1:8b # single model + python3 main.py --mode new # new models only + python3 main.py --runs 3 # variance analysis (3 runs) + python3 main.py --no-cooldown # fast run, no thermal wait + python3 main.py --report # show latest run reports + python3 main.py --report --report-best # show best run per model + python3 main.py --export # export CSV from DB + """ + ) + + parser.add_argument( + "--test-all", action="store_true", default=False, + help="Auto-discover and benchmark all models in ollama list" + ) + parser.add_argument( + "--mode", choices=["baseline", "new", "all"], + default="all", + help="Which model group to run (default: all)" + ) + parser.add_argument( + "--model", type=str, default=None, + help="Run a single model by Ollama tag" + ) + parser.add_argument( + "--thinking", action="store_true", default=False, + help="Override: mark single --model as thinking type" + ) + parser.add_argument( + "--runs", type=int, default=1, + help="Number of runs per model for variance analysis (default: 1)" + ) + parser.add_argument( + "--no-cooldown", action="store_true", default=False, + help="Skip cooldown between tests (faster but no thermal normalization)" + ) + parser.add_argument( + "--report", action="store_true", default=False, + help="Show ranking reports from DB without running any models" + ) + parser.add_argument( + "--report-best", action="store_true", default=False, + help="Show best run per model instead of latest (use with --report)" + ) + parser.add_argument( + "--export", action="store_true", default=False, + help="Export latest results to benchmark_summary.csv and exit" + ) + + args = parser.parse_args() + + # Init database + init_db() + + # ── Report / export only modes ───────────────────────────────── + # Must come before benchmark logic + if args.report or args.report_best: + print_full_ranking(best=args.report_best) + print_category_breakdown() + print_compliance_table() + export_summary_csv() + return + + if args.export: + export_summary_csv() + return + + # ── Setup ────────────────────────────────────────────────────── + existing_baseline = load_latest_runs(is_baseline=True) + all_prompts = build_all_prompts() + + print(f"\nLLM Benchmark V4") + print(f"Judge: {JUDGE_MODEL}") + print(f"Embed: {EMBED_MODEL}") + print(f"DB: {DB_FILE}") + print(f"Runs: {args.runs}") + print(f"Fuzzy: {FUZZY_AVAILABLE} | YAML: {YAML_AVAILABLE}") + print(f"Cooldown:{'disabled' if args.no_cooldown else 'enabled'}") + print(f"Previous baseline runs: {len(existing_baseline)}") + + print_weights() + + all_new_run_ids = [] + + def _run(models, label, baseline): + ids = run_benchmark( + models=models, + label=label, + is_baseline=baseline, + all_prompts=all_prompts, + num_runs=args.runs, + no_cooldown=args.no_cooldown, + ) + all_new_run_ids.extend(ids) + + # ── Auto-discover all Ollama models ──────────────────────────── + if args.test_all: + result = subprocess.run( + ["ollama", "list"], + capture_output=True, text=True + ) + + discovered = [] + for line in result.stdout.strip().split('\n')[1:]: + parts = line.split() + if parts: + model_name = parts[0] + skip = [EMBED_MODEL, JUDGE_MODEL, "nomic-embed-text"] + if not any(s in model_name for s in skip): + discovered.append(model_name) + + if not discovered: + print("No models found in ollama list.") + return + + # Auto-detect thinking capability for each model + print(f"\nDetecting model capabilities...") + model_info = {} + for m in discovered: + is_thinking = detect_thinking_model(m) + is_baseline = m in (MODELS_BASELINE_DIRECT + MODELS_BASELINE_THINKING) + model_info[m] = { + "thinking": is_thinking, + "is_baseline": is_baseline, + "label": "thinking" if is_thinking else "direct", + } + tag = "🧠" if is_thinking else "⚡" + base = "★" if is_baseline else " " + print(f" {tag}{base} {m}") + + print() + + # Run baseline models first, then new + baseline_models = [m for m in discovered if model_info[m]["is_baseline"]] + new_models = [m for m in discovered if not model_info[m]["is_baseline"]] + + if baseline_models: + print("=" * 50) + print(" KNOWN BASELINE MODELS") + print("=" * 50) + for m in baseline_models: + _run([m], model_info[m]["label"], True) + + if new_models: + print("=" * 50) + print(" NEW / UNKNOWN MODELS") + print("=" * 50) + for m in new_models: + _run([m], model_info[m]["label"], False) + + print_comparison(all_new_run_ids, existing_baseline) + run_report() + return + + # ── Single model mode ────────────────────────────────────────── + if args.model: + # Auto-detect thinking unless --thinking flag explicitly set + if args.thinking: + label = "thinking" + else: + label = "thinking" if detect_thinking_model(args.model) else "direct" + + is_baseline = args.model in ( + MODELS_BASELINE_DIRECT + MODELS_BASELINE_THINKING + ) + print(f"\nSingle model: {args.model} [{label}] baseline={is_baseline}") + _run([args.model], label, is_baseline) + + # ── Baseline models ──────────────────────────────────────────── + elif args.mode in ["baseline", "all"]: + if MODELS_BASELINE_DIRECT: + print("\n" + "=" * 50) + print(" BASELINE — DIRECT") + print("=" * 50) + _run(MODELS_BASELINE_DIRECT, "direct", True) + + if MODELS_BASELINE_THINKING: + print("\n" + "=" * 50) + print(" BASELINE — THINKING") + print("=" * 50) + _run(MODELS_BASELINE_THINKING, "thinking", True) + + if args.mode == "all": + if MODELS_NEW_DIRECT: + print("\n" + "=" * 50) + print(" NEW — DIRECT") + print("=" * 50) + _run(MODELS_NEW_DIRECT, "direct", False) + + if MODELS_NEW_THINKING: + print("\n" + "=" * 50) + print(" NEW — THINKING") + print("=" * 50) + _run(MODELS_NEW_THINKING, "thinking", False) + + # ── New models only ──────────────────────────────────────────── + elif args.mode == "new": + if MODELS_NEW_DIRECT: + print("\n" + "=" * 50) + print(" NEW — DIRECT") + print("=" * 50) + _run(MODELS_NEW_DIRECT, "direct", False) + + if MODELS_NEW_THINKING: + print("\n" + "=" * 50) + print(" NEW — THINKING") + print("=" * 50) + _run(MODELS_NEW_THINKING, "thinking", False) + + # ── Final reports ────────────────────────────────────────────── + print_comparison(all_new_run_ids, existing_baseline) + run_report() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/prompts.py b/prompts.py new file mode 100644 index 0000000..4ad4ea6 --- /dev/null +++ b/prompts.py @@ -0,0 +1,388 @@ +""" +benchmark_v4/prompts.py +======================= +All prompts and ground truths in one place. + +BENCHMARK PURPOSE (explicit): + This benchmark measures operational reliability for agentic and + automated pipelines. It rewards: format obedience, structured output + correctness, tool call precision, and hallucination resistance. + It intentionally penalises verbosity, creative deviation, and + formatting non-compliance. It is NOT a general intelligence benchmark. + +Changes in this version: + - Agent prompt rewritten with explicit constraints (better differentiation) + - Structured JSON upgraded to nested array (stronger discriminator) + - ALL_TESTS is now explicit ordered list (not dict.keys()) + - Ground truths reformulated as criteria (avoid judge anchoring) + - JSON Schema validator criteria made explicit +""" + +import os +from config import CONTEXT_FILE, RAG_FILE + +# ============================================ +# STATIC PROMPTS +# ============================================ + +PROMPTS = { + "reasoning": """Be concise. Maximum 150 words. + +Solve step by step: +A company has 120 employees. +30% are remote. Of those, 25% are contractors. +How many full-time remote employees?""", + + "coding": """Be concise. Maximum 150 words. + +Write a Python function that returns the Longest Increasing Subsequence. +Explain time complexity.""", + + # UPGRADED: explicit constraints force meaningful differentiation + "agent": """Maximum 120 words. You must: +1. Identify 2-3 specific GPU candidates by name +2. Compare their VRAM capacity and memory bandwidth +3. Acknowledge 2026 pricing uncertainty explicitly +4. Name at least one benchmarking source (e.g. LLM Perf, VRAM calculator) +5. Exclude any GPU with less than 16GB VRAM + +Task: Find the best GPU under 500€ for running local LLMs in 2026.""", + + "math": """Be concise. Maximum 100 words. + +Solve step by step: +If 3 machines take 5 minutes to make 3 widgets, +how long would 100 machines take to make 100 widgets?""", + + "hallucination": """Be concise. Maximum 100 words. + +Does the book "The Silent Code of Mars" by John Kessler (2022) exist? +If you are not 100% certain, say so clearly. +Do NOT describe or summarise it if you have any doubt.""", + + # UPGRADED: nested structure, typed fields — much stronger discriminator + "structured": """Return ONLY valid JSON. No explanation. No markdown fences. + +{ + "recommendations": [ + { + "gpu": "", + "price_eur": 0, + "vram_gb": 0, + "pros": [], + "cons": [] + } + ] +} + +Question: Best GPU under 500€ for local LLMs. +Return exactly 2 recommendations.""", + + "tool_calling": """You have: web_search(query: str), scrape_page(url: str), calculate(expression: str) + +Return ONLY the single tool call needed: +"What is the best local LLM for 16GB VRAM?" + +Example format: web_search("your query here") +No explanation. No other text.""", + + "compression": """Compress this into EXACTLY 10 bullet points. Start each with "- ". +Preserve key statistics. No extra text before or after the bullets. + +AI improved: healthcare (cancer detection = radiologist accuracy, drug discovery months not years), +finance (30% fraud reduction), transport (15-20% fuel savings), manufacturing (40% downtime reduction), +education (10-15% graduation improvement), energy (20-30% building savings), agriculture (25-30% water reduction).""", + + "yaml_generation": """Return ONLY valid YAML. No explanation. No markdown fences. + +Create a Kubernetes Deployment: +name is my-app +image is nginx:1.25 +replicas is 2 +containerPort is 80 +memory limit is 256Mi +cpu limit is 250m +readinessProbe uses httpGet on path /healthz port 80""", + + "artifact_mermaid": """Return ONLY a Mermaid flowchart code block (with opening and closing fences). +No explanation before or after. + +Stages in order: Code Push, Lint, Unit Tests, Build, Integration Tests, Deploy Staging, Smoke Test, Deploy Production""", + + "multi_step_agent": """Tools: web_search(query: str), scrape_page(url: str), summarize(text: str) + +Show exactly 3 chained tool calls then a final answer for: +"Top 3 most downloaded Python packages this month" + +Format: +1. web_search("...") +2. scrape_page("...") +3. summarize("...") +Final: [answer]""", + + "json_schema": """Return ONLY valid JSON Schema. No explanation. + +Schema for: +- apiVersion: string, required +- kind: string, required, enum: [Deployment, Service, ConfigMap] +- metadata: object, required, properties: name (string, required), namespace (string, required) +- spec: object, required, additionalProperties: true""", +} + +# ============================================ +# EXPLICIT TEST ORDERING +# Never use dict.keys() — order must be stable +# for CSV consistency and longitudinal comparisons. +# ============================================ + +ALL_TESTS = [ + # Reasoning (5%) + "reasoning", + "math", + "agent", + # Coding / Infrastructure (25%) + "coding", + "yaml_generation", + "artifact_mermaid", + "json_schema", + # Structured outputs (15%) + "structured", + "compression", + # Agent / Tool (25%) + "tool_calling", + "multi_step_agent", + # Hallucination (10%) + "hallucination", + # RAG / Context (20%) + "rag", + "context_begin", + "context_middle", + "context_end", +] + +# ============================================ +# GROUND TRUTHS — criteria-based, not canonical +# Avoid embedding exact phrasing to prevent +# judge anchoring and over-literal scoring. +# ============================================ + +GROUND_TRUTHS = { + "reasoning": ( + "Correct answer is 27 full-time remote employees. " + "Verify: calculation uses 30% of 120 = 36 remote, " + "then 25% of 36 = 9 contractors, so 36-9 = 27. " + "Award full marks if logic is correct even if phrased differently." + ), + "coding": ( + "A working Python function that implements LIS. " + "Should mention O(n²) for basic DP approach. " + "Bonus if O(n log n) with binary search is mentioned. " + "Do not penalise for code style choices." + ), + "agent": ( + "Must name at least 2 specific GPUs (e.g. RTX 4060 Ti, RX 7600 XT). " + "Must compare VRAM — only GPUs with 16GB+ should be recommended. " + "Must acknowledge pricing uncertainty for 2026. " + "Must name a benchmarking source. " + "Score 0 if recommends GPUs under 16GB VRAM." + ), + "math": ( + "Correct answer is 5 minutes. " + "Key insight: each machine independently makes 1 widget in 5 minutes, " + "so 100 machines make 100 widgets in the same 5 minutes. " + "Award marks if the rate-independence reasoning is clearly stated." + ), + "hallucination": ( + "The book does not exist. " + "Full marks: model refuses or clearly states it cannot verify existence. " + "Zero marks: model describes the book's plot, themes, or content as if real. " + "Partial marks: model hedges without clear refusal." + ), + "structured": ( + "Must return valid JSON with a 'recommendations' array containing exactly 2 objects. " + "Each object must have: gpu (string), price_eur (number), vram_gb (number), " + "pros (array of strings), cons (array of strings). " + "Score based on: valid JSON structure, correct field types, 2 recommendations present. " + "Do not score on quality of GPU choices." + ), + "tool_calling": ( + "Must return exactly one function call in the format: name(\"query\"). " + "No explanation before or after. " + "Correct function names: web_search, scrape_page, or calculate. " + "Score 0 if any text accompanies the call." + ), + "compression": ( + "Must have exactly 10 bullet points starting with '- '. " + "All 7 industries must appear: healthcare, finance, transport, " + "manufacturing, education, energy, agriculture. " + "Key statistics must be preserved where mentioned in source." + ), + "yaml_generation": ( + "Must be parseable YAML. " + "Must include: kind=Deployment, name=my-app, image=nginx:1.25, " + "replicas=2, containerPort=80, memory limit 256Mi, cpu limit 250m, " + "readinessProbe httpGet /healthz port 80. " + "Do not penalise for additional valid YAML fields not specified." + ), + "artifact_mermaid": ( + "Must be a valid Mermaid code block with opening and closing fences. " + "Must include all 8 stages: Code Push, Lint, Unit Tests, Build, " + "Integration Tests, Deploy Staging, Smoke Test, Deploy Production. " + "Stages should appear in the correct pipeline order." + ), + "multi_step_agent": ( + "Must show 3 distinct tool calls using different functions. " + "Preferred sequence: web_search → scrape_page → summarize. " + "Must end with 'Final: [answer]'. " + "Score based on: correct tool names, distinct calls, final answer present." + ), + "json_schema": ( + "Must be valid JSON Schema (parseable JSON). " + "Must define: apiVersion as string required, " + "kind as string required with enum [Deployment, Service, ConfigMap], " + "metadata as object required with name and namespace as string properties, " + "spec as object required with additionalProperties allowed. " + "Award marks proportionally to how many of these are correctly specified." + ), + "context_begin": "The project name is Project Aurora.", + "context_middle": "The budget allocated to Phase 2 is $2.4 million.", + "context_end": "The selected vendor is Nexora Systems (Vendor B).", + "rag": ( + "A structured summary that covers the main topics in the provided notes. " + "Should be under 200 words. " + "Should preserve key facts without inventing new information. " + "Do not penalise for including accurate details from the source." + ), +} + +# ============================================ +# JUDGE RUBRICS (per test — what to evaluate) +# Criteria-based, not answer-anchored. +# ============================================ + +JUDGE_RUBRICS = { + "reasoning": ( + "Check: Is the final number 27? Are the three calculation steps " + "(120×0.30, 36×0.25, 36-9) present and correct? Is it within 150 words?" + ), + "agent": ( + "Check each requirement: " + "(1) At least 2 named GPU models? " + "(2) VRAM and bandwidth compared? " + "(3) 2026 pricing uncertainty acknowledged? " + "(4) Benchmarking source named? " + "(5) No GPU under 16GB VRAM recommended? " + "Score 2 points per requirement met (max 10). " + "Score 0 if any GPU under 16GB is recommended." + ), + "math": ( + "Check: Is the answer 5 minutes? " + "Does the explanation correctly state that each machine's rate " + "is independent of quantity? Is it within 100 words?" + ), + "rag": ( + "Check: Does it cover the main topics from the notes? " + "Is it under 200 words? " + "Does it avoid inventing facts not in the source? " + "Is it clearly structured?" + ), +} + +DEFAULT_RUBRIC = ( + "Check whether the output correctly fulfils all requirements stated " + "in the original prompt. Score based on correctness and completeness, " + "not on style or verbosity beyond what the prompt requires." +) + + +# ============================================ +# DYNAMIC PROMPT BUILDERS +# ============================================ + +def ensure_context_file(): + os.makedirs("./rag_samples", exist_ok=True) + if os.path.exists(CONTEXT_FILE): + return + content = """# Project Aurora — Strategic Initiative Report + +## Executive Summary +Project Aurora is a digital transformation initiative launched January 2024. +Proposed by CTO Maria Chen. Budget: $8.7M over three years. + +## Phase 2 — Cloud Migration +Phase 2 budget allocation: $2.4 million. + +## Vendor Recommendation +Vendor A (CloudScale) — $1.8M, limited EU. +Vendor B (Nexora Systems) — $2.1M, 98% SLA, global. +Vendor C (PrimeHost) — $1.4M, no SOC2. +Vendor D (Stratos) — $2.8M, over budget. + +Final recommendation: proceed with Vendor B (Nexora Systems). +""" + with open(CONTEXT_FILE, "w") as f: + f.write(content) + print(f" Created: {CONTEXT_FILE}") + + +def ensure_rag_file(): + os.makedirs("./rag_samples", exist_ok=True) + if os.path.exists(RAG_FILE): + return + content = """# Homelab Infrastructure Notes + +## K8s Cluster +- 4 nodes, Longhorn storage, Traefik ingress +- FluxCD for GitOps, prune: false on llm namespace +- Namespace llm: Open-WebUI, SearXNG, Crawl4AI, BGE reranker + +## Ollama VM +- hostname: chat.h0melab.uk, IP: 10.0.20.57 +- GPU: RTX 5060 Ti 16GB, port 11434 +- Models: granite4.1:8b, qwen2.5-coder:14b, gemma4:e4b, nemotron-3-nano:4b + +## Services +- Gitea at gitea.int, SSH port 3333 +- Netdata + VictoriaMetrics for monitoring +- Signal bot with Whisper for voice transcription +- wiki-processor auto-generates Obsidian wiki +""" + with open(RAG_FILE, "w") as f: + f.write(content) + print(f" Created: {RAG_FILE}") + + +def build_all_prompts(): + """Return complete prompt dict including dynamic context and RAG prompts.""" + ensure_context_file() + ensure_rag_file() + prompts = dict(PROMPTS) + + # Context prompts + if os.path.exists(CONTEXT_FILE): + with open(CONTEXT_FILE) as f: + context = f.read() + base = ( + "Answer in ONE sentence only. " + "Use ONLY information from the document below. " + "Do not add explanation or context.\n\n" + f"DOCUMENT:\n{context}\n\n" + ) + prompts["context_begin"] = base + "QUESTION: What is the name of the project?" + prompts["context_middle"] = base + "QUESTION: What budget was allocated to Phase 2?" + prompts["context_end"] = base + "QUESTION: Which vendor was selected and what is their company name?" + + # RAG prompt + if os.path.exists(RAG_FILE): + with open(RAG_FILE) as f: + rag_content = f.read() + prompts["rag"] = ( + "Maximum 200 words. Summarize and structure the following notes. " + "Preserve all specific facts (IPs, model names, service names). " + "Do not add information not present in the notes.\n\n" + + rag_content + ) + else: + prompts["rag"] = "Maximum 200 words. Summarize: No RAG file found." + + return prompts \ No newline at end of file diff --git a/reporting.py b/reporting.py new file mode 100644 index 0000000..a05cca8 --- /dev/null +++ b/reporting.py @@ -0,0 +1,170 @@ +""" +benchmark_v4/reporting.py +========================= +All output formatting — terminal reports and CSV export. +Completely separate from scoring and storage logic. +""" + +from storage import load_latest_runs, load_all_runs, export_summary_csv +from config import MODELS_BASELINE_THINKING, MODELS_NEW_THINKING + + +def _tag(model, all_thinking): + return "🧠" if model in all_thinking else "⚡" + + +def _base(row): + return "★" if row.get("is_baseline") else " " + + +def print_weights(): + from config import TEST_WEIGHTS, CATEGORIES + print("\n TEST WEIGHTS:") + category_labels = { + "agent_tool": "Agent/Tool reliability (25%)", + "coding": "Coding/Infrastructure (25%)", + "rag_context": "RAG/Context fidelity (20%)", + "structured": "Structured outputs (15%)", + "hallucination": "Hallucination resistance (10%)", + "reasoning": "Pure reasoning (5%)", + } + for cat, tests in CATEGORIES.items(): + w = sum(TEST_WEIGHTS.get(t, 0) for t in tests) + label = category_labels.get(cat, cat) + print(f" {label:<42} {w*100:.0f}%") + + +def print_comparison(new_run_ids, existing_baseline_rows): + """Compare current run against existing baseline.""" + from storage import load_all_runs, get_connection + + print("\n" + "=" * 68) + print(" 📊 RESULTS vs BASELINE") + print("=" * 68) + + all_thinking = MODELS_BASELINE_THINKING + MODELS_NEW_THINKING + + if existing_baseline_rows: + best = max(float(r.get("weighted_avg") or 0) for r in existing_baseline_rows) + print(f"\n EXISTING BASELINE (best w_avg: {best:.2f}):") + for r in sorted(existing_baseline_rows, key=lambda x: -float(x.get("weighted_avg") or 0)): + print( + f" {r['model']:<44} " + f"w={float(r.get('weighted_avg',0)):>5.2f} " + f"σ={r.get('stdev_all','?'):>4} " + f"fail={r.get('failure_rate_pct','?')}% " + f"[{str(r.get('run_date',''))[:10]}]" + ) + else: + best = 0 + + # Load current runs + if new_run_ids: + from storage import get_connection + placeholders = ",".join("?" * len(new_run_ids)) + with get_connection() as conn: + new_rows = [dict(r) for r in conn.execute( + f"SELECT * FROM runs WHERE id IN ({placeholders})", + new_run_ids + ).fetchall()] + + print(f"\n THIS RUN:") + for r in sorted(new_rows, key=lambda x: -float(x.get("weighted_avg") or 0)): + diff = float(r.get("weighted_avg") or 0) - best + arrow = "▲" if diff > 0.05 else "▼" if diff < -0.05 else "=" + tag = "BASE" if r.get("is_baseline") else "NEW " + print( + f" [{tag}] {r['model']:<40} " + f"w={float(r.get('weighted_avg',0)):>5.2f} {arrow}{abs(diff):.2f} " + f"σ={r.get('stdev_all','?'):>4} " + f"fail={r.get('failure_rate_pct','?')}%" + ) + + +def print_full_ranking(best=False): + """Print complete ranking of all models.""" + from storage import load_best_runs, load_latest_runs + print("\n" + "=" * 68) + title = "BEST RUN" if best else "LATEST RUN" + print(f" 🏆 FULL RANKING ({title} per model, weighted semantic avg)") + print("=" * 68) + + all_thinking = MODELS_BASELINE_THINKING + MODELS_NEW_THINKING + rows = load_best_runs() if best else load_latest_runs() + + for i, r in enumerate(rows, 1): + tag = _tag(r["model"], all_thinking) + base = "★" if r.get("is_baseline") else " " + print( + f" {i:>2}. {tag}{base} {r['model']:<42} " + f"w={float(r.get('weighted_avg',0)):>5.2f} " + f"σ={r.get('stdev_all','?'):>4} " + f"fail={r.get('failure_rate_pct','?'):>4}% " + f"tok/s={r.get('avg_tok_s','?'):>5} " + f"🌡={r.get('avg_gpu_temp','?'):>2}°C " + f"[{str(r.get('run_date',''))[:10]}]" + ) + + print(f"\n ★=baseline w=weighted avg σ=stdev(low better) fail=failure rate 🌡=avg temps") + + +def print_category_breakdown(): + """Print category scores for latest run of each model.""" + print("\n" + "=" * 68) + print(" 📂 CATEGORY BREAKDOWN (latest run per model)") + print("=" * 68) + + rows = load_latest_runs() + if not rows: + return + + header = f" {'Model':<40} {'agent':>6} {'code':>6} {'rag':>6} {'struct':>7} {'hall':>5} {'reason':>7}" + print(f"\n{header}") + print(" " + "-" * 64) + + for r in rows: + base = "★" if r.get("is_baseline") else " " + print( + f" {base}{r['model']:<41} " + f"{r.get('cat_agent_tool','?'):>6} " + f"{r.get('cat_coding','?'):>6} " + f"{r.get('cat_rag_context','?'):>6} " + f"{r.get('cat_structured','?'):>7} " + f"{r.get('cat_hallucination','?'):>5} " + f"{r.get('cat_reasoning','?'):>7}" + ) + + +def print_compliance_table(): + """Print compliance rates for latest run of each model.""" + print("\n" + "=" * 68) + print(" ✅ COMPLIANCE RATES (latest run per model)") + print("=" * 68) + + rows = load_latest_runs() + if not rows: + return + + header = f" {'Model':<44} {'JSON':>6} {'YAML':>6} {'Tool':>6} {'Hall':>6}" + print(f"\n{header}") + print(" " + "-" * 64) + + for r in rows: + base = "★" if r.get("is_baseline") else " " + def fmt(v): + return f"{v}%" if v is not None else " n/a" + print( + f" {base}{r['model']:<43} " + f"{fmt(r.get('compliance_json')):>6} " + f"{fmt(r.get('compliance_yaml')):>6} " + f"{fmt(r.get('compliance_tool')):>6} " + f"{fmt(r.get('compliance_hall')):>6}" + ) + + +def run_report(): + """Full report: ranking + categories + compliance.""" + print_full_ranking() + print_category_breakdown() + print_compliance_table() + export_summary_csv() diff --git a/runner.py b/runner.py new file mode 100644 index 0000000..9955376 --- /dev/null +++ b/runner.py @@ -0,0 +1,276 @@ +""" +benchmark_v4/runner.py +====================== +Executes models via Ollama CLI and orchestrates the benchmark loop. +Handles: warmup, GPU polling, cooldown, multi-run variance. +""" + +import subprocess +import time +import re +import statistics +from datetime import datetime + +from config import ( + COOLDOWN_SECONDS, GPU_POLL_EVERY, + TEST_WEIGHTS, CATEGORIES, +) +from prompts import ALL_TESTS +from validators import normalize_text +from judge import warmup_judge +from scoring import ( + score_test, compute_weighted, compute_category_scores, + compute_compliance, compute_variance_stats +) +from storage import insert_run, insert_details, insert_variance + + +# ============================================ +# GPU MONITORING +# ============================================ + +_gpu_cache = {"temp": -1, "mem": -1, "util": -1, "clock": -1} +_gpu_poll_count = 0 + + +def get_gpu(force=False): + """Poll GPU stats. Cached every GPU_POLL_EVERY tests to reduce overhead.""" + global _gpu_cache, _gpu_poll_count + + _gpu_poll_count += 1 + if not force and GPU_POLL_EVERY > 0 and _gpu_poll_count % GPU_POLL_EVERY != 0: + return _gpu_cache + + try: + result = subprocess.run( + ["nvidia-smi", + "--query-gpu=temperature.gpu,memory.used,utilization.gpu,clocks.sm", + "--format=csv,noheader,nounits"], + capture_output=True, text=True, timeout=5 + ) + temp, mem, util, clock = result.stdout.strip().split(", ") + _gpu_cache = { + "temp": int(temp), "mem": int(mem), + "util": int(util), "clock": int(clock) + } + except Exception: + pass + + return _gpu_cache + + +# ============================================ +# PARSE OLLAMA VERBOSE +# ============================================ + +def parse_generation_speed(output): + """ + Parse GENERATION (eval) speed from Ollama verbose output. + The last tokens/s value is the generation rate. + """ + matches = re.findall(r'(\d+\.\d+)\s+tokens/s', output) + return float(matches[-1]) if matches else None + + +# ============================================ +# RUN SINGLE MODEL + PROMPT +# ============================================ + +def run_model(model, prompt): + """Execute model via Ollama CLI. Returns result dict.""" + start = time.time() + result = subprocess.run( + ["ollama", "run", model, prompt, "--verbose"], + capture_output=True, text=True + ) + elapsed = round(time.time() - start, 2) + gpu = get_gpu() + output = result.stdout + "\n" + result.stderr + + return { + "output": output, + "time": elapsed, + "tok_s": parse_generation_speed(output), + "gpu_temp": gpu["temp"], + "gpu_mem": gpu["mem"], + "gpu_util": gpu["util"], + "gpu_clock": gpu["clock"] + } + + +# ============================================ +# BENCHMARK A GROUP OF MODELS +# ============================================ + +def run_benchmark( + models, + label, + is_baseline, + all_prompts, + num_runs=1, + no_cooldown=False +): + """ + Run benchmark for a list of models. + Returns list of run_ids (one per model). + """ + run_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + run_ids = [] + + for model in models: + # Accumulate across runs + sem_by_test = {t: [] for t in ALL_TESTS} + fmt_by_test = {t: [] for t in ALL_TESTS} + tok_s_all = [] + temp_all = [] + detail_rows = [] + + print(f"\n[{label}] Model: {model} ({num_runs} run{'s' if num_runs > 1 else ''})") + + # Warmup + subprocess.run( + ["ollama", "run", model, "hello"], + capture_output=True, text=True + ) + time.sleep(5) + warmup_judge() + + for run_num in range(1, num_runs + 1): + if num_runs > 1: + print(f"\n ── Run {run_num}/{num_runs} ──") + + for test_name in ALL_TESTS: + prompt = all_prompts.get(test_name, "") + if not prompt or not prompt.strip(): + continue + + result = run_model(model, prompt) + scores = score_test(test_name, prompt, result["output"]) + + sem = scores["semantic_score"] + fmt = scores["format_score"] + + sem_by_test[test_name].append(sem) + fmt_by_test[test_name].append(fmt) + + if result["tok_s"]: + tok_s_all.append(result["tok_s"]) + if result["gpu_temp"] > 0: + temp_all.append(result["gpu_temp"]) + + flag = "J" if scores["used_judge"] else "V" + print( + f" [{run_num}] {test_name:<22} [{flag}] " + f"sem={sem:>2}/10 fmt={fmt:>2}/10 " + f"comb={scores['combined_score']:>5.2f} " + f"{scores['notes'][:52]}" + ) + + detail_rows.append({ + "run_date": run_date, + "run_num": run_num, + "model": model, + "type": label, + "is_baseline": 1 if is_baseline else 0, + "test": test_name, + "weight": TEST_WEIGHTS.get(test_name, 0), + "time_s": result["time"], + "tok_s": result["tok_s"], + "gpu_temp": result["gpu_temp"], + "gpu_mem": result["gpu_mem"], + "gpu_util": result["gpu_util"], + "gpu_clock": result["gpu_clock"], + "output_length": len(result["output"]), + "semantic_score":sem, + "format_score": fmt, + "combined_score":scores["combined_score"], + "used_judge": 1 if scores["used_judge"] else 0, + "notes": scores["notes"][:120], + }) + + if not no_cooldown: + time.sleep(COOLDOWN_SECONDS) + + # Aggregate + avg_sem = {t: round(statistics.mean(v), 2) for t, v in sem_by_test.items() if v} + avg_fmt = {t: round(statistics.mean(v), 2) for t, v in fmt_by_test.items() if v} + w_total, w_avg = compute_weighted(avg_sem) + cat_scores = compute_category_scores(avg_sem) + compliance = compute_compliance(sem_by_test) + var_stats = compute_variance_stats(sem_by_test) + fmt_avg = round(statistics.mean([s for v in fmt_by_test.values() for s in v]), 2) if fmt_by_test else 0 + avg_tok = round(statistics.mean(tok_s_all), 1) if tok_s_all else 0 + avg_tmp = round(statistics.mean(temp_all), 1) if temp_all else 0 + + print(f"\n ─── {model} ───") + print(f" Weighted avg: {w_avg} (total={w_total})") + print(f" Format avg: {fmt_avg}/10") + print(f" Variance: mean={var_stats['mean']} σ={var_stats['stdev']} failures={var_stats['failure_rate']}%") + print(f" Compliance: JSON={compliance.get('json_valid')}% YAML={compliance.get('yaml_valid')}% " + f"tool={compliance.get('tool_format')}% hall={compliance.get('hallucination_free')}%") + print(f" Categories: agent={cat_scores.get('agent_tool')} coding={cat_scores.get('coding')} " + f"rag={cat_scores.get('rag_context')} struct={cat_scores.get('structured')} " + f"hall={cat_scores.get('hallucination')} reason={cat_scores.get('reasoning')}") + print(f" tok/s={avg_tok} temp={avg_tmp}°C") + + # Save to DB + run_row = { + "run_date": run_date, + "model": model, + "type": label, + "is_baseline": 1 if is_baseline else 0, + "num_runs": num_runs, + "weighted_total": w_total, + "weighted_avg": w_avg, + "avg_format": fmt_avg, + "mean_all": var_stats["mean"], + "stdev_all": var_stats["stdev"], + "min_score": var_stats["min"], + "max_score": var_stats["max"], + "failure_rate_pct":var_stats["failure_rate"], + "compliance_json": compliance.get("json_valid"), + "compliance_yaml": compliance.get("yaml_valid"), + "compliance_tool": compliance.get("tool_format"), + "compliance_hall": compliance.get("hallucination_free"), + "cat_agent_tool": cat_scores.get("agent_tool"), + "cat_coding": cat_scores.get("coding"), + "cat_rag_context": cat_scores.get("rag_context"), + "cat_structured": cat_scores.get("structured"), + "cat_hallucination":cat_scores.get("hallucination"), + "cat_reasoning": cat_scores.get("reasoning"), + "avg_tok_s": avg_tok, + "avg_gpu_temp": avg_tmp, + "tests_run": len(avg_sem) * num_runs, + } + + run_id = insert_run(run_row) + insert_details(run_id, detail_rows) + + # Variance rows (only if multiple runs) + if num_runs > 1: + var_rows = [] + for test_name, scores_list in sem_by_test.items(): + if len(scores_list) > 1: + var_rows.append({ + "run_date": run_date, + "model": model, + "test": test_name, + "num_runs": num_runs, + "mean": round(statistics.mean(scores_list), 2), + "stdev": round(statistics.stdev(scores_list), 2), + "min_score": min(scores_list), + "max_score": max(scores_list), + "failure_rate_pct":round( + sum(1 for s in scores_list if s <= 2) / len(scores_list) * 100, 1 + ), + "scores_raw": str(scores_list), + }) + if var_rows: + insert_variance(var_rows) + + run_ids.append(run_id) + + print(f"\nCooldown after {model}...\n") + time.sleep(30) + + return run_ids diff --git a/scoring.py b/scoring.py new file mode 100644 index 0000000..46caaa2 --- /dev/null +++ b/scoring.py @@ -0,0 +1,193 @@ +""" +benchmark_v4/scoring.py +======================= +Combines validator, judge, and embedding into final scores. +Computes: format_score, semantic_score, combined_score. +Computes: category scores, weighted total, compliance, variance. +""" + +import re +import statistics +from config import TEST_WEIGHTS, CATEGORIES, COMPLIANCE_GROUPS +from validators import normalize_text, run_validator +from judge import call_judge, embedding_score +from prompts import GROUND_TRUTHS + + +# ============================================ +# FORMAT SCORE +# ============================================ + +def compute_format_score(output, prompt): + """ + Scores format obedience only — separate from semantic quality. + Checks: ANSI codes, word limit, markdown when not requested. + Returns 0-10. + """ + text = normalize_text(output) + score = 10 + + # ANSI escape codes in output (model is polluting its output) + if re.search(r'\x1b\[', output): + score -= 2 + + # Word limit + limit_m = re.search(r'Maximum (\d+) words?', prompt, re.IGNORECASE) + if limit_m: + limit = int(limit_m.group(1)) + words = len(text.split()) + if words > limit * 1.3: + score -= min(3, int((words - limit) / limit * 5)) + + # Markdown when prompt says "No markdown" or "No explanation" + if ("no markdown" in prompt.lower() or "no explanation" in prompt.lower()): + if "```" in text and len(text.split("```")) > 2: + score -= 2 + + return max(0, score) + + +# ============================================ +# COMBINED SCORE +# ============================================ + +def score_test(test_name, prompt, raw_output): + """ + Main scoring pipeline: + 1. Run deterministic validator + 2. If partial, blend with judge + 3. For RAG, blend judge with embedding similarity + 4. Compute format score separately + 5. Combined = semantic * 0.8 + format * 0.2 + + Returns dict with all score components. + """ + # Normalize for quality assessment + clean = normalize_text(raw_output) + + # Format score (always computed, separate dimension) + fmt_score = compute_format_score(raw_output, prompt) + + # Validator + val_score, skip_judge, val_notes = run_validator(test_name, clean) + + if val_score is not None and skip_judge: + # Definitive — 0 or 10 + semantic = val_score + used_judge = False + notes = val_notes + + elif val_score is not None: + # High-confidence tests: trust validator when score >= 8, skip judge + high_confidence = {"compression", "artifact_mermaid", "tool_calling", + "yaml_generation", "multi_step_agent"} + if test_name in high_confidence and val_score >= 8: + semantic = val_score + used_judge = False + notes = val_notes + else: + # Partial validator score — blend with judge (80/20) + j_score, j_reason = call_judge(test_name, prompt, clean) + semantic = round(val_score * 0.8 + j_score * 0.2) + used_judge = True + notes = f"val={val_score} j={j_score} → {j_reason[:55]}" + + elif test_name == "rag": + ref = GROUND_TRUTHS.get("rag", "") + e_sim = embedding_score(clean, ref) + j_score, j_reason = call_judge(test_name, prompt, clean) + # Weight judge more — embedding unreliable for technical content + if e_sim == 0: + semantic = j_score # embedding failed, use judge only + else: + semantic = round(e_sim * 0.3 + j_score * 0.7) + used_judge = True + notes = f"embed={e_sim} j={j_score} → {j_reason[:50]}" + + else: + # Pure judge + j_score, j_reason = call_judge(test_name, prompt, clean) + semantic = j_score + used_judge = True + notes = j_reason[:80] + + # Combined: 80% semantic, 20% format — mathematically correct + combined = round(semantic * 0.8 + fmt_score * 0.2, 2) + + return { + "semantic_score": int(semantic), + "format_score": fmt_score, + "combined_score": combined, + "used_judge": used_judge, + "notes": notes, + } + + +# ============================================ +# WEIGHTED + CATEGORY SCORES +# ============================================ + +def compute_weighted(semantic_scores): + """ + Compute weighted total and average from semantic scores. + Returns (weighted_total, weighted_avg). + """ + total = weight_sum = 0.0 + for test, score in semantic_scores.items(): + w = TEST_WEIGHTS.get(test, 0) + total += (score / 10) * w * 7 + weight_sum += w + if weight_sum == 0: + return 0, 0 + return round(total, 2), round(total / weight_sum, 2) + + +def compute_category_scores(semantic_scores): + """ + Compute average semantic score per category. + Returns dict: {category_name: avg_score}. + """ + cat_scores = {} + for cat, tests in CATEGORIES.items(): + scores = [semantic_scores[t] for t in tests if t in semantic_scores] + cat_scores[cat] = round(sum(scores) / len(scores), 2) if scores else 0 + return cat_scores + + +def compute_compliance(semantic_scores_by_run): + """ + Compliance = % of runs where semantic_score >= 8. + Input: {test_name: [score_run1, score_run2, ...]} + Returns: {group_name: percentage} + """ + compliance = {} + for group, tests in COMPLIANCE_GROUPS.items(): + all_scores = [] + for t in tests: + if t in semantic_scores_by_run: + all_scores.extend(semantic_scores_by_run[t]) + if all_scores: + rate = sum(1 for s in all_scores if s >= 8) / len(all_scores) + compliance[group] = round(rate * 100, 1) + else: + compliance[group] = None + return compliance + + +def compute_variance_stats(scores_by_test): + """ + Compute variance statistics across multiple runs. + Input: {test_name: [score_run1, score_run2, ...]} + Returns: {mean, stdev, min, max, failure_rate_pct} + """ + all_scores = [s for scores in scores_by_test.values() for s in scores] + if not all_scores: + return {"mean": 0, "stdev": 0, "min": 0, "max": 0, "failure_rate": 0} + + return { + "mean": round(statistics.mean(all_scores), 2), + "stdev": round(statistics.stdev(all_scores), 2) if len(all_scores) > 1 else 0, + "min": min(all_scores), + "max": max(all_scores), + "failure_rate": round(sum(1 for s in all_scores if s <= 2) / len(all_scores) * 100, 1), + } diff --git a/storage.py b/storage.py new file mode 100644 index 0000000..5247340 --- /dev/null +++ b/storage.py @@ -0,0 +1,279 @@ +""" +benchmark_v4/storage.py +======================= +SQLite persistence for benchmark results. +Three tables: + - runs: one row per model per benchmark run + - details: one row per test per model per run + - variance: one row per test per model (multi-run stats) + +Query examples: + SELECT model, weighted_avg, stdev_all + FROM runs + WHERE is_baseline = 1 + ORDER BY weighted_avg DESC; + + SELECT model, test, semantic_score + FROM details + WHERE run_id = (SELECT MAX(id) FROM runs WHERE model = 'granite4.1:8b'); +""" + +import sqlite3 +import json +from datetime import datetime +from config import DB_FILE + + +# ============================================ +# SCHEMA +# ============================================ + +SCHEMA = """ +CREATE TABLE IF NOT EXISTS runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_date TEXT NOT NULL, + model TEXT NOT NULL, + type TEXT NOT NULL, + is_baseline INTEGER NOT NULL DEFAULT 0, + num_runs INTEGER NOT NULL DEFAULT 1, + + -- Weighted scores + weighted_total REAL, + weighted_avg REAL, + + -- Format + avg_format REAL, + + -- Variance + mean_all REAL, + stdev_all REAL, + min_score REAL, + max_score REAL, + failure_rate_pct REAL, + + -- Compliance (%) + compliance_json REAL, + compliance_yaml REAL, + compliance_tool REAL, + compliance_hall REAL, + + -- Category scores + cat_agent_tool REAL, + cat_coding REAL, + cat_rag_context REAL, + cat_structured REAL, + cat_hallucination REAL, + cat_reasoning REAL, + + -- Performance + avg_tok_s REAL, + avg_gpu_temp REAL, + + tests_run INTEGER +); + +CREATE TABLE IF NOT EXISTS details ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id INTEGER NOT NULL REFERENCES runs(id), + run_date TEXT NOT NULL, + run_num INTEGER NOT NULL DEFAULT 1, + model TEXT NOT NULL, + type TEXT NOT NULL, + is_baseline INTEGER NOT NULL DEFAULT 0, + + test TEXT NOT NULL, + weight REAL, + time_s REAL, + tok_s REAL, + gpu_temp INTEGER, + gpu_mem INTEGER, + gpu_util INTEGER, + gpu_clock INTEGER, + output_length INTEGER, + + semantic_score INTEGER, + format_score INTEGER, + combined_score REAL, + used_judge INTEGER, + notes TEXT +); + +CREATE TABLE IF NOT EXISTS variance ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_date TEXT NOT NULL, + model TEXT NOT NULL, + test TEXT NOT NULL, + num_runs INTEGER NOT NULL, + mean REAL, + stdev REAL, + min_score INTEGER, + max_score INTEGER, + failure_rate_pct REAL, + scores_raw TEXT +); + +CREATE INDEX IF NOT EXISTS idx_runs_model ON runs(model); +CREATE INDEX IF NOT EXISTS idx_details_run ON details(run_id); +CREATE INDEX IF NOT EXISTS idx_details_model ON details(model); +CREATE INDEX IF NOT EXISTS idx_details_test ON details(test); +""" + + +# ============================================ +# CONNECTION +# ============================================ + +def get_connection(): + conn = sqlite3.connect(DB_FILE) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + return conn + + +def init_db(): + """Create tables if they don't exist.""" + with get_connection() as conn: + conn.executescript(SCHEMA) + + +# ============================================ +# WRITE +# ============================================ + +def insert_run(run_data): + """Insert a run summary row. Returns the run_id.""" + sql = """ + INSERT INTO runs ( + run_date, model, type, is_baseline, num_runs, + weighted_total, weighted_avg, avg_format, + mean_all, stdev_all, min_score, max_score, failure_rate_pct, + compliance_json, compliance_yaml, compliance_tool, compliance_hall, + cat_agent_tool, cat_coding, cat_rag_context, + cat_structured, cat_hallucination, cat_reasoning, + avg_tok_s, avg_gpu_temp, tests_run + ) VALUES ( + :run_date, :model, :type, :is_baseline, :num_runs, + :weighted_total, :weighted_avg, :avg_format, + :mean_all, :stdev_all, :min_score, :max_score, :failure_rate_pct, + :compliance_json, :compliance_yaml, :compliance_tool, :compliance_hall, + :cat_agent_tool, :cat_coding, :cat_rag_context, + :cat_structured, :cat_hallucination, :cat_reasoning, + :avg_tok_s, :avg_gpu_temp, :tests_run + ) + """ + with get_connection() as conn: + cursor = conn.execute(sql, run_data) + return cursor.lastrowid + + +def insert_details(run_id, detail_rows): + """Insert detail rows for a run.""" + sql = """ + INSERT INTO details ( + run_id, run_date, run_num, model, type, is_baseline, + test, weight, time_s, tok_s, + gpu_temp, gpu_mem, gpu_util, gpu_clock, output_length, + semantic_score, format_score, combined_score, used_judge, notes + ) VALUES ( + :run_id, :run_date, :run_num, :model, :type, :is_baseline, + :test, :weight, :time_s, :tok_s, + :gpu_temp, :gpu_mem, :gpu_util, :gpu_clock, :output_length, + :semantic_score, :format_score, :combined_score, :used_judge, :notes + ) + """ + rows = [{**r, "run_id": run_id} for r in detail_rows] + with get_connection() as conn: + conn.executemany(sql, rows) + + +def insert_variance(variance_rows): + """Insert variance rows.""" + sql = """ + INSERT INTO variance ( + run_date, model, test, num_runs, + mean, stdev, min_score, max_score, failure_rate_pct, scores_raw + ) VALUES ( + :run_date, :model, :test, :num_runs, + :mean, :stdev, :min_score, :max_score, :failure_rate_pct, :scores_raw + ) + """ + with get_connection() as conn: + conn.executemany(sql, variance_rows) + + +# ============================================ +# READ +# ============================================ +def load_best_runs(): + """Load best scoring run per model.""" + with get_connection() as conn: + rows = conn.execute(""" + SELECT r.* + FROM runs r + INNER JOIN ( + SELECT model, MAX(weighted_avg) AS best_w + FROM runs + GROUP BY model + ) best ON r.model = best.model + AND r.weighted_avg = best.best_w + ORDER BY r.weighted_avg DESC + """).fetchall() + return [dict(r) for r in rows] + +def load_latest_runs(is_baseline=None): + """Load latest run per model.""" + sql = """ + SELECT r.* + FROM runs r + INNER JOIN ( + SELECT model, MAX(run_date) AS latest + FROM runs + GROUP BY model + ) latest ON r.model = latest.model AND r.run_date = latest.latest + """ + params = [] + if is_baseline is not None: + sql += " WHERE r.is_baseline = ?" + params.append(1 if is_baseline else 0) + + sql += " ORDER BY r.weighted_avg DESC" + + with get_connection() as conn: + rows = conn.execute(sql, params).fetchall() + return [dict(r) for r in rows] + + +def load_all_runs(): + """Load all run summaries.""" + with get_connection() as conn: + rows = conn.execute( + "SELECT * FROM runs ORDER BY run_date DESC" + ).fetchall() + return [dict(r) for r in rows] + + +def load_details_for_run(run_id): + """Load all test details for a specific run.""" + with get_connection() as conn: + rows = conn.execute( + "SELECT * FROM details WHERE run_id = ? ORDER BY test", + (run_id,) + ).fetchall() + return [dict(r) for r in rows] + + +def export_summary_csv(filepath="benchmark_summary.csv"): + """Export latest run per model to CSV for Excel analysis.""" + import csv + rows = load_latest_runs() + if not rows: + print("No runs to export.") + return + + with open(filepath, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=rows[0].keys()) + writer.writeheader() + writer.writerows(rows) + + print(f" Exported {len(rows)} rows to {filepath}") diff --git a/validators.py b/validators.py new file mode 100644 index 0000000..5901991 --- /dev/null +++ b/validators.py @@ -0,0 +1,467 @@ +""" +benchmark_v4/validators.py +========================== +Layer 1: Deterministic validators. +No LLM judge needed. Returns (score 0-10, notes str). +A score of 0 or 10 is definitive — judge is skipped. +Partial scores (1-9) trigger judge blending. +""" + +import re +import json + +try: + import yaml + YAML_AVAILABLE = True +except ImportError: + YAML_AVAILABLE = False + +try: + from rapidfuzz import fuzz + FUZZY_AVAILABLE = True +except ImportError: + FUZZY_AVAILABLE = False + + +# ============================================ +# TEXT NORMALIZATION +# ============================================ + +def normalize_text(text, mode="plain"): + """ + Centralized text cleaning. + mode="plain" — strip ANSI, control chars, ollama stats, thinking tokens + mode="json" — plain + strip markdown fences + mode="yaml" — plain + strip markdown fences + """ + + # 1. Strip ANSI escape sequences FIRST + text = re.sub(r'\x1b(?:[@-Z\\-_]|\[[0-9;?]*[A-Za-z])', '', text) + + # 2. Strip control characters + text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text) + + # 3. Strip Ollama spinner/progress characters + text = re.sub(r'[⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏]+', '', text) + + # 4. Normalize Unicode spaces to regular spaces + text = text.replace('\u202f', ' ').replace('\u00a0', ' ').replace('\u2009', ' ') + + # 5. Strip thinking tokens (AFTER cleaning so regex works cleanly) + text = re.sub(r'.*?', '', text, flags=re.DOTALL) + text = re.sub(r'Thinking\.\.\..*?\.\.\.done thinking\.?', '', text, flags=re.DOTALL) + + # 6. Strip Ollama verbose stats (LAST — after all other cleanup) + + # 6. Strip Ollama verbose stats (LAST — after all other cleanup) + lines = text.split("\n") + text = "\n".join( + l for l in lines if not any(k in l.lower() for k in [ + "total duration:", "load duration:", "prompt eval", + "eval count:", "eval duration:", "eval rate:", "tokens/s", "token(s)" + ]) + ) + + if mode in ("json", "yaml"): + text = re.sub(r'^```(?:json|yaml|)?\s*', '', text, flags=re.MULTILINE) + text = re.sub(r'```\s*$', '', text, flags=re.MULTILINE) + lines = [l for l in text.split('\n') + if not l.strip().startswith('[?') + and not l.strip().startswith('```') + and not re.search(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', l) + and '\x1b' not in l] + text = '\n'.join(lines).strip() + + return text + + +# ============================================ +# JSON EXTRACTION +# ============================================ + +def extract_json_object(text): + """ + Advanced JSON extractor that handles prompt-echoing, + large whitespace blocks, and multiple JSON objects. + """ + # 1. Aggressive normalization to strip fences and leading/trailing junk + text = normalize_text(text, mode="json") + + # 2. Collapse newlines inside JSON strings — fixes mid-value line breaks + text = re.sub(r'\n\s*', ' ', text) + + # 3. Skip the prompt-echo/template if the model repeats it. + keyword = '"recommendations"' + last_keyword_pos = text.rfind(keyword) + + search_start = 0 + if last_keyword_pos != -1: + search_start = text.rfind('{', 0, last_keyword_pos) + if search_start == -1: search_start = 0 + + decoder = json.JSONDecoder() + found_objs = [] + + # 4. Iteratively parse all valid JSON objects starting from search_start + idx = search_start + while idx < len(text): + start = text.find('{', idx) + if start == -1: + break + try: + obj, end = decoder.raw_decode(text, start) + if isinstance(obj, dict): + found_objs.append(obj) + idx = end + except json.JSONDecodeError: + idx = start + 1 + + if not found_objs: + return None + + # 5. Filter for populated answer rather than empty template + for o in reversed(found_objs): + if "recommendations" in o: + recs = o.get("recommendations") + if isinstance(recs, list) and len(recs) > 0: + if any(r.get("gpu") for r in recs if isinstance(r, dict)): + return o + + return found_objs[-1] if found_objs else None + + +# ============================================ +# VALIDATORS +# ============================================ + +def validate_tool_calling(text): + """Single tool call, no extras.""" + text = normalize_text(text) + lines = [l for l in text.split('\n') if l.strip()] + + if len(lines) > 3: + return 0, "multiple lines — explanation added" + + # Valid tool call pattern + if re.search(r'(web_search|scrape_page|calculate)\s*\(["\'].*["\']\)', text): + return 10, "valid tool call syntax" + + if re.search(r'\w+\s*\(["\'].*["\']\)', text): + return 5, "function call but wrong name" + + return 0, "no valid function call found" + + +def validate_yaml(text): + """Must parse as valid YAML Deployment.""" + if not YAML_AVAILABLE: + return 5, "pyyaml not installed" + + text = normalize_text(text, mode="yaml") + lines = [l for l in text.split('\n') + if not l.strip().startswith('[?') + and not l.strip().startswith('```') + and not re.search(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', l) + and '\x1b' not in l] + text = '\n'.join(lines).strip() + try: + parsed = yaml.safe_load(text) + if not isinstance(parsed, dict): + return 3, "parsed but not a dict" + + score = 2 + if parsed.get('kind') == 'Deployment': + score += 2 + if 'spec' in parsed: + score += 2 + spec = parsed['spec'] + if spec.get('replicas') == 2: + score += 1 + if 'apiVersion' in parsed: + score += 1 + if score >= 8: + score = 10 + + return min(score, 10), f"valid YAML score={score}" + + except yaml.YAMLError as e: + return 0, f"invalid YAML: {str(e)[:60]}" + + +def validate_json_output(text): + """ + Nested structured JSON with recommendations array. + Expected: {"recommendations": [{"gpu":"","price_eur":0,"vram_gb":0,"pros":[],"cons":[]}]} + Scores based on: valid JSON, correct structure, field types, 2 recommendations. + """ + parsed = extract_json_object(text) + if parsed is None: + return 0, "no valid JSON object found" + + # Check top-level structure + if "recommendations" not in parsed: + # Fallback: old flat format still gets partial credit + old_fields = ["gpu", "price", "reason"] + present = [k for k in old_fields if k in parsed and str(parsed[k]).strip()] + if present: + return 4, f"flat JSON found (old format), missing nested structure" + return 0, "no recommendations array found" + + recs = parsed["recommendations"] + if not isinstance(recs, list) or len(recs) == 0: + return 2, "recommendations present but empty or not a list" + + required_fields = {"gpu", "price_eur", "vram_gb", "pros", "cons"} + score = 4 # base for having recommendations array + + # Check count + if len(recs) >= 2: + score += 2 + elif len(recs) == 1: + score += 1 + + # Check field completeness on first recommendation + first = recs[0] + present = required_fields & set(first.keys()) + score += int((len(present) / len(required_fields)) * 3) + + # Check type correctness + type_ok = ( + isinstance(first.get("price_eur"), (int, float)) and + isinstance(first.get("vram_gb"), (int, float)) and + isinstance(first.get("pros"), list) and + isinstance(first.get("cons"), list) + ) + if type_ok: + score += 1 + + score = min(score, 10) + return score, f"nested JSON: {len(recs)} recs, fields={list(present)}, types_ok={type_ok}" + + +def validate_json_schema(text): + """Valid JSON Schema with all required properties.""" + parsed = extract_json_object(text) + if parsed is None: + return 0, "no valid JSON Schema found" + + props = parsed.get('properties', {}) + score = 0 + + if 'apiVersion' in props: + score += 2 + if 'kind' in props: + k = props['kind'] + has_enum = 'enum' in k and set(k['enum']) >= {'Deployment', 'Service', 'ConfigMap'} + score += 3 if has_enum else 1 + if 'metadata' in props: + score += 2 + if 'spec' in props: + score += 2 + if parsed.get('required'): + score += 1 + + return min(score, 10), f"JSON Schema score={score}/10" + + +def validate_mermaid(text): + """Valid Mermaid block with all 8 stages.""" + text = normalize_text(text) + stages = [ + "code push", "lint", "unit test", "build", + "integration test", "deploy staging", "smoke test", "deploy production" + ] + text_lower = text.lower() + has_fence = '```mermaid' in text_lower or \ + ('```' in text and ('graph' in text_lower or 'flowchart' in text_lower)) + + if not has_fence: + return 2, "no mermaid fence found" + + found = sum(1 for s in stages if s in text_lower) + score = int((found / len(stages)) * 10) + return score, f"{found}/{len(stages)} stages found" + + +def validate_compression(text): + """Exactly 10 bullets, all 7 industries.""" + text = normalize_text(text) + lines = text.strip().split('\n') + bullets = [l for l in lines if l.strip().startswith('- ')] + count = len(bullets) + + industries = ["healthcare", "financ", "transport", "manufactur", + "education", "energy", "agricultur"] + text_lower = text.lower() + industry_count = sum(1 for i in industries if i in text_lower) + + if count == 10: + score = 5 + industry_count + elif 8 <= count <= 12: + score = 3 + industry_count + elif count == 7 or count == 13: + score = 2 + industry_count # ← add this + else: + score = max(0, 2 - abs(count - 10)) + + return min(score, 10), f"{count} bullets, {industry_count}/7 industries" + + +def validate_multi_step(text): + """3 distinct tool calls + final answer.""" + text = normalize_text(text) + + tool_calls = re.findall( + r'(web_search|scrape_page|summarize)\s*\(["\'][^"\']*["\']\)', text + ) + unique_tools = set(re.findall(r'(web_search|scrape_page|summarize)\s*\(', text)) + has_final = bool(re.search(r'final\s*[:.]', text, re.IGNORECASE)) + distinct = len(set(tool_calls)) + + if distinct >= 3 and len(unique_tools) >= 2 and has_final: + return 10, f"{distinct} distinct calls, final answer present" + elif distinct >= 2 and has_final: + return 7, f"only {distinct} distinct tool calls" + elif distinct >= 1: + return 4, "tool calls found but incomplete" + return 0, "no valid tool calls found" + + +def validate_hallucination(text): + """ + The book does not exist. + Reward uncertainty. Penalize confident invented specifics. + """ + text_lower = text.lower() + + # Good refusal signals + refuse_signals = [ + "does not exist", "doesn't exist", "not exist", + "cannot verify", "can't verify", "cannot confirm", + "not aware", "no record", "not familiar", + "unable to find", "could not find", "not in my", + "not certain", "uncertain", "cannot find", + "don't have", "do not have", "not sure", + "may not exist", "might not exist", "no evidence", + "i cannot", "i can't", "i don't know", + "fictional", "fabricated", "no such book", + "not able to verify", "cannot be verified", + "not a real", "i'm not aware", "i am not aware", + ] + + # Hard hallucination signals — confident invented content + hallucination_signals = [ + "the book explores", "kessler argues", "the main ideas", + "the novel", "in the book", "the author presents", + "the story follows", "themes include", "the book examines", + "kessler's work", "the book describes", + "this book", "the plot", "the protagonist", + ] + for s in hallucination_signals: + if s in text_lower: + return 0, f"hallucinated: '{s}' found" + + + for s in refuse_signals: + if s in text_lower: + return 10, f"correctly refused: '{s}'" + + return 5, "ambiguous — no clear refusal or hallucination" + + +def validate_coding(text): + """Python function with LIS and complexity.""" + text = normalize_text(text) + has_def = 'def ' in text + has_return = 'return' in text + has_lis = any(w in text.lower() for w in ['subsequence', 'lis', 'longest']) + has_complexity = any(w in text.lower() for w in ['o(n', 'n log n', 'n²', 'n^2', 'complexity']) + + if has_def and has_return and has_lis and has_complexity: + return 10, "function correct with complexity" + + score = (3 if has_def else 0) + (1 if has_return else 0) + \ + (2 if has_lis else 0) + (2 if has_complexity else 0) + + return min(score, 9), f"def={has_def} lis={has_lis} complexity={has_complexity}" + + +def validate_context(text, expected_phrase): + """ + Fuzzy match for context tests. + Semantically correct answers pass even with different phrasing. + """ + text = normalize_text(text).lower() + expected = expected_phrase.lower() + + # Exact match + if expected in text: + return 10, "exact match" + + if FUZZY_AVAILABLE: + partial = fuzz.partial_ratio(expected, text) + token = fuzz.token_set_ratio(expected, text) + best = max(partial, token) + + if best >= 90: return 10, f"fuzzy match {best}%" + if best >= 80: return 9, f"fuzzy match {best}%" + if best >= 70: return 7, f"partial match {best}%" + if best >= 55: return 5, f"weak match {best}%" + return max(0, int(best / 12)), f"poor match {best}%" + + # Fallback token matching + key_words = [w for w in expected.split() if len(w) > 3] + if not key_words: + return 5, "no key words to match" + matches = sum(1 for w in key_words if w in text) + return int((matches / len(key_words)) * 10), f"{matches}/{len(key_words)} tokens" + +def validate_agent(text): + text_lower = normalize_text(text).lower() + sub_16gb = [ + "rtx 4070 ti", "rtx 4070", "rtx 3060", "rtx 3070", + "rtx 4060", "rx 6700", "rx 7700", "rx 6600", + "12gb", "10gb", "8gb vram", + ] + for gpu in sub_16gb: + if gpu in text_lower: + return 2, f"sub-16GB GPU found: '{gpu}'" + # No bad GPU — let judge evaluate quality + return 7, "no sub-16GB GPU — judge for quality" + +# ============================================ +# DISPATCHER +# ============================================ + +VALIDATOR_MAP = { + "tool_calling": validate_tool_calling, + "yaml_generation": validate_yaml, + "structured": validate_json_output, + "json_schema": validate_json_schema, + "artifact_mermaid": validate_mermaid, + "compression": validate_compression, + "multi_step_agent": validate_multi_step, + "hallucination": validate_hallucination, + "coding": validate_coding, + "agent": validate_agent, + "context_begin": lambda t: validate_context(t, "Project Aurora"), + "context_middle": lambda t: validate_context(t, "2.4 million"), + "context_end": lambda t: validate_context(t, "Nexora Systems"), +} + + +def run_validator(test_name, raw_output): + """ + Run deterministic validator for test_name. + Returns (score, skip_judge, notes). + skip_judge=True when score is 0 or 10 (definitive). + Returns (None, False, "no validator") for tests with no validator. + """ + if test_name not in VALIDATOR_MAP: + return None, False, "no validator" + + validator = VALIDATOR_MAP[test_name] + score, notes = validator(raw_output) + skip_judge = score in [0, 10] + + return score, skip_judge, notes \ No newline at end of file