diff --git a/frontend/src/pages/index/SystemHistoryModal.tsx b/frontend/src/pages/index/SystemHistoryModal.tsx index 1760ce878..7d3fb249a 100644 --- a/frontend/src/pages/index/SystemHistoryModal.tsx +++ b/frontend/src/pages/index/SystemHistoryModal.tsx @@ -137,10 +137,13 @@ export default function SystemHistoryModal({ open, status, onClose }: SystemHist const tss: number[] = []; for (const p of msg.obj) { const d = new Date(p.t * 1000); + const MM = String(d.getMonth() + 1).padStart(2, '0'); + const DD = String(d.getDate()).padStart(2, '0'); const hh = String(d.getHours()).padStart(2, '0'); const mm = String(d.getMinutes()).padStart(2, '0'); const ss = String(d.getSeconds()).padStart(2, '0'); - labs.push(bucket >= 60 ? `${hh}:${mm}` : `${hh}:${mm}:${ss}`); + const lab = bucket >= 2880 ? `${MM}-${DD} ${hh}:${mm}` : bucket >= 60 ? `${hh}:${mm}` : `${hh}:${mm}:${ss}`; + labs.push(lab); vals.push(Number(p.v) || 0); tss.push(Number(p.t) || 0); } @@ -208,14 +211,13 @@ export default function SystemHistoryModal({ open, status, onClose }: SystemHist onChange={setBucket} options={[ { value: 2, label: '2m' }, - { value: 30, label: '30m' }, { value: 60, label: '1h' }, - { value: 120, label: '2h' }, { value: 180, label: '3h' }, - { value: 300, label: '5h' }, + { value: 360, label: '6h' }, { value: 720, label: '12h' }, { value: 1440, label: '24h' }, - { value: 2880, label: '48h' }, + { value: 2880, label: '2d' }, + { value: 10080, label: '7d' }, ]} /> diff --git a/frontend/src/pages/index/XrayMetricsModal.tsx b/frontend/src/pages/index/XrayMetricsModal.tsx index 9e134ed2c..5a6d53ac0 100644 --- a/frontend/src/pages/index/XrayMetricsModal.tsx +++ b/frontend/src/pages/index/XrayMetricsModal.tsx @@ -286,11 +286,10 @@ export default function XrayMetricsModal({ open, onClose }: XrayMetricsModalProp onChange={setBucket} options={[ { value: 2, label: '2m' }, - { value: 30, label: '30m' }, { value: 60, label: '1h' }, - { value: 120, label: '2h' }, { value: 180, label: '3h' }, - { value: 300, label: '5h' }, + { value: 360, label: '6h' }, + { value: 720, label: '12h' }, ]} /> diff --git a/internal/web/service/metric_history.go b/internal/web/service/metric_history.go index ff21eb5a4..0a0f2128f 100644 --- a/internal/web/service/metric_history.go +++ b/internal/web/service/metric_history.go @@ -4,7 +4,6 @@ import ( "encoding/gob" "os" "path/filepath" - "slices" "sync" "time" @@ -19,109 +18,167 @@ type MetricSample struct { V float64 `json:"v"` } -// metricCapacityDefault caps each ring buffer at 48h worth of @2s samples. -// Node metrics arrive less frequently, so they fit the same retention window -// with room to spare. -const metricCapacityDefault = 86400 - -// metricHistory is a thread-safe, in-memory ring buffer keyed by -// arbitrary strings. Two singletons live below: one for system-wide -// host metrics, one for per-node metrics. Keeping them in this file -// (rather than scattered across services) makes the storage model -// easy to reason about and avoids double-locking. -type metricHistory struct { - mu sync.Mutex - metrics map[string][]MetricSample +// tierSpec defines one resolution layer of the rollup ladder: a fixed bucket +// size in seconds and how many buckets to retain. window = resolution*capacity. +type tierSpec struct { + resolution int + capacity int } -func newMetricHistory() *metricHistory { - return &metricHistory{metrics: map[string][]MetricSample{}} +// metricTiers is the rollup ladder applied to every series. High resolution is +// kept only for the recent past; older samples roll up into progressively +// coarser, cheaper layers (RRDtool-style). Per series this totals ~5700 samples +// (~90 KiB) yet spans a live 2s view through ~7 days of history. +var metricTiers = []tierSpec{ + {resolution: 2, capacity: 1800}, // 1h at 2s + {resolution: 60, capacity: 2880}, // 48h at 1m + {resolution: 600, capacity: 1008}, // 7d at 10m } -// append stores a single sample for the given metric, deduping when -// two appends happen within the same wall-clock second (which can -// happen if the cron tick is faster than the metric's natural rate). -func (h *metricHistory) append(metric string, t time.Time, v float64) { - h.mu.Lock() - defer h.mu.Unlock() - buf := h.metrics[metric] - p := MetricSample{T: t.Unix(), V: v} - if n := len(buf); n > 0 && buf[n-1].T == p.T { - buf[n-1] = p - } else { - buf = append(buf, p) +// tierBuf is one fixed-resolution ring of a series. Samples land in an open +// bucket and are averaged into the ring only when the next bucket begins, so a +// coarse tier carries one mean per bucket instead of every raw point. +type tierBuf struct { + resolution int + capacity int + samples []MetricSample + open bool + openStart int64 + openSum float64 + openCount int +} + +func (tb *tierBuf) add(unixSec int64, v float64) { + res := int64(tb.resolution) + b := (unixSec / res) * res + if tb.open && b != tb.openStart { + tb.flush() } - if len(buf) > metricCapacityDefault { - buf = buf[len(buf)-metricCapacityDefault:] + tb.open = true + tb.openStart = b + tb.openSum += v + tb.openCount++ +} + +func (tb *tierBuf) flush() { + if tb.openCount == 0 { + tb.open = false + return } - h.metrics[metric] = buf + tb.samples = append(tb.samples, MetricSample{T: tb.openStart, V: tb.openSum / float64(tb.openCount)}) + if len(tb.samples) > tb.capacity { + tb.samples = tb.samples[len(tb.samples)-tb.capacity:] + } + tb.open = false + tb.openStart = 0 + tb.openSum = 0 + tb.openCount = 0 } -// drop removes the entire history for one metric. Used when a node is -// deleted so its old samples don't linger forever in the singleton. -func (h *metricHistory) drop(metric string) { - h.mu.Lock() - delete(h.metrics, metric) - h.mu.Unlock() -} - -// snapshot returns a deep copy of every series, safe to serialize without -// holding the lock during disk I/O. -func (h *metricHistory) snapshot() map[string][]MetricSample { - h.mu.Lock() - defer h.mu.Unlock() - out := make(map[string][]MetricSample, len(h.metrics)) - for k, v := range h.metrics { - cp := make([]MetricSample, len(v)) - copy(cp, v) - out[k] = cp +// readSamples returns a copy of the closed buckets plus the still-open one, so +// the most recent point is visible before its bucket boundary closes. +func (tb *tierBuf) readSamples() []MetricSample { + out := make([]MetricSample, len(tb.samples), len(tb.samples)+1) + copy(out, tb.samples) + if tb.openCount > 0 { + out = append(out, MetricSample{T: tb.openStart, V: tb.openSum / float64(tb.openCount)}) } return out } -// restore replaces the in-memory series with a previously persisted set, -// re-applying the per-series capacity cap so a tampered or oversized file -// can't grow the working set unbounded. -func (h *metricHistory) restore(data map[string][]MetricSample) { - h.mu.Lock() - defer h.mu.Unlock() - for k, v := range data { - if len(v) > metricCapacityDefault { - v = v[len(v)-metricCapacityDefault:] - } - h.metrics[k] = v +// series is the rollup ladder for one metric: a sample is fed to every tier. +type series struct { + tiers []*tierBuf +} + +func newSeries() *series { + s := &series{tiers: make([]*tierBuf, len(metricTiers))} + for i, spec := range metricTiers { + s.tiers[i] = &tierBuf{resolution: spec.resolution, capacity: spec.capacity} + } + return s +} + +func (s *series) add(unixSec int64, v float64) { + for _, tb := range s.tiers { + tb.add(unixSec, v) } } -// aggregate returns up to maxPoints buckets of size bucketSeconds, -// each bucket carrying the arithmetic mean of the underlying samples. -// Bucket alignment is to absolute Unix-second boundaries so two -// concurrent calls (e.g. two browser tabs) see identical x-axes. -func (h *metricHistory) aggregate(metric string, bucketSeconds int, maxPoints int) []map[string]any { - if bucketSeconds <= 0 || maxPoints <= 0 { - return []map[string]any{} - } - cutoff := time.Now().Add(-time.Duration(bucketSeconds*maxPoints) * time.Second).Unix() - - h.mu.Lock() - hist := h.metrics[metric] - startIdx := 0 - for i, h := range slices.Backward(hist) { - if h.T < cutoff { - startIdx = i + 1 - break +// pickTier returns the finest tier whose window covers spanSeconds, falling back +// to the coarsest (longest-window) tier when nothing covers it. +func (s *series) pickTier(spanSeconds int64) *tierBuf { + for _, tb := range s.tiers { + if int64(tb.resolution)*int64(tb.capacity) >= spanSeconds { + return tb } } - if startIdx >= len(hist) { - h.mu.Unlock() - return []map[string]any{} + return s.tiers[len(s.tiers)-1] +} + +// metricHistory is a thread-safe, in-memory store of tiered series keyed by +// arbitrary strings. Three singletons live below: system-wide host metrics, +// per-node metrics, and xray expvar metrics. +type metricHistory struct { + mu sync.Mutex + series map[string]*series +} + +func newMetricHistory() *metricHistory { + return &metricHistory{series: map[string]*series{}} +} + +// append stores a single sample for the given metric across all tiers. +func (h *metricHistory) append(metric string, t time.Time, v float64) { + h.mu.Lock() + defer h.mu.Unlock() + s := h.series[metric] + if s == nil { + s = newSeries() + h.series[metric] = s } - tmp := make([]MetricSample, len(hist)-startIdx) - copy(tmp, hist[startIdx:]) + s.add(t.Unix(), v) +} + +// drop removes the entire history for one metric. Used when a node is deleted so +// its old samples don't linger forever in the singleton. +func (h *metricHistory) drop(metric string) { + h.mu.Lock() + delete(h.series, metric) + h.mu.Unlock() +} + +// aggregate returns up to maxPoints buckets of size bucketSeconds, each carrying +// the arithmetic mean of the underlying samples from the finest tier that covers +// the requested span. Bucket alignment is to absolute Unix-second boundaries so +// two concurrent calls see identical x-axes. +func (h *metricHistory) aggregate(metric string, bucketSeconds int, maxPoints int) []map[string]any { + empty := []map[string]any{} + if bucketSeconds <= 0 || maxPoints <= 0 { + return empty + } + span := int64(bucketSeconds) * int64(maxPoints) + cutoff := time.Now().Unix() - span + + h.mu.Lock() + s := h.series[metric] + if s == nil { + h.mu.Unlock() + return empty + } + raw := s.pickTier(span).readSamples() h.mu.Unlock() + startIdx := len(raw) + for i := len(raw) - 1; i >= 0; i-- { + if raw[i].T < cutoff { + break + } + startIdx = i + } + tmp := raw[startIdx:] if len(tmp) == 0 { - return []map[string]any{} + return empty } bSize := int64(bucketSeconds) @@ -152,24 +209,79 @@ func (h *metricHistory) aggregate(metric string, bucketSeconds int, maxPoints in out = out[len(out)-maxPoints:] } if out == nil { - return []map[string]any{} + return empty } return out } -// systemMetrics holds whole-host time series (cpu, mem, netUp, etc.) -// fed by ServerService.RefreshStatus every 2s. nodeMetrics holds -// per-node CPU/Mem fed by NodeHeartbeatJob every 10s. Both are -// process-local — survival across panel restart is not required. +// persistedTier and persistedSeries are the on-disk shape of a series. Tiers are +// matched back by resolution on restore, so changing the ladder degrades +// gracefully (unmatched layers are dropped) instead of corrupting state. +type persistedTier struct { + Resolution int + Samples []MetricSample +} + +type persistedSeries struct { + Tiers []persistedTier +} + +// snapshot returns a deep copy of every series' closed buckets, safe to +// serialize without holding the lock during disk I/O. +func (h *metricHistory) snapshot() map[string]persistedSeries { + h.mu.Lock() + defer h.mu.Unlock() + out := make(map[string]persistedSeries, len(h.series)) + for k, s := range h.series { + ps := persistedSeries{Tiers: make([]persistedTier, len(s.tiers))} + for i, tb := range s.tiers { + cp := make([]MetricSample, len(tb.samples)) + copy(cp, tb.samples) + ps.Tiers[i] = persistedTier{Resolution: tb.resolution, Samples: cp} + } + out[k] = ps + } + return out +} + +// restore replaces the in-memory series with a previously persisted set, +// re-applying each tier's capacity cap so a tampered or oversized file can't grow +// the working set unbounded. +func (h *metricHistory) restore(data map[string]persistedSeries) { + h.mu.Lock() + defer h.mu.Unlock() + for k, ps := range data { + s := newSeries() + for _, pt := range ps.Tiers { + for _, tb := range s.tiers { + if tb.resolution != pt.Resolution { + continue + } + samples := pt.Samples + if len(samples) > tb.capacity { + samples = samples[len(samples)-tb.capacity:] + } + tb.samples = samples + break + } + } + h.series[k] = s + } +} + +// systemMetrics holds whole-host time series (cpu, mem, netUp, etc.) fed by +// ServerService.RefreshStatus every 2s. nodeMetrics holds per-node CPU/Mem fed +// by NodeHeartbeatJob. xrayMetrics holds xray expvar series. Only systemMetrics +// is persisted; the others rebuild from live connections. var ( systemMetrics = newMetricHistory() nodeMetrics = newMetricHistory() xrayMetrics = newMetricHistory() ) -// SystemMetricKeys lists the metric names ServerService writes on every -// status sample. Exposed for documentation/test purposes; the -// controller validates incoming names against an allow-list. +// SystemMetricKeys lists the metric names ServerService writes on every status +// sample. Exposed for documentation/test purposes; the controller validates +// incoming names against an allow-list. var SystemMetricKeys = []string{ "cpu", "mem", "swap", "netUp", "netDown", "pktUp", "pktDown", "diskRead", "diskWrite", "diskUsage", "tcpCount", "udpCount", "online", "load1", "load5", "load15", } @@ -177,18 +289,13 @@ var SystemMetricKeys = []string{ // NodeMetricKeys lists the per-node metric names NodeHeartbeatJob writes. var NodeMetricKeys = []string{"cpu", "mem", "netUp", "netDown"} -// XrayMetricKeys lists series sourced from xray's /debug/vars expvar -// endpoint. Populated by XrayMetricsService.Sample on the same 2s cadence -// as the system metrics, but only when the xray config has a `metrics` -// block configured. +// XrayMetricKeys lists series sourced from xray's /debug/vars expvar endpoint. var XrayMetricKeys = []string{ "xrAlloc", "xrSys", "xrHeapObjects", "xrNumGC", "xrPauseNs", } // systemMetricsStorePath is where the host time-series is persisted between -// restarts. It lives next to the database so a single volume mount carries -// both. Only systemMetrics is persisted — node and xray series are cheap to -// rebuild and tied to live connections. +// restarts. It lives next to the database so a single volume mount carries both. func systemMetricsStorePath() string { return filepath.Join(config.GetDBFolderPath(), "system_metrics.gob") } @@ -216,8 +323,8 @@ func PersistSystemMetrics() error { } // RestoreSystemMetrics loads a previously persisted host time-series on startup. -// A missing file is not an error (first boot). Aggregation already windows by -// time, so any gap from downtime is handled by the readers. +// A missing file is not an error (first boot). A pre-tier flat snapshot is +// migrated by replaying its samples through the rollup. func RestoreSystemMetrics() { path := systemMetricsStorePath() f, err := os.Open(path) @@ -227,11 +334,36 @@ func RestoreSystemMetrics() { } return } - defer f.Close() - var data map[string][]MetricSample - if err := gob.NewDecoder(f).Decode(&data); err != nil { - logger.Warning("decode system metrics failed:", err) + var data map[string]persistedSeries + decErr := gob.NewDecoder(f).Decode(&data) + f.Close() + if decErr == nil { + systemMetrics.restore(data) return } - systemMetrics.restore(data) + if migrateLegacySystemMetrics(path) { + return + } + logger.Warning("decode system metrics failed:", decErr) +} + +// migrateLegacySystemMetrics loads a pre-tier flat snapshot +// (map[string][]MetricSample) and replays it through append so the new tiers are +// seeded from the existing history instead of starting empty. +func migrateLegacySystemMetrics(path string) bool { + f, err := os.Open(path) + if err != nil { + return false + } + defer f.Close() + var legacy map[string][]MetricSample + if err := gob.NewDecoder(f).Decode(&legacy); err != nil { + return false + } + for metric, samples := range legacy { + for _, p := range samples { + systemMetrics.append(metric, time.Unix(p.T, 0), p.V) + } + } + return true } diff --git a/internal/web/service/metric_history_test.go b/internal/web/service/metric_history_test.go new file mode 100644 index 000000000..3f959b56c --- /dev/null +++ b/internal/web/service/metric_history_test.go @@ -0,0 +1,167 @@ +package service + +import ( + "fmt" + "runtime" + "testing" + "time" +) + +func TestMetricMemoryFootprint(t *testing.T) { + const metrics = 16 + + retained := func(build func() any) uint64 { + runtime.GC() + runtime.GC() + var m0 runtime.MemStats + runtime.ReadMemStats(&m0) + obj := build() + runtime.GC() + var m1 runtime.MemStats + runtime.ReadMemStats(&m1) + runtime.KeepAlive(obj) + if m1.HeapAlloc < m0.HeapAlloc { + return 0 + } + return m1.HeapAlloc - m0.HeapAlloc + } + + fill := func(buf []MetricSample) { + for j := range buf { + buf[j] = MetricSample{T: int64(j), V: float64(j)} + } + } + + oldFlat := retained(func() any { + m := make(map[string][]MetricSample, metrics) + for i := 0; i < metrics; i++ { + buf := make([]MetricSample, 86400) + fill(buf) + m[fmt.Sprintf("m%d", i)] = buf + } + return m + }) + + newTiered := retained(func() any { + h := newMetricHistory() + for i := 0; i < metrics; i++ { + s := newSeries() + for _, tb := range s.tiers { + buf := make([]MetricSample, tb.capacity) + fill(buf) + tb.samples = buf + } + h.series[fmt.Sprintf("m%d", i)] = s + } + return h + }) + + t.Logf("metric history footprint (16 system metrics, full):") + t.Logf(" before (flat 48h@2s): %d KiB", oldFlat/1024) + t.Logf(" after (tiered 7d): %d KiB", newTiered/1024) + if newTiered >= oldFlat { + t.Fatalf("expected tiered footprint smaller: old=%d new=%d", oldFlat, newTiered) + } +} + +func TestTierBufRollupAveragesClosedBuckets(t *testing.T) { + tb := &tierBuf{resolution: 10, capacity: 100} + tb.add(0, 2) + tb.add(2, 4) + tb.add(5, 6) + tb.add(10, 10) + + if len(tb.samples) != 1 || tb.samples[0].T != 0 || tb.samples[0].V != 4 { + t.Fatalf("expected one closed bucket {0,4}, got %+v", tb.samples) + } + + got := tb.readSamples() + if len(got) != 2 || got[1].T != 10 || got[1].V != 10 { + t.Fatalf("expected open bucket {10,10} appended on read, got %+v", got) + } +} + +func TestTierBufRespectsCapacity(t *testing.T) { + tb := &tierBuf{resolution: 1, capacity: 5} + for i := int64(0); i < 20; i++ { + tb.add(i, float64(i)) + } + if len(tb.samples) != 5 { + t.Fatalf("expected closed buckets capped at 5, got %d", len(tb.samples)) + } + if last := tb.samples[len(tb.samples)-1]; last.T != 18 { + t.Fatalf("expected last closed bucket T=18 (19 still open), got %d", last.T) + } +} + +func TestSeriesPickTierBySpan(t *testing.T) { + s := newSeries() + cases := []struct { + span int64 + res int + }{ + {120, 2}, + {3600, 2}, + {7200, 60}, + {172800, 60}, + {604800, 600}, + {9999999, 600}, + } + for _, c := range cases { + if got := s.pickTier(c.span); got.resolution != c.res { + t.Errorf("span %d: expected resolution %d, got %d", c.span, c.res, got.resolution) + } + } +} + +func TestAggregateFineRealtime(t *testing.T) { + h := newMetricHistory() + now := time.Now().Unix() + for i := int64(59); i >= 0; i-- { + h.append("cpu", time.Unix(now-i*2, 0), float64(100-i)) + } + + out := h.aggregate("cpu", 2, 60) + if len(out) == 0 { + t.Fatalf("expected non-empty realtime aggregate") + } + if _, ok := out[len(out)-1]["v"].(float64); !ok { + t.Fatalf("expected float64 value, got %T", out[len(out)-1]["v"]) + } +} + +func TestAggregateLongSpanUsesCoarseTier(t *testing.T) { + h := newMetricHistory() + now := time.Now().Unix() + for i := int64(0); i < 200; i++ { + ts := now - (200-i)*600 + h.append("cpu", time.Unix(ts, 0), float64(i)) + } + + out := h.aggregate("cpu", 10080, 60) + if len(out) == 0 { + t.Fatalf("expected non-empty 7d aggregate from the archive tier") + } +} + +func TestSnapshotRestoreRoundTrip(t *testing.T) { + h := newMetricHistory() + now := time.Now().Unix() + for i := int64(0); i < 10; i++ { + h.append("cpu", time.Unix(now-(9-i)*2, 0), float64(i)) + } + + h2 := newMetricHistory() + h2.restore(h.snapshot()) + + if out := h2.aggregate("cpu", 2, 60); len(out) == 0 { + t.Fatalf("expected restored series to aggregate") + } +} + +func TestAggregateMissingMetricIsEmpty(t *testing.T) { + h := newMetricHistory() + if out := h.aggregate("nope", 2, 60); len(out) != 0 { + t.Fatalf("expected empty result for unknown metric, got %d points", len(out)) + } +} diff --git a/internal/web/service/server.go b/internal/web/service/server.go index c007582bc..da235a1b2 100644 --- a/internal/web/service/server.go +++ b/internal/web/service/server.go @@ -166,15 +166,15 @@ const xrayVersionsCacheTTL = 15 * time.Minute // callers from triggering arbitrary aggregation work and keeps the // frontend's bucket selector self-documenting. var allowedHistoryBuckets = map[int]bool{ - 2: true, // Real-time view - 30: true, // 30s intervals - 60: true, // 1m intervals - 120: true, // 2m intervals - 180: true, // 3m intervals - 300: true, // 5m intervals - 720: true, // 12m intervals - 1440: true, // 24m intervals - 2880: true, // 48m intervals + 2: true, // 2m + 30: true, // 30m + 60: true, // 1h + 180: true, // 3h + 360: true, // 6h + 720: true, // 12h + 1440: true, // 24h + 2880: true, // 2d + 10080: true, // 7d } // IsAllowedHistoryBucket reports whether a bucket-seconds value is in the diff --git a/internal/web/web.go b/internal/web/web.go index 517833942..a61b45d3e 100644 --- a/internal/web/web.go +++ b/internal/web/web.go @@ -398,6 +398,10 @@ func (s *Server) startTask(restartXray bool) { if mins := sys.MemoryReleaseIntervalMinutes(); mins > 0 { s.cron.AddJob(fmt.Sprintf("@every %dm", mins), job.NewMemoryReleaseJob()) + go func() { + time.Sleep(time.Minute) + job.NewMemoryReleaseJob().Run() + }() } }