feat(node-sync): push global client usage to nodes for display and local enforcement

A client attached to several panels has one aggregated row on each
master, but a node only ever saw its local share: the node UI
under-reported usage, and the node kept serving a client whose
cross-panel total had already exceeded its quota — the master's disable
push doesn't kill established connections unless the node restarts xray
itself.

Masters now push their aggregated per-client counters to each node from
NodeTrafficSyncJob (throttled, scoped to the clients that node hosts).
The node stores them in the new client_global_traffics side table keyed
by (masterGuid, email), overwritten on every push so a master-side
reset propagates, and:

- overlays max(local, pushed) onto UI read paths (slim inbound list,
  inbound detail, clients list, WS stats, per-email lookups). The full
  /panel/api/inbounds/list stays un-overlaid on purpose: it doubles as
  the traffic snapshot masters poll, and overlaying it would corrupt
  every master's delta accounting;
- trips disableInvalidClients when any master's pushed total exceeds
  the client's quota, so the existing RestartXrayOnClientDisable flow
  disconnects the client locally;
- clears the side rows on traffic reset, auto-renew, and client
  delete, keeping a renewed quota window clean.

Supersedes #5204, which folded pushed globals into client_traffics and
compensated with read-back baselines — that double-counted first-sight
emails and could not work with several masters sharing one node.
This commit is contained in:
MHSanaei
2026-06-11 15:14:08 +02:00
parent 8258a26fbf
commit 58905d81a4
15 changed files with 604 additions and 4 deletions
+53
View File
@@ -2709,6 +2709,59 @@
}
}
},
"/panel/api/inbounds/pushClientTraffics": {
"post": {
"tags": [
"Inbounds"
],
"summary": "Receive a master panel's aggregated per-client usage, keyed by the master's GUID. Stored in a side table used only for the UI display overlay and local quota enforcement — never folded into the local counters that masters poll, so delta accounting stays intact. Called panel-to-panel by the node traffic sync job.",
"operationId": "post_panel_api_inbounds_pushClientTraffics",
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"type": "object"
},
"example": {
"masterGuid": "9f6c2d-…",
"traffics": [
{
"email": "alice",
"up": 1048576,
"down": 2097152
}
]
}
}
}
},
"responses": {
"200": {
"description": "Successful response",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"success": {
"type": "boolean"
},
"msg": {
"type": "string"
},
"obj": {}
}
},
"example": {
"success": true
}
}
}
}
}
}
},
"/panel/api/inbounds/{id}/fallbacks": {
"get": {
"tags": [
+11
View File
@@ -205,6 +205,17 @@ export const sections: readonly Section[] = [
{ name: 'data', in: 'body (form)', type: 'string', desc: 'JSON-encoded inbound payload.' },
],
},
{
method: 'POST',
path: '/panel/api/inbounds/pushClientTraffics',
summary: 'Receive a master panel\'s aggregated per-client usage, keyed by the master\'s GUID. Stored in a side table used only for the UI display overlay and local quota enforcement — never folded into the local counters that masters poll, so delta accounting stays intact. Called panel-to-panel by the node traffic sync job.',
params: [
{ name: 'masterGuid', in: 'body (json)', type: 'string', desc: 'Stable GUID of the pushing master panel.' },
{ name: 'traffics', in: 'body (json)', type: 'object[]', desc: 'Client traffic rows; only email/up/down are read.' },
],
body: '{\n "masterGuid": "9f6c2d-…",\n "traffics": [\n { "email": "alice", "up": 1048576, "down": 2097152 }\n ]\n}',
response: '{\n "success": true\n}',
},
{
method: 'GET',
path: '/panel/api/inbounds/:id/fallbacks',
+1
View File
@@ -72,6 +72,7 @@ func initModels() error {
&model.ClientGroup{},
&model.InboundFallback{},
&model.NodeClientTraffic{},
&model.ClientGlobalTraffic{},
&model.OutboundSubscription{},
}
for _, mdl := range models {
@@ -0,0 +1,20 @@
package model
// ClientGlobalTraffic mirrors a master panel's aggregated (global) usage for a
// client hosted on this panel. Masters push one row per (master, email) so the
// node can display the client's true cross-panel total and enforce its quota
// locally. The values never feed back into client_traffics — that table keeps
// this panel's local-only counters, which is what keeps every master's
// delta-baseline accounting over our snapshot correct.
//
// Rows are overwritten in place on every push (not max-merged), so a traffic
// reset on the master propagates here within one push cycle. Readers that need
// a single number fold the per-master rows with MAX.
type ClientGlobalTraffic struct {
Id int `json:"id" gorm:"primaryKey;autoIncrement"`
MasterGuid string `json:"masterGuid" gorm:"uniqueIndex:idx_master_email,priority:1;not null"`
Email string `json:"email" gorm:"uniqueIndex:idx_master_email,priority:2;not null"`
Up int64 `json:"up"`
Down int64 `json:"down"`
UpdatedAt int64 `json:"updatedAt" gorm:"autoUpdateTime:milli"`
}
+20
View File
@@ -11,6 +11,7 @@ import (
"github.com/mhsanaei/3x-ui/v3/internal/web/service"
"github.com/mhsanaei/3x-ui/v3/internal/web/session"
"github.com/mhsanaei/3x-ui/v3/internal/web/websocket"
"github.com/mhsanaei/3x-ui/v3/internal/xray"
"github.com/gin-gonic/gin"
)
@@ -77,6 +78,7 @@ func (a *InboundController) initRouter(g *gin.RouterGroup) {
g.POST("/resetAllTraffics", a.resetAllTraffics)
g.POST("/import", a.importInbound)
g.POST("/:id/fallbacks", a.setFallbacks)
g.POST("/pushClientTraffics", a.pushClientTraffics)
}
// getInbounds retrieves the list of inbounds for the logged-in user.
@@ -339,6 +341,24 @@ func (a *InboundController) resetAllTraffics(c *gin.Context) {
jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.resetAllTrafficSuccess"), nil)
}
// pushClientTraffics receives a master panel's aggregated per-client usage
// (see InboundService.AcceptGlobalTraffic for the storage semantics).
func (a *InboundController) pushClientTraffics(c *gin.Context) {
var req struct {
MasterGuid string `json:"masterGuid"`
Traffics []*xray.ClientTraffic `json:"traffics"`
}
if err := c.ShouldBindJSON(&req); err != nil {
jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err)
return
}
if err := a.inboundService.AcceptGlobalTraffic(req.MasterGuid, req.Traffics); err != nil {
jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err)
return
}
jsonMsg(c, "success", nil)
}
// importInbound imports an inbound configuration from provided data.
func (a *InboundController) importInbound(c *gin.Context) {
inbound := &model.Inbound{}
+66
View File
@@ -2,6 +2,7 @@ package job
import (
"context"
"strings"
"sync"
"time"
@@ -10,6 +11,7 @@ import (
"github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
"github.com/mhsanaei/3x-ui/v3/internal/web/service"
"github.com/mhsanaei/3x-ui/v3/internal/web/websocket"
"github.com/mhsanaei/3x-ui/v3/internal/xray"
)
const (
@@ -17,6 +19,7 @@ const (
nodeTrafficSyncRequestTimeout = 4 * time.Second
nodeReconcileTimeout = 30 * time.Second
nodeClientIpSyncInterval = 10 * time.Second
nodeGlobalPushInterval = 30 * time.Second
)
type NodeTrafficSyncJob struct {
@@ -28,6 +31,8 @@ type NodeTrafficSyncJob struct {
structural atomicBool
ipSyncMu sync.Mutex
lastIpSync int64
globalPushMu sync.Mutex
lastGlobalPush int64
}
type atomicBool struct {
@@ -115,6 +120,8 @@ func (j *NodeTrafficSyncJob) Run() {
j.structural.set()
}
j.maybePushGlobals(mgr, nodes)
lastOnline, err := j.inboundService.GetClientsLastOnline()
if err != nil {
logger.Warning("node traffic sync: get last-online failed:", err)
@@ -164,6 +171,65 @@ func (j *NodeTrafficSyncJob) Run() {
}
}
// maybePushGlobals broadcasts this panel's aggregated per-client usage to its
// online nodes so each node can display the client's cross-panel total and
// enforce its quota locally (see InboundService.AcceptGlobalTraffic). Scoped
// per node to the clients that node actually hosts, and throttled — the
// aggregates only need to reach nodes on a human timescale, not every poll.
func (j *NodeTrafficSyncJob) maybePushGlobals(mgr *runtime.Manager, nodes []*model.Node) {
j.globalPushMu.Lock()
now := time.Now().Unix()
if now-j.lastGlobalPush < int64(nodeGlobalPushInterval/time.Second) {
j.globalPushMu.Unlock()
return
}
j.lastGlobalPush = now
j.globalPushMu.Unlock()
masterGuid, err := j.settingService.GetPanelGuid()
if err != nil || masterGuid == "" {
return
}
sem := make(chan struct{}, nodeTrafficSyncConcurrency)
var wg sync.WaitGroup
for _, n := range nodes {
if !n.Enable || n.Status != "online" {
continue
}
remote, err := mgr.RemoteFor(n)
if err != nil {
continue
}
traffics, err := j.inboundService.GetNodeClientTraffics(n.Id)
if err != nil {
logger.Warning("node traffic sync: load globals for", n.Name, "failed:", err)
continue
}
if len(traffics) == 0 {
continue
}
wg.Add(1)
sem <- struct{}{}
go func(n *model.Node, remote *runtime.Remote, traffics []*xray.ClientTraffic) {
defer wg.Done()
defer func() { <-sem }()
ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
defer cancel()
if err := remote.PushGlobalClientTraffics(ctx, masterGuid, traffics); err != nil {
// An old-build node without the endpoint answers 404 — not worth a
// warning every cycle.
if strings.Contains(err.Error(), "HTTP 404") {
logger.Debug("node traffic sync: node", n.Name, "has no global-traffic endpoint (old build)")
} else {
logger.Warning("node traffic sync: push globals to", n.Name, "failed:", err)
}
}
}(n, remote, traffics)
}
wg.Wait()
}
func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSync bool) {
rt, err := mgr.RemoteFor(n)
if err != nil {
+15
View File
@@ -18,6 +18,7 @@ import (
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
"github.com/mhsanaei/3x-ui/v3/internal/logger"
"github.com/mhsanaei/3x-ui/v3/internal/util/netsafe"
"github.com/mhsanaei/3x-ui/v3/internal/xray"
)
const remoteHTTPTimeout = 10 * time.Second
@@ -452,6 +453,20 @@ func (r *Remote) FetchTrafficSnapshot(ctx context.Context) (*TrafficSnapshot, er
return snap, nil
}
// PushGlobalClientTraffics sends this panel's aggregated per-client usage to
// the node, tagged with this panel's GUID so the node keeps one row per
// pushing master. Display/enforcement input on the node only — the node never
// folds these into the counters it reports back, so this panel's (and any
// other master's) delta accounting over the node snapshot stays intact.
func (r *Remote) PushGlobalClientTraffics(ctx context.Context, masterGuid string, traffics []*xray.ClientTraffic) error {
payload := map[string]any{
"masterGuid": masterGuid,
"traffics": traffics,
}
_, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/pushClientTraffics", payload)
return err
}
func wireInbound(ib *model.Inbound) url.Values {
v := url.Values{}
v.Set("total", strconv.FormatInt(ib.Total, 10))
+6
View File
@@ -411,6 +411,9 @@ func (s *ClientService) Delete(inboundSvc *InboundService, id int, keepTraffic b
if err := db.Where("email = ?", existing.Email).Delete(&xray.ClientTraffic{}).Error; err != nil {
return needRestart, err
}
if err := clearGlobalTraffic(db, existing.Email); err != nil {
return needRestart, err
}
if err := db.Where("client_email = ?", existing.Email).Delete(&model.InboundClientIps{}).Error; err != nil {
return needRestart, err
}
@@ -550,6 +553,9 @@ func (s *ClientService) DeleteByEmail(inboundSvc *InboundService, email string,
if err := db.Where("email = ?", email).Delete(&xray.ClientTraffic{}).Error; err != nil {
return needRestart, err
}
if err := clearGlobalTraffic(db, email); err != nil {
return needRestart, err
}
if err := db.Where("client_email = ?", email).Delete(&model.InboundClientIps{}).Error; err != nil {
return needRestart, err
}
+1
View File
@@ -125,6 +125,7 @@ func (s *ClientService) List() ([]ClientWithAttachments, error) {
}
stats = append(stats, batchStats...)
}
overlayGlobalTrafficValues(db, stats)
for i := range stats {
trafficByEmail[stats[i].Email] = &stats[i]
}
+17 -2
View File
@@ -101,7 +101,7 @@ func (s *ClientService) BulkResetTraffic(inboundSvc *InboundService, emails []st
}
affected += int(res.RowsAffected)
}
return nil
return clearGlobalTraffic(tx, cleanEmails...)
})
})
if err != nil {
@@ -128,6 +128,13 @@ func (s *ClientService) resetAllClientTrafficsLocked(id int) error {
whereText += " = ?"
}
var resetEmails []string
if err := tx.Model(xray.ClientTraffic{}).
Where(whereText, id).
Pluck("email", &resetEmails).Error; err != nil {
return err
}
result := tx.Model(xray.ClientTraffic{}).
Where(whereText, id).
Updates(map[string]any{"enable": true, "up": 0, "down": 0})
@@ -136,6 +143,10 @@ func (s *ClientService) resetAllClientTrafficsLocked(id int) error {
return result.Error
}
if err := clearGlobalTraffic(tx, resetEmails...); err != nil {
return err
}
inboundWhereText := "id "
if id == -1 {
inboundWhereText += " > ?"
@@ -155,11 +166,15 @@ func (s *ClientService) resetAllClientTrafficsLocked(id int) error {
}
func (s *ClientService) ResetAllTraffics() (bool, error) {
res := database.GetDB().Model(&xray.ClientTraffic{}).
db := database.GetDB()
res := db.Model(&xray.ClientTraffic{}).
Where("1 = 1").
Updates(map[string]any{"up": 0, "down": 0})
if res.Error != nil {
return false, res.Error
}
if err := db.Where("1 = 1").Delete(&model.ClientGlobalTraffic{}).Error; err != nil {
return false, err
}
return res.RowsAffected > 0, nil
}
+142
View File
@@ -0,0 +1,142 @@
package service
import (
"testing"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
"github.com/mhsanaei/3x-ui/v3/internal/xray"
)
func seedClientRow(t *testing.T, email string, inboundId int, up, down, total int64) {
t.Helper()
db := database.GetDB()
if err := db.Create(&xray.ClientTraffic{InboundId: inboundId, Email: email, Enable: true, Up: up, Down: down, Total: total}).Error; err != nil {
t.Fatalf("seed client_traffics %q: %v", email, err)
}
}
func TestAcceptGlobalTraffic_SideTableOnly(t *testing.T) {
db := initTrafficTestDB(t)
svc := &InboundService{}
seedClientRow(t, "alice", 1, 100, 100, 0)
err := svc.AcceptGlobalTraffic("master-a", []*xray.ClientTraffic{
{Email: "alice", Up: 900, Down: 800},
{Email: "ghost", Up: 5, Down: 5}, // not hosted here — must be dropped
})
if err != nil {
t.Fatalf("AcceptGlobalTraffic: %v", err)
}
local := readTraffic(t, db, "alice")
if local.Up != 100 || local.Down != 100 {
t.Errorf("local counters must stay pure, got up=%d down=%d", local.Up, local.Down)
}
var globals []model.ClientGlobalTraffic
if err := db.Find(&globals).Error; err != nil {
t.Fatalf("read globals: %v", err)
}
if len(globals) != 1 || globals[0].Email != "alice" || globals[0].Up != 900 || globals[0].Down != 800 {
t.Errorf("unexpected globals: %+v", globals)
}
}
func TestAcceptGlobalTraffic_OverwriteAndMultiMaster(t *testing.T) {
db := initTrafficTestDB(t)
svc := &InboundService{}
seedClientRow(t, "alice", 1, 0, 0, 0)
must := func(guid string, up, down int64) {
t.Helper()
if err := svc.AcceptGlobalTraffic(guid, []*xray.ClientTraffic{{Email: "alice", Up: up, Down: down}}); err != nil {
t.Fatalf("AcceptGlobalTraffic(%s): %v", guid, err)
}
}
must("master-a", 900, 900)
must("master-a", 50, 50) // a master-side reset propagates by overwrite
must("master-b", 500, 400)
rows := []*xray.ClientTraffic{{Email: "alice", Up: 10, Down: 10}}
overlayGlobalTraffic(db, rows)
if rows[0].Up != 500 || rows[0].Down != 400 {
t.Errorf("overlay should fold per-master max, got up=%d down=%d", rows[0].Up, rows[0].Down)
}
}
func TestGlobalUsage_DisablesClient(t *testing.T) {
db := initTrafficTestDB(t)
svc := &InboundService{}
// 200 of 1000 used locally — local check alone would never trip.
seedClientRow(t, "cap", 1, 100, 100, 1000)
if err := svc.AcceptGlobalTraffic("master-a", []*xray.ClientTraffic{{Email: "cap", Up: 800, Down: 700}}); err != nil {
t.Fatalf("AcceptGlobalTraffic: %v", err)
}
if _, count, _, err := svc.disableInvalidClients(db); err != nil {
t.Fatalf("disableInvalidClients: %v", err)
} else if count != 1 {
t.Fatalf("expected 1 client disabled, got %d", count)
}
if got := readTraffic(t, db, "cap"); got.Enable {
t.Error("client should be disabled by global usage exceeding its quota")
}
}
func TestGlobalRows_ClearedOnReset(t *testing.T) {
db := initTrafficTestDB(t)
svc := &InboundService{}
seedClientRow(t, "alice", 1, 50, 50, 1000)
if err := svc.AcceptGlobalTraffic("master-a", []*xray.ClientTraffic{{Email: "alice", Up: 999, Down: 999}}); err != nil {
t.Fatalf("AcceptGlobalTraffic: %v", err)
}
if err := svc.ResetClientTrafficByEmail("alice"); err != nil {
t.Fatalf("ResetClientTrafficByEmail: %v", err)
}
var cnt int64
db.Model(&model.ClientGlobalTraffic{}).Count(&cnt)
if cnt != 0 {
t.Errorf("global rows should be cleared on reset, found %d", cnt)
}
}
// The full inbound list doubles as the traffic snapshot masters poll, so it
// must report pure local counters; the slim list only feeds this panel's UI,
// so it carries the cross-panel overlay.
func TestSnapshotListNotOverlaid_SlimOverlaid(t *testing.T) {
db := initTrafficTestDB(t)
svc := &InboundService{}
settings := `{"clients": [{"email": "alice", "enable": true}]}`
ib := &model.Inbound{UserId: 1, Tag: "in-a", Enable: true, Port: 42001, Protocol: model.VLESS, Settings: settings}
if err := db.Create(ib).Error; err != nil {
t.Fatalf("create inbound: %v", err)
}
seedClientRow(t, "alice", ib.Id, 100, 100, 0)
if err := svc.AcceptGlobalTraffic("master-a", []*xray.ClientTraffic{{Email: "alice", Up: 900, Down: 900}}); err != nil {
t.Fatalf("AcceptGlobalTraffic: %v", err)
}
full, err := svc.GetInbounds(1)
if err != nil {
t.Fatalf("GetInbounds: %v", err)
}
if len(full) != 1 || len(full[0].ClientStats) != 1 {
t.Fatalf("unexpected full list shape: %d inbounds", len(full))
}
if full[0].ClientStats[0].Up != 100 {
t.Errorf("full list (master snapshot) must stay un-overlaid, got up=%d", full[0].ClientStats[0].Up)
}
slim, err := svc.GetInboundsSlim(1)
if err != nil {
t.Fatalf("GetInboundsSlim: %v", err)
}
if len(slim) != 1 || len(slim[0].ClientStats) != 1 {
t.Fatalf("unexpected slim list shape: %d inbounds", len(slim))
}
if slim[0].ClientStats[0].Up != 900 {
t.Errorf("slim list should carry the global overlay, got up=%d", slim[0].ClientStats[0].Up)
}
}
+4
View File
@@ -82,6 +82,9 @@ func (s *InboundService) GetInboundsSlim(userId int) ([]*model.Inbound, error) {
// so the list's depleted/expiring badges see every client; the UUID/SubId
// enrichment stays skipped. Must run before slimming strips the settings.
s.backfillClientStats(db, inbounds)
// Slim feeds the panel UI only (masters poll the full list), so the badge
// math may see the cross-panel totals a master pushed.
s.overlayInboundsClientStats(db, inbounds)
for _, ib := range inbounds {
ib.Settings = slimSettingsClients(ib.Settings)
}
@@ -571,6 +574,7 @@ func (s *InboundService) GetInboundDetail(id int) (*model.Inbound, error) {
return nil, err
}
s.enrichClientStats(db, []*model.Inbound{inbound})
s.overlayInboundsClientStats(db, []*model.Inbound{inbound})
return inbound, nil
}
+14 -2
View File
@@ -48,13 +48,25 @@ func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error
return needRestart, count, err
}
// depletedClientsCond matches clients that exhausted their quota or expired.
// Besides the local counters it also trips on the cross-panel usage a master
// pushed into client_global_traffics — that's what lets a node cut a client
// whose combined usage exceeds the quota even though the local share doesn't
// (placeholders: now).
const depletedClientsCond = `((total > 0 AND up + down >= total)
OR (expiry_time > 0 AND expiry_time <= ?)
OR (total > 0 AND EXISTS (
SELECT 1 FROM client_global_traffics g
WHERE g.email = client_traffics.email AND g.up + g.down >= client_traffics.total
)))`
func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int, error) {
now := time.Now().Unix() * 1000
needRestart := false
var depletedRows []xray.ClientTraffic
err := tx.Model(xray.ClientTraffic{}).
Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true).
Where(depletedClientsCond+" AND enable = ?", now, true).
Find(&depletedRows).Error
if err != nil {
return false, 0, nil, err
@@ -130,7 +142,7 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int,
}
result := tx.Model(xray.ClientTraffic{}).
Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true).
Where(depletedClientsCond+" AND enable = ?", now, true).
Update("enable", false)
err = result.Error
count := result.RowsAffected
+19
View File
@@ -386,6 +386,11 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
if err != nil {
return false, 0, err
}
// A renewed client starts a fresh quota window: drop the cross-panel rows
// too, or the stale pushed totals would re-deplete it immediately.
if err = clearGlobalTraffic(tx, renewEmails...); err != nil {
return false, 0, err
}
if p != nil {
err1 = s.xrayApi.Init(p.GetAPIPort())
if err1 != nil {
@@ -436,6 +441,9 @@ func (s *InboundService) DelClientStat(tx *gorm.DB, email string) error {
if err := tx.Where("email = ?", email).Delete(xray.ClientTraffic{}).Error; err != nil {
return err
}
if err := clearGlobalTraffic(tx, email); err != nil {
return err
}
return tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error
}
@@ -447,6 +455,9 @@ func (s *InboundService) delClientStatsByEmails(tx *gorm.DB, emails []string) er
if err := tx.Where("email IN ?", batch).Delete(xray.ClientTraffic{}).Error; err != nil {
return err
}
if err := tx.Where("email IN ?", batch).Delete(&model.ClientGlobalTraffic{}).Error; err != nil {
return err
}
if err := tx.Where("email IN ?", batch).Delete(&model.NodeClientTraffic{}).Error; err != nil {
return err
}
@@ -457,6 +468,9 @@ func (s *InboundService) delClientStatsByEmails(tx *gorm.DB, emails []string) er
func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error {
return submitTrafficWrite(func() error {
db := database.GetDB()
if err := clearGlobalTraffic(db, clientEmail); err != nil {
return err
}
return db.Model(xray.ClientTraffic{}).
Where("email = ?", clientEmail).
Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error
@@ -550,6 +564,9 @@ func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (b
if err != nil {
return false, err
}
if err := clearGlobalTraffic(db, clientEmail); err != nil {
return false, err
}
now := time.Now().UnixMilli()
_ = db.Model(model.Inbound{}).
@@ -848,6 +865,7 @@ func (s *InboundService) GetAllClientTraffics() ([]*xray.ClientTraffic, error) {
if err := db.Model(xray.ClientTraffic{}).Find(&traffics).Error; err != nil {
return nil, err
}
overlayGlobalTraffic(db, traffics)
return traffics, nil
}
@@ -880,6 +898,7 @@ func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.Cl
if len(traffics) == 0 {
return nil, nil
}
overlayGlobalTraffic(db, traffics)
t := traffics[0]
if rec, rErr := s.clientService.GetRecordByEmail(db, email); rErr == nil && rec != nil {
@@ -0,0 +1,215 @@
package service
import (
"strings"
"time"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
"github.com/mhsanaei/3x-ui/v3/internal/logger"
"github.com/mhsanaei/3x-ui/v3/internal/xray"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// AcceptGlobalTraffic ingests a master panel's aggregated per-client usage
// into client_global_traffics, keyed by (masterGuid, email). The numbers are
// display/enforcement inputs only — client_traffics keeps this panel's
// local-only counters, so the pushing master's (and any other master's)
// delta accounting over our snapshot stays correct.
//
// Rows are overwritten, not max-merged: a reset on the master propagates here
// on its next push. Emails this panel doesn't host are dropped.
func (s *InboundService) AcceptGlobalTraffic(masterGuid string, traffics []*xray.ClientTraffic) error {
masterGuid = strings.TrimSpace(masterGuid)
if masterGuid == "" {
return nil
}
emails := make([]string, 0, len(traffics))
byEmail := make(map[string]*xray.ClientTraffic, len(traffics))
for _, t := range traffics {
if t == nil || t.Email == "" {
continue
}
if _, dup := byEmail[t.Email]; !dup {
emails = append(emails, t.Email)
}
byEmail[t.Email] = t
}
if len(emails) == 0 {
return nil
}
return submitTrafficWrite(func() error {
db := database.GetDB()
known := make([]string, 0, len(emails))
for _, batch := range chunkStrings(emails, sqlInChunk) {
var page []string
if err := db.Model(xray.ClientTraffic{}).
Where("email IN ?", batch).
Pluck("email", &page).Error; err != nil {
return err
}
known = append(known, page...)
}
if len(known) == 0 {
return nil
}
now := time.Now().UnixMilli()
rows := make([]model.ClientGlobalTraffic, 0, len(known))
for _, email := range known {
t := byEmail[email]
if t == nil {
continue
}
rows = append(rows, model.ClientGlobalTraffic{
MasterGuid: masterGuid,
Email: email,
Up: t.Up,
Down: t.Down,
UpdatedAt: now,
})
}
return db.Transaction(func(tx *gorm.DB) error {
for _, batch := range chunkGlobalRows(rows, 200) {
if err := tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "master_guid"}, {Name: "email"}},
DoUpdates: clause.AssignmentColumns([]string{"up", "down", "updated_at"}),
}).Create(&batch).Error; err != nil {
return err
}
}
return nil
})
})
}
func chunkGlobalRows(rows []model.ClientGlobalTraffic, size int) [][]model.ClientGlobalTraffic {
if len(rows) == 0 {
return nil
}
out := make([][]model.ClientGlobalTraffic, 0, (len(rows)+size-1)/size)
for start := 0; start < len(rows); start += size {
end := min(start+size, len(rows))
out = append(out, rows[start:end])
}
return out
}
// overlayGlobalTraffic raises Up/Down on the given rows to the largest global
// value any master pushed for that email. Read-path only — callers hand it
// rows about to be serialized for display; the stored counters are untouched.
func overlayGlobalTraffic(db *gorm.DB, rows []*xray.ClientTraffic) {
if len(rows) == 0 {
return
}
// Cheap short-circuit for the common case (a panel no master pushes to).
var probe int64
if err := db.Model(&model.ClientGlobalTraffic{}).Limit(1).Count(&probe).Error; err != nil || probe == 0 {
return
}
emails := make([]string, 0, len(rows))
byEmail := make(map[string][]*xray.ClientTraffic, len(rows))
for _, r := range rows {
if r == nil || r.Email == "" {
continue
}
key := strings.ToLower(r.Email)
if _, ok := byEmail[key]; !ok {
emails = append(emails, r.Email)
}
byEmail[key] = append(byEmail[key], r)
}
for _, batch := range chunkStrings(emails, sqlInChunk) {
var globals []model.ClientGlobalTraffic
if err := db.Where("email IN ?", batch).Find(&globals).Error; err != nil {
logger.Warning("overlayGlobalTraffic:", err)
return
}
for i := range globals {
for _, r := range byEmail[strings.ToLower(globals[i].Email)] {
if globals[i].Up > r.Up {
r.Up = globals[i].Up
}
if globals[i].Down > r.Down {
r.Down = globals[i].Down
}
}
}
}
}
// overlayGlobalTrafficValues is overlayGlobalTraffic for value slices.
func overlayGlobalTrafficValues(db *gorm.DB, rows []xray.ClientTraffic) {
if len(rows) == 0 {
return
}
ptrs := make([]*xray.ClientTraffic, 0, len(rows))
for i := range rows {
ptrs = append(ptrs, &rows[i])
}
overlayGlobalTraffic(db, ptrs)
}
// GetNodeClientTraffics returns this panel's aggregated traffic rows for the
// clients known to live on the given node (those with a delta baseline) —
// the payload for Remote.PushGlobalClientTraffics. The rows carry the global
// overlay so a mid-chain panel forwards the widest view it has seen, not just
// its own aggregate.
func (s *InboundService) GetNodeClientTraffics(nodeID int) ([]*xray.ClientTraffic, error) {
db := database.GetDB()
var emails []string
if err := db.Model(&model.NodeClientTraffic{}).
Where("node_id = ?", nodeID).
Pluck("email", &emails).Error; err != nil {
return nil, err
}
if len(emails) == 0 {
return nil, nil
}
out := make([]*xray.ClientTraffic, 0, len(emails))
for _, batch := range chunkStrings(emails, sqlInChunk) {
var page []*xray.ClientTraffic
if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
return nil, err
}
out = append(out, page...)
}
overlayGlobalTraffic(db, out)
return out, nil
}
// overlayInboundsClientStats applies the global overlay to every preloaded
// ClientStats row across the given inbounds. UI read paths only — never the
// full /panel/api/inbounds/list payload, which doubles as the traffic
// snapshot masters poll: overlaying that would leak pushed globals back into
// the masters' delta accounting.
func (s *InboundService) overlayInboundsClientStats(db *gorm.DB, inbounds []*model.Inbound) {
rows := make([]*xray.ClientTraffic, 0)
for _, ib := range inbounds {
for j := range ib.ClientStats {
rows = append(rows, &ib.ClientStats[j])
}
}
overlayGlobalTraffic(db, rows)
}
// clearGlobalTraffic drops every master's pushed rows for the given emails.
// Used by client deletion and traffic-reset flows: after a node-local reset
// the next master push restores the master's (authoritative) numbers, and
// after a master-side reset that push carries the reset values anyway.
func clearGlobalTraffic(tx *gorm.DB, emails ...string) error {
if len(emails) == 0 {
return nil
}
for _, batch := range chunkStrings(emails, sqlInChunk) {
if err := tx.Where("email IN ?", batch).Delete(&model.ClientGlobalTraffic{}).Error; err != nil {
return err
}
}
return nil
}