perf(metrics): tiered rollup history (7d at ~1.5MB) and cleaner ranges

Replace the flat 48h@2s ring buffer with a 3-tier rollup ladder (2s/1h, 1m/48h, 10m/7d). A sample feeds every tier and rolls up into progressively coarser averages, so per-metric footprint drops from ~21MB to ~1.5MB (measured, 16 system metrics) while extending the range from 48h to 7 days. aggregate() picks the finest tier covering the requested span; a pre-tier flat gob is migrated by replaying its samples through the rollup.

Tidy the dashboard ranges to a professional ladder: 2m, 1h, 3h, 6h, 12h, 24h, 2d, 7d (drop the irregular 2h/5h, the redundant 30m, and the excessive 30d). The allow-list keeps bucket 30 because the node history panel uses it.

Add an initial FreeOSMemory about 60s after boot to reclaim the startup and metric-restore peak instead of waiting for the periodic release. Cover the rollup, tier selection, round-trip, and footprint with tests.
This commit is contained in:
MHSanaei
2026-06-25 23:30:13 +02:00
parent 69ad8b76e1
commit 293c1e44dc
6 changed files with 426 additions and 122 deletions
@@ -137,10 +137,13 @@ export default function SystemHistoryModal({ open, status, onClose }: SystemHist
const tss: number[] = [];
for (const p of msg.obj) {
const d = new Date(p.t * 1000);
const MM = String(d.getMonth() + 1).padStart(2, '0');
const DD = String(d.getDate()).padStart(2, '0');
const hh = String(d.getHours()).padStart(2, '0');
const mm = String(d.getMinutes()).padStart(2, '0');
const ss = String(d.getSeconds()).padStart(2, '0');
labs.push(bucket >= 60 ? `${hh}:${mm}` : `${hh}:${mm}:${ss}`);
const lab = bucket >= 2880 ? `${MM}-${DD} ${hh}:${mm}` : bucket >= 60 ? `${hh}:${mm}` : `${hh}:${mm}:${ss}`;
labs.push(lab);
vals.push(Number(p.v) || 0);
tss.push(Number(p.t) || 0);
}
@@ -208,14 +211,13 @@ export default function SystemHistoryModal({ open, status, onClose }: SystemHist
onChange={setBucket}
options={[
{ value: 2, label: '2m' },
{ value: 30, label: '30m' },
{ value: 60, label: '1h' },
{ value: 120, label: '2h' },
{ value: 180, label: '3h' },
{ value: 300, label: '5h' },
{ value: 360, label: '6h' },
{ value: 720, label: '12h' },
{ value: 1440, label: '24h' },
{ value: 2880, label: '48h' },
{ value: 2880, label: '2d' },
{ value: 10080, label: '7d' },
]}
/>
</div>
@@ -286,11 +286,10 @@ export default function XrayMetricsModal({ open, onClose }: XrayMetricsModalProp
onChange={setBucket}
options={[
{ value: 2, label: '2m' },
{ value: 30, label: '30m' },
{ value: 60, label: '1h' },
{ value: 120, label: '2h' },
{ value: 180, label: '3h' },
{ value: 300, label: '5h' },
{ value: 360, label: '6h' },
{ value: 720, label: '12h' },
]}
/>
</div>
+237 -105
View File
@@ -4,7 +4,6 @@ import (
"encoding/gob"
"os"
"path/filepath"
"slices"
"sync"
"time"
@@ -19,109 +18,167 @@ type MetricSample struct {
V float64 `json:"v"`
}
// metricCapacityDefault caps each ring buffer at 48h worth of @2s samples.
// Node metrics arrive less frequently, so they fit the same retention window
// with room to spare.
const metricCapacityDefault = 86400
// metricHistory is a thread-safe, in-memory ring buffer keyed by
// arbitrary strings. Two singletons live below: one for system-wide
// host metrics, one for per-node metrics. Keeping them in this file
// (rather than scattered across services) makes the storage model
// easy to reason about and avoids double-locking.
type metricHistory struct {
mu sync.Mutex
metrics map[string][]MetricSample
// tierSpec defines one resolution layer of the rollup ladder: a fixed bucket
// size in seconds and how many buckets to retain. window = resolution*capacity.
type tierSpec struct {
resolution int
capacity int
}
func newMetricHistory() *metricHistory {
return &metricHistory{metrics: map[string][]MetricSample{}}
// metricTiers is the rollup ladder applied to every series. High resolution is
// kept only for the recent past; older samples roll up into progressively
// coarser, cheaper layers (RRDtool-style). Per series this totals ~5700 samples
// (~90 KiB) yet spans a live 2s view through ~7 days of history.
var metricTiers = []tierSpec{
{resolution: 2, capacity: 1800}, // 1h at 2s
{resolution: 60, capacity: 2880}, // 48h at 1m
{resolution: 600, capacity: 1008}, // 7d at 10m
}
// append stores a single sample for the given metric, deduping when
// two appends happen within the same wall-clock second (which can
// happen if the cron tick is faster than the metric's natural rate).
func (h *metricHistory) append(metric string, t time.Time, v float64) {
h.mu.Lock()
defer h.mu.Unlock()
buf := h.metrics[metric]
p := MetricSample{T: t.Unix(), V: v}
if n := len(buf); n > 0 && buf[n-1].T == p.T {
buf[n-1] = p
} else {
buf = append(buf, p)
// tierBuf is one fixed-resolution ring of a series. Samples land in an open
// bucket and are averaged into the ring only when the next bucket begins, so a
// coarse tier carries one mean per bucket instead of every raw point.
type tierBuf struct {
resolution int
capacity int
samples []MetricSample
open bool
openStart int64
openSum float64
openCount int
}
func (tb *tierBuf) add(unixSec int64, v float64) {
res := int64(tb.resolution)
b := (unixSec / res) * res
if tb.open && b != tb.openStart {
tb.flush()
}
if len(buf) > metricCapacityDefault {
buf = buf[len(buf)-metricCapacityDefault:]
tb.open = true
tb.openStart = b
tb.openSum += v
tb.openCount++
}
func (tb *tierBuf) flush() {
if tb.openCount == 0 {
tb.open = false
return
}
h.metrics[metric] = buf
tb.samples = append(tb.samples, MetricSample{T: tb.openStart, V: tb.openSum / float64(tb.openCount)})
if len(tb.samples) > tb.capacity {
tb.samples = tb.samples[len(tb.samples)-tb.capacity:]
}
tb.open = false
tb.openStart = 0
tb.openSum = 0
tb.openCount = 0
}
// drop removes the entire history for one metric. Used when a node is
// deleted so its old samples don't linger forever in the singleton.
func (h *metricHistory) drop(metric string) {
h.mu.Lock()
delete(h.metrics, metric)
h.mu.Unlock()
}
// snapshot returns a deep copy of every series, safe to serialize without
// holding the lock during disk I/O.
func (h *metricHistory) snapshot() map[string][]MetricSample {
h.mu.Lock()
defer h.mu.Unlock()
out := make(map[string][]MetricSample, len(h.metrics))
for k, v := range h.metrics {
cp := make([]MetricSample, len(v))
copy(cp, v)
out[k] = cp
// readSamples returns a copy of the closed buckets plus the still-open one, so
// the most recent point is visible before its bucket boundary closes.
func (tb *tierBuf) readSamples() []MetricSample {
out := make([]MetricSample, len(tb.samples), len(tb.samples)+1)
copy(out, tb.samples)
if tb.openCount > 0 {
out = append(out, MetricSample{T: tb.openStart, V: tb.openSum / float64(tb.openCount)})
}
return out
}
// restore replaces the in-memory series with a previously persisted set,
// re-applying the per-series capacity cap so a tampered or oversized file
// can't grow the working set unbounded.
func (h *metricHistory) restore(data map[string][]MetricSample) {
h.mu.Lock()
defer h.mu.Unlock()
for k, v := range data {
if len(v) > metricCapacityDefault {
v = v[len(v)-metricCapacityDefault:]
}
h.metrics[k] = v
// series is the rollup ladder for one metric: a sample is fed to every tier.
type series struct {
tiers []*tierBuf
}
func newSeries() *series {
s := &series{tiers: make([]*tierBuf, len(metricTiers))}
for i, spec := range metricTiers {
s.tiers[i] = &tierBuf{resolution: spec.resolution, capacity: spec.capacity}
}
return s
}
func (s *series) add(unixSec int64, v float64) {
for _, tb := range s.tiers {
tb.add(unixSec, v)
}
}
// aggregate returns up to maxPoints buckets of size bucketSeconds,
// each bucket carrying the arithmetic mean of the underlying samples.
// Bucket alignment is to absolute Unix-second boundaries so two
// concurrent calls (e.g. two browser tabs) see identical x-axes.
func (h *metricHistory) aggregate(metric string, bucketSeconds int, maxPoints int) []map[string]any {
if bucketSeconds <= 0 || maxPoints <= 0 {
return []map[string]any{}
}
cutoff := time.Now().Add(-time.Duration(bucketSeconds*maxPoints) * time.Second).Unix()
h.mu.Lock()
hist := h.metrics[metric]
startIdx := 0
for i, h := range slices.Backward(hist) {
if h.T < cutoff {
startIdx = i + 1
break
// pickTier returns the finest tier whose window covers spanSeconds, falling back
// to the coarsest (longest-window) tier when nothing covers it.
func (s *series) pickTier(spanSeconds int64) *tierBuf {
for _, tb := range s.tiers {
if int64(tb.resolution)*int64(tb.capacity) >= spanSeconds {
return tb
}
}
if startIdx >= len(hist) {
h.mu.Unlock()
return []map[string]any{}
return s.tiers[len(s.tiers)-1]
}
// metricHistory is a thread-safe, in-memory store of tiered series keyed by
// arbitrary strings. Three singletons live below: system-wide host metrics,
// per-node metrics, and xray expvar metrics.
type metricHistory struct {
mu sync.Mutex
series map[string]*series
}
func newMetricHistory() *metricHistory {
return &metricHistory{series: map[string]*series{}}
}
// append stores a single sample for the given metric across all tiers.
func (h *metricHistory) append(metric string, t time.Time, v float64) {
h.mu.Lock()
defer h.mu.Unlock()
s := h.series[metric]
if s == nil {
s = newSeries()
h.series[metric] = s
}
tmp := make([]MetricSample, len(hist)-startIdx)
copy(tmp, hist[startIdx:])
s.add(t.Unix(), v)
}
// drop removes the entire history for one metric. Used when a node is deleted so
// its old samples don't linger forever in the singleton.
func (h *metricHistory) drop(metric string) {
h.mu.Lock()
delete(h.series, metric)
h.mu.Unlock()
}
// aggregate returns up to maxPoints buckets of size bucketSeconds, each carrying
// the arithmetic mean of the underlying samples from the finest tier that covers
// the requested span. Bucket alignment is to absolute Unix-second boundaries so
// two concurrent calls see identical x-axes.
func (h *metricHistory) aggregate(metric string, bucketSeconds int, maxPoints int) []map[string]any {
empty := []map[string]any{}
if bucketSeconds <= 0 || maxPoints <= 0 {
return empty
}
span := int64(bucketSeconds) * int64(maxPoints)
cutoff := time.Now().Unix() - span
h.mu.Lock()
s := h.series[metric]
if s == nil {
h.mu.Unlock()
return empty
}
raw := s.pickTier(span).readSamples()
h.mu.Unlock()
startIdx := len(raw)
for i := len(raw) - 1; i >= 0; i-- {
if raw[i].T < cutoff {
break
}
startIdx = i
}
tmp := raw[startIdx:]
if len(tmp) == 0 {
return []map[string]any{}
return empty
}
bSize := int64(bucketSeconds)
@@ -152,24 +209,79 @@ func (h *metricHistory) aggregate(metric string, bucketSeconds int, maxPoints in
out = out[len(out)-maxPoints:]
}
if out == nil {
return []map[string]any{}
return empty
}
return out
}
// systemMetrics holds whole-host time series (cpu, mem, netUp, etc.)
// fed by ServerService.RefreshStatus every 2s. nodeMetrics holds
// per-node CPU/Mem fed by NodeHeartbeatJob every 10s. Both are
// process-local — survival across panel restart is not required.
// persistedTier and persistedSeries are the on-disk shape of a series. Tiers are
// matched back by resolution on restore, so changing the ladder degrades
// gracefully (unmatched layers are dropped) instead of corrupting state.
type persistedTier struct {
Resolution int
Samples []MetricSample
}
type persistedSeries struct {
Tiers []persistedTier
}
// snapshot returns a deep copy of every series' closed buckets, safe to
// serialize without holding the lock during disk I/O.
func (h *metricHistory) snapshot() map[string]persistedSeries {
h.mu.Lock()
defer h.mu.Unlock()
out := make(map[string]persistedSeries, len(h.series))
for k, s := range h.series {
ps := persistedSeries{Tiers: make([]persistedTier, len(s.tiers))}
for i, tb := range s.tiers {
cp := make([]MetricSample, len(tb.samples))
copy(cp, tb.samples)
ps.Tiers[i] = persistedTier{Resolution: tb.resolution, Samples: cp}
}
out[k] = ps
}
return out
}
// restore replaces the in-memory series with a previously persisted set,
// re-applying each tier's capacity cap so a tampered or oversized file can't grow
// the working set unbounded.
func (h *metricHistory) restore(data map[string]persistedSeries) {
h.mu.Lock()
defer h.mu.Unlock()
for k, ps := range data {
s := newSeries()
for _, pt := range ps.Tiers {
for _, tb := range s.tiers {
if tb.resolution != pt.Resolution {
continue
}
samples := pt.Samples
if len(samples) > tb.capacity {
samples = samples[len(samples)-tb.capacity:]
}
tb.samples = samples
break
}
}
h.series[k] = s
}
}
// systemMetrics holds whole-host time series (cpu, mem, netUp, etc.) fed by
// ServerService.RefreshStatus every 2s. nodeMetrics holds per-node CPU/Mem fed
// by NodeHeartbeatJob. xrayMetrics holds xray expvar series. Only systemMetrics
// is persisted; the others rebuild from live connections.
var (
systemMetrics = newMetricHistory()
nodeMetrics = newMetricHistory()
xrayMetrics = newMetricHistory()
)
// SystemMetricKeys lists the metric names ServerService writes on every
// status sample. Exposed for documentation/test purposes; the
// controller validates incoming names against an allow-list.
// SystemMetricKeys lists the metric names ServerService writes on every status
// sample. Exposed for documentation/test purposes; the controller validates
// incoming names against an allow-list.
var SystemMetricKeys = []string{
"cpu", "mem", "swap", "netUp", "netDown", "pktUp", "pktDown", "diskRead", "diskWrite", "diskUsage", "tcpCount", "udpCount", "online", "load1", "load5", "load15",
}
@@ -177,18 +289,13 @@ var SystemMetricKeys = []string{
// NodeMetricKeys lists the per-node metric names NodeHeartbeatJob writes.
var NodeMetricKeys = []string{"cpu", "mem", "netUp", "netDown"}
// XrayMetricKeys lists series sourced from xray's /debug/vars expvar
// endpoint. Populated by XrayMetricsService.Sample on the same 2s cadence
// as the system metrics, but only when the xray config has a `metrics`
// block configured.
// XrayMetricKeys lists series sourced from xray's /debug/vars expvar endpoint.
var XrayMetricKeys = []string{
"xrAlloc", "xrSys", "xrHeapObjects", "xrNumGC", "xrPauseNs",
}
// systemMetricsStorePath is where the host time-series is persisted between
// restarts. It lives next to the database so a single volume mount carries
// both. Only systemMetrics is persisted — node and xray series are cheap to
// rebuild and tied to live connections.
// restarts. It lives next to the database so a single volume mount carries both.
func systemMetricsStorePath() string {
return filepath.Join(config.GetDBFolderPath(), "system_metrics.gob")
}
@@ -216,8 +323,8 @@ func PersistSystemMetrics() error {
}
// RestoreSystemMetrics loads a previously persisted host time-series on startup.
// A missing file is not an error (first boot). Aggregation already windows by
// time, so any gap from downtime is handled by the readers.
// A missing file is not an error (first boot). A pre-tier flat snapshot is
// migrated by replaying its samples through the rollup.
func RestoreSystemMetrics() {
path := systemMetricsStorePath()
f, err := os.Open(path)
@@ -227,11 +334,36 @@ func RestoreSystemMetrics() {
}
return
}
defer f.Close()
var data map[string][]MetricSample
if err := gob.NewDecoder(f).Decode(&data); err != nil {
logger.Warning("decode system metrics failed:", err)
var data map[string]persistedSeries
decErr := gob.NewDecoder(f).Decode(&data)
f.Close()
if decErr == nil {
systemMetrics.restore(data)
return
}
systemMetrics.restore(data)
if migrateLegacySystemMetrics(path) {
return
}
logger.Warning("decode system metrics failed:", decErr)
}
// migrateLegacySystemMetrics loads a pre-tier flat snapshot
// (map[string][]MetricSample) and replays it through append so the new tiers are
// seeded from the existing history instead of starting empty.
func migrateLegacySystemMetrics(path string) bool {
f, err := os.Open(path)
if err != nil {
return false
}
defer f.Close()
var legacy map[string][]MetricSample
if err := gob.NewDecoder(f).Decode(&legacy); err != nil {
return false
}
for metric, samples := range legacy {
for _, p := range samples {
systemMetrics.append(metric, time.Unix(p.T, 0), p.V)
}
}
return true
}
+167
View File
@@ -0,0 +1,167 @@
package service
import (
"fmt"
"runtime"
"testing"
"time"
)
func TestMetricMemoryFootprint(t *testing.T) {
const metrics = 16
retained := func(build func() any) uint64 {
runtime.GC()
runtime.GC()
var m0 runtime.MemStats
runtime.ReadMemStats(&m0)
obj := build()
runtime.GC()
var m1 runtime.MemStats
runtime.ReadMemStats(&m1)
runtime.KeepAlive(obj)
if m1.HeapAlloc < m0.HeapAlloc {
return 0
}
return m1.HeapAlloc - m0.HeapAlloc
}
fill := func(buf []MetricSample) {
for j := range buf {
buf[j] = MetricSample{T: int64(j), V: float64(j)}
}
}
oldFlat := retained(func() any {
m := make(map[string][]MetricSample, metrics)
for i := 0; i < metrics; i++ {
buf := make([]MetricSample, 86400)
fill(buf)
m[fmt.Sprintf("m%d", i)] = buf
}
return m
})
newTiered := retained(func() any {
h := newMetricHistory()
for i := 0; i < metrics; i++ {
s := newSeries()
for _, tb := range s.tiers {
buf := make([]MetricSample, tb.capacity)
fill(buf)
tb.samples = buf
}
h.series[fmt.Sprintf("m%d", i)] = s
}
return h
})
t.Logf("metric history footprint (16 system metrics, full):")
t.Logf(" before (flat 48h@2s): %d KiB", oldFlat/1024)
t.Logf(" after (tiered 7d): %d KiB", newTiered/1024)
if newTiered >= oldFlat {
t.Fatalf("expected tiered footprint smaller: old=%d new=%d", oldFlat, newTiered)
}
}
func TestTierBufRollupAveragesClosedBuckets(t *testing.T) {
tb := &tierBuf{resolution: 10, capacity: 100}
tb.add(0, 2)
tb.add(2, 4)
tb.add(5, 6)
tb.add(10, 10)
if len(tb.samples) != 1 || tb.samples[0].T != 0 || tb.samples[0].V != 4 {
t.Fatalf("expected one closed bucket {0,4}, got %+v", tb.samples)
}
got := tb.readSamples()
if len(got) != 2 || got[1].T != 10 || got[1].V != 10 {
t.Fatalf("expected open bucket {10,10} appended on read, got %+v", got)
}
}
func TestTierBufRespectsCapacity(t *testing.T) {
tb := &tierBuf{resolution: 1, capacity: 5}
for i := int64(0); i < 20; i++ {
tb.add(i, float64(i))
}
if len(tb.samples) != 5 {
t.Fatalf("expected closed buckets capped at 5, got %d", len(tb.samples))
}
if last := tb.samples[len(tb.samples)-1]; last.T != 18 {
t.Fatalf("expected last closed bucket T=18 (19 still open), got %d", last.T)
}
}
func TestSeriesPickTierBySpan(t *testing.T) {
s := newSeries()
cases := []struct {
span int64
res int
}{
{120, 2},
{3600, 2},
{7200, 60},
{172800, 60},
{604800, 600},
{9999999, 600},
}
for _, c := range cases {
if got := s.pickTier(c.span); got.resolution != c.res {
t.Errorf("span %d: expected resolution %d, got %d", c.span, c.res, got.resolution)
}
}
}
func TestAggregateFineRealtime(t *testing.T) {
h := newMetricHistory()
now := time.Now().Unix()
for i := int64(59); i >= 0; i-- {
h.append("cpu", time.Unix(now-i*2, 0), float64(100-i))
}
out := h.aggregate("cpu", 2, 60)
if len(out) == 0 {
t.Fatalf("expected non-empty realtime aggregate")
}
if _, ok := out[len(out)-1]["v"].(float64); !ok {
t.Fatalf("expected float64 value, got %T", out[len(out)-1]["v"])
}
}
func TestAggregateLongSpanUsesCoarseTier(t *testing.T) {
h := newMetricHistory()
now := time.Now().Unix()
for i := int64(0); i < 200; i++ {
ts := now - (200-i)*600
h.append("cpu", time.Unix(ts, 0), float64(i))
}
out := h.aggregate("cpu", 10080, 60)
if len(out) == 0 {
t.Fatalf("expected non-empty 7d aggregate from the archive tier")
}
}
func TestSnapshotRestoreRoundTrip(t *testing.T) {
h := newMetricHistory()
now := time.Now().Unix()
for i := int64(0); i < 10; i++ {
h.append("cpu", time.Unix(now-(9-i)*2, 0), float64(i))
}
h2 := newMetricHistory()
h2.restore(h.snapshot())
if out := h2.aggregate("cpu", 2, 60); len(out) == 0 {
t.Fatalf("expected restored series to aggregate")
}
}
func TestAggregateMissingMetricIsEmpty(t *testing.T) {
h := newMetricHistory()
if out := h.aggregate("nope", 2, 60); len(out) != 0 {
t.Fatalf("expected empty result for unknown metric, got %d points", len(out))
}
}
+9 -9
View File
@@ -166,15 +166,15 @@ const xrayVersionsCacheTTL = 15 * time.Minute
// callers from triggering arbitrary aggregation work and keeps the
// frontend's bucket selector self-documenting.
var allowedHistoryBuckets = map[int]bool{
2: true, // Real-time view
30: true, // 30s intervals
60: true, // 1m intervals
120: true, // 2m intervals
180: true, // 3m intervals
300: true, // 5m intervals
720: true, // 12m intervals
1440: true, // 24m intervals
2880: true, // 48m intervals
2: true, // 2m
30: true, // 30m
60: true, // 1h
180: true, // 3h
360: true, // 6h
720: true, // 12h
1440: true, // 24h
2880: true, // 2d
10080: true, // 7d
}
// IsAllowedHistoryBucket reports whether a bucket-seconds value is in the
+4
View File
@@ -398,6 +398,10 @@ func (s *Server) startTask(restartXray bool) {
if mins := sys.MemoryReleaseIntervalMinutes(); mins > 0 {
s.cron.AddJob(fmt.Sprintf("@every %dm", mins), job.NewMemoryReleaseJob())
go func() {
time.Sleep(time.Minute)
job.NewMemoryReleaseJob().Run()
}()
}
}