diff --git a/internal/web/service/inbound_node.go b/internal/web/service/inbound_node.go index 79465dc36..9b6522b7a 100644 --- a/internal/web/service/inbound_node.go +++ b/internal/web/service/inbound_node.go @@ -262,6 +262,22 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi } } + // Union of every email the snapshot still reports, across all inbounds. + // The (node, email) baseline rows are keyed per node, not per inbound, so + // the sweeps below must only drop one when the email left the node + // entirely — an email whose stats moved to (or always lived under) a + // sibling inbound still needs its baseline for the sibling's delta + // computation (#5202). + snapEmailsAll := make(map[string]struct{}) + for _, snapIb := range snap.Inbounds { + if snapIb == nil { + continue + } + for i := range snapIb.ClientStats { + snapEmailsAll[snapIb.ClientStats[i].Email] = struct{}{} + } + } + tx := db.Begin() committed := false defer func() { @@ -421,9 +437,17 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi return false, err } if len(goneEmails) > 0 { + // Baselines are per (node, email), not per inbound: keep them for + // emails the snapshot still reports under a sibling inbound (#5202). + baselineGone := make([]string, 0, len(goneEmails)) + for _, e := range goneEmails { + if _, still := snapEmailsAll[e]; !still { + baselineGone = append(baselineGone, e) + } + } // Chunk to avoid SQLite bind var limit when a node has many clients // removed (e.g. after API bulk delete or structural change on node inbound). - for _, batch := range chunkStrings(goneEmails, sqliteMaxVars) { + for _, batch := range chunkStrings(baselineGone, sqliteMaxVars) { if err := tx.Where("node_id = ? AND email IN ?", nodeID, batch). Delete(&model.NodeClientTraffic{}).Error; err != nil { return false, err @@ -554,6 +578,13 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi if _, kept := snapEmails[k.email]; kept { continue } + // Gone from this inbound's stats but still reported by the node under + // a sibling inbound: both the shared accumulator row and the (node, + // email) baseline must survive, or the sibling's next delta would + // compute against nothing and freeze the counter (#5202). + if _, still := snapEmailsAll[k.email]; still { + continue + } if err := tx.Where("node_id = ? AND email = ?", nodeID, existing.Email). Delete(&model.NodeClientTraffic{}).Error; err != nil { return false, err diff --git a/internal/web/service/node_client_traffic_sum_test.go b/internal/web/service/node_client_traffic_sum_test.go index 0eba143e2..9df6d2a53 100644 --- a/internal/web/service/node_client_traffic_sum_test.go +++ b/internal/web/service/node_client_traffic_sum_test.go @@ -226,6 +226,54 @@ func TestClientGoneFromOneNode_KeepsSharedEmailRow(t *testing.T) { assertUpDown(t, readTraffic(t, db, email), 140, 140, "node 2 keeps accruing") } +// TestStatsUnderSiblingInbound_KeepsNodeBaseline reproduces the recurring +// sweep bug behind #5202: the client is attached to two inbounds of the SAME +// node, the node reports its stats under n1-a only, but the master-side row +// is owned by n1-b's mirror. The per-email sweep for n1-b must not drop the +// (node, email) baseline that n1-a's delta computation needs — doing so every +// cycle froze the client's counter permanently. +func TestStatsUnderSiblingInbound_KeepsNodeBaseline(t *testing.T) { + db := initTrafficTestDB(t) + createNodeInboundWithClient(t, db, 1, "n1-a", 41001, "fresh") + createNodeInbound(t, db, 1, "n1-b", 41002) + svc := &InboundService{} + + const email = "fresh" + var ibB model.Inbound + if err := db.Where("tag = ?", "n1-b").First(&ibB).Error; err != nil { + t.Fatalf("load n1-b: %v", err) + } + // Master-side row created when the client was added on the panel, owned by + // n1-b's mirror (e.g. the client form targeted that inbound). + if err := db.Create(&xray.ClientTraffic{InboundId: ibB.Id, Email: email, Enable: true}).Error; err != nil { + t.Fatalf("seed master row: %v", err) + } + + settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email) + sync := func(up, down int64) { + t.Helper() + snap := &runtime.TrafficSnapshot{Inbounds: []*model.Inbound{ + {Tag: "n1-a", Settings: settings, ClientStats: []xray.ClientTraffic{{Email: email, Up: up, Down: down, Enable: true}}}, + {Tag: "n1-b", Settings: `{"clients": []}`}, + }} + if _, err := svc.setRemoteTrafficLocked(1, snap, false); err != nil { + t.Fatalf("sync: %v", err) + } + } + + sync(630, 630) + var baselines int64 + if err := db.Model(&model.NodeClientTraffic{}).Where("node_id = ? AND email = ?", 1, email).Count(&baselines).Error; err != nil { + t.Fatalf("count baselines: %v", err) + } + if baselines != 1 { + t.Fatalf("baseline must survive the sibling-inbound sweep, found %d rows", baselines) + } + + sync(700, 700) + assertUpDown(t, readTraffic(t, db, email), 70, 70, "delta accrues once baseline survives") +} + func TestDelClientStat_CleansNodeBaselines(t *testing.T) { db := initTrafficTestDB(t) svc := &InboundService{}