diff --git a/internal/sub/service_property_test.go b/internal/sub/service_property_test.go index c8995daf8..f324cce17 100644 --- a/internal/sub/service_property_test.go +++ b/internal/sub/service_property_test.go @@ -86,4 +86,3 @@ func TestProp_SplitLinkLines_Invariants(t *testing.T) { } }) } - diff --git a/internal/util/link/outbound_helpers_test.go b/internal/util/link/outbound_helpers_test.go index c6a02356b..66f46b011 100644 --- a/internal/util/link/outbound_helpers_test.go +++ b/internal/util/link/outbound_helpers_test.go @@ -15,9 +15,9 @@ func TestDefaultPort(t *testing.T) { }{ {"", 443, 443}, {"8080", 443, 8080}, - {"0", 443, 443}, // non-positive falls back - {"-1", 443, 443}, // negative falls back - {"abc", 443, 443}, // unparseable falls back + {"0", 443, 443}, // non-positive falls back + {"-1", 443, 443}, // negative falls back + {"abc", 443, 443}, // unparseable falls back {"65535", 443, 65535}, } for _, c := range cases { diff --git a/internal/web/service/inbound_node.go b/internal/web/service/inbound_node.go index efa50eff7..822b24296 100644 --- a/internal/web/service/inbound_node.go +++ b/internal/web/service/inbound_node.go @@ -287,13 +287,28 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi // entirely — an email whose stats moved to (or always lived under) a // sibling inbound still needs its baseline for the sibling's delta // computation (#5202). + // + // Xray counts traffic per email, not per inbound, so a multi-attached + // client's shared counter is copied onto every inbound it's on. Fold each + // email to its per-field max (nodeEmailTotals) so divergent copies can't make + // the reset clamp re-add a lower sibling as fresh traffic (#5274). snapEmailsAll := make(map[string]struct{}) + nodeEmailTotals := make(map[string]nodeTrafficCounter) for _, snapIb := range snap.Inbounds { if snapIb == nil { continue } for i := range snapIb.ClientStats { - snapEmailsAll[snapIb.ClientStats[i].Email] = struct{}{} + email := snapIb.ClientStats[i].Email + snapEmailsAll[email] = struct{}{} + cur := nodeEmailTotals[email] + if snapIb.ClientStats[i].Up > cur.Up { + cur.Up = snapIb.ClientStats[i].Up + } + if snapIb.ClientStats[i].Down > cur.Down { + cur.Down = snapIb.ClientStats[i].Down + } + nodeEmailTotals[email] = cur } } @@ -519,14 +534,17 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi for _, cs := range snapIb.ClientStats { snapEmails[cs.Email] = struct{}{} + // Node-wide total, not this inbound's possibly-stale copy (#5274). + canon := nodeEmailTotals[cs.Email] + base, seen := nodeBaselines[cs.Email] var deltaUp, deltaDown int64 if seen { - if deltaUp = cs.Up - base.Up; deltaUp < 0 { - deltaUp = cs.Up + if deltaUp = canon.Up - base.Up; deltaUp < 0 { + deltaUp = canon.Up } - if deltaDown = cs.Down - base.Down; deltaDown < 0 { - deltaDown = cs.Down + if deltaDown = canon.Down - base.Down; deltaDown < 0 { + deltaDown = canon.Down } } @@ -541,8 +559,8 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi Total: cs.Total, ExpiryTime: cs.ExpiryTime, Reset: cs.Reset, - Up: cs.Up, - Down: cs.Down, + Up: canon.Up, + Down: canon.Down, LastOnline: cs.LastOnline, } if err := tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}). @@ -553,10 +571,10 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi centralCSByEmail[cs.Email] = row existingEmails[cs.Email] = struct{}{} structuralChange = true - if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, cs.Up, cs.Down); err != nil { + if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, canon.Up, canon.Down); err != nil { return false, err } - nodeBaselines[cs.Email] = nodeTrafficCounter{Up: cs.Up, Down: cs.Down} + nodeBaselines[cs.Email] = nodeTrafficCounter{Up: canon.Up, Down: canon.Down} continue } @@ -592,10 +610,10 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi ).Error; err != nil { return false, err } - if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, cs.Up, cs.Down); err != nil { + if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, canon.Up, canon.Down); err != nil { return false, err } - nodeBaselines[cs.Email] = nodeTrafficCounter{Up: cs.Up, Down: cs.Down} + nodeBaselines[cs.Email] = nodeTrafficCounter{Up: canon.Up, Down: canon.Down} } for k, existing := range centralCS { diff --git a/internal/web/service/node_client_traffic_sum_test.go b/internal/web/service/node_client_traffic_sum_test.go index 9df6d2a53..6d75277f9 100644 --- a/internal/web/service/node_client_traffic_sum_test.go +++ b/internal/web/service/node_client_traffic_sum_test.go @@ -274,6 +274,49 @@ func TestStatsUnderSiblingInbound_KeepsNodeBaseline(t *testing.T) { assertUpDown(t, readTraffic(t, db, email), 70, 70, "delta accrues once baseline survives") } +// TestMultiAttach_SameNode_DivergentSiblings reproduces #5274: a client is +// attached to several inbounds of the SAME node. Xray reports client traffic +// globally per email, so the node's enriched inbound list copies one shared +// counter onto every inbound the client is on. When those copies diverge — a +// legacy per-inbound row surviving the v3.2.x→v3.3.x upgrade, or any drift — +// the per-inbound delta loop used to treat the lower sibling as a node-counter +// reset and re-add its full value, inflating the client far past real usage. +// The merge must collapse the email to its node-wide total and count it once. +func TestMultiAttach_SameNode_DivergentSiblings(t *testing.T) { + db := initTrafficTestDB(t) + createNodeInboundWithClient(t, db, 1, "n1-a", 41001, "multi") + createNodeInboundWithClient(t, db, 1, "n1-b", 41002, "multi") + createNodeInboundWithClient(t, db, 1, "n1-c", 41003, "multi") + svc := &InboundService{} + + const email = "multi" + settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email) + + // The three inbounds report the same email with diverging values; the + // node's true per-email total is the largest (the shared global counter). + sync := func(a, b, c int64) { + t.Helper() + snap := &runtime.TrafficSnapshot{Inbounds: []*model.Inbound{ + {Tag: "n1-a", Settings: settings, ClientStats: []xray.ClientTraffic{{Email: email, Up: a, Down: a, Enable: true}}}, + {Tag: "n1-b", Settings: settings, ClientStats: []xray.ClientTraffic{{Email: email, Up: b, Down: b, Enable: true}}}, + {Tag: "n1-c", Settings: settings, ClientStats: []xray.ClientTraffic{{Email: email, Up: c, Down: c, Enable: true}}}, + }} + if _, err := svc.setRemoteTrafficLocked(1, snap, false); err != nil { + t.Fatalf("sync: %v", err) + } + } + + sync(100, 50, 80) + assertUpDown(t, readTraffic(t, db, email), 100, 100, "first sync counts the node total once, not the sum") + + sync(150, 60, 90) + assertUpDown(t, readTraffic(t, db, email), 150, 150, "second sync: grew by 50, not by every sibling") + + // Equal siblings (the healthy current-schema case) must still accrue once. + sync(200, 200, 200) + assertUpDown(t, readTraffic(t, db, email), 200, 200, "equal siblings accrue the single increment") +} + func TestDelClientStat_CleansNodeBaselines(t *testing.T) { db := initTrafficTestDB(t) svc := &InboundService{}