From 6a032bcb2a8c5baf7c758dda255158aaf7108d16 Mon Sep 17 00:00:00 2001 From: MHSanaei Date: Sat, 20 Jun 2026 10:35:46 +0200 Subject: [PATCH] perf(scale): speed up traffic, auto-renew, and node bulk ops at 50k-100k clients Local hot paths: - autoRenewClients: replace the O(clients x expired) inner scan with an email->traffic map lookup (quadratic at scale). - node traffic sync: scope the client_traffics email-membership query to the snapshot's emails instead of plucking the whole table every poll. - add a (expiry_time, reset) index for the per-tick auto-renew filter. - SQLite: add cache_size/mmap_size/temp_store pragmas (env-tunable); keep the single-file DELETE journal and synchronous=FULL defaults. - scale benchmarks now run on SQLite too via XUI_SCALE_TEST=1 (shared setupScaleDB/resetScaleTables helpers), not just Postgres. Node paths: - bulk add/delete/adjust on a node-attached inbound folded one HTTP RPC per client; above nodeBulkPushThreshold (32) mark the node dirty and let one ReconcileNode push converge it instead of O(M) sequential round-trips. Small ops keep the live per-client path. Also hoist nodePushPlan out of the per-email delete loop. - ReconcileNode skips inbounds whose wire payload is unchanged (per-tag fingerprint on Remote), guarded by node-side tag presence so a restarted node is still re-seeded. Tests: auto-renew multi-inbound correctness, node-path dispatch (large ops fold to dirty, small ops push live) via a manager runtime override seam, and reconcile delta-skip. --- internal/database/db.go | 42 ++++- internal/database/index_tags_test.go | 1 + internal/web/runtime/manager.go | 21 +++ internal/web/runtime/reconcile_skip_test.go | 65 +++++++ internal/web/runtime/remote.go | 37 ++++ .../web/service/api_scale_postgres_test.go | 31 +--- internal/web/service/client_bulk.go | 10 ++ internal/web/service/client_inbound_apply.go | 41 +++-- .../web/service/inbound_autorenew_test.go | 104 +++++++++++ internal/web/service/inbound_node.go | 50 ++++-- internal/web/service/inbound_traffic.go | 61 ++++--- .../web/service/node_bulk_dispatch_test.go | 168 ++++++++++++++++++ internal/web/service/scale_helpers_test.go | 64 +++++++ .../web/service/sync_scale_postgres_test.go | 61 ++----- internal/xray/client_traffic.go | 4 +- 15 files changed, 623 insertions(+), 137 deletions(-) create mode 100644 internal/web/runtime/reconcile_skip_test.go create mode 100644 internal/web/service/inbound_autorenew_test.go create mode 100644 internal/web/service/node_bulk_dispatch_test.go create mode 100644 internal/web/service/scale_helpers_test.go diff --git a/internal/database/db.go b/internal/database/db.go index f249cbd88..c1db84430 100644 --- a/internal/database/db.go +++ b/internal/database/db.go @@ -6,6 +6,7 @@ import ( "bytes" "encoding/json" "errors" + "fmt" "io" "log" "math" @@ -886,7 +887,10 @@ func InitDB(dbPath string) error { if err = os.MkdirAll(dir, 0755); err != nil { return err } - dsn := dbPath + "?_journal_mode=DELETE&_busy_timeout=10000&_synchronous=FULL&_txlock=immediate" + // Keep journal_mode=DELETE so the DB stays a single file (no -wal/-shm + // sidecars). synchronous defaults to FULL for durability but is tunable. + sync := sqliteSynchronous() + dsn := dbPath + "?_journal_mode=DELETE&_busy_timeout=10000&_synchronous=" + sync + "&_txlock=immediate" db, err = gorm.Open(sqlite.Open(dsn), c) if err != nil { return err @@ -895,14 +899,21 @@ func InitDB(dbPath string) error { if err != nil { return err } - if _, err := sqlDB.Exec("PRAGMA journal_mode=DELETE"); err != nil { - return err + // Re-assert the DSN pragmas plus scan-friendly ones for large datasets. + // cache_size/mmap_size/temp_store create no extra files, so the single-file + // guarantee holds; they just cut disk I/O on the 50k-row hot paths. + pragmas := []string{ + "PRAGMA journal_mode=DELETE", + "PRAGMA busy_timeout=10000", + "PRAGMA synchronous=" + sync, + fmt.Sprintf("PRAGMA cache_size=-%d", envInt("XUI_DB_CACHE_MB", 32)*1024), + fmt.Sprintf("PRAGMA mmap_size=%d", int64(envInt("XUI_DB_MMAP_MB", 256))*1024*1024), + "PRAGMA temp_store=MEMORY", } - if _, err := sqlDB.Exec("PRAGMA busy_timeout=10000"); err != nil { - return err - } - if _, err := sqlDB.Exec("PRAGMA synchronous=FULL"); err != nil { - return err + for _, p := range pragmas { + if _, err := sqlDB.Exec(p); err != nil { + return err + } } } @@ -939,6 +950,21 @@ func InitDB(dbPath string) error { return runSeeders(isUsersEmpty) } +// sqliteSynchronous returns the SQLite synchronous mode, defaulting to FULL. +// Whitelisted because the value is interpolated directly into a PRAGMA string. +func sqliteSynchronous() string { + switch strings.ToUpper(strings.TrimSpace(os.Getenv("XUI_DB_SYNCHRONOUS"))) { + case "OFF": + return "OFF" + case "NORMAL": + return "NORMAL" + case "EXTRA": + return "EXTRA" + default: + return "FULL" + } +} + func envInt(key string, def int) int { v := strings.TrimSpace(os.Getenv(key)) if v == "" { diff --git a/internal/database/index_tags_test.go b/internal/database/index_tags_test.go index e7a27c1ca..03396119c 100644 --- a/internal/database/index_tags_test.go +++ b/internal/database/index_tags_test.go @@ -31,6 +31,7 @@ func TestAutoMigrateCreatesHotPathIndexes(t *testing.T) { }{ {&model.ClientRecord{}, "idx_client_record_group"}, {&xray.ClientTraffic{}, "idx_client_traffics_inbound"}, + {&xray.ClientTraffic{}, "idx_client_traffics_renew"}, } for _, c := range cases { if !db.Migrator().HasIndex(c.model, c.index) { diff --git a/internal/web/runtime/manager.go b/internal/web/runtime/manager.go index 2105d2b68..19cd12017 100644 --- a/internal/web/runtime/manager.go +++ b/internal/web/runtime/manager.go @@ -17,6 +17,7 @@ type Manager struct { mu sync.RWMutex remotes map[int]*Remote + overrides map[int]Runtime // test-only: forces RuntimeFor to return a stub egressResolver NodeEgressResolver } @@ -27,6 +28,22 @@ func NewManager(localDeps LocalDeps) *Manager { } } +// SetRuntimeOverride makes RuntimeFor(nodeID) return rt instead of building a +// real Remote. Test seam for exercising node-dispatch paths without a network +// node; pass nil rt to clear. +func (m *Manager) SetRuntimeOverride(nodeID int, rt Runtime) { + m.mu.Lock() + defer m.mu.Unlock() + if rt == nil { + delete(m.overrides, nodeID) + return + } + if m.overrides == nil { + m.overrides = make(map[int]Runtime) + } + m.overrides[nodeID] = rt +} + func (m *Manager) SetNodeEgressResolver(r NodeEgressResolver) { m.mu.Lock() defer m.mu.Unlock() @@ -47,6 +64,10 @@ func (m *Manager) RuntimeFor(nodeID *int) (Runtime, error) { return m.local, nil } m.mu.RLock() + if rt, ok := m.overrides[*nodeID]; ok { + m.mu.RUnlock() + return rt, nil + } if rt, ok := m.remotes[*nodeID]; ok { m.mu.RUnlock() return rt, nil diff --git a/internal/web/runtime/reconcile_skip_test.go b/internal/web/runtime/reconcile_skip_test.go new file mode 100644 index 000000000..74da152b0 --- /dev/null +++ b/internal/web/runtime/reconcile_skip_test.go @@ -0,0 +1,65 @@ +package runtime + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + + "github.com/mhsanaei/3x-ui/v3/internal/database/model" +) + +// TestReconcileInbound_SkipsUnchanged proves the delta-skip: a second reconcile +// of an unchanged inbound that the node still reports sends no push, while a +// content change or an absent-on-node inbound forces a fresh push. +func TestReconcileInbound_SkipsUnchanged(t *testing.T) { + var pushes atomic.Int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/panel/api/inbounds/update/") { + pushes.Add(1) + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"success":true}`)) + })) + defer srv.Close() + + r := NewRemote(nodeForPlainServer(t, srv, "verify", "tok"), nil) + ib := &model.Inbound{Tag: "in-1", Protocol: model.VLESS, Port: 443, Settings: `{"clients":[]}`} + // Pre-seed the tag→id cache so resolveRemoteID needs no network round-trip. + r.cacheSet(ib.Tag, 7) + + // First reconcile: node doesn't report it yet → must push and record the fp. + if pushed, err := r.ReconcileInbound(context.Background(), ib, false); err != nil || !pushed { + t.Fatalf("first reconcile: pushed=%v err=%v, want push", pushed, err) + } + if got := pushes.Load(); got != 1 { + t.Fatalf("after first reconcile pushes=%d, want 1", got) + } + + // Second reconcile: unchanged and present on node → skip. + if pushed, err := r.ReconcileInbound(context.Background(), ib, true); err != nil || pushed { + t.Fatalf("second reconcile: pushed=%v err=%v, want skip", pushed, err) + } + if got := pushes.Load(); got != 1 { + t.Fatalf("unchanged reconcile pushed again: pushes=%d, want 1", got) + } + + // Content change → push again even though it's present on node. + ib.Settings = `{"clients":[{"email":"a@x"}]}` + if pushed, err := r.ReconcileInbound(context.Background(), ib, true); err != nil || !pushed { + t.Fatalf("changed reconcile: pushed=%v err=%v, want push", pushed, err) + } + if got := pushes.Load(); got != 2 { + t.Fatalf("changed reconcile pushes=%d, want 2", got) + } + + // Absent on node (e.g. node restarted/lost it) → re-push even if fp matches. + if pushed, err := r.ReconcileInbound(context.Background(), ib, false); err != nil || !pushed { + t.Fatalf("absent-on-node reconcile: pushed=%v err=%v, want push", pushed, err) + } + if got := pushes.Load(); got != 3 { + t.Fatalf("absent-on-node reconcile pushes=%d, want 3", got) + } +} diff --git a/internal/web/runtime/remote.go b/internal/web/runtime/remote.go index 94d78c465..74303f090 100644 --- a/internal/web/runtime/remote.go +++ b/internal/web/runtime/remote.go @@ -3,6 +3,8 @@ package runtime import ( "bytes" "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -71,6 +73,10 @@ type Remote struct { mu sync.RWMutex remoteIDByTag map[string]int + // pushedFP holds the fingerprint of the last inbound wire payload successfully + // pushed, keyed by panel-side tag, so reconcile can skip re-sending an + // unchanged inbound. Guarded by mu; dropped with the Remote on node config change. + pushedFP map[string]string // supportsZstd is learned from the node's X-3x-Node-Caps response header; once // seen, config pushes to this node are zstd-compressed. Old nodes never set // it, so they keep receiving plain bodies (mixed-version safe). @@ -96,6 +102,7 @@ func NewRemote(n *model.Node, r NodeEgressResolver) *Remote { return &Remote{ node: n, remoteIDByTag: make(map[string]int), + pushedFP: make(map[string]string), egressResolver: r, } } @@ -432,6 +439,36 @@ func (r *Remote) UpdateInbound(ctx context.Context, oldIb, newIb *model.Inbound) return nil } +// ReconcileInbound pushes ib only when its wire payload differs from the last +// successful push, or when the node no longer reports the tag (existsOnNode +// false) — a node that dropped/restarted must still be re-seeded. Returns +// whether a push actually happened. This turns a full-fleet reconcile from "send +// every inbound's full settings" into "send only what changed". +func (r *Remote) ReconcileInbound(ctx context.Context, ib *model.Inbound, existsOnNode bool) (bool, error) { + fp := wireFingerprint(wireInbound(ib, r.node.Id)) + if existsOnNode { + r.mu.RLock() + prev, ok := r.pushedFP[ib.Tag] + r.mu.RUnlock() + if ok && prev == fp { + return false, nil + } + } + if err := r.UpdateInbound(ctx, ib, ib); err != nil { + return false, err + } + r.mu.Lock() + r.pushedFP[ib.Tag] = fp + r.mu.Unlock() + return true, nil +} + +// wireFingerprint hashes a wire payload so an unchanged inbound is cheap to detect. +func wireFingerprint(v url.Values) string { + sum := sha256.Sum256([]byte(v.Encode())) + return hex.EncodeToString(sum[:]) +} + func (r *Remote) AddUser(ctx context.Context, ib *model.Inbound, _ map[string]any) error { return r.UpdateInbound(ctx, ib, ib) } diff --git a/internal/web/service/api_scale_postgres_test.go b/internal/web/service/api_scale_postgres_test.go index 37a791e06..39ece795d 100644 --- a/internal/web/service/api_scale_postgres_test.go +++ b/internal/web/service/api_scale_postgres_test.go @@ -2,17 +2,12 @@ package service import ( "fmt" - "os" - "strings" "testing" "time" "github.com/mhsanaei/3x-ui/v3/internal/database" "github.com/mhsanaei/3x-ui/v3/internal/database/model" - xuilogger "github.com/mhsanaei/3x-ui/v3/internal/logger" "github.com/mhsanaei/3x-ui/v3/internal/xray" - - "github.com/op/go-logging" ) func seedClientTraffics(t *testing.T, inboundId int, clients []model.Client) { @@ -37,14 +32,7 @@ func seedClientTraffics(t *testing.T, inboundId int, clients []model.Client) { // reachable from the REST API at 100k/200k clients, asserting none crash on the // PostgreSQL bind-parameter ceiling and logging the wall-clock cost of each. func TestAllAPIsPostgresScale(t *testing.T) { - if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" { - t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark") - } - xuilogger.InitLogger(logging.ERROR) - if err := database.InitDB(""); err != nil { - t.Fatalf("InitDB: %v", err) - } - t.Cleanup(func() { _ = database.CloseDB() }) + setupScaleDB(t) svc := &ClientService{} inboundSvc := &InboundService{} @@ -56,9 +44,7 @@ func TestAllAPIsPostgresScale(t *testing.T) { for _, n := range sizes { t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) { db := database.GetDB() - if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics, client_groups RESTART IDENTITY CASCADE").Error; err != nil { - t.Fatalf("truncate: %v", err) - } + resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics", "client_groups") clients := makeScaleClients(n) exp := time.Now().AddDate(1, 0, 0).UnixMilli() @@ -150,14 +136,7 @@ func TestAllAPIsPostgresScale(t *testing.T) { // old path (GetClientByEmail, which parses the inbound's entire settings JSON to // find one client) vs new path (UUID/subId read from the indexed clients table). func TestGetClientTrafficByEmailABScale(t *testing.T) { - if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" { - t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark") - } - xuilogger.InitLogger(logging.ERROR) - if err := database.InitDB(""); err != nil { - t.Fatalf("InitDB: %v", err) - } - t.Cleanup(func() { _ = database.CloseDB() }) + setupScaleDB(t) svc := &ClientService{} inboundSvc := &InboundService{} @@ -179,9 +158,7 @@ func TestGetClientTrafficByEmailABScale(t *testing.T) { for _, n := range sizes { t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) { db := database.GetDB() - if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil { - t.Fatalf("truncate: %v", err) - } + resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics") clients := makeScaleClients(n) ib := &model.Inbound{UserId: 1, Tag: fmt.Sprintf("ctbe-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)} if err := db.Create(ib).Error; err != nil { diff --git a/internal/web/service/client_bulk.go b/internal/web/service/client_bulk.go index 5a21119ee..52c663cef 100644 --- a/internal/web/service/client_bulk.go +++ b/internal/web/service/client_bulk.go @@ -525,6 +525,11 @@ func (s *ClientService) bulkAdjustInboundClients( if dirty { markDirty = true } + // Large batches collapse into one reconcile push rather than M updates. + if push && len(foundEmails) > nodeBulkPushThreshold { + markDirty = true + push = false + } if push { for email := range foundEmails { entry := plan[email] @@ -911,6 +916,11 @@ func (s *ClientService) bulkDelInboundClients( if dirty { markDirty = true } + // Large batches collapse into one reconcile push rather than M deletes. + if push && len(foundEmails) > nodeBulkPushThreshold { + markDirty = true + push = false + } if push { for email := range foundEmails { if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { diff --git a/internal/web/service/client_inbound_apply.go b/internal/web/service/client_inbound_apply.go index 59b5eb19d..1661be383 100644 --- a/internal/web/service/client_inbound_apply.go +++ b/internal/web/service/client_inbound_apply.go @@ -163,6 +163,25 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId return needRestart, txErr } + // Resolve the node push plan once for the whole batch instead of per email. + var nodeRt runtime.Runtime + nodePush := false + if oldInbound.NodeID != nil { + rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) + if perr != nil { + return needRestart, perr + } + if dirty { + markDirty = true + } + nodeRt, nodePush = rt, push + // Large batches collapse into one reconcile push rather than M deletes. + if nodePush && len(targets) > nodeBulkPushThreshold { + markDirty = true + nodePush = false + } + } + // Apply runtime deletes after commit — outside the serialized writer so a // slow node call can't stall traffic accounting. for _, t := range targets { @@ -180,20 +199,11 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId } } } - } else { - rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) - if perr != nil { - return needRestart, perr - } - if dirty { + } else if nodePush { + if err1 := nodeRt.DeleteUser(context.Background(), oldInbound, t.email); err1 != nil { + logger.Warning("Error in deleting client on", nodeRt.Name(), ":", err1) markDirty = true } - if push { - if err1 := rt.DeleteUser(context.Background(), oldInbound, t.email); err1 != nil { - logger.Warning("Error in deleting client on", rt.Name(), ":", err1) - markDirty = true - } - } } } @@ -402,6 +412,13 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model } } } else { + // Large batches would be M sequential per-client RPCs; the inbound's saved + // settings already hold the final set, so mark dirty and let one reconcile + // push converge the node instead. + if push && len(clients) > nodeBulkPushThreshold { + markDirty = true + push = false + } for _, client := range clients { if push { if err1 := rt.AddClient(context.Background(), oldInbound, client); err1 != nil { diff --git a/internal/web/service/inbound_autorenew_test.go b/internal/web/service/inbound_autorenew_test.go new file mode 100644 index 000000000..34f6ba0ed --- /dev/null +++ b/internal/web/service/inbound_autorenew_test.go @@ -0,0 +1,104 @@ +package service + +import ( + "testing" + "time" + + "github.com/mhsanaei/3x-ui/v3/internal/database" + "github.com/mhsanaei/3x-ui/v3/internal/database/model" + "github.com/mhsanaei/3x-ui/v3/internal/xray" +) + +// TestAutoRenewClients_MultiInbound covers the renew loop across more than one +// inbound: every expired client with reset>0 must get a fresh future expiry, +// zeroed usage and re-enabled state, while a non-expiring client is untouched. +// It also guards the map-lookup refactor of the old quadratic inner loop. +func TestAutoRenewClients_MultiInbound(t *testing.T) { + setupBulkDB(t) + svc := &InboundService{} + db := database.GetDB() + + past := time.Now().Add(-48 * time.Hour).UnixMilli() + future := time.Now().Add(365 * 24 * time.Hour).UnixMilli() + + // Two inbounds, two expiring clients each, plus one client that never expires. + ib1Clients := []model.Client{ + {Email: "a@x", ID: "11111111-1111-1111-1111-111111111111", Enable: false, Reset: 30, ExpiryTime: past}, + {Email: "b@x", ID: "22222222-2222-2222-2222-222222222222", Enable: false, Reset: 30, ExpiryTime: past}, + } + ib2Clients := []model.Client{ + {Email: "c@x", ID: "33333333-3333-3333-3333-333333333333", Enable: false, Reset: 7, ExpiryTime: past}, + {Email: "keep@x", ID: "44444444-4444-4444-4444-444444444444", Enable: true, Reset: 0, ExpiryTime: future}, + } + + ib1 := mkInbound(t, 30001, model.VLESS, clientsSettings(t, ib1Clients)) + ib2 := mkInbound(t, 30002, model.VLESS, clientsSettings(t, ib2Clients)) + if err := svc.clientService.SyncInbound(nil, ib1.Id, ib1Clients); err != nil { + t.Fatalf("SyncInbound ib1: %v", err) + } + if err := svc.clientService.SyncInbound(nil, ib2.Id, ib2Clients); err != nil { + t.Fatalf("SyncInbound ib2: %v", err) + } + + // Seed traffic rows: expired+depleted for the three renewable clients, and a + // healthy row for keep@x. + rows := []xray.ClientTraffic{ + {InboundId: ib1.Id, Email: "a@x", Enable: false, Up: 100, Down: 200, Reset: 30, ExpiryTime: past}, + {InboundId: ib1.Id, Email: "b@x", Enable: false, Up: 300, Down: 400, Reset: 30, ExpiryTime: past}, + {InboundId: ib2.Id, Email: "c@x", Enable: false, Up: 500, Down: 600, Reset: 7, ExpiryTime: past}, + {InboundId: ib2.Id, Email: "keep@x", Enable: true, Up: 1, Down: 2, Reset: 0, ExpiryTime: future}, + } + if err := db.Create(&rows).Error; err != nil { + t.Fatalf("seed client_traffics: %v", err) + } + + if _, count, err := svc.autoRenewClients(db); err != nil { + t.Fatalf("autoRenewClients: %v", err) + } else if count != 3 { + t.Fatalf("renewed count = %d, want 3", count) + } + + now := time.Now().UnixMilli() + for _, email := range []string{"a@x", "b@x", "c@x"} { + var row xray.ClientTraffic + if err := db.Where("email = ?", email).First(&row).Error; err != nil { + t.Fatalf("read %s: %v", email, err) + } + if row.Up != 0 || row.Down != 0 { + t.Errorf("%s: usage not reset: up=%d down=%d", email, row.Up, row.Down) + } + if !row.Enable { + t.Errorf("%s: not re-enabled", email) + } + if row.ExpiryTime <= now { + t.Errorf("%s: expiry not advanced: got %d, now %d", email, row.ExpiryTime, now) + } + } + + // The non-expiring client must be left exactly as seeded. + var keep xray.ClientTraffic + if err := db.Where("email = ?", "keep@x").First(&keep).Error; err != nil { + t.Fatalf("read keep@x: %v", err) + } + if keep.Up != 1 || keep.Down != 2 || keep.ExpiryTime != future { + t.Errorf("keep@x was modified: %+v", keep) + } + + // The renewed state must also be reflected in the inbound settings JSON. + reloaded, err := svc.GetInbound(ib1.Id) + if err != nil { + t.Fatalf("GetInbound ib1: %v", err) + } + cs, err := svc.GetClients(reloaded) + if err != nil { + t.Fatalf("GetClients ib1: %v", err) + } + for _, c := range cs { + if !c.Enable { + t.Errorf("settings client %s still disabled after renew", c.Email) + } + if c.ExpiryTime <= now { + t.Errorf("settings client %s expiry not advanced: %d", c.Email, c.ExpiryTime) + } + } +} diff --git a/internal/web/service/inbound_node.go b/internal/web/service/inbound_node.go index aeeab2eff..9d1becef4 100644 --- a/internal/web/service/inbound_node.go +++ b/internal/web/service/inbound_node.go @@ -21,6 +21,12 @@ import ( var reportedRemoteTagConflict sync.Map +// nodeBulkPushThreshold caps how many per-client RPCs a single operation will +// stream to a remote node. Above it, the panel marks the node dirty instead and +// lets one ReconcileNode push converge the whole inbound — far cheaper than M +// sequential round-trips. Small ops stay on the live per-client path. +const nodeBulkPushThreshold = 32 + func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) { mgr := runtime.GetManager() if mgr == nil { @@ -90,18 +96,31 @@ func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, if err != nil { return err } + remoteTagSet := make(map[string]struct{}, len(remoteTags)) + for _, tag := range remoteTags { + remoteTagSet[tag] = struct{}{} + } prefix := nodeTagPrefix(&nodeID) desiredTags := make(map[string]struct{}, len(inbounds)*2) for _, ib := range inbounds { desiredTags[ib.Tag] = struct{}{} + // existsOnNode: does the node already report this inbound under any of the + // tag forms it may be stored as? If so, an unchanged push can be skipped. + _, existsOnNode := remoteTagSet[ib.Tag] if prefix != "" { if stripped, found := strings.CutPrefix(ib.Tag, prefix); found { desiredTags[stripped] = struct{}{} + if _, ok := remoteTagSet[stripped]; ok { + existsOnNode = true + } } else { desiredTags[prefix+ib.Tag] = struct{}{} + if _, ok := remoteTagSet[prefix+ib.Tag]; ok { + existsOnNode = true + } } } - if err := rt.UpdateInbound(ctx, ib, ib); err != nil { + if _, err := rt.ReconcileInbound(ctx, ib, existsOnNode); err != nil { return fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err) } } @@ -260,15 +279,6 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi nodeBaselines[baselineRows[i].Email] = nodeTrafficCounter{Up: baselineRows[i].Up, Down: baselineRows[i].Down} } - var existingEmailsList []string - if err := db.Model(xray.ClientTraffic{}).Pluck("email", &existingEmailsList).Error; err != nil { - return false, err - } - existingEmails := make(map[string]struct{}, len(existingEmailsList)) - for _, e := range existingEmailsList { - existingEmails[e] = struct{}{} - } - var defaultUserId int if len(central) > 0 { defaultUserId = central[0].UserId @@ -312,6 +322,26 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi } } + // Membership set for the rowExists checks below. Only the snapshot's emails + // are ever probed, so scope the lookup to those instead of plucking the whole + // client_traffics table (50k+ rows) on every node poll. + existingEmails := make(map[string]struct{}, len(snapEmailsAll)) + if len(snapEmailsAll) > 0 { + snapEmailList := make([]string, 0, len(snapEmailsAll)) + for email := range snapEmailsAll { + snapEmailList = append(snapEmailList, email) + } + for _, batch := range chunkStrings(snapEmailList, sqliteMaxVars) { + var found []string + if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Pluck("email", &found).Error; err != nil { + return false, err + } + for _, e := range found { + existingEmails[e] = struct{}{} + } + } + } + tx := db.Begin() committed := false defer func() { diff --git a/internal/web/service/inbound_traffic.go b/internal/web/service/inbound_traffic.go index 244f7ad99..261a2596a 100644 --- a/internal/web/service/inbound_traffic.go +++ b/internal/web/service/inbound_traffic.go @@ -348,6 +348,13 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) { } inbounds = append(inbounds, page...) } + // Index the expired traffics by email so each client is an O(1) lookup + // instead of a linear scan of every expired row (O(clients × expired) per + // inbound, quadratic at scale). Pointers keep the in-place mutation below. + trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics)) + for i := range traffics { + trafficByEmail[traffics[i].Email] = traffics[i] + } for inbound_index := range inbounds { settings := map[string]any{} json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings) @@ -357,34 +364,34 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) { } for client_index := range clients { c := clients[client_index].(map[string]any) - for traffic_index, traffic := range traffics { - if traffic.Email == c["email"].(string) { - newExpiryTime := traffic.ExpiryTime - for newExpiryTime < now { - newExpiryTime += (int64(traffic.Reset) * 86400000) - } - c["expiryTime"] = newExpiryTime - traffics[traffic_index].ExpiryTime = newExpiryTime - traffics[traffic_index].Down = 0 - traffics[traffic_index].Up = 0 - if !traffic.Enable { - traffics[traffic_index].Enable = true - c["enable"] = true - clientsToAdd = append(clientsToAdd, - struct { - protocol string - tag string - client map[string]any - }{ - protocol: string(inbounds[inbound_index].Protocol), - tag: inbounds[inbound_index].Tag, - client: c, - }) - } - clients[client_index] = any(c) - break - } + email, _ := c["email"].(string) + traffic, ok := trafficByEmail[email] + if !ok { + continue } + newExpiryTime := traffic.ExpiryTime + for newExpiryTime < now { + newExpiryTime += (int64(traffic.Reset) * 86400000) + } + c["expiryTime"] = newExpiryTime + traffic.ExpiryTime = newExpiryTime + traffic.Down = 0 + traffic.Up = 0 + if !traffic.Enable { + traffic.Enable = true + c["enable"] = true + clientsToAdd = append(clientsToAdd, + struct { + protocol string + tag string + client map[string]any + }{ + protocol: string(inbounds[inbound_index].Protocol), + tag: inbounds[inbound_index].Tag, + client: c, + }) + } + clients[client_index] = any(c) } settings["clients"] = clients newSettings, err := json.MarshalIndent(settings, "", " ") diff --git a/internal/web/service/node_bulk_dispatch_test.go b/internal/web/service/node_bulk_dispatch_test.go new file mode 100644 index 000000000..ec9ec2bd9 --- /dev/null +++ b/internal/web/service/node_bulk_dispatch_test.go @@ -0,0 +1,168 @@ +package service + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + + "github.com/google/uuid" + "github.com/mhsanaei/3x-ui/v3/internal/database" + "github.com/mhsanaei/3x-ui/v3/internal/database/model" + "github.com/mhsanaei/3x-ui/v3/internal/web/runtime" +) + +// fakeNodeRuntime is a runtime.Runtime stub that counts the per-client dispatch +// calls so a test can assert a bulk op does NOT stream one RPC per client. +type fakeNodeRuntime struct { + addClient atomic.Int32 + deleteUser atomic.Int32 + updateUser atomic.Int32 +} + +func (f *fakeNodeRuntime) Name() string { return "fake-node" } + +func (f *fakeNodeRuntime) AddInbound(context.Context, *model.Inbound) error { return nil } +func (f *fakeNodeRuntime) DelInbound(context.Context, *model.Inbound) error { return nil } +func (f *fakeNodeRuntime) UpdateInbound(context.Context, *model.Inbound, *model.Inbound) error { + return nil +} +func (f *fakeNodeRuntime) AddUser(context.Context, *model.Inbound, map[string]any) error { return nil } +func (f *fakeNodeRuntime) RemoveUser(context.Context, *model.Inbound, string) error { return nil } +func (f *fakeNodeRuntime) UpdateUser(context.Context, *model.Inbound, string, model.Client) error { + f.updateUser.Add(1) + return nil +} +func (f *fakeNodeRuntime) DeleteUser(context.Context, *model.Inbound, string) error { + f.deleteUser.Add(1) + return nil +} +func (f *fakeNodeRuntime) AddClient(context.Context, *model.Inbound, model.Client) error { + f.addClient.Add(1) + return nil +} +func (f *fakeNodeRuntime) RestartXray(context.Context) error { return nil } +func (f *fakeNodeRuntime) ResetClientTraffic(context.Context, *model.Inbound, string) error { + return nil +} +func (f *fakeNodeRuntime) ResetInboundTraffic(context.Context, *model.Inbound) error { return nil } +func (f *fakeNodeRuntime) ResetAllTraffics(context.Context) error { return nil } + +// setupNodeRuntime wires an online node + a fake runtime override and returns the +// node id and the fake so a test can drive the service node-dispatch path without +// a network node. +func setupNodeRuntime(t *testing.T) (int, *fakeNodeRuntime) { + t.Helper() + prev := runtime.GetManager() + mgr := runtime.NewManager(runtime.LocalDeps{APIPort: func() int { return 0 }, SetNeedRestart: func() {}}) + runtime.SetManager(mgr) + t.Cleanup(func() { runtime.SetManager(prev) }) + + node := &model.Node{Name: "n1", Address: "127.0.0.1", Port: 2096, ApiToken: "tok", Enable: true, Status: "online"} + if err := database.GetDB().Create(node).Error; err != nil { + t.Fatalf("create node: %v", err) + } + fake := &fakeNodeRuntime{} + mgr.SetRuntimeOverride(node.Id, fake) + return node.Id, fake +} + +func nodeInbound(t *testing.T, nodeID, port int, clients []model.Client) *model.Inbound { + t.Helper() + if clients == nil { + clients = []model.Client{} + } + ib := &model.Inbound{ + UserId: 1, NodeID: &nodeID, Tag: fmt.Sprintf("in-%d", port), Enable: true, + Port: port, Protocol: model.VLESS, Settings: clientsSettings(t, clients), + } + if err := database.GetDB().Create(ib).Error; err != nil { + t.Fatalf("create node inbound: %v", err) + } + if err := (&ClientService{}).SyncInbound(nil, ib.Id, clients); err != nil { + t.Fatalf("seed SyncInbound: %v", err) + } + return ib +} + +func makeNodeClients(n int) []model.Client { + out := make([]model.Client, n) + for i := range n { + out[i] = model.Client{ID: uuid.NewString(), Email: fmt.Sprintf("nu-%05d@x", i), Enable: true} + } + return out +} + +// TestNodeBulk_LargeAddFoldsToDirty: adding more than the threshold of clients to +// an online node inbound must NOT stream one AddClient RPC per client; it marks +// the node dirty so a single reconcile push converges it instead. +func TestNodeBulk_LargeAddFoldsToDirty(t *testing.T) { + setupBulkDB(t) + nodeID, fake := setupNodeRuntime(t) + ib := nodeInbound(t, nodeID, 30001, nil) + + svc := &ClientService{} + inboundSvc := &InboundService{} + + add := makeNodeClients(nodeBulkPushThreshold + 10) + if _, err := svc.AddInboundClient(inboundSvc, &model.Inbound{Id: ib.Id, Protocol: model.VLESS, Settings: clientsSettings(t, add)}); err != nil { + t.Fatalf("AddInboundClient: %v", err) + } + + if got := fake.addClient.Load(); got != 0 { + t.Fatalf("large add streamed %d AddClient RPCs, want 0 (should fold to dirty)", got) + } + if _, _, dirty, _, err := (&NodeService{}).NodeSyncState(nodeID); err != nil { + t.Fatalf("NodeSyncState: %v", err) + } else if !dirty { + t.Fatal("large add must mark the node dirty") + } +} + +// TestNodeBulk_SmallAddPushesLive: a small add stays on the live per-client path. +func TestNodeBulk_SmallAddPushesLive(t *testing.T) { + setupBulkDB(t) + nodeID, fake := setupNodeRuntime(t) + ib := nodeInbound(t, nodeID, 30002, nil) + + svc := &ClientService{} + inboundSvc := &InboundService{} + + const small = 3 + add := makeNodeClients(small) + if _, err := svc.AddInboundClient(inboundSvc, &model.Inbound{Id: ib.Id, Protocol: model.VLESS, Settings: clientsSettings(t, add)}); err != nil { + t.Fatalf("AddInboundClient: %v", err) + } + if got := fake.addClient.Load(); got != int32(small) { + t.Fatalf("small add streamed %d AddClient RPCs, want %d", got, small) + } +} + +// TestNodeBulk_LargeDeleteFoldsToDirty: deleting more than the threshold from an +// online node inbound must fold into a reconcile rather than per-client deletes. +func TestNodeBulk_LargeDeleteFoldsToDirty(t *testing.T) { + setupBulkDB(t) + nodeID, fake := setupNodeRuntime(t) + + seed := makeNodeClients(nodeBulkPushThreshold + 10) + nodeInbound(t, nodeID, 30003, seed) + + svc := &ClientService{} + inboundSvc := &InboundService{} + emails := make([]string, len(seed)) + for i := range seed { + emails[i] = seed[i].Email + } + if _, _, err := svc.BulkDelete(inboundSvc, emails, false); err != nil { + t.Fatalf("BulkDelete: %v", err) + } + + if got := fake.deleteUser.Load(); got != 0 { + t.Fatalf("large delete streamed %d DeleteUser RPCs, want 0 (should fold to dirty)", got) + } + if _, _, dirty, _, err := (&NodeService{}).NodeSyncState(nodeID); err != nil { + t.Fatalf("NodeSyncState: %v", err) + } else if !dirty { + t.Fatal("large delete must mark the node dirty") + } +} diff --git a/internal/web/service/scale_helpers_test.go b/internal/web/service/scale_helpers_test.go new file mode 100644 index 000000000..20b01ac8b --- /dev/null +++ b/internal/web/service/scale_helpers_test.go @@ -0,0 +1,64 @@ +package service + +import ( + "os" + "path/filepath" + "strings" + "testing" + + "github.com/mhsanaei/3x-ui/v3/internal/config" + "github.com/mhsanaei/3x-ui/v3/internal/database" + xuilogger "github.com/mhsanaei/3x-ui/v3/internal/logger" + + "github.com/op/go-logging" + "gorm.io/gorm" +) + +// setupScaleDB initializes the DB for a scale benchmark on either Postgres +// (XUI_DB_TYPE=postgres + XUI_DB_DSN) or SQLite (XUI_SCALE_TEST=1, temp file), +// and registers cleanup. Skips the test when neither backend is configured. +func setupScaleDB(t *testing.T) { + t.Helper() + xuilogger.InitLogger(logging.ERROR) + + if os.Getenv("XUI_DB_TYPE") == "postgres" && strings.TrimSpace(os.Getenv("XUI_DB_DSN")) != "" { + if err := database.InitDB(""); err != nil { + t.Fatalf("InitDB(postgres): %v", err) + } + t.Cleanup(func() { _ = database.CloseDB() }) + return + } + + switch strings.ToLower(strings.TrimSpace(os.Getenv("XUI_SCALE_TEST"))) { + case "1", "true", "yes": + dbPath := filepath.Join(t.TempDir(), "scale.db") + if err := database.InitDB(dbPath); err != nil { + t.Fatalf("InitDB(sqlite): %v", err) + } + t.Cleanup(func() { _ = database.CloseDB() }) + return + } + + t.Skip("set XUI_SCALE_TEST=1 (sqlite) or XUI_DB_TYPE=postgres + XUI_DB_DSN (postgres) to run the scale benchmark") +} + +// resetScaleTables empties the given tables between sub-sizes. Postgres uses a +// single TRUNCATE ... CASCADE; SQLite deletes per table and clears the +// autoincrement counters so ids restart like RESTART IDENTITY. +func resetScaleTables(t *testing.T, db *gorm.DB, tables ...string) { + t.Helper() + if config.GetDBKind() == "postgres" { + stmt := "TRUNCATE TABLE " + strings.Join(tables, ", ") + " RESTART IDENTITY CASCADE" + if err := db.Exec(stmt).Error; err != nil { + t.Fatalf("truncate: %v", err) + } + return + } + for _, tbl := range tables { + if err := db.Exec("DELETE FROM " + tbl).Error; err != nil { + t.Fatalf("delete %s: %v", tbl, err) + } + } + // Best-effort id reset; sqlite_sequence is absent until the first insert. + db.Exec("DELETE FROM sqlite_sequence") +} diff --git a/internal/web/service/sync_scale_postgres_test.go b/internal/web/service/sync_scale_postgres_test.go index 3df6bc28a..894cfc68f 100644 --- a/internal/web/service/sync_scale_postgres_test.go +++ b/internal/web/service/sync_scale_postgres_test.go @@ -3,7 +3,6 @@ package service import ( "errors" "fmt" - "os" "strings" "testing" "time" @@ -82,13 +81,7 @@ func makeScaleClients(n int) []model.Client { } func TestSyncInboundPostgresScale(t *testing.T) { - if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" { - t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark") - } - if err := database.InitDB(""); err != nil { - t.Fatalf("InitDB: %v", err) - } - t.Cleanup(func() { _ = database.CloseDB() }) + setupScaleDB(t) svc := &ClientService{} sizes := []int{5000, 10000, 20000, 50000, 100000, 200000} @@ -96,9 +89,7 @@ func TestSyncInboundPostgresScale(t *testing.T) { for _, n := range sizes { t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) { db := database.GetDB() - if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds RESTART IDENTITY CASCADE").Error; err != nil { - t.Fatalf("truncate: %v", err) - } + resetScaleTables(t, db, "inbounds", "clients", "client_inbounds") clients := makeScaleClients(n) ib := &model.Inbound{ @@ -168,13 +159,7 @@ func maxDur(d, floor time.Duration) time.Duration { } func TestAddDelClientPostgresScale(t *testing.T) { - if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" { - t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark") - } - if err := database.InitDB(""); err != nil { - t.Fatalf("InitDB: %v", err) - } - t.Cleanup(func() { _ = database.CloseDB() }) + setupScaleDB(t) svc := &ClientService{} inboundSvc := &InboundService{} @@ -183,9 +168,7 @@ func TestAddDelClientPostgresScale(t *testing.T) { for _, n := range sizes { t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) { db := database.GetDB() - if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil { - t.Fatalf("truncate: %v", err) - } + resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics") clients := makeScaleClients(n) ib := &model.Inbound{ @@ -233,13 +216,7 @@ func TestAddDelClientPostgresScale(t *testing.T) { } func TestGroupAndListPostgresScale(t *testing.T) { - if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" { - t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark") - } - if err := database.InitDB(""); err != nil { - t.Fatalf("InitDB: %v", err) - } - t.Cleanup(func() { _ = database.CloseDB() }) + setupScaleDB(t) svc := &ClientService{} sizes := []int{5000, 100000} @@ -247,9 +224,7 @@ func TestGroupAndListPostgresScale(t *testing.T) { for _, n := range sizes { t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) { db := database.GetDB() - if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil { - t.Fatalf("truncate: %v", err) - } + resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics") clients := makeScaleClients(n) ib := &model.Inbound{Tag: fmt.Sprintf("grp-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)} if err := db.Create(ib).Error; err != nil { @@ -293,13 +268,7 @@ func TestGroupAndListPostgresScale(t *testing.T) { } func TestDelAllClientsPostgresScale(t *testing.T) { - if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" { - t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark") - } - if err := database.InitDB(""); err != nil { - t.Fatalf("InitDB: %v", err) - } - t.Cleanup(func() { _ = database.CloseDB() }) + setupScaleDB(t) svc := &ClientService{} inboundSvc := &InboundService{} @@ -308,9 +277,7 @@ func TestDelAllClientsPostgresScale(t *testing.T) { for _, n := range sizes { t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) { db := database.GetDB() - if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil { - t.Fatalf("truncate: %v", err) - } + resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics") clients := makeScaleClients(n) ib := &model.Inbound{Tag: fmt.Sprintf("delall-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)} if err := db.Create(ib).Error; err != nil { @@ -343,13 +310,7 @@ func TestDelAllClientsPostgresScale(t *testing.T) { } func TestBulkOpsPostgresScale(t *testing.T) { - if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" { - t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark") - } - if err := database.InitDB(""); err != nil { - t.Fatalf("InitDB: %v", err) - } - t.Cleanup(func() { _ = database.CloseDB() }) + setupScaleDB(t) svc := &ClientService{} inboundSvc := &InboundService{} @@ -359,9 +320,7 @@ func TestBulkOpsPostgresScale(t *testing.T) { for _, n := range sizes { t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) { db := database.GetDB() - if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil { - t.Fatalf("truncate: %v", err) - } + resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics") clients := makeScaleClients(n) exp := time.Now().AddDate(1, 0, 0).UnixMilli() diff --git a/internal/xray/client_traffic.go b/internal/xray/client_traffic.go index 9c75a12d2..147878df7 100644 --- a/internal/xray/client_traffic.go +++ b/internal/xray/client_traffic.go @@ -11,8 +11,8 @@ type ClientTraffic struct { SubId string `json:"subId" form:"subId" gorm:"-" example:"i7tvdpeffi0hvvf1"` Up int64 `json:"up" form:"up" example:"1048576"` Down int64 `json:"down" form:"down" example:"2097152"` - ExpiryTime int64 `json:"expiryTime" form:"expiryTime" example:"1735689600000"` + ExpiryTime int64 `json:"expiryTime" form:"expiryTime" gorm:"index:idx_client_traffics_renew,priority:1" example:"1735689600000"` Total int64 `json:"total" form:"total" example:"10737418240"` - Reset int `json:"reset" form:"reset" gorm:"default:0" example:"0"` + Reset int `json:"reset" form:"reset" gorm:"default:0;index:idx_client_traffics_renew,priority:2" example:"0"` LastOnline int64 `json:"lastOnline" form:"lastOnline" gorm:"default:0" example:"1735680000000"` }