diff --git a/frontend/public/openapi.json b/frontend/public/openapi.json index f82e88ba0..1026144bd 100644 --- a/frontend/public/openapi.json +++ b/frontend/public/openapi.json @@ -2709,6 +2709,59 @@ } } }, + "/panel/api/inbounds/pushClientTraffics": { + "post": { + "tags": [ + "Inbounds" + ], + "summary": "Receive a master panel's aggregated per-client usage, keyed by the master's GUID. Stored in a side table used only for the UI display overlay and local quota enforcement — never folded into the local counters that masters poll, so delta accounting stays intact. Called panel-to-panel by the node traffic sync job.", + "operationId": "post_panel_api_inbounds_pushClientTraffics", + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "type": "object" + }, + "example": { + "masterGuid": "9f6c2d-…", + "traffics": [ + { + "email": "alice", + "up": 1048576, + "down": 2097152 + } + ] + } + } + } + }, + "responses": { + "200": { + "description": "Successful response", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "success": { + "type": "boolean" + }, + "msg": { + "type": "string" + }, + "obj": {} + } + }, + "example": { + "success": true + } + } + } + } + } + } + }, "/panel/api/inbounds/{id}/fallbacks": { "get": { "tags": [ diff --git a/frontend/src/pages/api-docs/endpoints.ts b/frontend/src/pages/api-docs/endpoints.ts index f553eeb9f..f948bcb71 100644 --- a/frontend/src/pages/api-docs/endpoints.ts +++ b/frontend/src/pages/api-docs/endpoints.ts @@ -205,6 +205,17 @@ export const sections: readonly Section[] = [ { name: 'data', in: 'body (form)', type: 'string', desc: 'JSON-encoded inbound payload.' }, ], }, + { + method: 'POST', + path: '/panel/api/inbounds/pushClientTraffics', + summary: 'Receive a master panel\'s aggregated per-client usage, keyed by the master\'s GUID. Stored in a side table used only for the UI display overlay and local quota enforcement — never folded into the local counters that masters poll, so delta accounting stays intact. Called panel-to-panel by the node traffic sync job.', + params: [ + { name: 'masterGuid', in: 'body (json)', type: 'string', desc: 'Stable GUID of the pushing master panel.' }, + { name: 'traffics', in: 'body (json)', type: 'object[]', desc: 'Client traffic rows; only email/up/down are read.' }, + ], + body: '{\n "masterGuid": "9f6c2d-…",\n "traffics": [\n { "email": "alice", "up": 1048576, "down": 2097152 }\n ]\n}', + response: '{\n "success": true\n}', + }, { method: 'GET', path: '/panel/api/inbounds/:id/fallbacks', diff --git a/internal/database/db.go b/internal/database/db.go index b328e5749..15cf2debf 100644 --- a/internal/database/db.go +++ b/internal/database/db.go @@ -72,6 +72,7 @@ func initModels() error { &model.ClientGroup{}, &model.InboundFallback{}, &model.NodeClientTraffic{}, + &model.ClientGlobalTraffic{}, &model.OutboundSubscription{}, } for _, mdl := range models { diff --git a/internal/database/model/client_global_traffic.go b/internal/database/model/client_global_traffic.go new file mode 100644 index 000000000..407051e46 --- /dev/null +++ b/internal/database/model/client_global_traffic.go @@ -0,0 +1,20 @@ +package model + +// ClientGlobalTraffic mirrors a master panel's aggregated (global) usage for a +// client hosted on this panel. Masters push one row per (master, email) so the +// node can display the client's true cross-panel total and enforce its quota +// locally. The values never feed back into client_traffics — that table keeps +// this panel's local-only counters, which is what keeps every master's +// delta-baseline accounting over our snapshot correct. +// +// Rows are overwritten in place on every push (not max-merged), so a traffic +// reset on the master propagates here within one push cycle. Readers that need +// a single number fold the per-master rows with MAX. +type ClientGlobalTraffic struct { + Id int `json:"id" gorm:"primaryKey;autoIncrement"` + MasterGuid string `json:"masterGuid" gorm:"uniqueIndex:idx_master_email,priority:1;not null"` + Email string `json:"email" gorm:"uniqueIndex:idx_master_email,priority:2;not null"` + Up int64 `json:"up"` + Down int64 `json:"down"` + UpdatedAt int64 `json:"updatedAt" gorm:"autoUpdateTime:milli"` +} diff --git a/internal/web/controller/inbound.go b/internal/web/controller/inbound.go index eb77118c8..d6322f56c 100644 --- a/internal/web/controller/inbound.go +++ b/internal/web/controller/inbound.go @@ -11,6 +11,7 @@ import ( "github.com/mhsanaei/3x-ui/v3/internal/web/service" "github.com/mhsanaei/3x-ui/v3/internal/web/session" "github.com/mhsanaei/3x-ui/v3/internal/web/websocket" + "github.com/mhsanaei/3x-ui/v3/internal/xray" "github.com/gin-gonic/gin" ) @@ -77,6 +78,7 @@ func (a *InboundController) initRouter(g *gin.RouterGroup) { g.POST("/resetAllTraffics", a.resetAllTraffics) g.POST("/import", a.importInbound) g.POST("/:id/fallbacks", a.setFallbacks) + g.POST("/pushClientTraffics", a.pushClientTraffics) } // getInbounds retrieves the list of inbounds for the logged-in user. @@ -339,6 +341,24 @@ func (a *InboundController) resetAllTraffics(c *gin.Context) { jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.resetAllTrafficSuccess"), nil) } +// pushClientTraffics receives a master panel's aggregated per-client usage +// (see InboundService.AcceptGlobalTraffic for the storage semantics). +func (a *InboundController) pushClientTraffics(c *gin.Context) { + var req struct { + MasterGuid string `json:"masterGuid"` + Traffics []*xray.ClientTraffic `json:"traffics"` + } + if err := c.ShouldBindJSON(&req); err != nil { + jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err) + return + } + if err := a.inboundService.AcceptGlobalTraffic(req.MasterGuid, req.Traffics); err != nil { + jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err) + return + } + jsonMsg(c, "success", nil) +} + // importInbound imports an inbound configuration from provided data. func (a *InboundController) importInbound(c *gin.Context) { inbound := &model.Inbound{} diff --git a/internal/web/job/node_traffic_sync_job.go b/internal/web/job/node_traffic_sync_job.go index 4df57be42..85ecf99e8 100644 --- a/internal/web/job/node_traffic_sync_job.go +++ b/internal/web/job/node_traffic_sync_job.go @@ -2,6 +2,7 @@ package job import ( "context" + "strings" "sync" "time" @@ -10,6 +11,7 @@ import ( "github.com/mhsanaei/3x-ui/v3/internal/web/runtime" "github.com/mhsanaei/3x-ui/v3/internal/web/service" "github.com/mhsanaei/3x-ui/v3/internal/web/websocket" + "github.com/mhsanaei/3x-ui/v3/internal/xray" ) const ( @@ -17,6 +19,7 @@ const ( nodeTrafficSyncRequestTimeout = 4 * time.Second nodeReconcileTimeout = 30 * time.Second nodeClientIpSyncInterval = 10 * time.Second + nodeGlobalPushInterval = 30 * time.Second ) type NodeTrafficSyncJob struct { @@ -28,6 +31,8 @@ type NodeTrafficSyncJob struct { structural atomicBool ipSyncMu sync.Mutex lastIpSync int64 + globalPushMu sync.Mutex + lastGlobalPush int64 } type atomicBool struct { @@ -115,6 +120,8 @@ func (j *NodeTrafficSyncJob) Run() { j.structural.set() } + j.maybePushGlobals(mgr, nodes) + lastOnline, err := j.inboundService.GetClientsLastOnline() if err != nil { logger.Warning("node traffic sync: get last-online failed:", err) @@ -164,6 +171,65 @@ func (j *NodeTrafficSyncJob) Run() { } } +// maybePushGlobals broadcasts this panel's aggregated per-client usage to its +// online nodes so each node can display the client's cross-panel total and +// enforce its quota locally (see InboundService.AcceptGlobalTraffic). Scoped +// per node to the clients that node actually hosts, and throttled — the +// aggregates only need to reach nodes on a human timescale, not every poll. +func (j *NodeTrafficSyncJob) maybePushGlobals(mgr *runtime.Manager, nodes []*model.Node) { + j.globalPushMu.Lock() + now := time.Now().Unix() + if now-j.lastGlobalPush < int64(nodeGlobalPushInterval/time.Second) { + j.globalPushMu.Unlock() + return + } + j.lastGlobalPush = now + j.globalPushMu.Unlock() + + masterGuid, err := j.settingService.GetPanelGuid() + if err != nil || masterGuid == "" { + return + } + + sem := make(chan struct{}, nodeTrafficSyncConcurrency) + var wg sync.WaitGroup + for _, n := range nodes { + if !n.Enable || n.Status != "online" { + continue + } + remote, err := mgr.RemoteFor(n) + if err != nil { + continue + } + traffics, err := j.inboundService.GetNodeClientTraffics(n.Id) + if err != nil { + logger.Warning("node traffic sync: load globals for", n.Name, "failed:", err) + continue + } + if len(traffics) == 0 { + continue + } + wg.Add(1) + sem <- struct{}{} + go func(n *model.Node, remote *runtime.Remote, traffics []*xray.ClientTraffic) { + defer wg.Done() + defer func() { <-sem }() + ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout) + defer cancel() + if err := remote.PushGlobalClientTraffics(ctx, masterGuid, traffics); err != nil { + // An old-build node without the endpoint answers 404 — not worth a + // warning every cycle. + if strings.Contains(err.Error(), "HTTP 404") { + logger.Debug("node traffic sync: node", n.Name, "has no global-traffic endpoint (old build)") + } else { + logger.Warning("node traffic sync: push globals to", n.Name, "failed:", err) + } + } + }(n, remote, traffics) + } + wg.Wait() +} + func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSync bool) { rt, err := mgr.RemoteFor(n) if err != nil { diff --git a/internal/web/runtime/remote.go b/internal/web/runtime/remote.go index cd8a6f859..3bbc695b2 100644 --- a/internal/web/runtime/remote.go +++ b/internal/web/runtime/remote.go @@ -18,6 +18,7 @@ import ( "github.com/mhsanaei/3x-ui/v3/internal/database/model" "github.com/mhsanaei/3x-ui/v3/internal/logger" "github.com/mhsanaei/3x-ui/v3/internal/util/netsafe" + "github.com/mhsanaei/3x-ui/v3/internal/xray" ) const remoteHTTPTimeout = 10 * time.Second @@ -452,6 +453,20 @@ func (r *Remote) FetchTrafficSnapshot(ctx context.Context) (*TrafficSnapshot, er return snap, nil } +// PushGlobalClientTraffics sends this panel's aggregated per-client usage to +// the node, tagged with this panel's GUID so the node keeps one row per +// pushing master. Display/enforcement input on the node only — the node never +// folds these into the counters it reports back, so this panel's (and any +// other master's) delta accounting over the node snapshot stays intact. +func (r *Remote) PushGlobalClientTraffics(ctx context.Context, masterGuid string, traffics []*xray.ClientTraffic) error { + payload := map[string]any{ + "masterGuid": masterGuid, + "traffics": traffics, + } + _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/pushClientTraffics", payload) + return err +} + func wireInbound(ib *model.Inbound) url.Values { v := url.Values{} v.Set("total", strconv.FormatInt(ib.Total, 10)) diff --git a/internal/web/service/client_crud.go b/internal/web/service/client_crud.go index 6b27719a0..1fcdca71d 100644 --- a/internal/web/service/client_crud.go +++ b/internal/web/service/client_crud.go @@ -411,6 +411,9 @@ func (s *ClientService) Delete(inboundSvc *InboundService, id int, keepTraffic b if err := db.Where("email = ?", existing.Email).Delete(&xray.ClientTraffic{}).Error; err != nil { return needRestart, err } + if err := clearGlobalTraffic(db, existing.Email); err != nil { + return needRestart, err + } if err := db.Where("client_email = ?", existing.Email).Delete(&model.InboundClientIps{}).Error; err != nil { return needRestart, err } @@ -550,6 +553,9 @@ func (s *ClientService) DeleteByEmail(inboundSvc *InboundService, email string, if err := db.Where("email = ?", email).Delete(&xray.ClientTraffic{}).Error; err != nil { return needRestart, err } + if err := clearGlobalTraffic(db, email); err != nil { + return needRestart, err + } if err := db.Where("client_email = ?", email).Delete(&model.InboundClientIps{}).Error; err != nil { return needRestart, err } diff --git a/internal/web/service/client_lookup.go b/internal/web/service/client_lookup.go index 2a45b791f..cd423989a 100644 --- a/internal/web/service/client_lookup.go +++ b/internal/web/service/client_lookup.go @@ -125,6 +125,7 @@ func (s *ClientService) List() ([]ClientWithAttachments, error) { } stats = append(stats, batchStats...) } + overlayGlobalTrafficValues(db, stats) for i := range stats { trafficByEmail[stats[i].Email] = &stats[i] } diff --git a/internal/web/service/client_traffic.go b/internal/web/service/client_traffic.go index f75cb60ff..a5b9e0abb 100644 --- a/internal/web/service/client_traffic.go +++ b/internal/web/service/client_traffic.go @@ -101,7 +101,7 @@ func (s *ClientService) BulkResetTraffic(inboundSvc *InboundService, emails []st } affected += int(res.RowsAffected) } - return nil + return clearGlobalTraffic(tx, cleanEmails...) }) }) if err != nil { @@ -128,6 +128,13 @@ func (s *ClientService) resetAllClientTrafficsLocked(id int) error { whereText += " = ?" } + var resetEmails []string + if err := tx.Model(xray.ClientTraffic{}). + Where(whereText, id). + Pluck("email", &resetEmails).Error; err != nil { + return err + } + result := tx.Model(xray.ClientTraffic{}). Where(whereText, id). Updates(map[string]any{"enable": true, "up": 0, "down": 0}) @@ -136,6 +143,10 @@ func (s *ClientService) resetAllClientTrafficsLocked(id int) error { return result.Error } + if err := clearGlobalTraffic(tx, resetEmails...); err != nil { + return err + } + inboundWhereText := "id " if id == -1 { inboundWhereText += " > ?" @@ -155,11 +166,15 @@ func (s *ClientService) resetAllClientTrafficsLocked(id int) error { } func (s *ClientService) ResetAllTraffics() (bool, error) { - res := database.GetDB().Model(&xray.ClientTraffic{}). + db := database.GetDB() + res := db.Model(&xray.ClientTraffic{}). Where("1 = 1"). Updates(map[string]any{"up": 0, "down": 0}) if res.Error != nil { return false, res.Error } + if err := db.Where("1 = 1").Delete(&model.ClientGlobalTraffic{}).Error; err != nil { + return false, err + } return res.RowsAffected > 0, nil } diff --git a/internal/web/service/global_traffic_test.go b/internal/web/service/global_traffic_test.go new file mode 100644 index 000000000..b46c2a140 --- /dev/null +++ b/internal/web/service/global_traffic_test.go @@ -0,0 +1,142 @@ +package service + +import ( + "testing" + + "github.com/mhsanaei/3x-ui/v3/internal/database" + "github.com/mhsanaei/3x-ui/v3/internal/database/model" + "github.com/mhsanaei/3x-ui/v3/internal/xray" +) + +func seedClientRow(t *testing.T, email string, inboundId int, up, down, total int64) { + t.Helper() + db := database.GetDB() + if err := db.Create(&xray.ClientTraffic{InboundId: inboundId, Email: email, Enable: true, Up: up, Down: down, Total: total}).Error; err != nil { + t.Fatalf("seed client_traffics %q: %v", email, err) + } +} + +func TestAcceptGlobalTraffic_SideTableOnly(t *testing.T) { + db := initTrafficTestDB(t) + svc := &InboundService{} + seedClientRow(t, "alice", 1, 100, 100, 0) + + err := svc.AcceptGlobalTraffic("master-a", []*xray.ClientTraffic{ + {Email: "alice", Up: 900, Down: 800}, + {Email: "ghost", Up: 5, Down: 5}, // not hosted here — must be dropped + }) + if err != nil { + t.Fatalf("AcceptGlobalTraffic: %v", err) + } + + local := readTraffic(t, db, "alice") + if local.Up != 100 || local.Down != 100 { + t.Errorf("local counters must stay pure, got up=%d down=%d", local.Up, local.Down) + } + var globals []model.ClientGlobalTraffic + if err := db.Find(&globals).Error; err != nil { + t.Fatalf("read globals: %v", err) + } + if len(globals) != 1 || globals[0].Email != "alice" || globals[0].Up != 900 || globals[0].Down != 800 { + t.Errorf("unexpected globals: %+v", globals) + } +} + +func TestAcceptGlobalTraffic_OverwriteAndMultiMaster(t *testing.T) { + db := initTrafficTestDB(t) + svc := &InboundService{} + seedClientRow(t, "alice", 1, 0, 0, 0) + + must := func(guid string, up, down int64) { + t.Helper() + if err := svc.AcceptGlobalTraffic(guid, []*xray.ClientTraffic{{Email: "alice", Up: up, Down: down}}); err != nil { + t.Fatalf("AcceptGlobalTraffic(%s): %v", guid, err) + } + } + must("master-a", 900, 900) + must("master-a", 50, 50) // a master-side reset propagates by overwrite + must("master-b", 500, 400) + + rows := []*xray.ClientTraffic{{Email: "alice", Up: 10, Down: 10}} + overlayGlobalTraffic(db, rows) + if rows[0].Up != 500 || rows[0].Down != 400 { + t.Errorf("overlay should fold per-master max, got up=%d down=%d", rows[0].Up, rows[0].Down) + } +} + +func TestGlobalUsage_DisablesClient(t *testing.T) { + db := initTrafficTestDB(t) + svc := &InboundService{} + // 200 of 1000 used locally — local check alone would never trip. + seedClientRow(t, "cap", 1, 100, 100, 1000) + + if err := svc.AcceptGlobalTraffic("master-a", []*xray.ClientTraffic{{Email: "cap", Up: 800, Down: 700}}); err != nil { + t.Fatalf("AcceptGlobalTraffic: %v", err) + } + + if _, count, _, err := svc.disableInvalidClients(db); err != nil { + t.Fatalf("disableInvalidClients: %v", err) + } else if count != 1 { + t.Fatalf("expected 1 client disabled, got %d", count) + } + if got := readTraffic(t, db, "cap"); got.Enable { + t.Error("client should be disabled by global usage exceeding its quota") + } +} + +func TestGlobalRows_ClearedOnReset(t *testing.T) { + db := initTrafficTestDB(t) + svc := &InboundService{} + seedClientRow(t, "alice", 1, 50, 50, 1000) + if err := svc.AcceptGlobalTraffic("master-a", []*xray.ClientTraffic{{Email: "alice", Up: 999, Down: 999}}); err != nil { + t.Fatalf("AcceptGlobalTraffic: %v", err) + } + if err := svc.ResetClientTrafficByEmail("alice"); err != nil { + t.Fatalf("ResetClientTrafficByEmail: %v", err) + } + var cnt int64 + db.Model(&model.ClientGlobalTraffic{}).Count(&cnt) + if cnt != 0 { + t.Errorf("global rows should be cleared on reset, found %d", cnt) + } +} + +// The full inbound list doubles as the traffic snapshot masters poll, so it +// must report pure local counters; the slim list only feeds this panel's UI, +// so it carries the cross-panel overlay. +func TestSnapshotListNotOverlaid_SlimOverlaid(t *testing.T) { + db := initTrafficTestDB(t) + svc := &InboundService{} + + settings := `{"clients": [{"email": "alice", "enable": true}]}` + ib := &model.Inbound{UserId: 1, Tag: "in-a", Enable: true, Port: 42001, Protocol: model.VLESS, Settings: settings} + if err := db.Create(ib).Error; err != nil { + t.Fatalf("create inbound: %v", err) + } + seedClientRow(t, "alice", ib.Id, 100, 100, 0) + if err := svc.AcceptGlobalTraffic("master-a", []*xray.ClientTraffic{{Email: "alice", Up: 900, Down: 900}}); err != nil { + t.Fatalf("AcceptGlobalTraffic: %v", err) + } + + full, err := svc.GetInbounds(1) + if err != nil { + t.Fatalf("GetInbounds: %v", err) + } + if len(full) != 1 || len(full[0].ClientStats) != 1 { + t.Fatalf("unexpected full list shape: %d inbounds", len(full)) + } + if full[0].ClientStats[0].Up != 100 { + t.Errorf("full list (master snapshot) must stay un-overlaid, got up=%d", full[0].ClientStats[0].Up) + } + + slim, err := svc.GetInboundsSlim(1) + if err != nil { + t.Fatalf("GetInboundsSlim: %v", err) + } + if len(slim) != 1 || len(slim[0].ClientStats) != 1 { + t.Fatalf("unexpected slim list shape: %d inbounds", len(slim)) + } + if slim[0].ClientStats[0].Up != 900 { + t.Errorf("slim list should carry the global overlay, got up=%d", slim[0].ClientStats[0].Up) + } +} diff --git a/internal/web/service/inbound.go b/internal/web/service/inbound.go index e710827b2..99953bfea 100644 --- a/internal/web/service/inbound.go +++ b/internal/web/service/inbound.go @@ -82,6 +82,9 @@ func (s *InboundService) GetInboundsSlim(userId int) ([]*model.Inbound, error) { // so the list's depleted/expiring badges see every client; the UUID/SubId // enrichment stays skipped. Must run before slimming strips the settings. s.backfillClientStats(db, inbounds) + // Slim feeds the panel UI only (masters poll the full list), so the badge + // math may see the cross-panel totals a master pushed. + s.overlayInboundsClientStats(db, inbounds) for _, ib := range inbounds { ib.Settings = slimSettingsClients(ib.Settings) } @@ -571,6 +574,7 @@ func (s *InboundService) GetInboundDetail(id int) (*model.Inbound, error) { return nil, err } s.enrichClientStats(db, []*model.Inbound{inbound}) + s.overlayInboundsClientStats(db, []*model.Inbound{inbound}) return inbound, nil } diff --git a/internal/web/service/inbound_disable.go b/internal/web/service/inbound_disable.go index 12d3d5d67..660abcbee 100644 --- a/internal/web/service/inbound_disable.go +++ b/internal/web/service/inbound_disable.go @@ -48,13 +48,25 @@ func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error return needRestart, count, err } +// depletedClientsCond matches clients that exhausted their quota or expired. +// Besides the local counters it also trips on the cross-panel usage a master +// pushed into client_global_traffics — that's what lets a node cut a client +// whose combined usage exceeds the quota even though the local share doesn't +// (placeholders: now). +const depletedClientsCond = `((total > 0 AND up + down >= total) + OR (expiry_time > 0 AND expiry_time <= ?) + OR (total > 0 AND EXISTS ( + SELECT 1 FROM client_global_traffics g + WHERE g.email = client_traffics.email AND g.up + g.down >= client_traffics.total + )))` + func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int, error) { now := time.Now().Unix() * 1000 needRestart := false var depletedRows []xray.ClientTraffic err := tx.Model(xray.ClientTraffic{}). - Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true). + Where(depletedClientsCond+" AND enable = ?", now, true). Find(&depletedRows).Error if err != nil { return false, 0, nil, err @@ -130,7 +142,7 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int, } result := tx.Model(xray.ClientTraffic{}). - Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true). + Where(depletedClientsCond+" AND enable = ?", now, true). Update("enable", false) err = result.Error count := result.RowsAffected diff --git a/internal/web/service/inbound_traffic.go b/internal/web/service/inbound_traffic.go index d0595237a..d3ea2378b 100644 --- a/internal/web/service/inbound_traffic.go +++ b/internal/web/service/inbound_traffic.go @@ -386,6 +386,11 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) { if err != nil { return false, 0, err } + // A renewed client starts a fresh quota window: drop the cross-panel rows + // too, or the stale pushed totals would re-deplete it immediately. + if err = clearGlobalTraffic(tx, renewEmails...); err != nil { + return false, 0, err + } if p != nil { err1 = s.xrayApi.Init(p.GetAPIPort()) if err1 != nil { @@ -436,6 +441,9 @@ func (s *InboundService) DelClientStat(tx *gorm.DB, email string) error { if err := tx.Where("email = ?", email).Delete(xray.ClientTraffic{}).Error; err != nil { return err } + if err := clearGlobalTraffic(tx, email); err != nil { + return err + } return tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error } @@ -447,6 +455,9 @@ func (s *InboundService) delClientStatsByEmails(tx *gorm.DB, emails []string) er if err := tx.Where("email IN ?", batch).Delete(xray.ClientTraffic{}).Error; err != nil { return err } + if err := tx.Where("email IN ?", batch).Delete(&model.ClientGlobalTraffic{}).Error; err != nil { + return err + } if err := tx.Where("email IN ?", batch).Delete(&model.NodeClientTraffic{}).Error; err != nil { return err } @@ -457,6 +468,9 @@ func (s *InboundService) delClientStatsByEmails(tx *gorm.DB, emails []string) er func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error { return submitTrafficWrite(func() error { db := database.GetDB() + if err := clearGlobalTraffic(db, clientEmail); err != nil { + return err + } return db.Model(xray.ClientTraffic{}). Where("email = ?", clientEmail). Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error @@ -550,6 +564,9 @@ func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (b if err != nil { return false, err } + if err := clearGlobalTraffic(db, clientEmail); err != nil { + return false, err + } now := time.Now().UnixMilli() _ = db.Model(model.Inbound{}). @@ -848,6 +865,7 @@ func (s *InboundService) GetAllClientTraffics() ([]*xray.ClientTraffic, error) { if err := db.Model(xray.ClientTraffic{}).Find(&traffics).Error; err != nil { return nil, err } + overlayGlobalTraffic(db, traffics) return traffics, nil } @@ -880,6 +898,7 @@ func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.Cl if len(traffics) == 0 { return nil, nil } + overlayGlobalTraffic(db, traffics) t := traffics[0] if rec, rErr := s.clientService.GetRecordByEmail(db, email); rErr == nil && rec != nil { diff --git a/internal/web/service/inbound_traffic_global.go b/internal/web/service/inbound_traffic_global.go new file mode 100644 index 000000000..6fbb75b72 --- /dev/null +++ b/internal/web/service/inbound_traffic_global.go @@ -0,0 +1,215 @@ +package service + +import ( + "strings" + "time" + + "github.com/mhsanaei/3x-ui/v3/internal/database" + "github.com/mhsanaei/3x-ui/v3/internal/database/model" + "github.com/mhsanaei/3x-ui/v3/internal/logger" + "github.com/mhsanaei/3x-ui/v3/internal/xray" + + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +// AcceptGlobalTraffic ingests a master panel's aggregated per-client usage +// into client_global_traffics, keyed by (masterGuid, email). The numbers are +// display/enforcement inputs only — client_traffics keeps this panel's +// local-only counters, so the pushing master's (and any other master's) +// delta accounting over our snapshot stays correct. +// +// Rows are overwritten, not max-merged: a reset on the master propagates here +// on its next push. Emails this panel doesn't host are dropped. +func (s *InboundService) AcceptGlobalTraffic(masterGuid string, traffics []*xray.ClientTraffic) error { + masterGuid = strings.TrimSpace(masterGuid) + if masterGuid == "" { + return nil + } + emails := make([]string, 0, len(traffics)) + byEmail := make(map[string]*xray.ClientTraffic, len(traffics)) + for _, t := range traffics { + if t == nil || t.Email == "" { + continue + } + if _, dup := byEmail[t.Email]; !dup { + emails = append(emails, t.Email) + } + byEmail[t.Email] = t + } + if len(emails) == 0 { + return nil + } + + return submitTrafficWrite(func() error { + db := database.GetDB() + known := make([]string, 0, len(emails)) + for _, batch := range chunkStrings(emails, sqlInChunk) { + var page []string + if err := db.Model(xray.ClientTraffic{}). + Where("email IN ?", batch). + Pluck("email", &page).Error; err != nil { + return err + } + known = append(known, page...) + } + if len(known) == 0 { + return nil + } + + now := time.Now().UnixMilli() + rows := make([]model.ClientGlobalTraffic, 0, len(known)) + for _, email := range known { + t := byEmail[email] + if t == nil { + continue + } + rows = append(rows, model.ClientGlobalTraffic{ + MasterGuid: masterGuid, + Email: email, + Up: t.Up, + Down: t.Down, + UpdatedAt: now, + }) + } + + return db.Transaction(func(tx *gorm.DB) error { + for _, batch := range chunkGlobalRows(rows, 200) { + if err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "master_guid"}, {Name: "email"}}, + DoUpdates: clause.AssignmentColumns([]string{"up", "down", "updated_at"}), + }).Create(&batch).Error; err != nil { + return err + } + } + return nil + }) + }) +} + +func chunkGlobalRows(rows []model.ClientGlobalTraffic, size int) [][]model.ClientGlobalTraffic { + if len(rows) == 0 { + return nil + } + out := make([][]model.ClientGlobalTraffic, 0, (len(rows)+size-1)/size) + for start := 0; start < len(rows); start += size { + end := min(start+size, len(rows)) + out = append(out, rows[start:end]) + } + return out +} + +// overlayGlobalTraffic raises Up/Down on the given rows to the largest global +// value any master pushed for that email. Read-path only — callers hand it +// rows about to be serialized for display; the stored counters are untouched. +func overlayGlobalTraffic(db *gorm.DB, rows []*xray.ClientTraffic) { + if len(rows) == 0 { + return + } + // Cheap short-circuit for the common case (a panel no master pushes to). + var probe int64 + if err := db.Model(&model.ClientGlobalTraffic{}).Limit(1).Count(&probe).Error; err != nil || probe == 0 { + return + } + + emails := make([]string, 0, len(rows)) + byEmail := make(map[string][]*xray.ClientTraffic, len(rows)) + for _, r := range rows { + if r == nil || r.Email == "" { + continue + } + key := strings.ToLower(r.Email) + if _, ok := byEmail[key]; !ok { + emails = append(emails, r.Email) + } + byEmail[key] = append(byEmail[key], r) + } + for _, batch := range chunkStrings(emails, sqlInChunk) { + var globals []model.ClientGlobalTraffic + if err := db.Where("email IN ?", batch).Find(&globals).Error; err != nil { + logger.Warning("overlayGlobalTraffic:", err) + return + } + for i := range globals { + for _, r := range byEmail[strings.ToLower(globals[i].Email)] { + if globals[i].Up > r.Up { + r.Up = globals[i].Up + } + if globals[i].Down > r.Down { + r.Down = globals[i].Down + } + } + } + } +} + +// overlayGlobalTrafficValues is overlayGlobalTraffic for value slices. +func overlayGlobalTrafficValues(db *gorm.DB, rows []xray.ClientTraffic) { + if len(rows) == 0 { + return + } + ptrs := make([]*xray.ClientTraffic, 0, len(rows)) + for i := range rows { + ptrs = append(ptrs, &rows[i]) + } + overlayGlobalTraffic(db, ptrs) +} + +// GetNodeClientTraffics returns this panel's aggregated traffic rows for the +// clients known to live on the given node (those with a delta baseline) — +// the payload for Remote.PushGlobalClientTraffics. The rows carry the global +// overlay so a mid-chain panel forwards the widest view it has seen, not just +// its own aggregate. +func (s *InboundService) GetNodeClientTraffics(nodeID int) ([]*xray.ClientTraffic, error) { + db := database.GetDB() + var emails []string + if err := db.Model(&model.NodeClientTraffic{}). + Where("node_id = ?", nodeID). + Pluck("email", &emails).Error; err != nil { + return nil, err + } + if len(emails) == 0 { + return nil, nil + } + out := make([]*xray.ClientTraffic, 0, len(emails)) + for _, batch := range chunkStrings(emails, sqlInChunk) { + var page []*xray.ClientTraffic + if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil { + return nil, err + } + out = append(out, page...) + } + overlayGlobalTraffic(db, out) + return out, nil +} + +// overlayInboundsClientStats applies the global overlay to every preloaded +// ClientStats row across the given inbounds. UI read paths only — never the +// full /panel/api/inbounds/list payload, which doubles as the traffic +// snapshot masters poll: overlaying that would leak pushed globals back into +// the masters' delta accounting. +func (s *InboundService) overlayInboundsClientStats(db *gorm.DB, inbounds []*model.Inbound) { + rows := make([]*xray.ClientTraffic, 0) + for _, ib := range inbounds { + for j := range ib.ClientStats { + rows = append(rows, &ib.ClientStats[j]) + } + } + overlayGlobalTraffic(db, rows) +} + +// clearGlobalTraffic drops every master's pushed rows for the given emails. +// Used by client deletion and traffic-reset flows: after a node-local reset +// the next master push restores the master's (authoritative) numbers, and +// after a master-side reset that push carries the reset values anyway. +func clearGlobalTraffic(tx *gorm.DB, emails ...string) error { + if len(emails) == 0 { + return nil + } + for _, batch := range chunkStrings(emails, sqlInChunk) { + if err := tx.Where("email IN ?", batch).Delete(&model.ClientGlobalTraffic{}).Error; err != nil { + return err + } + } + return nil +}