""" benchmark_v4/storage.py ======================= SQLite persistence for benchmark results. Three tables: - runs: one row per model per benchmark run - details: one row per test per model per run - variance: one row per test per model (multi-run stats) Query examples: SELECT model, weighted_avg, stdev_all FROM runs WHERE is_baseline = 1 ORDER BY weighted_avg DESC; SELECT model, test, semantic_score FROM details WHERE run_id = (SELECT MAX(id) FROM runs WHERE model = 'granite4.1:8b'); """ import sqlite3 import json from datetime import datetime from config import DB_FILE # ============================================ # SCHEMA # ============================================ SCHEMA = """ CREATE TABLE IF NOT EXISTS runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_date TEXT NOT NULL, model TEXT NOT NULL, type TEXT NOT NULL, is_baseline INTEGER NOT NULL DEFAULT 0, num_runs INTEGER NOT NULL DEFAULT 1, -- Weighted scores weighted_total REAL, weighted_avg REAL, -- Format avg_format REAL, -- Variance mean_all REAL, stdev_all REAL, min_score REAL, max_score REAL, failure_rate_pct REAL, -- Compliance (%) compliance_json REAL, compliance_yaml REAL, compliance_tool REAL, compliance_hall REAL, -- Category scores cat_agent_tool REAL, cat_coding REAL, cat_rag_context REAL, cat_structured REAL, cat_hallucination REAL, cat_reasoning REAL, -- Performance avg_tok_s REAL, avg_gpu_temp REAL, tests_run INTEGER ); CREATE TABLE IF NOT EXISTS details ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id INTEGER NOT NULL REFERENCES runs(id), run_date TEXT NOT NULL, run_num INTEGER NOT NULL DEFAULT 1, model TEXT NOT NULL, type TEXT NOT NULL, is_baseline INTEGER NOT NULL DEFAULT 0, test TEXT NOT NULL, weight REAL, time_s REAL, tok_s REAL, gpu_temp INTEGER, gpu_mem INTEGER, gpu_util INTEGER, gpu_clock INTEGER, output_length INTEGER, semantic_score INTEGER, format_score INTEGER, combined_score REAL, used_judge INTEGER, notes TEXT ); CREATE TABLE IF NOT EXISTS variance ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_date TEXT NOT NULL, model TEXT NOT NULL, test TEXT NOT NULL, num_runs INTEGER NOT NULL, mean REAL, stdev REAL, min_score INTEGER, max_score INTEGER, failure_rate_pct REAL, scores_raw TEXT ); CREATE INDEX IF NOT EXISTS idx_runs_model ON runs(model); CREATE INDEX IF NOT EXISTS idx_details_run ON details(run_id); CREATE INDEX IF NOT EXISTS idx_details_model ON details(model); CREATE INDEX IF NOT EXISTS idx_details_test ON details(test); """ # ============================================ # CONNECTION # ============================================ def get_connection(): conn = sqlite3.connect(DB_FILE) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn def init_db(): """Create tables if they don't exist.""" with get_connection() as conn: conn.executescript(SCHEMA) # ============================================ # WRITE # ============================================ def insert_run(run_data): """Insert a run summary row. Returns the run_id.""" sql = """ INSERT INTO runs ( run_date, model, type, is_baseline, num_runs, weighted_total, weighted_avg, avg_format, mean_all, stdev_all, min_score, max_score, failure_rate_pct, compliance_json, compliance_yaml, compliance_tool, compliance_hall, cat_agent_tool, cat_coding, cat_rag_context, cat_structured, cat_hallucination, cat_reasoning, avg_tok_s, avg_gpu_temp, tests_run ) VALUES ( :run_date, :model, :type, :is_baseline, :num_runs, :weighted_total, :weighted_avg, :avg_format, :mean_all, :stdev_all, :min_score, :max_score, :failure_rate_pct, :compliance_json, :compliance_yaml, :compliance_tool, :compliance_hall, :cat_agent_tool, :cat_coding, :cat_rag_context, :cat_structured, :cat_hallucination, :cat_reasoning, :avg_tok_s, :avg_gpu_temp, :tests_run ) """ with get_connection() as conn: cursor = conn.execute(sql, run_data) return cursor.lastrowid def insert_details(run_id, detail_rows): """Insert detail rows for a run.""" sql = """ INSERT INTO details ( run_id, run_date, run_num, model, type, is_baseline, test, weight, time_s, tok_s, gpu_temp, gpu_mem, gpu_util, gpu_clock, output_length, semantic_score, format_score, combined_score, used_judge, notes ) VALUES ( :run_id, :run_date, :run_num, :model, :type, :is_baseline, :test, :weight, :time_s, :tok_s, :gpu_temp, :gpu_mem, :gpu_util, :gpu_clock, :output_length, :semantic_score, :format_score, :combined_score, :used_judge, :notes ) """ rows = [{**r, "run_id": run_id} for r in detail_rows] with get_connection() as conn: conn.executemany(sql, rows) def insert_variance(variance_rows): """Insert variance rows.""" sql = """ INSERT INTO variance ( run_date, model, test, num_runs, mean, stdev, min_score, max_score, failure_rate_pct, scores_raw ) VALUES ( :run_date, :model, :test, :num_runs, :mean, :stdev, :min_score, :max_score, :failure_rate_pct, :scores_raw ) """ with get_connection() as conn: conn.executemany(sql, variance_rows) # ============================================ # READ # ============================================ def load_best_runs(): """Load best scoring run per model.""" with get_connection() as conn: rows = conn.execute(""" SELECT r.* FROM runs r INNER JOIN ( SELECT model, MAX(weighted_avg) AS best_w FROM runs GROUP BY model ) best ON r.model = best.model AND r.weighted_avg = best.best_w ORDER BY r.weighted_avg DESC """).fetchall() return [dict(r) for r in rows] def load_latest_runs(is_baseline=None): """Load latest run per model.""" sql = """ SELECT r.* FROM runs r INNER JOIN ( SELECT model, MAX(run_date) AS latest FROM runs GROUP BY model ) latest ON r.model = latest.model AND r.run_date = latest.latest """ params = [] if is_baseline is not None: sql += " WHERE r.is_baseline = ?" params.append(1 if is_baseline else 0) sql += " ORDER BY r.weighted_avg DESC" with get_connection() as conn: rows = conn.execute(sql, params).fetchall() return [dict(r) for r in rows] def load_all_runs(): """Load all run summaries.""" with get_connection() as conn: rows = conn.execute( "SELECT * FROM runs ORDER BY run_date DESC" ).fetchall() return [dict(r) for r in rows] def load_details_for_run(run_id): """Load all test details for a specific run.""" with get_connection() as conn: rows = conn.execute( "SELECT * FROM details WHERE run_id = ? ORDER BY test", (run_id,) ).fetchall() return [dict(r) for r in rows] def export_summary_csv(filepath="benchmark_summary.csv"): """Export latest run per model to CSV for Excel analysis.""" import csv rows = load_latest_runs() if not rows: print("No runs to export.") return with open(filepath, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=rows[0].keys()) writer.writeheader() writer.writerows(rows) print(f" Exported {len(rows)} rows to {filepath}")