From eedb65bcc18d0b2203baecbedf8fd221c528c769 Mon Sep 17 00:00:00 2001 From: MHSanaei Date: Mon, 22 Jun 2026 18:17:02 +0200 Subject: [PATCH] fix(nodes): normalize node-inbound speed by elapsed time to avoid recovery spikes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adversarial review found that a node's cumulative inbound counter keeps climbing while the master can't reach it, so the first delta after a gap (node outage, skipped poll, slow node) spans more than one 5s window but was still divided by the dashboard's fixed 5s — rendering an impossible one-tick speed spike on recovery (and a 2x over-report after a skipped poll). Now each delta is normalized to the fixed window using the real elapsed time since the inbound's counter last changed, so a backlog shows the true average rate over the gap. The change timestamp advances only on actual movement, so idle stretches average correctly when traffic resumes; resets rebaseline. Also moves the maybePushGlobals doc comment back onto its function. --- internal/web/job/node_traffic_sync_job.go | 95 ++++++++++++++++------- 1 file changed, 67 insertions(+), 28 deletions(-) diff --git a/internal/web/job/node_traffic_sync_job.go b/internal/web/job/node_traffic_sync_job.go index a91264763..2618cd7b9 100644 --- a/internal/web/job/node_traffic_sync_job.go +++ b/internal/web/job/node_traffic_sync_job.go @@ -22,8 +22,18 @@ const ( nodeClientIpSyncInterval = 10 * time.Second nodeClientIpSyncTimeout = 6 * time.Second nodeGlobalPushInterval = 30 * time.Second + // nodeInboundSpeedWindowMs is the poll window node-inbound speed deltas are + // normalized to; it MUST match the dashboard's TRAFFIC_POLL_INTERVAL_S (5s), + // the fixed divisor the frontend applies to turn a delta into a rate. + nodeInboundSpeedWindowMs int64 = 5000 ) +// inboundSample is a node inbound's last-seen cumulative up/down and the time +// (unix millis) its counter last changed, used to derive a normalized speed. +type inboundSample struct { + up, down, at int64 +} + type NodeTrafficSyncJob struct { nodeService service.NodeService inboundService service.InboundService @@ -38,10 +48,11 @@ type NodeTrafficSyncJob struct { // noGuidIpEndpoint tracks nodes (by id) whose client-IP attribution endpoint // returned 404, so an old-build node is noted once instead of every cycle. noGuidIpEndpoint sync.Map - // prevInboundTotals holds the previous poll's cumulative up/down per node - // inbound tag, so the next poll can derive a per-inbound speed delta (node - // inbounds have no local Xray poll). Touched only from Run (serialized). - prevInboundTotals map[string][2]int64 + // prevInboundTotals holds the previous poll's cumulative up/down (and the time + // the counter last changed) per node inbound tag, so the next poll can derive + // a per-inbound speed delta — node inbounds have no local Xray poll. Touched + // only from Run (serialized). + prevInboundTotals map[string]inboundSample } type atomicBool struct { @@ -191,42 +202,70 @@ func (j *NodeTrafficSyncJob) Run() { } } -// maybePushGlobals broadcasts this panel's aggregated per-client usage to its -// online nodes so each node can display the client's cross-panel total and -// enforce its quota locally (see InboundService.AcceptGlobalTraffic). Scoped -// per node to the clients that node actually hosts, and throttled — the -// aggregates only need to reach nodes on a human timescale, not every poll. -// nodeInboundSpeed diffs the current node-inbound cumulative totals against the -// previous poll's and returns per-inbound byte deltas (clamped at 0 on a reset), -// keyed by the central tag the dashboard matches. It updates the baseline for -// the next poll. A node that failed to sync this tick keeps its stale total, so -// its delta is 0 — no phantom speed — and a tag seen for the first time yields -// no delta until the next poll. +// nodeInboundSpeed derives a per-node-inbound speed delta by diffing the current +// cumulative up/down against the previous poll's, keyed by the central tag the +// dashboard matches. The node's counter keeps climbing while the master can't +// reach it, so the first delta after a gap (node outage, skipped poll, slow +// node) spans more than one poll window; it is normalized to the fixed +// nodeInboundSpeedWindowMs using the real elapsed time so the dashboard's fixed +// divisor yields the true average rate over the gap instead of an impossible +// one-tick spike. The change timestamp only advances when the value actually +// moves, so an idle stretch is averaged correctly when traffic resumes. A reset +// rebaselines to the lower value; a first-seen tag yields no delta until the +// next poll. func (j *NodeTrafficSyncJob) nodeInboundSpeed() []*xray.Traffic { totals, err := j.inboundService.GetNodeInboundTrafficTotals() if err != nil { return nil } + now := time.Now().UnixMilli() deltas := make([]*xray.Traffic, 0, len(totals)) + next := make(map[string]inboundSample, len(totals)) for tag, cur := range totals { - if prev, ok := j.prevInboundTotals[tag]; ok { - dUp := cur[0] - prev[0] - dDown := cur[1] - prev[1] - if dUp < 0 { - dUp = 0 - } - if dDown < 0 { - dDown = 0 - } - if dUp > 0 || dDown > 0 { - deltas = append(deltas, &xray.Traffic{Tag: tag, IsInbound: true, Up: dUp, Down: dDown}) - } + prev, ok := j.prevInboundTotals[tag] + if !ok { + next[tag] = inboundSample{up: cur[0], down: cur[1], at: now} + continue } + dUp := cur[0] - prev.up + dDown := cur[1] - prev.down + if dUp <= 0 && dDown <= 0 { + // No movement, or a counter reset: hold the change timestamp so a + // later jump is averaged over the real elapsed window, not shown as a + // spike. Adopt the lower value on a reset. + if cur[0] < prev.up || cur[1] < prev.down { + next[tag] = inboundSample{up: cur[0], down: cur[1], at: now} + } else { + next[tag] = prev + } + continue + } + if dUp < 0 { + dUp = 0 + } + if dDown < 0 { + dDown = 0 + } + elapsed := now - prev.at + if elapsed < nodeInboundSpeedWindowMs { + elapsed = nodeInboundSpeedWindowMs + } + up := dUp * nodeInboundSpeedWindowMs / elapsed + down := dDown * nodeInboundSpeedWindowMs / elapsed + if up > 0 || down > 0 { + deltas = append(deltas, &xray.Traffic{Tag: tag, IsInbound: true, Up: up, Down: down}) + } + next[tag] = inboundSample{up: cur[0], down: cur[1], at: now} } - j.prevInboundTotals = totals + j.prevInboundTotals = next return deltas } +// maybePushGlobals broadcasts this panel's aggregated per-client usage to its +// online nodes so each node can display the client's cross-panel total and +// enforce its quota locally (see InboundService.AcceptGlobalTraffic). Scoped +// per node to the clients that node actually hosts, and throttled — the +// aggregates only need to reach nodes on a human timescale, not every poll. func (j *NodeTrafficSyncJob) maybePushGlobals(mgr *runtime.Manager, nodes []*model.Node) { j.globalPushMu.Lock() now := time.Now().Unix()