diff --git a/frontend/src/components/clients/ClientTrafficCell.tsx b/frontend/src/components/clients/ClientTrafficCell.tsx index 6df6ef09c..ed2a5f3fb 100644 --- a/frontend/src/components/clients/ClientTrafficCell.tsx +++ b/frontend/src/components/clients/ClientTrafficCell.tsx @@ -68,7 +68,7 @@ export default function ClientTrafficCell({ showInfo={false} strokeColor={display.strokeColor} status={display.status} - size={compact ? 'small' : 'default'} + size={compact ? 'small' : 'medium'} /> {display.isUnlimited ? ( diff --git a/internal/web/service/inbound_node.go b/internal/web/service/inbound_node.go index 942e4844d..97ace9bdc 100644 --- a/internal/web/service/inbound_node.go +++ b/internal/web/service/inbound_node.go @@ -411,10 +411,27 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi return false, err } } - } - if err := tx.Where("inbound_id = ?", c.Id). - Delete(&xray.ClientTraffic{}).Error; err != nil { - return false, err + // The per-email row is the shared accumulator across every inbound + // (and node) the email is attached to. Only drop it when this was the + // email's last inbound — wiping it while a sibling still feeds it + // loses the summed history, and the next node sync would re-seed the + // row with that node's counter alone. + sharedEmails, sErr := s.emailsUsedByOtherInbounds(goneEmails, c.Id) + if sErr != nil { + return false, sErr + } + delEmails := make([]string, 0, len(goneEmails)) + for _, e := range goneEmails { + if !sharedEmails[strings.ToLower(strings.TrimSpace(e))] { + delEmails = append(delEmails, e) + } + } + for _, batch := range chunkStrings(delEmails, sqliteMaxVars) { + if err := tx.Where("inbound_id = ? AND email IN ?", c.Id, batch). + Delete(&xray.ClientTraffic{}).Error; err != nil { + return false, err + } + } } if err := s.clientService.DetachInbound(tx, c.Id); err != nil { return false, err @@ -523,9 +540,17 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi Delete(&model.NodeClientTraffic{}).Error; err != nil { return false, err } - if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email). - Delete(&xray.ClientTraffic{}).Error; err != nil { - return false, err + // Same shared-accumulator rule as the inbound-removal sweep above: + // keep the row while another inbound still references the email. + stillUsed, uErr := s.emailUsedByOtherInbounds(existing.Email, c.Id) + if uErr != nil { + return false, uErr + } + if !stillUsed { + if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email). + Delete(&xray.ClientTraffic{}).Error; err != nil { + return false, err + } } structuralChange = true } diff --git a/internal/web/service/node_client_traffic_sum_test.go b/internal/web/service/node_client_traffic_sum_test.go index 9ede1e665..0eba143e2 100644 --- a/internal/web/service/node_client_traffic_sum_test.go +++ b/internal/web/service/node_client_traffic_sum_test.go @@ -1,6 +1,7 @@ package service import ( + "fmt" "path/filepath" "testing" @@ -31,6 +32,18 @@ func createNodeInbound(t *testing.T, db *gorm.DB, nodeID int, tag string, port i } } +// createNodeInboundWithClient mirrors createNodeInbound but stores the client +// in the settings JSON so emailUsedByOtherInbounds can see the attachment. +func createNodeInboundWithClient(t *testing.T, db *gorm.DB, nodeID int, tag string, port int, email string) { + t.Helper() + nid := nodeID + settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email) + ib := &model.Inbound{UserId: 1, Tag: tag, Enable: true, Port: port, Protocol: model.VLESS, NodeID: &nid, Settings: settings} + if err := db.Create(ib).Error; err != nil { + t.Fatalf("create node inbound %q: %v", tag, err) + } +} + func syncNode(t *testing.T, svc *InboundService, nodeID int, tag string, stats ...xray.ClientTraffic) { t.Helper() snap := &runtime.TrafficSnapshot{ @@ -41,6 +54,20 @@ func syncNode(t *testing.T, svc *InboundService, nodeID int, tag string, stats . } } +// syncNodeWithSettings mirrors syncNode but carries a real settings JSON on +// the snapshot inbound, like production nodes do — the sync mirrors snapshot +// settings onto the central row, and the shared-accumulator guard reads the +// clients out of those settings. +func syncNodeWithSettings(t *testing.T, svc *InboundService, nodeID int, tag, settings string, stats ...xray.ClientTraffic) { + t.Helper() + snap := &runtime.TrafficSnapshot{ + Inbounds: []*model.Inbound{{Tag: tag, Settings: settings, ClientStats: stats}}, + } + if _, err := svc.setRemoteTrafficLocked(nodeID, snap, false); err != nil { + t.Fatalf("setRemoteTrafficLocked node %d: %v", nodeID, err) + } +} + func readTraffic(t *testing.T, db *gorm.DB, email string) xray.ClientTraffic { t.Helper() var ct xray.ClientTraffic @@ -151,6 +178,54 @@ func TestCentralReset_NoReAdd(t *testing.T) { assertUpDown(t, readTraffic(t, db, email), 15, 15, "after central reset only increments accrue") } +func TestInboundRemoval_KeepsSharedEmailRow(t *testing.T) { + db := initTrafficTestDB(t) + createNodeInboundWithClient(t, db, 1, "n1-in", 41001, "shared") + createNodeInboundWithClient(t, db, 2, "n2-in", 41002, "shared") + svc := &InboundService{} + + const email = "shared" + settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email) + syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true}) + syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true}) + syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 150, Down: 150, Enable: true}) + syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 260, Down: 260, Enable: true}) + assertUpDown(t, readTraffic(t, db, email), 210, 210, "baseline sum") + + // Node 1 rebuilt (reinstall / another master's reconcile): its inbound + // vanishes from the snapshot. The shared accumulator must survive — losing + // it would let the next node sync re-seed the row with that node's counter + // alone, showing only the last panel's number instead of the sum. + if _, err := svc.setRemoteTrafficLocked(1, &runtime.TrafficSnapshot{}, false); err != nil { + t.Fatalf("sync node 1 with empty snapshot: %v", err) + } + assertUpDown(t, readTraffic(t, db, email), 210, 210, "after node 1 inbound removal") + + // Node 2 keeps accruing onto the surviving row. + syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 300, Down: 300, Enable: true}) + assertUpDown(t, readTraffic(t, db, email), 250, 250, "after node 2 grows") +} + +func TestClientGoneFromOneNode_KeepsSharedEmailRow(t *testing.T) { + db := initTrafficTestDB(t) + createNodeInboundWithClient(t, db, 1, "n1-in", 41001, "shared") + createNodeInboundWithClient(t, db, 2, "n2-in", 41002, "shared") + svc := &InboundService{} + + const email = "shared" + settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email) + syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true}) + syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true}) + + // Client detached from node 1's inbound only: its stats vanish from that + // inbound's snapshot while node 2 still hosts the email. + syncNodeWithSettings(t, svc, 1, "n1-in", `{"clients": []}`) + assertUpDown(t, readTraffic(t, db, email), 100, 100, "after client left node 1") + + syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 240, Down: 240, Enable: true}) + assertUpDown(t, readTraffic(t, db, email), 140, 140, "node 2 keeps accruing") +} + func TestDelClientStat_CleansNodeBaselines(t *testing.T) { db := initTrafficTestDB(t) svc := &InboundService{}