By using Python's threading library and a deque (a highly efficient rolling array). A background thread will wake up every 1 second, take a snapshot, and push it into the array. When your website hits the API, it just grabs whatever is currently in the arrays.
123 lines
4.5 KiB
Python
123 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
import subprocess
|
|
import os
|
|
import time
|
|
import threading
|
|
from collections import deque
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
|
|
# --- CONFIGURATION ---
|
|
PORT = 8080
|
|
IFACE = "vmbr0"
|
|
ENABLE_PVE_STATS = True
|
|
# ---------------------
|
|
|
|
# Rolling memory buffers (stores exactly 60 seconds of data)
|
|
hist_cpu = deque([0]*60, maxlen=60)
|
|
hist_ram = deque([0]*60, maxlen=60)
|
|
hist_rx = deque([0.0]*60, maxlen=60)
|
|
hist_tx = deque([0.0]*60, maxlen=60)
|
|
|
|
# Dictionary to hold the most recent slow-changing data
|
|
latest_state = {}
|
|
|
|
def run_cmd(cmd):
|
|
try:
|
|
return subprocess.check_output(cmd, shell=True, stderr=subprocess.DEVNULL).decode('utf-8').strip()
|
|
except Exception:
|
|
return None
|
|
|
|
def get_cpu_times():
|
|
"""Reads raw CPU ticks directly from the kernel (ultra-fast, no blocking)"""
|
|
with open('/proc/stat', 'r') as f:
|
|
for line in f:
|
|
if line.startswith('cpu '):
|
|
parts = list(map(int, line.split()[1:]))
|
|
idle = parts[3] + parts[4] # idle + iowait
|
|
total = sum(parts)
|
|
return idle, total
|
|
return 0, 0
|
|
|
|
def background_poller():
|
|
"""Runs in the background, gathering data every 1 second."""
|
|
global latest_state
|
|
|
|
# Baseline for 1-second delta calculations
|
|
prev_idle, prev_total = get_cpu_times()
|
|
rx_prev = int(run_cmd(f"cat /sys/class/net/{IFACE}/statistics/rx_bytes") or 0)
|
|
tx_prev = int(run_cmd(f"cat /sys/class/net/{IFACE}/statistics/tx_bytes") or 0)
|
|
|
|
while True:
|
|
time.sleep(1) # The 1-second heartbeat
|
|
|
|
try:
|
|
# 1. CPU Delta Calculation
|
|
idle, total = get_cpu_times()
|
|
diff_idle = idle - prev_idle
|
|
diff_total = total - prev_total
|
|
cpu_pct = (1.0 - (diff_idle / diff_total)) * 100.0 if diff_total > 0 else 0.0
|
|
hist_cpu.append(round(cpu_pct, 1))
|
|
prev_idle, prev_total = idle, total
|
|
|
|
# 2. Network Delta Calculation
|
|
rx_now = int(run_cmd(f"cat /sys/class/net/{IFACE}/statistics/rx_bytes") or 0)
|
|
tx_now = int(run_cmd(f"cat /sys/class/net/{IFACE}/statistics/tx_bytes") or 0)
|
|
hist_rx.append(round((rx_now - rx_prev) / (1024 * 1024), 2))
|
|
hist_tx.append(round((tx_now - tx_prev) / (1024 * 1024), 2))
|
|
rx_prev, tx_prev = rx_now, tx_now
|
|
|
|
# 3. RAM Snapshot
|
|
ram_raw = run_cmd("free -b | grep Mem | awk '{print $2, $3}'")
|
|
if ram_raw:
|
|
ram_total, ram_used = map(int, ram_raw.split())
|
|
ram_pct = (ram_used / ram_total) * 100.0
|
|
hist_ram.append(round(ram_pct, 1))
|
|
latest_state["ram"] = {
|
|
"percent": round(ram_pct, 1),
|
|
"used_gb": round(ram_used / (1024**3), 1),
|
|
"total_gb": round(ram_total / (1024**3), 1)
|
|
}
|
|
|
|
# 4. Storage & System Health
|
|
st = os.statvfs('/')
|
|
latest_state["root_disk_pct"] = round((1.0 - (st.f_bavail / st.f_blocks)) * 100.0, 1)
|
|
|
|
if ENABLE_PVE_STATS:
|
|
latest_state["vms_running"] = int(run_cmd("qm list | grep running | wc -l") or 0)
|
|
latest_state["lxcs_running"] = int(run_cmd("pct list | grep running | wc -l") or 0)
|
|
|
|
except Exception as e:
|
|
print(f"Poller error: {e}")
|
|
|
|
class MetricsHandler(BaseHTTPRequestHandler):
|
|
def do_GET(self):
|
|
if self.path == '/api/homelab':
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'application/json')
|
|
self.send_header('Access-Control-Allow-Origin', '*')
|
|
self.end_headers()
|
|
|
|
# Package the latest instantaneous data + the full 60s history arrays
|
|
data = latest_state.copy()
|
|
data["cpu"] = hist_cpu[-1] if hist_cpu else 0
|
|
data["rx_mb"] = hist_rx[-1] if hist_rx else 0
|
|
data["tx_mb"] = hist_tx[-1] if hist_tx else 0
|
|
|
|
data["cpu_hist"] = list(hist_cpu)
|
|
data["ram_hist"] = list(hist_ram)
|
|
data["rx_hist"] = list(hist_rx)
|
|
data["tx_hist"] = list(hist_tx)
|
|
|
|
self.wfile.write(json.dumps(data).encode())
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
|
|
if __name__ == '__main__':
|
|
# Start the data collector in the background
|
|
threading.Thread(target=background_poller, daemon=True).start()
|
|
|
|
server = HTTPServer(('0.0.0.0', PORT), MetricsHandler)
|
|
print(f"Starting API on port {PORT}...")
|
|
server.serve_forever() |