""" benchmark_v4/runner.py ====================== Executes models via Ollama CLI and orchestrates the benchmark loop. Handles: warmup, GPU polling, cooldown, multi-run variance. """ import subprocess import time import re import statistics from datetime import datetime from config import ( COOLDOWN_SECONDS, GPU_POLL_EVERY, TEST_WEIGHTS, CATEGORIES, ) from prompts import ALL_TESTS from validators import normalize_text from judge import warmup_judge from scoring import ( score_test, compute_weighted, compute_category_scores, compute_compliance, compute_variance_stats ) from storage import insert_run, insert_details, insert_variance # ============================================ # GPU MONITORING # ============================================ _gpu_cache = {"temp": -1, "mem": -1, "util": -1, "clock": -1} _gpu_poll_count = 0 def get_gpu(force=False): """Poll GPU stats. Cached every GPU_POLL_EVERY tests to reduce overhead.""" global _gpu_cache, _gpu_poll_count _gpu_poll_count += 1 if not force and GPU_POLL_EVERY > 0 and _gpu_poll_count % GPU_POLL_EVERY != 0: return _gpu_cache try: result = subprocess.run( ["nvidia-smi", "--query-gpu=temperature.gpu,memory.used,utilization.gpu,clocks.sm", "--format=csv,noheader,nounits"], capture_output=True, text=True, timeout=5 ) temp, mem, util, clock = result.stdout.strip().split(", ") _gpu_cache = { "temp": int(temp), "mem": int(mem), "util": int(util), "clock": int(clock) } except Exception: pass return _gpu_cache # ============================================ # PARSE OLLAMA VERBOSE # ============================================ def parse_generation_speed(output): """ Parse GENERATION (eval) speed from Ollama verbose output. The last tokens/s value is the generation rate. """ matches = re.findall(r'(\d+\.\d+)\s+tokens/s', output) return float(matches[-1]) if matches else None # ============================================ # RUN SINGLE MODEL + PROMPT # ============================================ def run_model(model, prompt): """Execute model via Ollama CLI. Returns result dict.""" start = time.time() result = subprocess.run( ["ollama", "run", model, prompt, "--verbose"], capture_output=True, text=True ) elapsed = round(time.time() - start, 2) gpu = get_gpu() output = result.stdout + "\n" + result.stderr return { "output": output, "time": elapsed, "tok_s": parse_generation_speed(output), "gpu_temp": gpu["temp"], "gpu_mem": gpu["mem"], "gpu_util": gpu["util"], "gpu_clock": gpu["clock"] } # ============================================ # BENCHMARK A GROUP OF MODELS # ============================================ def run_benchmark( models, label, is_baseline, all_prompts, num_runs=1, no_cooldown=False ): """ Run benchmark for a list of models. Returns list of run_ids (one per model). """ run_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") run_ids = [] for model in models: # Accumulate across runs sem_by_test = {t: [] for t in ALL_TESTS} fmt_by_test = {t: [] for t in ALL_TESTS} tok_s_all = [] temp_all = [] detail_rows = [] print(f"\n[{label}] Model: {model} ({num_runs} run{'s' if num_runs > 1 else ''})") # Warmup subprocess.run( ["ollama", "run", model, "hello"], capture_output=True, text=True ) time.sleep(5) warmup_judge() for run_num in range(1, num_runs + 1): if num_runs > 1: print(f"\n ── Run {run_num}/{num_runs} ──") for test_name in ALL_TESTS: prompt = all_prompts.get(test_name, "") if not prompt or not prompt.strip(): continue result = run_model(model, prompt) scores = score_test(test_name, prompt, result["output"]) sem = scores["semantic_score"] fmt = scores["format_score"] sem_by_test[test_name].append(sem) fmt_by_test[test_name].append(fmt) if result["tok_s"]: tok_s_all.append(result["tok_s"]) if result["gpu_temp"] > 0: temp_all.append(result["gpu_temp"]) flag = "J" if scores["used_judge"] else "V" print( f" [{run_num}] {test_name:<22} [{flag}] " f"sem={sem:>2}/10 fmt={fmt:>2}/10 " f"comb={scores['combined_score']:>5.2f} " f"{scores['notes'][:52]}" ) detail_rows.append({ "run_date": run_date, "run_num": run_num, "model": model, "type": label, "is_baseline": 1 if is_baseline else 0, "test": test_name, "weight": TEST_WEIGHTS.get(test_name, 0), "time_s": result["time"], "tok_s": result["tok_s"], "gpu_temp": result["gpu_temp"], "gpu_mem": result["gpu_mem"], "gpu_util": result["gpu_util"], "gpu_clock": result["gpu_clock"], "output_length": len(result["output"]), "semantic_score":sem, "format_score": fmt, "combined_score":scores["combined_score"], "used_judge": 1 if scores["used_judge"] else 0, "notes": scores["notes"][:120], }) if not no_cooldown: time.sleep(COOLDOWN_SECONDS) # Aggregate avg_sem = {t: round(statistics.mean(v), 2) for t, v in sem_by_test.items() if v} avg_fmt = {t: round(statistics.mean(v), 2) for t, v in fmt_by_test.items() if v} w_total, w_avg = compute_weighted(avg_sem) cat_scores = compute_category_scores(avg_sem) compliance = compute_compliance(sem_by_test) var_stats = compute_variance_stats(sem_by_test) fmt_avg = round(statistics.mean([s for v in fmt_by_test.values() for s in v]), 2) if fmt_by_test else 0 avg_tok = round(statistics.mean(tok_s_all), 1) if tok_s_all else 0 avg_tmp = round(statistics.mean(temp_all), 1) if temp_all else 0 print(f"\n ─── {model} ───") print(f" Weighted avg: {w_avg} (total={w_total})") print(f" Format avg: {fmt_avg}/10") print(f" Variance: mean={var_stats['mean']} σ={var_stats['stdev']} failures={var_stats['failure_rate']}%") print(f" Compliance: JSON={compliance.get('json_valid')}% YAML={compliance.get('yaml_valid')}% " f"tool={compliance.get('tool_format')}% hall={compliance.get('hallucination_free')}%") print(f" Categories: agent={cat_scores.get('agent_tool')} coding={cat_scores.get('coding')} " f"rag={cat_scores.get('rag_context')} struct={cat_scores.get('structured')} " f"hall={cat_scores.get('hallucination')} reason={cat_scores.get('reasoning')}") print(f" tok/s={avg_tok} temp={avg_tmp}°C") # Save to DB run_row = { "run_date": run_date, "model": model, "type": label, "is_baseline": 1 if is_baseline else 0, "num_runs": num_runs, "weighted_total": w_total, "weighted_avg": w_avg, "avg_format": fmt_avg, "mean_all": var_stats["mean"], "stdev_all": var_stats["stdev"], "min_score": var_stats["min"], "max_score": var_stats["max"], "failure_rate_pct":var_stats["failure_rate"], "compliance_json": compliance.get("json_valid"), "compliance_yaml": compliance.get("yaml_valid"), "compliance_tool": compliance.get("tool_format"), "compliance_hall": compliance.get("hallucination_free"), "cat_agent_tool": cat_scores.get("agent_tool"), "cat_coding": cat_scores.get("coding"), "cat_rag_context": cat_scores.get("rag_context"), "cat_structured": cat_scores.get("structured"), "cat_hallucination":cat_scores.get("hallucination"), "cat_reasoning": cat_scores.get("reasoning"), "avg_tok_s": avg_tok, "avg_gpu_temp": avg_tmp, "tests_run": len(avg_sem) * num_runs, } run_id = insert_run(run_row) insert_details(run_id, detail_rows) # Variance rows (only if multiple runs) if num_runs > 1: var_rows = [] for test_name, scores_list in sem_by_test.items(): if len(scores_list) > 1: var_rows.append({ "run_date": run_date, "model": model, "test": test_name, "num_runs": num_runs, "mean": round(statistics.mean(scores_list), 2), "stdev": round(statistics.stdev(scores_list), 2), "min_score": min(scores_list), "max_score": max(scores_list), "failure_rate_pct":round( sum(1 for s in scores_list if s <= 2) / len(scores_list) * 100, 1 ), "scores_raw": str(scores_list), }) if var_rows: insert_variance(var_rows) run_ids.append(run_id) print(f"\nCooldown after {model}...\n") time.sleep(30) return run_ids