From ab7875303e34378f6fc30079125afdf3eb7dd92e Mon Sep 17 00:00:00 2001 From: Raul Costa Date: Fri, 15 May 2026 16:50:26 +0100 Subject: [PATCH] RC: (add) python script files --- config.py | 100 +++++++++++ judge.py | 178 +++++++++++++++++++ main.py | 299 ++++++++++++++++++++++++++++++++ prompts.py | 388 +++++++++++++++++++++++++++++++++++++++++ reporting.py | 170 ++++++++++++++++++ runner.py | 276 +++++++++++++++++++++++++++++ scoring.py | 193 +++++++++++++++++++++ storage.py | 279 ++++++++++++++++++++++++++++++ validators.py | 467 ++++++++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 2350 insertions(+) create mode 100644 config.py create mode 100644 judge.py create mode 100644 main.py create mode 100644 prompts.py create mode 100644 reporting.py create mode 100644 runner.py create mode 100644 scoring.py create mode 100644 storage.py create mode 100644 validators.py diff --git a/config.py b/config.py new file mode 100644 index 0000000..787519d --- /dev/null +++ b/config.py @@ -0,0 +1,100 @@ +""" +benchmark_v4/config.py +====================== +All configuration in one place. Edit this file to change models, +weights, judge, and runtime settings. +""" + +# ============================================ +# MODELS +# ============================================ + +MODELS_BASELINE_DIRECT = [ + "granite4.1:8b", + "qwen2.5-coder:14b", + +] + +MODELS_BASELINE_THINKING = [ + "nemotron-3-nano:4b", + "gemma4:e4b", +] + +MODELS_NEW_DIRECT = [] +MODELS_NEW_THINKING = [] + +# ============================================ +# JUDGE + EMBEDDINGS +# ============================================ + +JUDGE_MODEL = "qwen2.5:14b" +EMBED_MODEL = "nomic-embed-text" +OLLAMA_URL = "http://localhost:11434" + +# ============================================ +# RUNTIME +# ============================================ + +COOLDOWN_SECONDS = 20 # between tests (thermal normalization) +GPU_POLL_EVERY = 3 # poll nvidia-smi every N tests (0 = every test) + +# ============================================ +# TEST WEIGHTS (must sum to 1.0) +# ============================================ + +TEST_WEIGHTS = { + # Agent / tool reliability — 25% + "tool_calling": 0.13, + "multi_step_agent": 0.12, + + # Coding / infrastructure — 25% + "coding": 0.10, + "yaml_generation": 0.08, + "artifact_mermaid": 0.04, + "json_schema": 0.03, + + # RAG / context fidelity — 20% + "rag": 0.07, + "context_begin": 0.04, + "context_middle": 0.05, + "context_end": 0.04, + + # Structured outputs — 15% + "structured": 0.08, + "compression": 0.07, + + # Hallucination resistance — 10% + "hallucination": 0.10, + + # Pure reasoning — 5% + "reasoning": 0.03, + "agent": 0.01, + "math": 0.01, +} + +assert abs(sum(TEST_WEIGHTS.values()) - 1.0) < 0.001, "Weights must sum to 1.0" + +# Category groupings for category-level scores +CATEGORIES = { + "agent_tool": ["tool_calling", "multi_step_agent"], + "coding": ["coding", "yaml_generation", "artifact_mermaid", "json_schema"], + "rag_context": ["rag", "context_begin", "context_middle", "context_end"], + "structured": ["structured", "compression"], + "hallucination": ["hallucination"], + "reasoning": ["reasoning", "agent", "math"], +} + +# Compliance groups — pass if semantic_score >= 8 +COMPLIANCE_GROUPS = { + "json_valid": ["structured", "json_schema"], + "yaml_valid": ["yaml_generation"], + "tool_format": ["tool_calling", "multi_step_agent"], + "hallucination_free": ["hallucination"], +} + +# Context files +CONTEXT_FILE = "./rag_samples/context_test.md" +RAG_FILE = "./rag_samples/note1.md" + +# Database +DB_FILE = "benchmark_v4.db" diff --git a/judge.py b/judge.py new file mode 100644 index 0000000..af4c8a4 --- /dev/null +++ b/judge.py @@ -0,0 +1,178 @@ +""" +benchmark_v4/judge.py +===================== +Layer 2: Semantic judge (rubric-based). +Layer 3: Embedding similarity via Ollama nomic-embed-text. + +Judge is only called when validator gives a partial score. +Embedding similarity used for RAG test. +""" + +import re +import json +import math +import requests +from config import JUDGE_MODEL, EMBED_MODEL, OLLAMA_URL +from prompts import JUDGE_RUBRICS, GROUND_TRUTHS, DEFAULT_RUBRIC + + +# ============================================ +# JUDGE PROMPT TEMPLATE +# ============================================ + +JUDGE_PROMPT_TEMPLATE = """You are a strict benchmark judge. Score from 0-10. + +ABSOLUTE RULES: +1. Judge ONLY what the prompt asked for. Nothing else. +2. NEVER penalise for missing information not requested. +3. NEVER penalise for being concise — brevity is correct. +4. NEVER invent requirements. Only the prompt counts. +5. Minimal correct answers score 8-10. +6. Extra unnecessary content scores lower, not higher. + +SCORING: +10 = perfect | 8 = correct, trivial issue | 6 = mostly correct +4 = partial | 2 = major error | 0 = wrong or hallucination + +TEST: {test_name} +WHAT TO JUDGE: {rubric} +GROUND TRUTH: {ground_truth} + +PROMPT (what was asked): +{prompt} + +OUTPUT (what model answered): +{output} + +Return ONLY this JSON on one line, nothing else: +{{"semantic_score": <0-10>, "reason": ""}}""" + + +# ============================================ +# JUDGE WARMUP +# ============================================ + +def warmup_judge(): + """Load judge model without generating output.""" + print(f" Warming up judge: {JUDGE_MODEL}") + try: + requests.post( + f"{OLLAMA_URL}/api/generate", + json={ + "model": JUDGE_MODEL, + "prompt": "hi", + "stream": False, + "options": {"num_predict": 1} + }, + timeout=120 + ) + except Exception as e: + print(f" Judge warmup error: {e}") + + +# ============================================ +# JUDGE CALL +# ============================================ + +def call_judge(test_name, prompt, output): + """ + Call LLM judge with strict rubric. + Returns (semantic_score 0-10, reason str). + Falls back to midpoint (5) on failure to avoid corrupting results. + """ + rubric = JUDGE_RUBRICS.get(test_name, DEFAULT_RUBRIC) + ground_truth = GROUND_TRUTHS.get(test_name, "See prompt requirements.") + + judge_prompt = JUDGE_PROMPT_TEMPLATE.format( + test_name=test_name, + rubric=rubric, + ground_truth=ground_truth, + prompt=prompt[:500], + output=output[:1500], + ) + + try: + response = requests.post( + f"{OLLAMA_URL}/api/generate", + json={"model": JUDGE_MODEL, "prompt": judge_prompt, "stream": False}, + timeout=180 + ) + raw = response.json().get("response", "").strip() + + # Try clean JSON parse + m = re.search(r'\{[^{}]*"semantic_score"[^{}]*\}', raw) + if m: + try: + parsed = json.loads(m.group()) + score = max(0, min(10, int(parsed.get("semantic_score", 5)))) + reason = str(parsed.get("reason", ""))[:80] + return score, reason + except (json.JSONDecodeError, ValueError): + pass + + # Fallback: extract score number + sm = re.search(r'"semantic_score"\s*:\s*(\d+)', raw) + rm = re.search(r'"reason"\s*:\s*"([^"]{1,80})"', raw) + if sm: + score = max(0, min(10, int(sm.group(1)))) + reason = rm.group(1) if rm else "extracted" + return score, reason + + # Last resort + last = re.search(r'semantic_score[^\d]*(\d+)', raw, re.IGNORECASE) + if last: + return max(0, min(10, int(last.group(1)))), "score extracted" + + print(f" Judge unparseable: {raw[:80]}") + return 5, "judge unparseable — midpoint" + + except requests.exceptions.Timeout: + return 5, "judge timeout — midpoint" + except Exception as e: + return 5, f"judge error — midpoint" + + +# ============================================ +# EMBEDDING SIMILARITY +# ============================================ + +def get_embedding(text): + """Get embedding vector from nomic-embed-text via Ollama.""" + try: + r = requests.post( + f"{OLLAMA_URL}/api/embed", + json={"model": EMBED_MODEL, "input": text[:2000]}, + timeout=30 + ) + return r.json().get("embeddings", [[]])[0] + except Exception: + return [] + + +def cosine_similarity(v1, v2): + if not v1 or not v2 or len(v1) != len(v2): + return 0.0 + dot = sum(a * b for a, b in zip(v1, v2)) + mag = math.sqrt(sum(a**2 for a in v1)) * math.sqrt(sum(b**2 for b in v2)) + return dot / mag if mag else 0.0 + + +def embedding_score(text, reference): + """ + Score 0-10 based on cosine similarity. + Uses stepped mapping for better discrimination. + """ + if not text or not reference: + return 0 + + v1 = get_embedding(text[:1000]) + v2 = get_embedding(reference) + sim = cosine_similarity(v1, v2) + + # Stepped mapping — more discriminating than linear + if sim >= 0.92: return 10 + if sim >= 0.85: return 8 + if sim >= 0.78: return 6 + if sim >= 0.70: return 4 + if sim >= 0.60: return 2 + return 0 diff --git a/main.py b/main.py new file mode 100644 index 0000000..8a50629 --- /dev/null +++ b/main.py @@ -0,0 +1,299 @@ +""" +benchmark_v4/main.py +==================== +Entry point. CLI argument parsing and orchestration. + +Usage: + python3 main.py # run all baseline models + python3 main.py --test-all # auto-discover and test all ollama models + python3 main.py --mode baseline # baseline only + python3 main.py --mode new # new models only + python3 main.py --model granite4.1:8b # single model + python3 main.py --runs 3 # variance analysis + python3 main.py --no-cooldown # fast run (no thermal wait) + python3 main.py --report # show reports of latest run + python3 main.py --report --report-best # show best run per model + python3 main.py --export # export CSV from DB +""" + +import argparse +import sys +import subprocess +import requests + +from config import ( + MODELS_BASELINE_DIRECT, MODELS_BASELINE_THINKING, + MODELS_NEW_DIRECT, MODELS_NEW_THINKING, + JUDGE_MODEL, EMBED_MODEL, DB_FILE, OLLAMA_URL, +) +from storage import init_db, load_latest_runs, export_summary_csv +from prompts import build_all_prompts +from runner import run_benchmark +from reporting import ( + print_weights, print_comparison, + print_full_ranking, print_category_breakdown, + print_compliance_table, run_report +) + +try: + import yaml + YAML_AVAILABLE = True +except ImportError: + YAML_AVAILABLE = False + +try: + from rapidfuzz import fuzz + FUZZY_AVAILABLE = True +except ImportError: + FUZZY_AVAILABLE = False + + +# ============================================ +# THINKING MODEL DETECTION +# ============================================ + +def detect_thinking_model(model_name): + """ + Detect if a model supports thinking mode via Ollama capabilities API. + Uses /api/show and checks for 'thinking' in capabilities array. + Fast — single API call, no generation needed. + """ + try: + r = requests.post( + f"{OLLAMA_URL}/api/show", + json={"name": model_name}, + timeout=10 + ) + caps = r.json().get("capabilities", []) + return "thinking" in caps + except Exception: + return False + + +# ============================================ +# MAIN +# ============================================ + +def main(): + parser = argparse.ArgumentParser( + description="LLM Benchmark V4 — Modular, SQLite-backed", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python3 main.py # full baseline run + python3 main.py --test-all # auto-discover all ollama models + python3 main.py --model granite4.1:8b # single model + python3 main.py --mode new # new models only + python3 main.py --runs 3 # variance analysis (3 runs) + python3 main.py --no-cooldown # fast run, no thermal wait + python3 main.py --report # show latest run reports + python3 main.py --report --report-best # show best run per model + python3 main.py --export # export CSV from DB + """ + ) + + parser.add_argument( + "--test-all", action="store_true", default=False, + help="Auto-discover and benchmark all models in ollama list" + ) + parser.add_argument( + "--mode", choices=["baseline", "new", "all"], + default="all", + help="Which model group to run (default: all)" + ) + parser.add_argument( + "--model", type=str, default=None, + help="Run a single model by Ollama tag" + ) + parser.add_argument( + "--thinking", action="store_true", default=False, + help="Override: mark single --model as thinking type" + ) + parser.add_argument( + "--runs", type=int, default=1, + help="Number of runs per model for variance analysis (default: 1)" + ) + parser.add_argument( + "--no-cooldown", action="store_true", default=False, + help="Skip cooldown between tests (faster but no thermal normalization)" + ) + parser.add_argument( + "--report", action="store_true", default=False, + help="Show ranking reports from DB without running any models" + ) + parser.add_argument( + "--report-best", action="store_true", default=False, + help="Show best run per model instead of latest (use with --report)" + ) + parser.add_argument( + "--export", action="store_true", default=False, + help="Export latest results to benchmark_summary.csv and exit" + ) + + args = parser.parse_args() + + # Init database + init_db() + + # ── Report / export only modes ───────────────────────────────── + # Must come before benchmark logic + if args.report or args.report_best: + print_full_ranking(best=args.report_best) + print_category_breakdown() + print_compliance_table() + export_summary_csv() + return + + if args.export: + export_summary_csv() + return + + # ── Setup ────────────────────────────────────────────────────── + existing_baseline = load_latest_runs(is_baseline=True) + all_prompts = build_all_prompts() + + print(f"\nLLM Benchmark V4") + print(f"Judge: {JUDGE_MODEL}") + print(f"Embed: {EMBED_MODEL}") + print(f"DB: {DB_FILE}") + print(f"Runs: {args.runs}") + print(f"Fuzzy: {FUZZY_AVAILABLE} | YAML: {YAML_AVAILABLE}") + print(f"Cooldown:{'disabled' if args.no_cooldown else 'enabled'}") + print(f"Previous baseline runs: {len(existing_baseline)}") + + print_weights() + + all_new_run_ids = [] + + def _run(models, label, baseline): + ids = run_benchmark( + models=models, + label=label, + is_baseline=baseline, + all_prompts=all_prompts, + num_runs=args.runs, + no_cooldown=args.no_cooldown, + ) + all_new_run_ids.extend(ids) + + # ── Auto-discover all Ollama models ──────────────────────────── + if args.test_all: + result = subprocess.run( + ["ollama", "list"], + capture_output=True, text=True + ) + + discovered = [] + for line in result.stdout.strip().split('\n')[1:]: + parts = line.split() + if parts: + model_name = parts[0] + skip = [EMBED_MODEL, JUDGE_MODEL, "nomic-embed-text"] + if not any(s in model_name for s in skip): + discovered.append(model_name) + + if not discovered: + print("No models found in ollama list.") + return + + # Auto-detect thinking capability for each model + print(f"\nDetecting model capabilities...") + model_info = {} + for m in discovered: + is_thinking = detect_thinking_model(m) + is_baseline = m in (MODELS_BASELINE_DIRECT + MODELS_BASELINE_THINKING) + model_info[m] = { + "thinking": is_thinking, + "is_baseline": is_baseline, + "label": "thinking" if is_thinking else "direct", + } + tag = "🧠" if is_thinking else "⚡" + base = "★" if is_baseline else " " + print(f" {tag}{base} {m}") + + print() + + # Run baseline models first, then new + baseline_models = [m for m in discovered if model_info[m]["is_baseline"]] + new_models = [m for m in discovered if not model_info[m]["is_baseline"]] + + if baseline_models: + print("=" * 50) + print(" KNOWN BASELINE MODELS") + print("=" * 50) + for m in baseline_models: + _run([m], model_info[m]["label"], True) + + if new_models: + print("=" * 50) + print(" NEW / UNKNOWN MODELS") + print("=" * 50) + for m in new_models: + _run([m], model_info[m]["label"], False) + + print_comparison(all_new_run_ids, existing_baseline) + run_report() + return + + # ── Single model mode ────────────────────────────────────────── + if args.model: + # Auto-detect thinking unless --thinking flag explicitly set + if args.thinking: + label = "thinking" + else: + label = "thinking" if detect_thinking_model(args.model) else "direct" + + is_baseline = args.model in ( + MODELS_BASELINE_DIRECT + MODELS_BASELINE_THINKING + ) + print(f"\nSingle model: {args.model} [{label}] baseline={is_baseline}") + _run([args.model], label, is_baseline) + + # ── Baseline models ──────────────────────────────────────────── + elif args.mode in ["baseline", "all"]: + if MODELS_BASELINE_DIRECT: + print("\n" + "=" * 50) + print(" BASELINE — DIRECT") + print("=" * 50) + _run(MODELS_BASELINE_DIRECT, "direct", True) + + if MODELS_BASELINE_THINKING: + print("\n" + "=" * 50) + print(" BASELINE — THINKING") + print("=" * 50) + _run(MODELS_BASELINE_THINKING, "thinking", True) + + if args.mode == "all": + if MODELS_NEW_DIRECT: + print("\n" + "=" * 50) + print(" NEW — DIRECT") + print("=" * 50) + _run(MODELS_NEW_DIRECT, "direct", False) + + if MODELS_NEW_THINKING: + print("\n" + "=" * 50) + print(" NEW — THINKING") + print("=" * 50) + _run(MODELS_NEW_THINKING, "thinking", False) + + # ── New models only ──────────────────────────────────────────── + elif args.mode == "new": + if MODELS_NEW_DIRECT: + print("\n" + "=" * 50) + print(" NEW — DIRECT") + print("=" * 50) + _run(MODELS_NEW_DIRECT, "direct", False) + + if MODELS_NEW_THINKING: + print("\n" + "=" * 50) + print(" NEW — THINKING") + print("=" * 50) + _run(MODELS_NEW_THINKING, "thinking", False) + + # ── Final reports ────────────────────────────────────────────── + print_comparison(all_new_run_ids, existing_baseline) + run_report() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/prompts.py b/prompts.py new file mode 100644 index 0000000..4ad4ea6 --- /dev/null +++ b/prompts.py @@ -0,0 +1,388 @@ +""" +benchmark_v4/prompts.py +======================= +All prompts and ground truths in one place. + +BENCHMARK PURPOSE (explicit): + This benchmark measures operational reliability for agentic and + automated pipelines. It rewards: format obedience, structured output + correctness, tool call precision, and hallucination resistance. + It intentionally penalises verbosity, creative deviation, and + formatting non-compliance. It is NOT a general intelligence benchmark. + +Changes in this version: + - Agent prompt rewritten with explicit constraints (better differentiation) + - Structured JSON upgraded to nested array (stronger discriminator) + - ALL_TESTS is now explicit ordered list (not dict.keys()) + - Ground truths reformulated as criteria (avoid judge anchoring) + - JSON Schema validator criteria made explicit +""" + +import os +from config import CONTEXT_FILE, RAG_FILE + +# ============================================ +# STATIC PROMPTS +# ============================================ + +PROMPTS = { + "reasoning": """Be concise. Maximum 150 words. + +Solve step by step: +A company has 120 employees. +30% are remote. Of those, 25% are contractors. +How many full-time remote employees?""", + + "coding": """Be concise. Maximum 150 words. + +Write a Python function that returns the Longest Increasing Subsequence. +Explain time complexity.""", + + # UPGRADED: explicit constraints force meaningful differentiation + "agent": """Maximum 120 words. You must: +1. Identify 2-3 specific GPU candidates by name +2. Compare their VRAM capacity and memory bandwidth +3. Acknowledge 2026 pricing uncertainty explicitly +4. Name at least one benchmarking source (e.g. LLM Perf, VRAM calculator) +5. Exclude any GPU with less than 16GB VRAM + +Task: Find the best GPU under 500€ for running local LLMs in 2026.""", + + "math": """Be concise. Maximum 100 words. + +Solve step by step: +If 3 machines take 5 minutes to make 3 widgets, +how long would 100 machines take to make 100 widgets?""", + + "hallucination": """Be concise. Maximum 100 words. + +Does the book "The Silent Code of Mars" by John Kessler (2022) exist? +If you are not 100% certain, say so clearly. +Do NOT describe or summarise it if you have any doubt.""", + + # UPGRADED: nested structure, typed fields — much stronger discriminator + "structured": """Return ONLY valid JSON. No explanation. No markdown fences. + +{ + "recommendations": [ + { + "gpu": "", + "price_eur": 0, + "vram_gb": 0, + "pros": [], + "cons": [] + } + ] +} + +Question: Best GPU under 500€ for local LLMs. +Return exactly 2 recommendations.""", + + "tool_calling": """You have: web_search(query: str), scrape_page(url: str), calculate(expression: str) + +Return ONLY the single tool call needed: +"What is the best local LLM for 16GB VRAM?" + +Example format: web_search("your query here") +No explanation. No other text.""", + + "compression": """Compress this into EXACTLY 10 bullet points. Start each with "- ". +Preserve key statistics. No extra text before or after the bullets. + +AI improved: healthcare (cancer detection = radiologist accuracy, drug discovery months not years), +finance (30% fraud reduction), transport (15-20% fuel savings), manufacturing (40% downtime reduction), +education (10-15% graduation improvement), energy (20-30% building savings), agriculture (25-30% water reduction).""", + + "yaml_generation": """Return ONLY valid YAML. No explanation. No markdown fences. + +Create a Kubernetes Deployment: +name is my-app +image is nginx:1.25 +replicas is 2 +containerPort is 80 +memory limit is 256Mi +cpu limit is 250m +readinessProbe uses httpGet on path /healthz port 80""", + + "artifact_mermaid": """Return ONLY a Mermaid flowchart code block (with opening and closing fences). +No explanation before or after. + +Stages in order: Code Push, Lint, Unit Tests, Build, Integration Tests, Deploy Staging, Smoke Test, Deploy Production""", + + "multi_step_agent": """Tools: web_search(query: str), scrape_page(url: str), summarize(text: str) + +Show exactly 3 chained tool calls then a final answer for: +"Top 3 most downloaded Python packages this month" + +Format: +1. web_search("...") +2. scrape_page("...") +3. summarize("...") +Final: [answer]""", + + "json_schema": """Return ONLY valid JSON Schema. No explanation. + +Schema for: +- apiVersion: string, required +- kind: string, required, enum: [Deployment, Service, ConfigMap] +- metadata: object, required, properties: name (string, required), namespace (string, required) +- spec: object, required, additionalProperties: true""", +} + +# ============================================ +# EXPLICIT TEST ORDERING +# Never use dict.keys() — order must be stable +# for CSV consistency and longitudinal comparisons. +# ============================================ + +ALL_TESTS = [ + # Reasoning (5%) + "reasoning", + "math", + "agent", + # Coding / Infrastructure (25%) + "coding", + "yaml_generation", + "artifact_mermaid", + "json_schema", + # Structured outputs (15%) + "structured", + "compression", + # Agent / Tool (25%) + "tool_calling", + "multi_step_agent", + # Hallucination (10%) + "hallucination", + # RAG / Context (20%) + "rag", + "context_begin", + "context_middle", + "context_end", +] + +# ============================================ +# GROUND TRUTHS — criteria-based, not canonical +# Avoid embedding exact phrasing to prevent +# judge anchoring and over-literal scoring. +# ============================================ + +GROUND_TRUTHS = { + "reasoning": ( + "Correct answer is 27 full-time remote employees. " + "Verify: calculation uses 30% of 120 = 36 remote, " + "then 25% of 36 = 9 contractors, so 36-9 = 27. " + "Award full marks if logic is correct even if phrased differently." + ), + "coding": ( + "A working Python function that implements LIS. " + "Should mention O(n²) for basic DP approach. " + "Bonus if O(n log n) with binary search is mentioned. " + "Do not penalise for code style choices." + ), + "agent": ( + "Must name at least 2 specific GPUs (e.g. RTX 4060 Ti, RX 7600 XT). " + "Must compare VRAM — only GPUs with 16GB+ should be recommended. " + "Must acknowledge pricing uncertainty for 2026. " + "Must name a benchmarking source. " + "Score 0 if recommends GPUs under 16GB VRAM." + ), + "math": ( + "Correct answer is 5 minutes. " + "Key insight: each machine independently makes 1 widget in 5 minutes, " + "so 100 machines make 100 widgets in the same 5 minutes. " + "Award marks if the rate-independence reasoning is clearly stated." + ), + "hallucination": ( + "The book does not exist. " + "Full marks: model refuses or clearly states it cannot verify existence. " + "Zero marks: model describes the book's plot, themes, or content as if real. " + "Partial marks: model hedges without clear refusal." + ), + "structured": ( + "Must return valid JSON with a 'recommendations' array containing exactly 2 objects. " + "Each object must have: gpu (string), price_eur (number), vram_gb (number), " + "pros (array of strings), cons (array of strings). " + "Score based on: valid JSON structure, correct field types, 2 recommendations present. " + "Do not score on quality of GPU choices." + ), + "tool_calling": ( + "Must return exactly one function call in the format: name(\"query\"). " + "No explanation before or after. " + "Correct function names: web_search, scrape_page, or calculate. " + "Score 0 if any text accompanies the call." + ), + "compression": ( + "Must have exactly 10 bullet points starting with '- '. " + "All 7 industries must appear: healthcare, finance, transport, " + "manufacturing, education, energy, agriculture. " + "Key statistics must be preserved where mentioned in source." + ), + "yaml_generation": ( + "Must be parseable YAML. " + "Must include: kind=Deployment, name=my-app, image=nginx:1.25, " + "replicas=2, containerPort=80, memory limit 256Mi, cpu limit 250m, " + "readinessProbe httpGet /healthz port 80. " + "Do not penalise for additional valid YAML fields not specified." + ), + "artifact_mermaid": ( + "Must be a valid Mermaid code block with opening and closing fences. " + "Must include all 8 stages: Code Push, Lint, Unit Tests, Build, " + "Integration Tests, Deploy Staging, Smoke Test, Deploy Production. " + "Stages should appear in the correct pipeline order." + ), + "multi_step_agent": ( + "Must show 3 distinct tool calls using different functions. " + "Preferred sequence: web_search → scrape_page → summarize. " + "Must end with 'Final: [answer]'. " + "Score based on: correct tool names, distinct calls, final answer present." + ), + "json_schema": ( + "Must be valid JSON Schema (parseable JSON). " + "Must define: apiVersion as string required, " + "kind as string required with enum [Deployment, Service, ConfigMap], " + "metadata as object required with name and namespace as string properties, " + "spec as object required with additionalProperties allowed. " + "Award marks proportionally to how many of these are correctly specified." + ), + "context_begin": "The project name is Project Aurora.", + "context_middle": "The budget allocated to Phase 2 is $2.4 million.", + "context_end": "The selected vendor is Nexora Systems (Vendor B).", + "rag": ( + "A structured summary that covers the main topics in the provided notes. " + "Should be under 200 words. " + "Should preserve key facts without inventing new information. " + "Do not penalise for including accurate details from the source." + ), +} + +# ============================================ +# JUDGE RUBRICS (per test — what to evaluate) +# Criteria-based, not answer-anchored. +# ============================================ + +JUDGE_RUBRICS = { + "reasoning": ( + "Check: Is the final number 27? Are the three calculation steps " + "(120×0.30, 36×0.25, 36-9) present and correct? Is it within 150 words?" + ), + "agent": ( + "Check each requirement: " + "(1) At least 2 named GPU models? " + "(2) VRAM and bandwidth compared? " + "(3) 2026 pricing uncertainty acknowledged? " + "(4) Benchmarking source named? " + "(5) No GPU under 16GB VRAM recommended? " + "Score 2 points per requirement met (max 10). " + "Score 0 if any GPU under 16GB is recommended." + ), + "math": ( + "Check: Is the answer 5 minutes? " + "Does the explanation correctly state that each machine's rate " + "is independent of quantity? Is it within 100 words?" + ), + "rag": ( + "Check: Does it cover the main topics from the notes? " + "Is it under 200 words? " + "Does it avoid inventing facts not in the source? " + "Is it clearly structured?" + ), +} + +DEFAULT_RUBRIC = ( + "Check whether the output correctly fulfils all requirements stated " + "in the original prompt. Score based on correctness and completeness, " + "not on style or verbosity beyond what the prompt requires." +) + + +# ============================================ +# DYNAMIC PROMPT BUILDERS +# ============================================ + +def ensure_context_file(): + os.makedirs("./rag_samples", exist_ok=True) + if os.path.exists(CONTEXT_FILE): + return + content = """# Project Aurora — Strategic Initiative Report + +## Executive Summary +Project Aurora is a digital transformation initiative launched January 2024. +Proposed by CTO Maria Chen. Budget: $8.7M over three years. + +## Phase 2 — Cloud Migration +Phase 2 budget allocation: $2.4 million. + +## Vendor Recommendation +Vendor A (CloudScale) — $1.8M, limited EU. +Vendor B (Nexora Systems) — $2.1M, 98% SLA, global. +Vendor C (PrimeHost) — $1.4M, no SOC2. +Vendor D (Stratos) — $2.8M, over budget. + +Final recommendation: proceed with Vendor B (Nexora Systems). +""" + with open(CONTEXT_FILE, "w") as f: + f.write(content) + print(f" Created: {CONTEXT_FILE}") + + +def ensure_rag_file(): + os.makedirs("./rag_samples", exist_ok=True) + if os.path.exists(RAG_FILE): + return + content = """# Homelab Infrastructure Notes + +## K8s Cluster +- 4 nodes, Longhorn storage, Traefik ingress +- FluxCD for GitOps, prune: false on llm namespace +- Namespace llm: Open-WebUI, SearXNG, Crawl4AI, BGE reranker + +## Ollama VM +- hostname: chat.h0melab.uk, IP: 10.0.20.57 +- GPU: RTX 5060 Ti 16GB, port 11434 +- Models: granite4.1:8b, qwen2.5-coder:14b, gemma4:e4b, nemotron-3-nano:4b + +## Services +- Gitea at gitea.int, SSH port 3333 +- Netdata + VictoriaMetrics for monitoring +- Signal bot with Whisper for voice transcription +- wiki-processor auto-generates Obsidian wiki +""" + with open(RAG_FILE, "w") as f: + f.write(content) + print(f" Created: {RAG_FILE}") + + +def build_all_prompts(): + """Return complete prompt dict including dynamic context and RAG prompts.""" + ensure_context_file() + ensure_rag_file() + prompts = dict(PROMPTS) + + # Context prompts + if os.path.exists(CONTEXT_FILE): + with open(CONTEXT_FILE) as f: + context = f.read() + base = ( + "Answer in ONE sentence only. " + "Use ONLY information from the document below. " + "Do not add explanation or context.\n\n" + f"DOCUMENT:\n{context}\n\n" + ) + prompts["context_begin"] = base + "QUESTION: What is the name of the project?" + prompts["context_middle"] = base + "QUESTION: What budget was allocated to Phase 2?" + prompts["context_end"] = base + "QUESTION: Which vendor was selected and what is their company name?" + + # RAG prompt + if os.path.exists(RAG_FILE): + with open(RAG_FILE) as f: + rag_content = f.read() + prompts["rag"] = ( + "Maximum 200 words. Summarize and structure the following notes. " + "Preserve all specific facts (IPs, model names, service names). " + "Do not add information not present in the notes.\n\n" + + rag_content + ) + else: + prompts["rag"] = "Maximum 200 words. Summarize: No RAG file found." + + return prompts \ No newline at end of file diff --git a/reporting.py b/reporting.py new file mode 100644 index 0000000..a05cca8 --- /dev/null +++ b/reporting.py @@ -0,0 +1,170 @@ +""" +benchmark_v4/reporting.py +========================= +All output formatting — terminal reports and CSV export. +Completely separate from scoring and storage logic. +""" + +from storage import load_latest_runs, load_all_runs, export_summary_csv +from config import MODELS_BASELINE_THINKING, MODELS_NEW_THINKING + + +def _tag(model, all_thinking): + return "🧠" if model in all_thinking else "⚡" + + +def _base(row): + return "★" if row.get("is_baseline") else " " + + +def print_weights(): + from config import TEST_WEIGHTS, CATEGORIES + print("\n TEST WEIGHTS:") + category_labels = { + "agent_tool": "Agent/Tool reliability (25%)", + "coding": "Coding/Infrastructure (25%)", + "rag_context": "RAG/Context fidelity (20%)", + "structured": "Structured outputs (15%)", + "hallucination": "Hallucination resistance (10%)", + "reasoning": "Pure reasoning (5%)", + } + for cat, tests in CATEGORIES.items(): + w = sum(TEST_WEIGHTS.get(t, 0) for t in tests) + label = category_labels.get(cat, cat) + print(f" {label:<42} {w*100:.0f}%") + + +def print_comparison(new_run_ids, existing_baseline_rows): + """Compare current run against existing baseline.""" + from storage import load_all_runs, get_connection + + print("\n" + "=" * 68) + print(" 📊 RESULTS vs BASELINE") + print("=" * 68) + + all_thinking = MODELS_BASELINE_THINKING + MODELS_NEW_THINKING + + if existing_baseline_rows: + best = max(float(r.get("weighted_avg") or 0) for r in existing_baseline_rows) + print(f"\n EXISTING BASELINE (best w_avg: {best:.2f}):") + for r in sorted(existing_baseline_rows, key=lambda x: -float(x.get("weighted_avg") or 0)): + print( + f" {r['model']:<44} " + f"w={float(r.get('weighted_avg',0)):>5.2f} " + f"σ={r.get('stdev_all','?'):>4} " + f"fail={r.get('failure_rate_pct','?')}% " + f"[{str(r.get('run_date',''))[:10]}]" + ) + else: + best = 0 + + # Load current runs + if new_run_ids: + from storage import get_connection + placeholders = ",".join("?" * len(new_run_ids)) + with get_connection() as conn: + new_rows = [dict(r) for r in conn.execute( + f"SELECT * FROM runs WHERE id IN ({placeholders})", + new_run_ids + ).fetchall()] + + print(f"\n THIS RUN:") + for r in sorted(new_rows, key=lambda x: -float(x.get("weighted_avg") or 0)): + diff = float(r.get("weighted_avg") or 0) - best + arrow = "▲" if diff > 0.05 else "▼" if diff < -0.05 else "=" + tag = "BASE" if r.get("is_baseline") else "NEW " + print( + f" [{tag}] {r['model']:<40} " + f"w={float(r.get('weighted_avg',0)):>5.2f} {arrow}{abs(diff):.2f} " + f"σ={r.get('stdev_all','?'):>4} " + f"fail={r.get('failure_rate_pct','?')}%" + ) + + +def print_full_ranking(best=False): + """Print complete ranking of all models.""" + from storage import load_best_runs, load_latest_runs + print("\n" + "=" * 68) + title = "BEST RUN" if best else "LATEST RUN" + print(f" 🏆 FULL RANKING ({title} per model, weighted semantic avg)") + print("=" * 68) + + all_thinking = MODELS_BASELINE_THINKING + MODELS_NEW_THINKING + rows = load_best_runs() if best else load_latest_runs() + + for i, r in enumerate(rows, 1): + tag = _tag(r["model"], all_thinking) + base = "★" if r.get("is_baseline") else " " + print( + f" {i:>2}. {tag}{base} {r['model']:<42} " + f"w={float(r.get('weighted_avg',0)):>5.2f} " + f"σ={r.get('stdev_all','?'):>4} " + f"fail={r.get('failure_rate_pct','?'):>4}% " + f"tok/s={r.get('avg_tok_s','?'):>5} " + f"🌡={r.get('avg_gpu_temp','?'):>2}°C " + f"[{str(r.get('run_date',''))[:10]}]" + ) + + print(f"\n ★=baseline w=weighted avg σ=stdev(low better) fail=failure rate 🌡=avg temps") + + +def print_category_breakdown(): + """Print category scores for latest run of each model.""" + print("\n" + "=" * 68) + print(" 📂 CATEGORY BREAKDOWN (latest run per model)") + print("=" * 68) + + rows = load_latest_runs() + if not rows: + return + + header = f" {'Model':<40} {'agent':>6} {'code':>6} {'rag':>6} {'struct':>7} {'hall':>5} {'reason':>7}" + print(f"\n{header}") + print(" " + "-" * 64) + + for r in rows: + base = "★" if r.get("is_baseline") else " " + print( + f" {base}{r['model']:<41} " + f"{r.get('cat_agent_tool','?'):>6} " + f"{r.get('cat_coding','?'):>6} " + f"{r.get('cat_rag_context','?'):>6} " + f"{r.get('cat_structured','?'):>7} " + f"{r.get('cat_hallucination','?'):>5} " + f"{r.get('cat_reasoning','?'):>7}" + ) + + +def print_compliance_table(): + """Print compliance rates for latest run of each model.""" + print("\n" + "=" * 68) + print(" ✅ COMPLIANCE RATES (latest run per model)") + print("=" * 68) + + rows = load_latest_runs() + if not rows: + return + + header = f" {'Model':<44} {'JSON':>6} {'YAML':>6} {'Tool':>6} {'Hall':>6}" + print(f"\n{header}") + print(" " + "-" * 64) + + for r in rows: + base = "★" if r.get("is_baseline") else " " + def fmt(v): + return f"{v}%" if v is not None else " n/a" + print( + f" {base}{r['model']:<43} " + f"{fmt(r.get('compliance_json')):>6} " + f"{fmt(r.get('compliance_yaml')):>6} " + f"{fmt(r.get('compliance_tool')):>6} " + f"{fmt(r.get('compliance_hall')):>6}" + ) + + +def run_report(): + """Full report: ranking + categories + compliance.""" + print_full_ranking() + print_category_breakdown() + print_compliance_table() + export_summary_csv() diff --git a/runner.py b/runner.py new file mode 100644 index 0000000..9955376 --- /dev/null +++ b/runner.py @@ -0,0 +1,276 @@ +""" +benchmark_v4/runner.py +====================== +Executes models via Ollama CLI and orchestrates the benchmark loop. +Handles: warmup, GPU polling, cooldown, multi-run variance. +""" + +import subprocess +import time +import re +import statistics +from datetime import datetime + +from config import ( + COOLDOWN_SECONDS, GPU_POLL_EVERY, + TEST_WEIGHTS, CATEGORIES, +) +from prompts import ALL_TESTS +from validators import normalize_text +from judge import warmup_judge +from scoring import ( + score_test, compute_weighted, compute_category_scores, + compute_compliance, compute_variance_stats +) +from storage import insert_run, insert_details, insert_variance + + +# ============================================ +# GPU MONITORING +# ============================================ + +_gpu_cache = {"temp": -1, "mem": -1, "util": -1, "clock": -1} +_gpu_poll_count = 0 + + +def get_gpu(force=False): + """Poll GPU stats. Cached every GPU_POLL_EVERY tests to reduce overhead.""" + global _gpu_cache, _gpu_poll_count + + _gpu_poll_count += 1 + if not force and GPU_POLL_EVERY > 0 and _gpu_poll_count % GPU_POLL_EVERY != 0: + return _gpu_cache + + try: + result = subprocess.run( + ["nvidia-smi", + "--query-gpu=temperature.gpu,memory.used,utilization.gpu,clocks.sm", + "--format=csv,noheader,nounits"], + capture_output=True, text=True, timeout=5 + ) + temp, mem, util, clock = result.stdout.strip().split(", ") + _gpu_cache = { + "temp": int(temp), "mem": int(mem), + "util": int(util), "clock": int(clock) + } + except Exception: + pass + + return _gpu_cache + + +# ============================================ +# PARSE OLLAMA VERBOSE +# ============================================ + +def parse_generation_speed(output): + """ + Parse GENERATION (eval) speed from Ollama verbose output. + The last tokens/s value is the generation rate. + """ + matches = re.findall(r'(\d+\.\d+)\s+tokens/s', output) + return float(matches[-1]) if matches else None + + +# ============================================ +# RUN SINGLE MODEL + PROMPT +# ============================================ + +def run_model(model, prompt): + """Execute model via Ollama CLI. Returns result dict.""" + start = time.time() + result = subprocess.run( + ["ollama", "run", model, prompt, "--verbose"], + capture_output=True, text=True + ) + elapsed = round(time.time() - start, 2) + gpu = get_gpu() + output = result.stdout + "\n" + result.stderr + + return { + "output": output, + "time": elapsed, + "tok_s": parse_generation_speed(output), + "gpu_temp": gpu["temp"], + "gpu_mem": gpu["mem"], + "gpu_util": gpu["util"], + "gpu_clock": gpu["clock"] + } + + +# ============================================ +# BENCHMARK A GROUP OF MODELS +# ============================================ + +def run_benchmark( + models, + label, + is_baseline, + all_prompts, + num_runs=1, + no_cooldown=False +): + """ + Run benchmark for a list of models. + Returns list of run_ids (one per model). + """ + run_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + run_ids = [] + + for model in models: + # Accumulate across runs + sem_by_test = {t: [] for t in ALL_TESTS} + fmt_by_test = {t: [] for t in ALL_TESTS} + tok_s_all = [] + temp_all = [] + detail_rows = [] + + print(f"\n[{label}] Model: {model} ({num_runs} run{'s' if num_runs > 1 else ''})") + + # Warmup + subprocess.run( + ["ollama", "run", model, "hello"], + capture_output=True, text=True + ) + time.sleep(5) + warmup_judge() + + for run_num in range(1, num_runs + 1): + if num_runs > 1: + print(f"\n ── Run {run_num}/{num_runs} ──") + + for test_name in ALL_TESTS: + prompt = all_prompts.get(test_name, "") + if not prompt or not prompt.strip(): + continue + + result = run_model(model, prompt) + scores = score_test(test_name, prompt, result["output"]) + + sem = scores["semantic_score"] + fmt = scores["format_score"] + + sem_by_test[test_name].append(sem) + fmt_by_test[test_name].append(fmt) + + if result["tok_s"]: + tok_s_all.append(result["tok_s"]) + if result["gpu_temp"] > 0: + temp_all.append(result["gpu_temp"]) + + flag = "J" if scores["used_judge"] else "V" + print( + f" [{run_num}] {test_name:<22} [{flag}] " + f"sem={sem:>2}/10 fmt={fmt:>2}/10 " + f"comb={scores['combined_score']:>5.2f} " + f"{scores['notes'][:52]}" + ) + + detail_rows.append({ + "run_date": run_date, + "run_num": run_num, + "model": model, + "type": label, + "is_baseline": 1 if is_baseline else 0, + "test": test_name, + "weight": TEST_WEIGHTS.get(test_name, 0), + "time_s": result["time"], + "tok_s": result["tok_s"], + "gpu_temp": result["gpu_temp"], + "gpu_mem": result["gpu_mem"], + "gpu_util": result["gpu_util"], + "gpu_clock": result["gpu_clock"], + "output_length": len(result["output"]), + "semantic_score":sem, + "format_score": fmt, + "combined_score":scores["combined_score"], + "used_judge": 1 if scores["used_judge"] else 0, + "notes": scores["notes"][:120], + }) + + if not no_cooldown: + time.sleep(COOLDOWN_SECONDS) + + # Aggregate + avg_sem = {t: round(statistics.mean(v), 2) for t, v in sem_by_test.items() if v} + avg_fmt = {t: round(statistics.mean(v), 2) for t, v in fmt_by_test.items() if v} + w_total, w_avg = compute_weighted(avg_sem) + cat_scores = compute_category_scores(avg_sem) + compliance = compute_compliance(sem_by_test) + var_stats = compute_variance_stats(sem_by_test) + fmt_avg = round(statistics.mean([s for v in fmt_by_test.values() for s in v]), 2) if fmt_by_test else 0 + avg_tok = round(statistics.mean(tok_s_all), 1) if tok_s_all else 0 + avg_tmp = round(statistics.mean(temp_all), 1) if temp_all else 0 + + print(f"\n ─── {model} ───") + print(f" Weighted avg: {w_avg} (total={w_total})") + print(f" Format avg: {fmt_avg}/10") + print(f" Variance: mean={var_stats['mean']} σ={var_stats['stdev']} failures={var_stats['failure_rate']}%") + print(f" Compliance: JSON={compliance.get('json_valid')}% YAML={compliance.get('yaml_valid')}% " + f"tool={compliance.get('tool_format')}% hall={compliance.get('hallucination_free')}%") + print(f" Categories: agent={cat_scores.get('agent_tool')} coding={cat_scores.get('coding')} " + f"rag={cat_scores.get('rag_context')} struct={cat_scores.get('structured')} " + f"hall={cat_scores.get('hallucination')} reason={cat_scores.get('reasoning')}") + print(f" tok/s={avg_tok} temp={avg_tmp}°C") + + # Save to DB + run_row = { + "run_date": run_date, + "model": model, + "type": label, + "is_baseline": 1 if is_baseline else 0, + "num_runs": num_runs, + "weighted_total": w_total, + "weighted_avg": w_avg, + "avg_format": fmt_avg, + "mean_all": var_stats["mean"], + "stdev_all": var_stats["stdev"], + "min_score": var_stats["min"], + "max_score": var_stats["max"], + "failure_rate_pct":var_stats["failure_rate"], + "compliance_json": compliance.get("json_valid"), + "compliance_yaml": compliance.get("yaml_valid"), + "compliance_tool": compliance.get("tool_format"), + "compliance_hall": compliance.get("hallucination_free"), + "cat_agent_tool": cat_scores.get("agent_tool"), + "cat_coding": cat_scores.get("coding"), + "cat_rag_context": cat_scores.get("rag_context"), + "cat_structured": cat_scores.get("structured"), + "cat_hallucination":cat_scores.get("hallucination"), + "cat_reasoning": cat_scores.get("reasoning"), + "avg_tok_s": avg_tok, + "avg_gpu_temp": avg_tmp, + "tests_run": len(avg_sem) * num_runs, + } + + run_id = insert_run(run_row) + insert_details(run_id, detail_rows) + + # Variance rows (only if multiple runs) + if num_runs > 1: + var_rows = [] + for test_name, scores_list in sem_by_test.items(): + if len(scores_list) > 1: + var_rows.append({ + "run_date": run_date, + "model": model, + "test": test_name, + "num_runs": num_runs, + "mean": round(statistics.mean(scores_list), 2), + "stdev": round(statistics.stdev(scores_list), 2), + "min_score": min(scores_list), + "max_score": max(scores_list), + "failure_rate_pct":round( + sum(1 for s in scores_list if s <= 2) / len(scores_list) * 100, 1 + ), + "scores_raw": str(scores_list), + }) + if var_rows: + insert_variance(var_rows) + + run_ids.append(run_id) + + print(f"\nCooldown after {model}...\n") + time.sleep(30) + + return run_ids diff --git a/scoring.py b/scoring.py new file mode 100644 index 0000000..46caaa2 --- /dev/null +++ b/scoring.py @@ -0,0 +1,193 @@ +""" +benchmark_v4/scoring.py +======================= +Combines validator, judge, and embedding into final scores. +Computes: format_score, semantic_score, combined_score. +Computes: category scores, weighted total, compliance, variance. +""" + +import re +import statistics +from config import TEST_WEIGHTS, CATEGORIES, COMPLIANCE_GROUPS +from validators import normalize_text, run_validator +from judge import call_judge, embedding_score +from prompts import GROUND_TRUTHS + + +# ============================================ +# FORMAT SCORE +# ============================================ + +def compute_format_score(output, prompt): + """ + Scores format obedience only — separate from semantic quality. + Checks: ANSI codes, word limit, markdown when not requested. + Returns 0-10. + """ + text = normalize_text(output) + score = 10 + + # ANSI escape codes in output (model is polluting its output) + if re.search(r'\x1b\[', output): + score -= 2 + + # Word limit + limit_m = re.search(r'Maximum (\d+) words?', prompt, re.IGNORECASE) + if limit_m: + limit = int(limit_m.group(1)) + words = len(text.split()) + if words > limit * 1.3: + score -= min(3, int((words - limit) / limit * 5)) + + # Markdown when prompt says "No markdown" or "No explanation" + if ("no markdown" in prompt.lower() or "no explanation" in prompt.lower()): + if "```" in text and len(text.split("```")) > 2: + score -= 2 + + return max(0, score) + + +# ============================================ +# COMBINED SCORE +# ============================================ + +def score_test(test_name, prompt, raw_output): + """ + Main scoring pipeline: + 1. Run deterministic validator + 2. If partial, blend with judge + 3. For RAG, blend judge with embedding similarity + 4. Compute format score separately + 5. Combined = semantic * 0.8 + format * 0.2 + + Returns dict with all score components. + """ + # Normalize for quality assessment + clean = normalize_text(raw_output) + + # Format score (always computed, separate dimension) + fmt_score = compute_format_score(raw_output, prompt) + + # Validator + val_score, skip_judge, val_notes = run_validator(test_name, clean) + + if val_score is not None and skip_judge: + # Definitive — 0 or 10 + semantic = val_score + used_judge = False + notes = val_notes + + elif val_score is not None: + # High-confidence tests: trust validator when score >= 8, skip judge + high_confidence = {"compression", "artifact_mermaid", "tool_calling", + "yaml_generation", "multi_step_agent"} + if test_name in high_confidence and val_score >= 8: + semantic = val_score + used_judge = False + notes = val_notes + else: + # Partial validator score — blend with judge (80/20) + j_score, j_reason = call_judge(test_name, prompt, clean) + semantic = round(val_score * 0.8 + j_score * 0.2) + used_judge = True + notes = f"val={val_score} j={j_score} → {j_reason[:55]}" + + elif test_name == "rag": + ref = GROUND_TRUTHS.get("rag", "") + e_sim = embedding_score(clean, ref) + j_score, j_reason = call_judge(test_name, prompt, clean) + # Weight judge more — embedding unreliable for technical content + if e_sim == 0: + semantic = j_score # embedding failed, use judge only + else: + semantic = round(e_sim * 0.3 + j_score * 0.7) + used_judge = True + notes = f"embed={e_sim} j={j_score} → {j_reason[:50]}" + + else: + # Pure judge + j_score, j_reason = call_judge(test_name, prompt, clean) + semantic = j_score + used_judge = True + notes = j_reason[:80] + + # Combined: 80% semantic, 20% format — mathematically correct + combined = round(semantic * 0.8 + fmt_score * 0.2, 2) + + return { + "semantic_score": int(semantic), + "format_score": fmt_score, + "combined_score": combined, + "used_judge": used_judge, + "notes": notes, + } + + +# ============================================ +# WEIGHTED + CATEGORY SCORES +# ============================================ + +def compute_weighted(semantic_scores): + """ + Compute weighted total and average from semantic scores. + Returns (weighted_total, weighted_avg). + """ + total = weight_sum = 0.0 + for test, score in semantic_scores.items(): + w = TEST_WEIGHTS.get(test, 0) + total += (score / 10) * w * 7 + weight_sum += w + if weight_sum == 0: + return 0, 0 + return round(total, 2), round(total / weight_sum, 2) + + +def compute_category_scores(semantic_scores): + """ + Compute average semantic score per category. + Returns dict: {category_name: avg_score}. + """ + cat_scores = {} + for cat, tests in CATEGORIES.items(): + scores = [semantic_scores[t] for t in tests if t in semantic_scores] + cat_scores[cat] = round(sum(scores) / len(scores), 2) if scores else 0 + return cat_scores + + +def compute_compliance(semantic_scores_by_run): + """ + Compliance = % of runs where semantic_score >= 8. + Input: {test_name: [score_run1, score_run2, ...]} + Returns: {group_name: percentage} + """ + compliance = {} + for group, tests in COMPLIANCE_GROUPS.items(): + all_scores = [] + for t in tests: + if t in semantic_scores_by_run: + all_scores.extend(semantic_scores_by_run[t]) + if all_scores: + rate = sum(1 for s in all_scores if s >= 8) / len(all_scores) + compliance[group] = round(rate * 100, 1) + else: + compliance[group] = None + return compliance + + +def compute_variance_stats(scores_by_test): + """ + Compute variance statistics across multiple runs. + Input: {test_name: [score_run1, score_run2, ...]} + Returns: {mean, stdev, min, max, failure_rate_pct} + """ + all_scores = [s for scores in scores_by_test.values() for s in scores] + if not all_scores: + return {"mean": 0, "stdev": 0, "min": 0, "max": 0, "failure_rate": 0} + + return { + "mean": round(statistics.mean(all_scores), 2), + "stdev": round(statistics.stdev(all_scores), 2) if len(all_scores) > 1 else 0, + "min": min(all_scores), + "max": max(all_scores), + "failure_rate": round(sum(1 for s in all_scores if s <= 2) / len(all_scores) * 100, 1), + } diff --git a/storage.py b/storage.py new file mode 100644 index 0000000..5247340 --- /dev/null +++ b/storage.py @@ -0,0 +1,279 @@ +""" +benchmark_v4/storage.py +======================= +SQLite persistence for benchmark results. +Three tables: + - runs: one row per model per benchmark run + - details: one row per test per model per run + - variance: one row per test per model (multi-run stats) + +Query examples: + SELECT model, weighted_avg, stdev_all + FROM runs + WHERE is_baseline = 1 + ORDER BY weighted_avg DESC; + + SELECT model, test, semantic_score + FROM details + WHERE run_id = (SELECT MAX(id) FROM runs WHERE model = 'granite4.1:8b'); +""" + +import sqlite3 +import json +from datetime import datetime +from config import DB_FILE + + +# ============================================ +# SCHEMA +# ============================================ + +SCHEMA = """ +CREATE TABLE IF NOT EXISTS runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_date TEXT NOT NULL, + model TEXT NOT NULL, + type TEXT NOT NULL, + is_baseline INTEGER NOT NULL DEFAULT 0, + num_runs INTEGER NOT NULL DEFAULT 1, + + -- Weighted scores + weighted_total REAL, + weighted_avg REAL, + + -- Format + avg_format REAL, + + -- Variance + mean_all REAL, + stdev_all REAL, + min_score REAL, + max_score REAL, + failure_rate_pct REAL, + + -- Compliance (%) + compliance_json REAL, + compliance_yaml REAL, + compliance_tool REAL, + compliance_hall REAL, + + -- Category scores + cat_agent_tool REAL, + cat_coding REAL, + cat_rag_context REAL, + cat_structured REAL, + cat_hallucination REAL, + cat_reasoning REAL, + + -- Performance + avg_tok_s REAL, + avg_gpu_temp REAL, + + tests_run INTEGER +); + +CREATE TABLE IF NOT EXISTS details ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id INTEGER NOT NULL REFERENCES runs(id), + run_date TEXT NOT NULL, + run_num INTEGER NOT NULL DEFAULT 1, + model TEXT NOT NULL, + type TEXT NOT NULL, + is_baseline INTEGER NOT NULL DEFAULT 0, + + test TEXT NOT NULL, + weight REAL, + time_s REAL, + tok_s REAL, + gpu_temp INTEGER, + gpu_mem INTEGER, + gpu_util INTEGER, + gpu_clock INTEGER, + output_length INTEGER, + + semantic_score INTEGER, + format_score INTEGER, + combined_score REAL, + used_judge INTEGER, + notes TEXT +); + +CREATE TABLE IF NOT EXISTS variance ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_date TEXT NOT NULL, + model TEXT NOT NULL, + test TEXT NOT NULL, + num_runs INTEGER NOT NULL, + mean REAL, + stdev REAL, + min_score INTEGER, + max_score INTEGER, + failure_rate_pct REAL, + scores_raw TEXT +); + +CREATE INDEX IF NOT EXISTS idx_runs_model ON runs(model); +CREATE INDEX IF NOT EXISTS idx_details_run ON details(run_id); +CREATE INDEX IF NOT EXISTS idx_details_model ON details(model); +CREATE INDEX IF NOT EXISTS idx_details_test ON details(test); +""" + + +# ============================================ +# CONNECTION +# ============================================ + +def get_connection(): + conn = sqlite3.connect(DB_FILE) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + return conn + + +def init_db(): + """Create tables if they don't exist.""" + with get_connection() as conn: + conn.executescript(SCHEMA) + + +# ============================================ +# WRITE +# ============================================ + +def insert_run(run_data): + """Insert a run summary row. Returns the run_id.""" + sql = """ + INSERT INTO runs ( + run_date, model, type, is_baseline, num_runs, + weighted_total, weighted_avg, avg_format, + mean_all, stdev_all, min_score, max_score, failure_rate_pct, + compliance_json, compliance_yaml, compliance_tool, compliance_hall, + cat_agent_tool, cat_coding, cat_rag_context, + cat_structured, cat_hallucination, cat_reasoning, + avg_tok_s, avg_gpu_temp, tests_run + ) VALUES ( + :run_date, :model, :type, :is_baseline, :num_runs, + :weighted_total, :weighted_avg, :avg_format, + :mean_all, :stdev_all, :min_score, :max_score, :failure_rate_pct, + :compliance_json, :compliance_yaml, :compliance_tool, :compliance_hall, + :cat_agent_tool, :cat_coding, :cat_rag_context, + :cat_structured, :cat_hallucination, :cat_reasoning, + :avg_tok_s, :avg_gpu_temp, :tests_run + ) + """ + with get_connection() as conn: + cursor = conn.execute(sql, run_data) + return cursor.lastrowid + + +def insert_details(run_id, detail_rows): + """Insert detail rows for a run.""" + sql = """ + INSERT INTO details ( + run_id, run_date, run_num, model, type, is_baseline, + test, weight, time_s, tok_s, + gpu_temp, gpu_mem, gpu_util, gpu_clock, output_length, + semantic_score, format_score, combined_score, used_judge, notes + ) VALUES ( + :run_id, :run_date, :run_num, :model, :type, :is_baseline, + :test, :weight, :time_s, :tok_s, + :gpu_temp, :gpu_mem, :gpu_util, :gpu_clock, :output_length, + :semantic_score, :format_score, :combined_score, :used_judge, :notes + ) + """ + rows = [{**r, "run_id": run_id} for r in detail_rows] + with get_connection() as conn: + conn.executemany(sql, rows) + + +def insert_variance(variance_rows): + """Insert variance rows.""" + sql = """ + INSERT INTO variance ( + run_date, model, test, num_runs, + mean, stdev, min_score, max_score, failure_rate_pct, scores_raw + ) VALUES ( + :run_date, :model, :test, :num_runs, + :mean, :stdev, :min_score, :max_score, :failure_rate_pct, :scores_raw + ) + """ + with get_connection() as conn: + conn.executemany(sql, variance_rows) + + +# ============================================ +# READ +# ============================================ +def load_best_runs(): + """Load best scoring run per model.""" + with get_connection() as conn: + rows = conn.execute(""" + SELECT r.* + FROM runs r + INNER JOIN ( + SELECT model, MAX(weighted_avg) AS best_w + FROM runs + GROUP BY model + ) best ON r.model = best.model + AND r.weighted_avg = best.best_w + ORDER BY r.weighted_avg DESC + """).fetchall() + return [dict(r) for r in rows] + +def load_latest_runs(is_baseline=None): + """Load latest run per model.""" + sql = """ + SELECT r.* + FROM runs r + INNER JOIN ( + SELECT model, MAX(run_date) AS latest + FROM runs + GROUP BY model + ) latest ON r.model = latest.model AND r.run_date = latest.latest + """ + params = [] + if is_baseline is not None: + sql += " WHERE r.is_baseline = ?" + params.append(1 if is_baseline else 0) + + sql += " ORDER BY r.weighted_avg DESC" + + with get_connection() as conn: + rows = conn.execute(sql, params).fetchall() + return [dict(r) for r in rows] + + +def load_all_runs(): + """Load all run summaries.""" + with get_connection() as conn: + rows = conn.execute( + "SELECT * FROM runs ORDER BY run_date DESC" + ).fetchall() + return [dict(r) for r in rows] + + +def load_details_for_run(run_id): + """Load all test details for a specific run.""" + with get_connection() as conn: + rows = conn.execute( + "SELECT * FROM details WHERE run_id = ? ORDER BY test", + (run_id,) + ).fetchall() + return [dict(r) for r in rows] + + +def export_summary_csv(filepath="benchmark_summary.csv"): + """Export latest run per model to CSV for Excel analysis.""" + import csv + rows = load_latest_runs() + if not rows: + print("No runs to export.") + return + + with open(filepath, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=rows[0].keys()) + writer.writeheader() + writer.writerows(rows) + + print(f" Exported {len(rows)} rows to {filepath}") diff --git a/validators.py b/validators.py new file mode 100644 index 0000000..5901991 --- /dev/null +++ b/validators.py @@ -0,0 +1,467 @@ +""" +benchmark_v4/validators.py +========================== +Layer 1: Deterministic validators. +No LLM judge needed. Returns (score 0-10, notes str). +A score of 0 or 10 is definitive — judge is skipped. +Partial scores (1-9) trigger judge blending. +""" + +import re +import json + +try: + import yaml + YAML_AVAILABLE = True +except ImportError: + YAML_AVAILABLE = False + +try: + from rapidfuzz import fuzz + FUZZY_AVAILABLE = True +except ImportError: + FUZZY_AVAILABLE = False + + +# ============================================ +# TEXT NORMALIZATION +# ============================================ + +def normalize_text(text, mode="plain"): + """ + Centralized text cleaning. + mode="plain" — strip ANSI, control chars, ollama stats, thinking tokens + mode="json" — plain + strip markdown fences + mode="yaml" — plain + strip markdown fences + """ + + # 1. Strip ANSI escape sequences FIRST + text = re.sub(r'\x1b(?:[@-Z\\-_]|\[[0-9;?]*[A-Za-z])', '', text) + + # 2. Strip control characters + text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text) + + # 3. Strip Ollama spinner/progress characters + text = re.sub(r'[⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏]+', '', text) + + # 4. Normalize Unicode spaces to regular spaces + text = text.replace('\u202f', ' ').replace('\u00a0', ' ').replace('\u2009', ' ') + + # 5. Strip thinking tokens (AFTER cleaning so regex works cleanly) + text = re.sub(r'.*?', '', text, flags=re.DOTALL) + text = re.sub(r'Thinking\.\.\..*?\.\.\.done thinking\.?', '', text, flags=re.DOTALL) + + # 6. Strip Ollama verbose stats (LAST — after all other cleanup) + + # 6. Strip Ollama verbose stats (LAST — after all other cleanup) + lines = text.split("\n") + text = "\n".join( + l for l in lines if not any(k in l.lower() for k in [ + "total duration:", "load duration:", "prompt eval", + "eval count:", "eval duration:", "eval rate:", "tokens/s", "token(s)" + ]) + ) + + if mode in ("json", "yaml"): + text = re.sub(r'^```(?:json|yaml|)?\s*', '', text, flags=re.MULTILINE) + text = re.sub(r'```\s*$', '', text, flags=re.MULTILINE) + lines = [l for l in text.split('\n') + if not l.strip().startswith('[?') + and not l.strip().startswith('```') + and not re.search(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', l) + and '\x1b' not in l] + text = '\n'.join(lines).strip() + + return text + + +# ============================================ +# JSON EXTRACTION +# ============================================ + +def extract_json_object(text): + """ + Advanced JSON extractor that handles prompt-echoing, + large whitespace blocks, and multiple JSON objects. + """ + # 1. Aggressive normalization to strip fences and leading/trailing junk + text = normalize_text(text, mode="json") + + # 2. Collapse newlines inside JSON strings — fixes mid-value line breaks + text = re.sub(r'\n\s*', ' ', text) + + # 3. Skip the prompt-echo/template if the model repeats it. + keyword = '"recommendations"' + last_keyword_pos = text.rfind(keyword) + + search_start = 0 + if last_keyword_pos != -1: + search_start = text.rfind('{', 0, last_keyword_pos) + if search_start == -1: search_start = 0 + + decoder = json.JSONDecoder() + found_objs = [] + + # 4. Iteratively parse all valid JSON objects starting from search_start + idx = search_start + while idx < len(text): + start = text.find('{', idx) + if start == -1: + break + try: + obj, end = decoder.raw_decode(text, start) + if isinstance(obj, dict): + found_objs.append(obj) + idx = end + except json.JSONDecodeError: + idx = start + 1 + + if not found_objs: + return None + + # 5. Filter for populated answer rather than empty template + for o in reversed(found_objs): + if "recommendations" in o: + recs = o.get("recommendations") + if isinstance(recs, list) and len(recs) > 0: + if any(r.get("gpu") for r in recs if isinstance(r, dict)): + return o + + return found_objs[-1] if found_objs else None + + +# ============================================ +# VALIDATORS +# ============================================ + +def validate_tool_calling(text): + """Single tool call, no extras.""" + text = normalize_text(text) + lines = [l for l in text.split('\n') if l.strip()] + + if len(lines) > 3: + return 0, "multiple lines — explanation added" + + # Valid tool call pattern + if re.search(r'(web_search|scrape_page|calculate)\s*\(["\'].*["\']\)', text): + return 10, "valid tool call syntax" + + if re.search(r'\w+\s*\(["\'].*["\']\)', text): + return 5, "function call but wrong name" + + return 0, "no valid function call found" + + +def validate_yaml(text): + """Must parse as valid YAML Deployment.""" + if not YAML_AVAILABLE: + return 5, "pyyaml not installed" + + text = normalize_text(text, mode="yaml") + lines = [l for l in text.split('\n') + if not l.strip().startswith('[?') + and not l.strip().startswith('```') + and not re.search(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', l) + and '\x1b' not in l] + text = '\n'.join(lines).strip() + try: + parsed = yaml.safe_load(text) + if not isinstance(parsed, dict): + return 3, "parsed but not a dict" + + score = 2 + if parsed.get('kind') == 'Deployment': + score += 2 + if 'spec' in parsed: + score += 2 + spec = parsed['spec'] + if spec.get('replicas') == 2: + score += 1 + if 'apiVersion' in parsed: + score += 1 + if score >= 8: + score = 10 + + return min(score, 10), f"valid YAML score={score}" + + except yaml.YAMLError as e: + return 0, f"invalid YAML: {str(e)[:60]}" + + +def validate_json_output(text): + """ + Nested structured JSON with recommendations array. + Expected: {"recommendations": [{"gpu":"","price_eur":0,"vram_gb":0,"pros":[],"cons":[]}]} + Scores based on: valid JSON, correct structure, field types, 2 recommendations. + """ + parsed = extract_json_object(text) + if parsed is None: + return 0, "no valid JSON object found" + + # Check top-level structure + if "recommendations" not in parsed: + # Fallback: old flat format still gets partial credit + old_fields = ["gpu", "price", "reason"] + present = [k for k in old_fields if k in parsed and str(parsed[k]).strip()] + if present: + return 4, f"flat JSON found (old format), missing nested structure" + return 0, "no recommendations array found" + + recs = parsed["recommendations"] + if not isinstance(recs, list) or len(recs) == 0: + return 2, "recommendations present but empty or not a list" + + required_fields = {"gpu", "price_eur", "vram_gb", "pros", "cons"} + score = 4 # base for having recommendations array + + # Check count + if len(recs) >= 2: + score += 2 + elif len(recs) == 1: + score += 1 + + # Check field completeness on first recommendation + first = recs[0] + present = required_fields & set(first.keys()) + score += int((len(present) / len(required_fields)) * 3) + + # Check type correctness + type_ok = ( + isinstance(first.get("price_eur"), (int, float)) and + isinstance(first.get("vram_gb"), (int, float)) and + isinstance(first.get("pros"), list) and + isinstance(first.get("cons"), list) + ) + if type_ok: + score += 1 + + score = min(score, 10) + return score, f"nested JSON: {len(recs)} recs, fields={list(present)}, types_ok={type_ok}" + + +def validate_json_schema(text): + """Valid JSON Schema with all required properties.""" + parsed = extract_json_object(text) + if parsed is None: + return 0, "no valid JSON Schema found" + + props = parsed.get('properties', {}) + score = 0 + + if 'apiVersion' in props: + score += 2 + if 'kind' in props: + k = props['kind'] + has_enum = 'enum' in k and set(k['enum']) >= {'Deployment', 'Service', 'ConfigMap'} + score += 3 if has_enum else 1 + if 'metadata' in props: + score += 2 + if 'spec' in props: + score += 2 + if parsed.get('required'): + score += 1 + + return min(score, 10), f"JSON Schema score={score}/10" + + +def validate_mermaid(text): + """Valid Mermaid block with all 8 stages.""" + text = normalize_text(text) + stages = [ + "code push", "lint", "unit test", "build", + "integration test", "deploy staging", "smoke test", "deploy production" + ] + text_lower = text.lower() + has_fence = '```mermaid' in text_lower or \ + ('```' in text and ('graph' in text_lower or 'flowchart' in text_lower)) + + if not has_fence: + return 2, "no mermaid fence found" + + found = sum(1 for s in stages if s in text_lower) + score = int((found / len(stages)) * 10) + return score, f"{found}/{len(stages)} stages found" + + +def validate_compression(text): + """Exactly 10 bullets, all 7 industries.""" + text = normalize_text(text) + lines = text.strip().split('\n') + bullets = [l for l in lines if l.strip().startswith('- ')] + count = len(bullets) + + industries = ["healthcare", "financ", "transport", "manufactur", + "education", "energy", "agricultur"] + text_lower = text.lower() + industry_count = sum(1 for i in industries if i in text_lower) + + if count == 10: + score = 5 + industry_count + elif 8 <= count <= 12: + score = 3 + industry_count + elif count == 7 or count == 13: + score = 2 + industry_count # ← add this + else: + score = max(0, 2 - abs(count - 10)) + + return min(score, 10), f"{count} bullets, {industry_count}/7 industries" + + +def validate_multi_step(text): + """3 distinct tool calls + final answer.""" + text = normalize_text(text) + + tool_calls = re.findall( + r'(web_search|scrape_page|summarize)\s*\(["\'][^"\']*["\']\)', text + ) + unique_tools = set(re.findall(r'(web_search|scrape_page|summarize)\s*\(', text)) + has_final = bool(re.search(r'final\s*[:.]', text, re.IGNORECASE)) + distinct = len(set(tool_calls)) + + if distinct >= 3 and len(unique_tools) >= 2 and has_final: + return 10, f"{distinct} distinct calls, final answer present" + elif distinct >= 2 and has_final: + return 7, f"only {distinct} distinct tool calls" + elif distinct >= 1: + return 4, "tool calls found but incomplete" + return 0, "no valid tool calls found" + + +def validate_hallucination(text): + """ + The book does not exist. + Reward uncertainty. Penalize confident invented specifics. + """ + text_lower = text.lower() + + # Good refusal signals + refuse_signals = [ + "does not exist", "doesn't exist", "not exist", + "cannot verify", "can't verify", "cannot confirm", + "not aware", "no record", "not familiar", + "unable to find", "could not find", "not in my", + "not certain", "uncertain", "cannot find", + "don't have", "do not have", "not sure", + "may not exist", "might not exist", "no evidence", + "i cannot", "i can't", "i don't know", + "fictional", "fabricated", "no such book", + "not able to verify", "cannot be verified", + "not a real", "i'm not aware", "i am not aware", + ] + + # Hard hallucination signals — confident invented content + hallucination_signals = [ + "the book explores", "kessler argues", "the main ideas", + "the novel", "in the book", "the author presents", + "the story follows", "themes include", "the book examines", + "kessler's work", "the book describes", + "this book", "the plot", "the protagonist", + ] + for s in hallucination_signals: + if s in text_lower: + return 0, f"hallucinated: '{s}' found" + + + for s in refuse_signals: + if s in text_lower: + return 10, f"correctly refused: '{s}'" + + return 5, "ambiguous — no clear refusal or hallucination" + + +def validate_coding(text): + """Python function with LIS and complexity.""" + text = normalize_text(text) + has_def = 'def ' in text + has_return = 'return' in text + has_lis = any(w in text.lower() for w in ['subsequence', 'lis', 'longest']) + has_complexity = any(w in text.lower() for w in ['o(n', 'n log n', 'n²', 'n^2', 'complexity']) + + if has_def and has_return and has_lis and has_complexity: + return 10, "function correct with complexity" + + score = (3 if has_def else 0) + (1 if has_return else 0) + \ + (2 if has_lis else 0) + (2 if has_complexity else 0) + + return min(score, 9), f"def={has_def} lis={has_lis} complexity={has_complexity}" + + +def validate_context(text, expected_phrase): + """ + Fuzzy match for context tests. + Semantically correct answers pass even with different phrasing. + """ + text = normalize_text(text).lower() + expected = expected_phrase.lower() + + # Exact match + if expected in text: + return 10, "exact match" + + if FUZZY_AVAILABLE: + partial = fuzz.partial_ratio(expected, text) + token = fuzz.token_set_ratio(expected, text) + best = max(partial, token) + + if best >= 90: return 10, f"fuzzy match {best}%" + if best >= 80: return 9, f"fuzzy match {best}%" + if best >= 70: return 7, f"partial match {best}%" + if best >= 55: return 5, f"weak match {best}%" + return max(0, int(best / 12)), f"poor match {best}%" + + # Fallback token matching + key_words = [w for w in expected.split() if len(w) > 3] + if not key_words: + return 5, "no key words to match" + matches = sum(1 for w in key_words if w in text) + return int((matches / len(key_words)) * 10), f"{matches}/{len(key_words)} tokens" + +def validate_agent(text): + text_lower = normalize_text(text).lower() + sub_16gb = [ + "rtx 4070 ti", "rtx 4070", "rtx 3060", "rtx 3070", + "rtx 4060", "rx 6700", "rx 7700", "rx 6600", + "12gb", "10gb", "8gb vram", + ] + for gpu in sub_16gb: + if gpu in text_lower: + return 2, f"sub-16GB GPU found: '{gpu}'" + # No bad GPU — let judge evaluate quality + return 7, "no sub-16GB GPU — judge for quality" + +# ============================================ +# DISPATCHER +# ============================================ + +VALIDATOR_MAP = { + "tool_calling": validate_tool_calling, + "yaml_generation": validate_yaml, + "structured": validate_json_output, + "json_schema": validate_json_schema, + "artifact_mermaid": validate_mermaid, + "compression": validate_compression, + "multi_step_agent": validate_multi_step, + "hallucination": validate_hallucination, + "coding": validate_coding, + "agent": validate_agent, + "context_begin": lambda t: validate_context(t, "Project Aurora"), + "context_middle": lambda t: validate_context(t, "2.4 million"), + "context_end": lambda t: validate_context(t, "Nexora Systems"), +} + + +def run_validator(test_name, raw_output): + """ + Run deterministic validator for test_name. + Returns (score, skip_judge, notes). + skip_judge=True when score is 0 or 10 (definitive). + Returns (None, False, "no validator") for tests with no validator. + """ + if test_name not in VALIDATOR_MAP: + return None, False, "no validator" + + validator = VALIDATOR_MAP[test_name] + score, notes = validator(raw_output) + skip_judge = score in [0, 10] + + return score, skip_judge, notes \ No newline at end of file