fix(nodes): normalize node-inbound speed by elapsed time to avoid recovery spikes

Adversarial review found that a node's cumulative inbound counter keeps climbing while the master can't reach it, so the first delta after a gap (node outage, skipped poll, slow node) spans more than one 5s window but was still divided by the dashboard's fixed 5s — rendering an impossible one-tick speed spike on recovery (and a 2x over-report after a skipped poll). Now each delta is normalized to the fixed window using the real elapsed time since the inbound's counter last changed, so a backlog shows the true average rate over the gap. The change timestamp advances only on actual movement, so idle stretches average correctly when traffic resumes; resets rebaseline. Also moves the maybePushGlobals doc comment back onto its function.
This commit is contained in:
MHSanaei
2026-06-22 18:17:02 +02:00
parent 2ce5891f5f
commit eedb65bcc1
+67 -28
View File
@@ -22,8 +22,18 @@ const (
nodeClientIpSyncInterval = 10 * time.Second
nodeClientIpSyncTimeout = 6 * time.Second
nodeGlobalPushInterval = 30 * time.Second
// nodeInboundSpeedWindowMs is the poll window node-inbound speed deltas are
// normalized to; it MUST match the dashboard's TRAFFIC_POLL_INTERVAL_S (5s),
// the fixed divisor the frontend applies to turn a delta into a rate.
nodeInboundSpeedWindowMs int64 = 5000
)
// inboundSample is a node inbound's last-seen cumulative up/down and the time
// (unix millis) its counter last changed, used to derive a normalized speed.
type inboundSample struct {
up, down, at int64
}
type NodeTrafficSyncJob struct {
nodeService service.NodeService
inboundService service.InboundService
@@ -38,10 +48,11 @@ type NodeTrafficSyncJob struct {
// noGuidIpEndpoint tracks nodes (by id) whose client-IP attribution endpoint
// returned 404, so an old-build node is noted once instead of every cycle.
noGuidIpEndpoint sync.Map
// prevInboundTotals holds the previous poll's cumulative up/down per node
// inbound tag, so the next poll can derive a per-inbound speed delta (node
// inbounds have no local Xray poll). Touched only from Run (serialized).
prevInboundTotals map[string][2]int64
// prevInboundTotals holds the previous poll's cumulative up/down (and the time
// the counter last changed) per node inbound tag, so the next poll can derive
// a per-inbound speed delta — node inbounds have no local Xray poll. Touched
// only from Run (serialized).
prevInboundTotals map[string]inboundSample
}
type atomicBool struct {
@@ -191,42 +202,70 @@ func (j *NodeTrafficSyncJob) Run() {
}
}
// maybePushGlobals broadcasts this panel's aggregated per-client usage to its
// online nodes so each node can display the client's cross-panel total and
// enforce its quota locally (see InboundService.AcceptGlobalTraffic). Scoped
// per node to the clients that node actually hosts, and throttled — the
// aggregates only need to reach nodes on a human timescale, not every poll.
// nodeInboundSpeed diffs the current node-inbound cumulative totals against the
// previous poll's and returns per-inbound byte deltas (clamped at 0 on a reset),
// keyed by the central tag the dashboard matches. It updates the baseline for
// the next poll. A node that failed to sync this tick keeps its stale total, so
// its delta is 0 — no phantom speed — and a tag seen for the first time yields
// no delta until the next poll.
// nodeInboundSpeed derives a per-node-inbound speed delta by diffing the current
// cumulative up/down against the previous poll's, keyed by the central tag the
// dashboard matches. The node's counter keeps climbing while the master can't
// reach it, so the first delta after a gap (node outage, skipped poll, slow
// node) spans more than one poll window; it is normalized to the fixed
// nodeInboundSpeedWindowMs using the real elapsed time so the dashboard's fixed
// divisor yields the true average rate over the gap instead of an impossible
// one-tick spike. The change timestamp only advances when the value actually
// moves, so an idle stretch is averaged correctly when traffic resumes. A reset
// rebaselines to the lower value; a first-seen tag yields no delta until the
// next poll.
func (j *NodeTrafficSyncJob) nodeInboundSpeed() []*xray.Traffic {
totals, err := j.inboundService.GetNodeInboundTrafficTotals()
if err != nil {
return nil
}
now := time.Now().UnixMilli()
deltas := make([]*xray.Traffic, 0, len(totals))
next := make(map[string]inboundSample, len(totals))
for tag, cur := range totals {
if prev, ok := j.prevInboundTotals[tag]; ok {
dUp := cur[0] - prev[0]
dDown := cur[1] - prev[1]
if dUp < 0 {
dUp = 0
}
if dDown < 0 {
dDown = 0
}
if dUp > 0 || dDown > 0 {
deltas = append(deltas, &xray.Traffic{Tag: tag, IsInbound: true, Up: dUp, Down: dDown})
}
prev, ok := j.prevInboundTotals[tag]
if !ok {
next[tag] = inboundSample{up: cur[0], down: cur[1], at: now}
continue
}
dUp := cur[0] - prev.up
dDown := cur[1] - prev.down
if dUp <= 0 && dDown <= 0 {
// No movement, or a counter reset: hold the change timestamp so a
// later jump is averaged over the real elapsed window, not shown as a
// spike. Adopt the lower value on a reset.
if cur[0] < prev.up || cur[1] < prev.down {
next[tag] = inboundSample{up: cur[0], down: cur[1], at: now}
} else {
next[tag] = prev
}
continue
}
if dUp < 0 {
dUp = 0
}
if dDown < 0 {
dDown = 0
}
elapsed := now - prev.at
if elapsed < nodeInboundSpeedWindowMs {
elapsed = nodeInboundSpeedWindowMs
}
up := dUp * nodeInboundSpeedWindowMs / elapsed
down := dDown * nodeInboundSpeedWindowMs / elapsed
if up > 0 || down > 0 {
deltas = append(deltas, &xray.Traffic{Tag: tag, IsInbound: true, Up: up, Down: down})
}
next[tag] = inboundSample{up: cur[0], down: cur[1], at: now}
}
j.prevInboundTotals = totals
j.prevInboundTotals = next
return deltas
}
// maybePushGlobals broadcasts this panel's aggregated per-client usage to its
// online nodes so each node can display the client's cross-panel total and
// enforce its quota locally (see InboundService.AcceptGlobalTraffic). Scoped
// per node to the clients that node actually hosts, and throttled — the
// aggregates only need to reach nodes on a human timescale, not every poll.
func (j *NodeTrafficSyncJob) maybePushGlobals(mgr *runtime.Manager, nodes []*model.Node) {
j.globalPushMu.Lock()
now := time.Now().Unix()