""" benchmark_v4/validators.py ========================== Layer 1: Deterministic validators. No LLM judge needed. Returns (score 0-10, notes str). A score of 0 or 10 is definitive — judge is skipped. Partial scores (1-9) trigger judge blending. """ import re import json try: import yaml YAML_AVAILABLE = True except ImportError: YAML_AVAILABLE = False try: from rapidfuzz import fuzz FUZZY_AVAILABLE = True except ImportError: FUZZY_AVAILABLE = False # ============================================ # TEXT NORMALIZATION # ============================================ def normalize_text(text, mode="plain"): """ Centralized text cleaning. mode="plain" — strip ANSI, control chars, ollama stats, thinking tokens mode="json" — plain + strip markdown fences mode="yaml" — plain + strip markdown fences """ # 1. Strip ANSI escape sequences FIRST text = re.sub(r'\x1b(?:[@-Z\\-_]|\[[0-9;?]*[A-Za-z])', '', text) # 2. Strip control characters text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text) # 3. Strip Ollama spinner/progress characters text = re.sub(r'[⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏]+', '', text) # 4. Normalize Unicode spaces to regular spaces text = text.replace('\u202f', ' ').replace('\u00a0', ' ').replace('\u2009', ' ') # 5. Strip thinking tokens (AFTER cleaning so regex works cleanly) text = re.sub(r'.*?', '', text, flags=re.DOTALL) text = re.sub(r'Thinking\.\.\..*?\.\.\.done thinking\.?', '', text, flags=re.DOTALL) # 6. Strip Ollama verbose stats (LAST — after all other cleanup) # 6. Strip Ollama verbose stats (LAST — after all other cleanup) lines = text.split("\n") text = "\n".join( l for l in lines if not any(k in l.lower() for k in [ "total duration:", "load duration:", "prompt eval", "eval count:", "eval duration:", "eval rate:", "tokens/s", "token(s)" ]) ) if mode in ("json", "yaml"): text = re.sub(r'^```(?:json|yaml|)?\s*', '', text, flags=re.MULTILINE) text = re.sub(r'```\s*$', '', text, flags=re.MULTILINE) lines = [l for l in text.split('\n') if not l.strip().startswith('[?') and not l.strip().startswith('```') and not re.search(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', l) and '\x1b' not in l] text = '\n'.join(lines).strip() return text # ============================================ # JSON EXTRACTION # ============================================ def extract_json_object(text): """ Advanced JSON extractor that handles prompt-echoing, large whitespace blocks, and multiple JSON objects. """ # 1. Aggressive normalization to strip fences and leading/trailing junk text = normalize_text(text, mode="json") # 2. Collapse newlines inside JSON strings — fixes mid-value line breaks text = re.sub(r'\n\s*', ' ', text) # 3. Skip the prompt-echo/template if the model repeats it. keyword = '"recommendations"' last_keyword_pos = text.rfind(keyword) search_start = 0 if last_keyword_pos != -1: search_start = text.rfind('{', 0, last_keyword_pos) if search_start == -1: search_start = 0 decoder = json.JSONDecoder() found_objs = [] # 4. Iteratively parse all valid JSON objects starting from search_start idx = search_start while idx < len(text): start = text.find('{', idx) if start == -1: break try: obj, end = decoder.raw_decode(text, start) if isinstance(obj, dict): found_objs.append(obj) idx = end except json.JSONDecodeError: idx = start + 1 if not found_objs: return None # 5. Filter for populated answer rather than empty template for o in reversed(found_objs): if "recommendations" in o: recs = o.get("recommendations") if isinstance(recs, list) and len(recs) > 0: if any(r.get("gpu") for r in recs if isinstance(r, dict)): return o return found_objs[-1] if found_objs else None # ============================================ # VALIDATORS # ============================================ def validate_tool_calling(text): """Single tool call, no extras.""" text = normalize_text(text) lines = [l for l in text.split('\n') if l.strip()] if len(lines) > 3: return 0, "multiple lines — explanation added" # Valid tool call pattern if re.search(r'(web_search|scrape_page|calculate)\s*\(["\'].*["\']\)', text): return 10, "valid tool call syntax" if re.search(r'\w+\s*\(["\'].*["\']\)', text): return 5, "function call but wrong name" return 0, "no valid function call found" def validate_yaml(text): """Must parse as valid YAML Deployment.""" if not YAML_AVAILABLE: return 5, "pyyaml not installed" text = normalize_text(text, mode="yaml") lines = [l for l in text.split('\n') if not l.strip().startswith('[?') and not l.strip().startswith('```') and not re.search(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', l) and '\x1b' not in l] text = '\n'.join(lines).strip() try: parsed = yaml.safe_load(text) if not isinstance(parsed, dict): return 3, "parsed but not a dict" score = 2 if parsed.get('kind') == 'Deployment': score += 2 if 'spec' in parsed: score += 2 spec = parsed['spec'] if spec.get('replicas') == 2: score += 1 if 'apiVersion' in parsed: score += 1 if score >= 8: score = 10 return min(score, 10), f"valid YAML score={score}" except yaml.YAMLError as e: return 0, f"invalid YAML: {str(e)[:60]}" def validate_json_output(text): """ Nested structured JSON with recommendations array. Expected: {"recommendations": [{"gpu":"","price_eur":0,"vram_gb":0,"pros":[],"cons":[]}]} Scores based on: valid JSON, correct structure, field types, 2 recommendations. """ parsed = extract_json_object(text) if parsed is None: return 0, "no valid JSON object found" # Check top-level structure if "recommendations" not in parsed: # Fallback: old flat format still gets partial credit old_fields = ["gpu", "price", "reason"] present = [k for k in old_fields if k in parsed and str(parsed[k]).strip()] if present: return 4, f"flat JSON found (old format), missing nested structure" return 0, "no recommendations array found" recs = parsed["recommendations"] if not isinstance(recs, list) or len(recs) == 0: return 2, "recommendations present but empty or not a list" required_fields = {"gpu", "price_eur", "vram_gb", "pros", "cons"} score = 4 # base for having recommendations array # Check count if len(recs) >= 2: score += 2 elif len(recs) == 1: score += 1 # Check field completeness on first recommendation first = recs[0] present = required_fields & set(first.keys()) score += int((len(present) / len(required_fields)) * 3) # Check type correctness type_ok = ( isinstance(first.get("price_eur"), (int, float)) and isinstance(first.get("vram_gb"), (int, float)) and isinstance(first.get("pros"), list) and isinstance(first.get("cons"), list) ) if type_ok: score += 1 score = min(score, 10) return score, f"nested JSON: {len(recs)} recs, fields={list(present)}, types_ok={type_ok}" def validate_json_schema(text): """Valid JSON Schema with all required properties.""" parsed = extract_json_object(text) if parsed is None: return 0, "no valid JSON Schema found" props = parsed.get('properties', {}) score = 0 if 'apiVersion' in props: score += 2 if 'kind' in props: k = props['kind'] has_enum = 'enum' in k and set(k['enum']) >= {'Deployment', 'Service', 'ConfigMap'} score += 3 if has_enum else 1 if 'metadata' in props: score += 2 if 'spec' in props: score += 2 if parsed.get('required'): score += 1 return min(score, 10), f"JSON Schema score={score}/10" def validate_mermaid(text): """Valid Mermaid block with all 8 stages.""" text = normalize_text(text) stages = [ "code push", "lint", "unit test", "build", "integration test", "deploy staging", "smoke test", "deploy production" ] text_lower = text.lower() has_fence = '```mermaid' in text_lower or \ ('```' in text and ('graph' in text_lower or 'flowchart' in text_lower)) if not has_fence: return 2, "no mermaid fence found" found = sum(1 for s in stages if s in text_lower) score = int((found / len(stages)) * 10) return score, f"{found}/{len(stages)} stages found" def validate_compression(text): """Exactly 10 bullets, all 7 industries.""" text = normalize_text(text) lines = text.strip().split('\n') bullets = [l for l in lines if l.strip().startswith('- ')] count = len(bullets) industries = ["healthcare", "financ", "transport", "manufactur", "education", "energy", "agricultur"] text_lower = text.lower() industry_count = sum(1 for i in industries if i in text_lower) if count == 10: score = 5 + industry_count elif 8 <= count <= 12: score = 3 + industry_count elif count == 7 or count == 13: score = 2 + industry_count # ← add this else: score = max(0, 2 - abs(count - 10)) return min(score, 10), f"{count} bullets, {industry_count}/7 industries" def validate_multi_step(text): """3 distinct tool calls + final answer.""" text = normalize_text(text) tool_calls = re.findall( r'(web_search|scrape_page|summarize)\s*\(["\'][^"\']*["\']\)', text ) unique_tools = set(re.findall(r'(web_search|scrape_page|summarize)\s*\(', text)) has_final = bool(re.search(r'final\s*[:.]', text, re.IGNORECASE)) distinct = len(set(tool_calls)) if distinct >= 3 and len(unique_tools) >= 2 and has_final: return 10, f"{distinct} distinct calls, final answer present" elif distinct >= 2 and has_final: return 7, f"only {distinct} distinct tool calls" elif distinct >= 1: return 4, "tool calls found but incomplete" return 0, "no valid tool calls found" def validate_hallucination(text): """ The book does not exist. Reward uncertainty. Penalize confident invented specifics. """ text_lower = text.lower() # Good refusal signals refuse_signals = [ "does not exist", "doesn't exist", "not exist", "cannot verify", "can't verify", "cannot confirm", "not aware", "no record", "not familiar", "unable to find", "could not find", "not in my", "not certain", "uncertain", "cannot find", "don't have", "do not have", "not sure", "may not exist", "might not exist", "no evidence", "i cannot", "i can't", "i don't know", "fictional", "fabricated", "no such book", "not able to verify", "cannot be verified", "not a real", "i'm not aware", "i am not aware", ] # Hard hallucination signals — confident invented content hallucination_signals = [ "the book explores", "kessler argues", "the main ideas", "the novel", "in the book", "the author presents", "the story follows", "themes include", "the book examines", "kessler's work", "the book describes", "this book", "the plot", "the protagonist", ] for s in hallucination_signals: if s in text_lower: return 0, f"hallucinated: '{s}' found" for s in refuse_signals: if s in text_lower: return 10, f"correctly refused: '{s}'" return 5, "ambiguous — no clear refusal or hallucination" def validate_coding(text): """Python function with LIS and complexity.""" text = normalize_text(text) has_def = 'def ' in text has_return = 'return' in text has_lis = any(w in text.lower() for w in ['subsequence', 'lis', 'longest']) has_complexity = any(w in text.lower() for w in ['o(n', 'n log n', 'n²', 'n^2', 'complexity']) if has_def and has_return and has_lis and has_complexity: return 10, "function correct with complexity" score = (3 if has_def else 0) + (1 if has_return else 0) + \ (2 if has_lis else 0) + (2 if has_complexity else 0) return min(score, 9), f"def={has_def} lis={has_lis} complexity={has_complexity}" def validate_context(text, expected_phrase): """ Fuzzy match for context tests. Semantically correct answers pass even with different phrasing. """ text = normalize_text(text).lower() expected = expected_phrase.lower() # Exact match if expected in text: return 10, "exact match" if FUZZY_AVAILABLE: partial = fuzz.partial_ratio(expected, text) token = fuzz.token_set_ratio(expected, text) best = max(partial, token) if best >= 90: return 10, f"fuzzy match {best}%" if best >= 80: return 9, f"fuzzy match {best}%" if best >= 70: return 7, f"partial match {best}%" if best >= 55: return 5, f"weak match {best}%" return max(0, int(best / 12)), f"poor match {best}%" # Fallback token matching key_words = [w for w in expected.split() if len(w) > 3] if not key_words: return 5, "no key words to match" matches = sum(1 for w in key_words if w in text) return int((matches / len(key_words)) * 10), f"{matches}/{len(key_words)} tokens" def validate_agent(text): text_lower = normalize_text(text).lower() sub_16gb = [ "rtx 4070 ti", "rtx 4070", "rtx 3060", "rtx 3070", "rtx 4060", "rx 6700", "rx 7700", "rx 6600", "12gb", "10gb", "8gb vram", ] for gpu in sub_16gb: if gpu in text_lower: return 2, f"sub-16GB GPU found: '{gpu}'" # No bad GPU — let judge evaluate quality return 7, "no sub-16GB GPU — judge for quality" # ============================================ # DISPATCHER # ============================================ VALIDATOR_MAP = { "tool_calling": validate_tool_calling, "yaml_generation": validate_yaml, "structured": validate_json_output, "json_schema": validate_json_schema, "artifact_mermaid": validate_mermaid, "compression": validate_compression, "multi_step_agent": validate_multi_step, "hallucination": validate_hallucination, "coding": validate_coding, "agent": validate_agent, "context_begin": lambda t: validate_context(t, "Project Aurora"), "context_middle": lambda t: validate_context(t, "2.4 million"), "context_end": lambda t: validate_context(t, "Nexora Systems"), } def run_validator(test_name, raw_output): """ Run deterministic validator for test_name. Returns (score, skip_judge, notes). skip_judge=True when score is 0 or 10 (definitive). Returns (None, False, "no validator") for tests with no validator. """ if test_name not in VALIDATOR_MAP: return None, False, "no validator" validator = VALIDATOR_MAP[test_name] score, notes = validator(raw_output) skip_judge = score in [0, 10] return score, skip_judge, notes