perf(scale): speed up traffic, auto-renew, and node bulk ops at 50k-100k clients

Local hot paths:
- autoRenewClients: replace the O(clients x expired) inner scan with an
  email->traffic map lookup (quadratic at scale).
- node traffic sync: scope the client_traffics email-membership query to the
  snapshot's emails instead of plucking the whole table every poll.
- add a (expiry_time, reset) index for the per-tick auto-renew filter.
- SQLite: add cache_size/mmap_size/temp_store pragmas (env-tunable); keep the
  single-file DELETE journal and synchronous=FULL defaults.
- scale benchmarks now run on SQLite too via XUI_SCALE_TEST=1 (shared
  setupScaleDB/resetScaleTables helpers), not just Postgres.

Node paths:
- bulk add/delete/adjust on a node-attached inbound folded one HTTP RPC per
  client; above nodeBulkPushThreshold (32) mark the node dirty and let one
  ReconcileNode push converge it instead of O(M) sequential round-trips.
  Small ops keep the live per-client path. Also hoist nodePushPlan out of the
  per-email delete loop.
- ReconcileNode skips inbounds whose wire payload is unchanged (per-tag
  fingerprint on Remote), guarded by node-side tag presence so a restarted
  node is still re-seeded.

Tests: auto-renew multi-inbound correctness, node-path dispatch (large ops
fold to dirty, small ops push live) via a manager runtime override seam, and
reconcile delta-skip.
This commit is contained in:
MHSanaei
2026-06-20 10:35:46 +02:00
parent e079490144
commit 6a032bcb2a
15 changed files with 623 additions and 137 deletions
+34 -8
View File
@@ -6,6 +6,7 @@ import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"math"
@@ -886,7 +887,10 @@ func InitDB(dbPath string) error {
if err = os.MkdirAll(dir, 0755); err != nil {
return err
}
dsn := dbPath + "?_journal_mode=DELETE&_busy_timeout=10000&_synchronous=FULL&_txlock=immediate"
// Keep journal_mode=DELETE so the DB stays a single file (no -wal/-shm
// sidecars). synchronous defaults to FULL for durability but is tunable.
sync := sqliteSynchronous()
dsn := dbPath + "?_journal_mode=DELETE&_busy_timeout=10000&_synchronous=" + sync + "&_txlock=immediate"
db, err = gorm.Open(sqlite.Open(dsn), c)
if err != nil {
return err
@@ -895,14 +899,21 @@ func InitDB(dbPath string) error {
if err != nil {
return err
}
if _, err := sqlDB.Exec("PRAGMA journal_mode=DELETE"); err != nil {
return err
// Re-assert the DSN pragmas plus scan-friendly ones for large datasets.
// cache_size/mmap_size/temp_store create no extra files, so the single-file
// guarantee holds; they just cut disk I/O on the 50k-row hot paths.
pragmas := []string{
"PRAGMA journal_mode=DELETE",
"PRAGMA busy_timeout=10000",
"PRAGMA synchronous=" + sync,
fmt.Sprintf("PRAGMA cache_size=-%d", envInt("XUI_DB_CACHE_MB", 32)*1024),
fmt.Sprintf("PRAGMA mmap_size=%d", int64(envInt("XUI_DB_MMAP_MB", 256))*1024*1024),
"PRAGMA temp_store=MEMORY",
}
if _, err := sqlDB.Exec("PRAGMA busy_timeout=10000"); err != nil {
return err
}
if _, err := sqlDB.Exec("PRAGMA synchronous=FULL"); err != nil {
return err
for _, p := range pragmas {
if _, err := sqlDB.Exec(p); err != nil {
return err
}
}
}
@@ -939,6 +950,21 @@ func InitDB(dbPath string) error {
return runSeeders(isUsersEmpty)
}
// sqliteSynchronous returns the SQLite synchronous mode, defaulting to FULL.
// Whitelisted because the value is interpolated directly into a PRAGMA string.
func sqliteSynchronous() string {
switch strings.ToUpper(strings.TrimSpace(os.Getenv("XUI_DB_SYNCHRONOUS"))) {
case "OFF":
return "OFF"
case "NORMAL":
return "NORMAL"
case "EXTRA":
return "EXTRA"
default:
return "FULL"
}
}
func envInt(key string, def int) int {
v := strings.TrimSpace(os.Getenv(key))
if v == "" {
+1
View File
@@ -31,6 +31,7 @@ func TestAutoMigrateCreatesHotPathIndexes(t *testing.T) {
}{
{&model.ClientRecord{}, "idx_client_record_group"},
{&xray.ClientTraffic{}, "idx_client_traffics_inbound"},
{&xray.ClientTraffic{}, "idx_client_traffics_renew"},
}
for _, c := range cases {
if !db.Migrator().HasIndex(c.model, c.index) {
+21
View File
@@ -17,6 +17,7 @@ type Manager struct {
mu sync.RWMutex
remotes map[int]*Remote
overrides map[int]Runtime // test-only: forces RuntimeFor to return a stub
egressResolver NodeEgressResolver
}
@@ -27,6 +28,22 @@ func NewManager(localDeps LocalDeps) *Manager {
}
}
// SetRuntimeOverride makes RuntimeFor(nodeID) return rt instead of building a
// real Remote. Test seam for exercising node-dispatch paths without a network
// node; pass nil rt to clear.
func (m *Manager) SetRuntimeOverride(nodeID int, rt Runtime) {
m.mu.Lock()
defer m.mu.Unlock()
if rt == nil {
delete(m.overrides, nodeID)
return
}
if m.overrides == nil {
m.overrides = make(map[int]Runtime)
}
m.overrides[nodeID] = rt
}
func (m *Manager) SetNodeEgressResolver(r NodeEgressResolver) {
m.mu.Lock()
defer m.mu.Unlock()
@@ -47,6 +64,10 @@ func (m *Manager) RuntimeFor(nodeID *int) (Runtime, error) {
return m.local, nil
}
m.mu.RLock()
if rt, ok := m.overrides[*nodeID]; ok {
m.mu.RUnlock()
return rt, nil
}
if rt, ok := m.remotes[*nodeID]; ok {
m.mu.RUnlock()
return rt, nil
@@ -0,0 +1,65 @@
package runtime
import (
"context"
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
)
// TestReconcileInbound_SkipsUnchanged proves the delta-skip: a second reconcile
// of an unchanged inbound that the node still reports sends no push, while a
// content change or an absent-on-node inbound forces a fresh push.
func TestReconcileInbound_SkipsUnchanged(t *testing.T) {
var pushes atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/panel/api/inbounds/update/") {
pushes.Add(1)
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"success":true}`))
}))
defer srv.Close()
r := NewRemote(nodeForPlainServer(t, srv, "verify", "tok"), nil)
ib := &model.Inbound{Tag: "in-1", Protocol: model.VLESS, Port: 443, Settings: `{"clients":[]}`}
// Pre-seed the tag→id cache so resolveRemoteID needs no network round-trip.
r.cacheSet(ib.Tag, 7)
// First reconcile: node doesn't report it yet → must push and record the fp.
if pushed, err := r.ReconcileInbound(context.Background(), ib, false); err != nil || !pushed {
t.Fatalf("first reconcile: pushed=%v err=%v, want push", pushed, err)
}
if got := pushes.Load(); got != 1 {
t.Fatalf("after first reconcile pushes=%d, want 1", got)
}
// Second reconcile: unchanged and present on node → skip.
if pushed, err := r.ReconcileInbound(context.Background(), ib, true); err != nil || pushed {
t.Fatalf("second reconcile: pushed=%v err=%v, want skip", pushed, err)
}
if got := pushes.Load(); got != 1 {
t.Fatalf("unchanged reconcile pushed again: pushes=%d, want 1", got)
}
// Content change → push again even though it's present on node.
ib.Settings = `{"clients":[{"email":"a@x"}]}`
if pushed, err := r.ReconcileInbound(context.Background(), ib, true); err != nil || !pushed {
t.Fatalf("changed reconcile: pushed=%v err=%v, want push", pushed, err)
}
if got := pushes.Load(); got != 2 {
t.Fatalf("changed reconcile pushes=%d, want 2", got)
}
// Absent on node (e.g. node restarted/lost it) → re-push even if fp matches.
if pushed, err := r.ReconcileInbound(context.Background(), ib, false); err != nil || !pushed {
t.Fatalf("absent-on-node reconcile: pushed=%v err=%v, want push", pushed, err)
}
if got := pushes.Load(); got != 3 {
t.Fatalf("absent-on-node reconcile pushes=%d, want 3", got)
}
}
+37
View File
@@ -3,6 +3,8 @@ package runtime
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
@@ -71,6 +73,10 @@ type Remote struct {
mu sync.RWMutex
remoteIDByTag map[string]int
// pushedFP holds the fingerprint of the last inbound wire payload successfully
// pushed, keyed by panel-side tag, so reconcile can skip re-sending an
// unchanged inbound. Guarded by mu; dropped with the Remote on node config change.
pushedFP map[string]string
// supportsZstd is learned from the node's X-3x-Node-Caps response header; once
// seen, config pushes to this node are zstd-compressed. Old nodes never set
// it, so they keep receiving plain bodies (mixed-version safe).
@@ -96,6 +102,7 @@ func NewRemote(n *model.Node, r NodeEgressResolver) *Remote {
return &Remote{
node: n,
remoteIDByTag: make(map[string]int),
pushedFP: make(map[string]string),
egressResolver: r,
}
}
@@ -432,6 +439,36 @@ func (r *Remote) UpdateInbound(ctx context.Context, oldIb, newIb *model.Inbound)
return nil
}
// ReconcileInbound pushes ib only when its wire payload differs from the last
// successful push, or when the node no longer reports the tag (existsOnNode
// false) — a node that dropped/restarted must still be re-seeded. Returns
// whether a push actually happened. This turns a full-fleet reconcile from "send
// every inbound's full settings" into "send only what changed".
func (r *Remote) ReconcileInbound(ctx context.Context, ib *model.Inbound, existsOnNode bool) (bool, error) {
fp := wireFingerprint(wireInbound(ib, r.node.Id))
if existsOnNode {
r.mu.RLock()
prev, ok := r.pushedFP[ib.Tag]
r.mu.RUnlock()
if ok && prev == fp {
return false, nil
}
}
if err := r.UpdateInbound(ctx, ib, ib); err != nil {
return false, err
}
r.mu.Lock()
r.pushedFP[ib.Tag] = fp
r.mu.Unlock()
return true, nil
}
// wireFingerprint hashes a wire payload so an unchanged inbound is cheap to detect.
func wireFingerprint(v url.Values) string {
sum := sha256.Sum256([]byte(v.Encode()))
return hex.EncodeToString(sum[:])
}
func (r *Remote) AddUser(ctx context.Context, ib *model.Inbound, _ map[string]any) error {
return r.UpdateInbound(ctx, ib, ib)
}
@@ -2,17 +2,12 @@ package service
import (
"fmt"
"os"
"strings"
"testing"
"time"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
xuilogger "github.com/mhsanaei/3x-ui/v3/internal/logger"
"github.com/mhsanaei/3x-ui/v3/internal/xray"
"github.com/op/go-logging"
)
func seedClientTraffics(t *testing.T, inboundId int, clients []model.Client) {
@@ -37,14 +32,7 @@ func seedClientTraffics(t *testing.T, inboundId int, clients []model.Client) {
// reachable from the REST API at 100k/200k clients, asserting none crash on the
// PostgreSQL bind-parameter ceiling and logging the wall-clock cost of each.
func TestAllAPIsPostgresScale(t *testing.T) {
if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" {
t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark")
}
xuilogger.InitLogger(logging.ERROR)
if err := database.InitDB(""); err != nil {
t.Fatalf("InitDB: %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
setupScaleDB(t)
svc := &ClientService{}
inboundSvc := &InboundService{}
@@ -56,9 +44,7 @@ func TestAllAPIsPostgresScale(t *testing.T) {
for _, n := range sizes {
t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
db := database.GetDB()
if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics, client_groups RESTART IDENTITY CASCADE").Error; err != nil {
t.Fatalf("truncate: %v", err)
}
resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics", "client_groups")
clients := makeScaleClients(n)
exp := time.Now().AddDate(1, 0, 0).UnixMilli()
@@ -150,14 +136,7 @@ func TestAllAPIsPostgresScale(t *testing.T) {
// old path (GetClientByEmail, which parses the inbound's entire settings JSON to
// find one client) vs new path (UUID/subId read from the indexed clients table).
func TestGetClientTrafficByEmailABScale(t *testing.T) {
if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" {
t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark")
}
xuilogger.InitLogger(logging.ERROR)
if err := database.InitDB(""); err != nil {
t.Fatalf("InitDB: %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
setupScaleDB(t)
svc := &ClientService{}
inboundSvc := &InboundService{}
@@ -179,9 +158,7 @@ func TestGetClientTrafficByEmailABScale(t *testing.T) {
for _, n := range sizes {
t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
db := database.GetDB()
if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil {
t.Fatalf("truncate: %v", err)
}
resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics")
clients := makeScaleClients(n)
ib := &model.Inbound{UserId: 1, Tag: fmt.Sprintf("ctbe-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)}
if err := db.Create(ib).Error; err != nil {
+10
View File
@@ -525,6 +525,11 @@ func (s *ClientService) bulkAdjustInboundClients(
if dirty {
markDirty = true
}
// Large batches collapse into one reconcile push rather than M updates.
if push && len(foundEmails) > nodeBulkPushThreshold {
markDirty = true
push = false
}
if push {
for email := range foundEmails {
entry := plan[email]
@@ -911,6 +916,11 @@ func (s *ClientService) bulkDelInboundClients(
if dirty {
markDirty = true
}
// Large batches collapse into one reconcile push rather than M deletes.
if push && len(foundEmails) > nodeBulkPushThreshold {
markDirty = true
push = false
}
if push {
for email := range foundEmails {
if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
+29 -12
View File
@@ -163,6 +163,25 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId
return needRestart, txErr
}
// Resolve the node push plan once for the whole batch instead of per email.
var nodeRt runtime.Runtime
nodePush := false
if oldInbound.NodeID != nil {
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
return needRestart, perr
}
if dirty {
markDirty = true
}
nodeRt, nodePush = rt, push
// Large batches collapse into one reconcile push rather than M deletes.
if nodePush && len(targets) > nodeBulkPushThreshold {
markDirty = true
nodePush = false
}
}
// Apply runtime deletes after commit — outside the serialized writer so a
// slow node call can't stall traffic accounting.
for _, t := range targets {
@@ -180,20 +199,11 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId
}
}
}
} else {
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
return needRestart, perr
}
if dirty {
} else if nodePush {
if err1 := nodeRt.DeleteUser(context.Background(), oldInbound, t.email); err1 != nil {
logger.Warning("Error in deleting client on", nodeRt.Name(), ":", err1)
markDirty = true
}
if push {
if err1 := rt.DeleteUser(context.Background(), oldInbound, t.email); err1 != nil {
logger.Warning("Error in deleting client on", rt.Name(), ":", err1)
markDirty = true
}
}
}
}
@@ -402,6 +412,13 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model
}
}
} else {
// Large batches would be M sequential per-client RPCs; the inbound's saved
// settings already hold the final set, so mark dirty and let one reconcile
// push converge the node instead.
if push && len(clients) > nodeBulkPushThreshold {
markDirty = true
push = false
}
for _, client := range clients {
if push {
if err1 := rt.AddClient(context.Background(), oldInbound, client); err1 != nil {
@@ -0,0 +1,104 @@
package service
import (
"testing"
"time"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
"github.com/mhsanaei/3x-ui/v3/internal/xray"
)
// TestAutoRenewClients_MultiInbound covers the renew loop across more than one
// inbound: every expired client with reset>0 must get a fresh future expiry,
// zeroed usage and re-enabled state, while a non-expiring client is untouched.
// It also guards the map-lookup refactor of the old quadratic inner loop.
func TestAutoRenewClients_MultiInbound(t *testing.T) {
setupBulkDB(t)
svc := &InboundService{}
db := database.GetDB()
past := time.Now().Add(-48 * time.Hour).UnixMilli()
future := time.Now().Add(365 * 24 * time.Hour).UnixMilli()
// Two inbounds, two expiring clients each, plus one client that never expires.
ib1Clients := []model.Client{
{Email: "a@x", ID: "11111111-1111-1111-1111-111111111111", Enable: false, Reset: 30, ExpiryTime: past},
{Email: "b@x", ID: "22222222-2222-2222-2222-222222222222", Enable: false, Reset: 30, ExpiryTime: past},
}
ib2Clients := []model.Client{
{Email: "c@x", ID: "33333333-3333-3333-3333-333333333333", Enable: false, Reset: 7, ExpiryTime: past},
{Email: "keep@x", ID: "44444444-4444-4444-4444-444444444444", Enable: true, Reset: 0, ExpiryTime: future},
}
ib1 := mkInbound(t, 30001, model.VLESS, clientsSettings(t, ib1Clients))
ib2 := mkInbound(t, 30002, model.VLESS, clientsSettings(t, ib2Clients))
if err := svc.clientService.SyncInbound(nil, ib1.Id, ib1Clients); err != nil {
t.Fatalf("SyncInbound ib1: %v", err)
}
if err := svc.clientService.SyncInbound(nil, ib2.Id, ib2Clients); err != nil {
t.Fatalf("SyncInbound ib2: %v", err)
}
// Seed traffic rows: expired+depleted for the three renewable clients, and a
// healthy row for keep@x.
rows := []xray.ClientTraffic{
{InboundId: ib1.Id, Email: "a@x", Enable: false, Up: 100, Down: 200, Reset: 30, ExpiryTime: past},
{InboundId: ib1.Id, Email: "b@x", Enable: false, Up: 300, Down: 400, Reset: 30, ExpiryTime: past},
{InboundId: ib2.Id, Email: "c@x", Enable: false, Up: 500, Down: 600, Reset: 7, ExpiryTime: past},
{InboundId: ib2.Id, Email: "keep@x", Enable: true, Up: 1, Down: 2, Reset: 0, ExpiryTime: future},
}
if err := db.Create(&rows).Error; err != nil {
t.Fatalf("seed client_traffics: %v", err)
}
if _, count, err := svc.autoRenewClients(db); err != nil {
t.Fatalf("autoRenewClients: %v", err)
} else if count != 3 {
t.Fatalf("renewed count = %d, want 3", count)
}
now := time.Now().UnixMilli()
for _, email := range []string{"a@x", "b@x", "c@x"} {
var row xray.ClientTraffic
if err := db.Where("email = ?", email).First(&row).Error; err != nil {
t.Fatalf("read %s: %v", email, err)
}
if row.Up != 0 || row.Down != 0 {
t.Errorf("%s: usage not reset: up=%d down=%d", email, row.Up, row.Down)
}
if !row.Enable {
t.Errorf("%s: not re-enabled", email)
}
if row.ExpiryTime <= now {
t.Errorf("%s: expiry not advanced: got %d, now %d", email, row.ExpiryTime, now)
}
}
// The non-expiring client must be left exactly as seeded.
var keep xray.ClientTraffic
if err := db.Where("email = ?", "keep@x").First(&keep).Error; err != nil {
t.Fatalf("read keep@x: %v", err)
}
if keep.Up != 1 || keep.Down != 2 || keep.ExpiryTime != future {
t.Errorf("keep@x was modified: %+v", keep)
}
// The renewed state must also be reflected in the inbound settings JSON.
reloaded, err := svc.GetInbound(ib1.Id)
if err != nil {
t.Fatalf("GetInbound ib1: %v", err)
}
cs, err := svc.GetClients(reloaded)
if err != nil {
t.Fatalf("GetClients ib1: %v", err)
}
for _, c := range cs {
if !c.Enable {
t.Errorf("settings client %s still disabled after renew", c.Email)
}
if c.ExpiryTime <= now {
t.Errorf("settings client %s expiry not advanced: %d", c.Email, c.ExpiryTime)
}
}
}
+40 -10
View File
@@ -21,6 +21,12 @@ import (
var reportedRemoteTagConflict sync.Map
// nodeBulkPushThreshold caps how many per-client RPCs a single operation will
// stream to a remote node. Above it, the panel marks the node dirty instead and
// lets one ReconcileNode push converge the whole inbound — far cheaper than M
// sequential round-trips. Small ops stay on the live per-client path.
const nodeBulkPushThreshold = 32
func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) {
mgr := runtime.GetManager()
if mgr == nil {
@@ -90,18 +96,31 @@ func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote,
if err != nil {
return err
}
remoteTagSet := make(map[string]struct{}, len(remoteTags))
for _, tag := range remoteTags {
remoteTagSet[tag] = struct{}{}
}
prefix := nodeTagPrefix(&nodeID)
desiredTags := make(map[string]struct{}, len(inbounds)*2)
for _, ib := range inbounds {
desiredTags[ib.Tag] = struct{}{}
// existsOnNode: does the node already report this inbound under any of the
// tag forms it may be stored as? If so, an unchanged push can be skipped.
_, existsOnNode := remoteTagSet[ib.Tag]
if prefix != "" {
if stripped, found := strings.CutPrefix(ib.Tag, prefix); found {
desiredTags[stripped] = struct{}{}
if _, ok := remoteTagSet[stripped]; ok {
existsOnNode = true
}
} else {
desiredTags[prefix+ib.Tag] = struct{}{}
if _, ok := remoteTagSet[prefix+ib.Tag]; ok {
existsOnNode = true
}
}
}
if err := rt.UpdateInbound(ctx, ib, ib); err != nil {
if _, err := rt.ReconcileInbound(ctx, ib, existsOnNode); err != nil {
return fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err)
}
}
@@ -260,15 +279,6 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
nodeBaselines[baselineRows[i].Email] = nodeTrafficCounter{Up: baselineRows[i].Up, Down: baselineRows[i].Down}
}
var existingEmailsList []string
if err := db.Model(xray.ClientTraffic{}).Pluck("email", &existingEmailsList).Error; err != nil {
return false, err
}
existingEmails := make(map[string]struct{}, len(existingEmailsList))
for _, e := range existingEmailsList {
existingEmails[e] = struct{}{}
}
var defaultUserId int
if len(central) > 0 {
defaultUserId = central[0].UserId
@@ -312,6 +322,26 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
}
}
// Membership set for the rowExists checks below. Only the snapshot's emails
// are ever probed, so scope the lookup to those instead of plucking the whole
// client_traffics table (50k+ rows) on every node poll.
existingEmails := make(map[string]struct{}, len(snapEmailsAll))
if len(snapEmailsAll) > 0 {
snapEmailList := make([]string, 0, len(snapEmailsAll))
for email := range snapEmailsAll {
snapEmailList = append(snapEmailList, email)
}
for _, batch := range chunkStrings(snapEmailList, sqliteMaxVars) {
var found []string
if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Pluck("email", &found).Error; err != nil {
return false, err
}
for _, e := range found {
existingEmails[e] = struct{}{}
}
}
}
tx := db.Begin()
committed := false
defer func() {
+34 -27
View File
@@ -348,6 +348,13 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
}
inbounds = append(inbounds, page...)
}
// Index the expired traffics by email so each client is an O(1) lookup
// instead of a linear scan of every expired row (O(clients × expired) per
// inbound, quadratic at scale). Pointers keep the in-place mutation below.
trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
for i := range traffics {
trafficByEmail[traffics[i].Email] = traffics[i]
}
for inbound_index := range inbounds {
settings := map[string]any{}
json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
@@ -357,34 +364,34 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
}
for client_index := range clients {
c := clients[client_index].(map[string]any)
for traffic_index, traffic := range traffics {
if traffic.Email == c["email"].(string) {
newExpiryTime := traffic.ExpiryTime
for newExpiryTime < now {
newExpiryTime += (int64(traffic.Reset) * 86400000)
}
c["expiryTime"] = newExpiryTime
traffics[traffic_index].ExpiryTime = newExpiryTime
traffics[traffic_index].Down = 0
traffics[traffic_index].Up = 0
if !traffic.Enable {
traffics[traffic_index].Enable = true
c["enable"] = true
clientsToAdd = append(clientsToAdd,
struct {
protocol string
tag string
client map[string]any
}{
protocol: string(inbounds[inbound_index].Protocol),
tag: inbounds[inbound_index].Tag,
client: c,
})
}
clients[client_index] = any(c)
break
}
email, _ := c["email"].(string)
traffic, ok := trafficByEmail[email]
if !ok {
continue
}
newExpiryTime := traffic.ExpiryTime
for newExpiryTime < now {
newExpiryTime += (int64(traffic.Reset) * 86400000)
}
c["expiryTime"] = newExpiryTime
traffic.ExpiryTime = newExpiryTime
traffic.Down = 0
traffic.Up = 0
if !traffic.Enable {
traffic.Enable = true
c["enable"] = true
clientsToAdd = append(clientsToAdd,
struct {
protocol string
tag string
client map[string]any
}{
protocol: string(inbounds[inbound_index].Protocol),
tag: inbounds[inbound_index].Tag,
client: c,
})
}
clients[client_index] = any(c)
}
settings["clients"] = clients
newSettings, err := json.MarshalIndent(settings, "", " ")
@@ -0,0 +1,168 @@
package service
import (
"context"
"fmt"
"sync/atomic"
"testing"
"github.com/google/uuid"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
"github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
)
// fakeNodeRuntime is a runtime.Runtime stub that counts the per-client dispatch
// calls so a test can assert a bulk op does NOT stream one RPC per client.
type fakeNodeRuntime struct {
addClient atomic.Int32
deleteUser atomic.Int32
updateUser atomic.Int32
}
func (f *fakeNodeRuntime) Name() string { return "fake-node" }
func (f *fakeNodeRuntime) AddInbound(context.Context, *model.Inbound) error { return nil }
func (f *fakeNodeRuntime) DelInbound(context.Context, *model.Inbound) error { return nil }
func (f *fakeNodeRuntime) UpdateInbound(context.Context, *model.Inbound, *model.Inbound) error {
return nil
}
func (f *fakeNodeRuntime) AddUser(context.Context, *model.Inbound, map[string]any) error { return nil }
func (f *fakeNodeRuntime) RemoveUser(context.Context, *model.Inbound, string) error { return nil }
func (f *fakeNodeRuntime) UpdateUser(context.Context, *model.Inbound, string, model.Client) error {
f.updateUser.Add(1)
return nil
}
func (f *fakeNodeRuntime) DeleteUser(context.Context, *model.Inbound, string) error {
f.deleteUser.Add(1)
return nil
}
func (f *fakeNodeRuntime) AddClient(context.Context, *model.Inbound, model.Client) error {
f.addClient.Add(1)
return nil
}
func (f *fakeNodeRuntime) RestartXray(context.Context) error { return nil }
func (f *fakeNodeRuntime) ResetClientTraffic(context.Context, *model.Inbound, string) error {
return nil
}
func (f *fakeNodeRuntime) ResetInboundTraffic(context.Context, *model.Inbound) error { return nil }
func (f *fakeNodeRuntime) ResetAllTraffics(context.Context) error { return nil }
// setupNodeRuntime wires an online node + a fake runtime override and returns the
// node id and the fake so a test can drive the service node-dispatch path without
// a network node.
func setupNodeRuntime(t *testing.T) (int, *fakeNodeRuntime) {
t.Helper()
prev := runtime.GetManager()
mgr := runtime.NewManager(runtime.LocalDeps{APIPort: func() int { return 0 }, SetNeedRestart: func() {}})
runtime.SetManager(mgr)
t.Cleanup(func() { runtime.SetManager(prev) })
node := &model.Node{Name: "n1", Address: "127.0.0.1", Port: 2096, ApiToken: "tok", Enable: true, Status: "online"}
if err := database.GetDB().Create(node).Error; err != nil {
t.Fatalf("create node: %v", err)
}
fake := &fakeNodeRuntime{}
mgr.SetRuntimeOverride(node.Id, fake)
return node.Id, fake
}
func nodeInbound(t *testing.T, nodeID, port int, clients []model.Client) *model.Inbound {
t.Helper()
if clients == nil {
clients = []model.Client{}
}
ib := &model.Inbound{
UserId: 1, NodeID: &nodeID, Tag: fmt.Sprintf("in-%d", port), Enable: true,
Port: port, Protocol: model.VLESS, Settings: clientsSettings(t, clients),
}
if err := database.GetDB().Create(ib).Error; err != nil {
t.Fatalf("create node inbound: %v", err)
}
if err := (&ClientService{}).SyncInbound(nil, ib.Id, clients); err != nil {
t.Fatalf("seed SyncInbound: %v", err)
}
return ib
}
func makeNodeClients(n int) []model.Client {
out := make([]model.Client, n)
for i := range n {
out[i] = model.Client{ID: uuid.NewString(), Email: fmt.Sprintf("nu-%05d@x", i), Enable: true}
}
return out
}
// TestNodeBulk_LargeAddFoldsToDirty: adding more than the threshold of clients to
// an online node inbound must NOT stream one AddClient RPC per client; it marks
// the node dirty so a single reconcile push converges it instead.
func TestNodeBulk_LargeAddFoldsToDirty(t *testing.T) {
setupBulkDB(t)
nodeID, fake := setupNodeRuntime(t)
ib := nodeInbound(t, nodeID, 30001, nil)
svc := &ClientService{}
inboundSvc := &InboundService{}
add := makeNodeClients(nodeBulkPushThreshold + 10)
if _, err := svc.AddInboundClient(inboundSvc, &model.Inbound{Id: ib.Id, Protocol: model.VLESS, Settings: clientsSettings(t, add)}); err != nil {
t.Fatalf("AddInboundClient: %v", err)
}
if got := fake.addClient.Load(); got != 0 {
t.Fatalf("large add streamed %d AddClient RPCs, want 0 (should fold to dirty)", got)
}
if _, _, dirty, _, err := (&NodeService{}).NodeSyncState(nodeID); err != nil {
t.Fatalf("NodeSyncState: %v", err)
} else if !dirty {
t.Fatal("large add must mark the node dirty")
}
}
// TestNodeBulk_SmallAddPushesLive: a small add stays on the live per-client path.
func TestNodeBulk_SmallAddPushesLive(t *testing.T) {
setupBulkDB(t)
nodeID, fake := setupNodeRuntime(t)
ib := nodeInbound(t, nodeID, 30002, nil)
svc := &ClientService{}
inboundSvc := &InboundService{}
const small = 3
add := makeNodeClients(small)
if _, err := svc.AddInboundClient(inboundSvc, &model.Inbound{Id: ib.Id, Protocol: model.VLESS, Settings: clientsSettings(t, add)}); err != nil {
t.Fatalf("AddInboundClient: %v", err)
}
if got := fake.addClient.Load(); got != int32(small) {
t.Fatalf("small add streamed %d AddClient RPCs, want %d", got, small)
}
}
// TestNodeBulk_LargeDeleteFoldsToDirty: deleting more than the threshold from an
// online node inbound must fold into a reconcile rather than per-client deletes.
func TestNodeBulk_LargeDeleteFoldsToDirty(t *testing.T) {
setupBulkDB(t)
nodeID, fake := setupNodeRuntime(t)
seed := makeNodeClients(nodeBulkPushThreshold + 10)
nodeInbound(t, nodeID, 30003, seed)
svc := &ClientService{}
inboundSvc := &InboundService{}
emails := make([]string, len(seed))
for i := range seed {
emails[i] = seed[i].Email
}
if _, _, err := svc.BulkDelete(inboundSvc, emails, false); err != nil {
t.Fatalf("BulkDelete: %v", err)
}
if got := fake.deleteUser.Load(); got != 0 {
t.Fatalf("large delete streamed %d DeleteUser RPCs, want 0 (should fold to dirty)", got)
}
if _, _, dirty, _, err := (&NodeService{}).NodeSyncState(nodeID); err != nil {
t.Fatalf("NodeSyncState: %v", err)
} else if !dirty {
t.Fatal("large delete must mark the node dirty")
}
}
@@ -0,0 +1,64 @@
package service
import (
"os"
"path/filepath"
"strings"
"testing"
"github.com/mhsanaei/3x-ui/v3/internal/config"
"github.com/mhsanaei/3x-ui/v3/internal/database"
xuilogger "github.com/mhsanaei/3x-ui/v3/internal/logger"
"github.com/op/go-logging"
"gorm.io/gorm"
)
// setupScaleDB initializes the DB for a scale benchmark on either Postgres
// (XUI_DB_TYPE=postgres + XUI_DB_DSN) or SQLite (XUI_SCALE_TEST=1, temp file),
// and registers cleanup. Skips the test when neither backend is configured.
func setupScaleDB(t *testing.T) {
t.Helper()
xuilogger.InitLogger(logging.ERROR)
if os.Getenv("XUI_DB_TYPE") == "postgres" && strings.TrimSpace(os.Getenv("XUI_DB_DSN")) != "" {
if err := database.InitDB(""); err != nil {
t.Fatalf("InitDB(postgres): %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
return
}
switch strings.ToLower(strings.TrimSpace(os.Getenv("XUI_SCALE_TEST"))) {
case "1", "true", "yes":
dbPath := filepath.Join(t.TempDir(), "scale.db")
if err := database.InitDB(dbPath); err != nil {
t.Fatalf("InitDB(sqlite): %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
return
}
t.Skip("set XUI_SCALE_TEST=1 (sqlite) or XUI_DB_TYPE=postgres + XUI_DB_DSN (postgres) to run the scale benchmark")
}
// resetScaleTables empties the given tables between sub-sizes. Postgres uses a
// single TRUNCATE ... CASCADE; SQLite deletes per table and clears the
// autoincrement counters so ids restart like RESTART IDENTITY.
func resetScaleTables(t *testing.T, db *gorm.DB, tables ...string) {
t.Helper()
if config.GetDBKind() == "postgres" {
stmt := "TRUNCATE TABLE " + strings.Join(tables, ", ") + " RESTART IDENTITY CASCADE"
if err := db.Exec(stmt).Error; err != nil {
t.Fatalf("truncate: %v", err)
}
return
}
for _, tbl := range tables {
if err := db.Exec("DELETE FROM " + tbl).Error; err != nil {
t.Fatalf("delete %s: %v", tbl, err)
}
}
// Best-effort id reset; sqlite_sequence is absent until the first insert.
db.Exec("DELETE FROM sqlite_sequence")
}
@@ -3,7 +3,6 @@ package service
import (
"errors"
"fmt"
"os"
"strings"
"testing"
"time"
@@ -82,13 +81,7 @@ func makeScaleClients(n int) []model.Client {
}
func TestSyncInboundPostgresScale(t *testing.T) {
if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" {
t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark")
}
if err := database.InitDB(""); err != nil {
t.Fatalf("InitDB: %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
setupScaleDB(t)
svc := &ClientService{}
sizes := []int{5000, 10000, 20000, 50000, 100000, 200000}
@@ -96,9 +89,7 @@ func TestSyncInboundPostgresScale(t *testing.T) {
for _, n := range sizes {
t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
db := database.GetDB()
if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds RESTART IDENTITY CASCADE").Error; err != nil {
t.Fatalf("truncate: %v", err)
}
resetScaleTables(t, db, "inbounds", "clients", "client_inbounds")
clients := makeScaleClients(n)
ib := &model.Inbound{
@@ -168,13 +159,7 @@ func maxDur(d, floor time.Duration) time.Duration {
}
func TestAddDelClientPostgresScale(t *testing.T) {
if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" {
t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark")
}
if err := database.InitDB(""); err != nil {
t.Fatalf("InitDB: %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
setupScaleDB(t)
svc := &ClientService{}
inboundSvc := &InboundService{}
@@ -183,9 +168,7 @@ func TestAddDelClientPostgresScale(t *testing.T) {
for _, n := range sizes {
t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
db := database.GetDB()
if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil {
t.Fatalf("truncate: %v", err)
}
resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics")
clients := makeScaleClients(n)
ib := &model.Inbound{
@@ -233,13 +216,7 @@ func TestAddDelClientPostgresScale(t *testing.T) {
}
func TestGroupAndListPostgresScale(t *testing.T) {
if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" {
t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark")
}
if err := database.InitDB(""); err != nil {
t.Fatalf("InitDB: %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
setupScaleDB(t)
svc := &ClientService{}
sizes := []int{5000, 100000}
@@ -247,9 +224,7 @@ func TestGroupAndListPostgresScale(t *testing.T) {
for _, n := range sizes {
t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
db := database.GetDB()
if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil {
t.Fatalf("truncate: %v", err)
}
resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics")
clients := makeScaleClients(n)
ib := &model.Inbound{Tag: fmt.Sprintf("grp-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)}
if err := db.Create(ib).Error; err != nil {
@@ -293,13 +268,7 @@ func TestGroupAndListPostgresScale(t *testing.T) {
}
func TestDelAllClientsPostgresScale(t *testing.T) {
if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" {
t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark")
}
if err := database.InitDB(""); err != nil {
t.Fatalf("InitDB: %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
setupScaleDB(t)
svc := &ClientService{}
inboundSvc := &InboundService{}
@@ -308,9 +277,7 @@ func TestDelAllClientsPostgresScale(t *testing.T) {
for _, n := range sizes {
t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
db := database.GetDB()
if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil {
t.Fatalf("truncate: %v", err)
}
resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics")
clients := makeScaleClients(n)
ib := &model.Inbound{Tag: fmt.Sprintf("delall-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)}
if err := db.Create(ib).Error; err != nil {
@@ -343,13 +310,7 @@ func TestDelAllClientsPostgresScale(t *testing.T) {
}
func TestBulkOpsPostgresScale(t *testing.T) {
if strings.TrimSpace(os.Getenv("XUI_DB_DSN")) == "" || os.Getenv("XUI_DB_TYPE") != "postgres" {
t.Skip("set XUI_DB_TYPE=postgres and XUI_DB_DSN to run the postgres scale benchmark")
}
if err := database.InitDB(""); err != nil {
t.Fatalf("InitDB: %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
setupScaleDB(t)
svc := &ClientService{}
inboundSvc := &InboundService{}
@@ -359,9 +320,7 @@ func TestBulkOpsPostgresScale(t *testing.T) {
for _, n := range sizes {
t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
db := database.GetDB()
if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil {
t.Fatalf("truncate: %v", err)
}
resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics")
clients := makeScaleClients(n)
exp := time.Now().AddDate(1, 0, 0).UnixMilli()
+2 -2
View File
@@ -11,8 +11,8 @@ type ClientTraffic struct {
SubId string `json:"subId" form:"subId" gorm:"-" example:"i7tvdpeffi0hvvf1"`
Up int64 `json:"up" form:"up" example:"1048576"`
Down int64 `json:"down" form:"down" example:"2097152"`
ExpiryTime int64 `json:"expiryTime" form:"expiryTime" example:"1735689600000"`
ExpiryTime int64 `json:"expiryTime" form:"expiryTime" gorm:"index:idx_client_traffics_renew,priority:1" example:"1735689600000"`
Total int64 `json:"total" form:"total" example:"10737418240"`
Reset int `json:"reset" form:"reset" gorm:"default:0" example:"0"`
Reset int `json:"reset" form:"reset" gorm:"default:0;index:idx_client_traffics_renew,priority:2" example:"0"`
LastOnline int64 `json:"lastOnline" form:"lastOnline" gorm:"default:0" example:"1735680000000"`
}