diff --git a/internal/sub/sub_scale_test.go b/internal/sub/sub_scale_test.go new file mode 100644 index 000000000..c9858779e --- /dev/null +++ b/internal/sub/sub_scale_test.go @@ -0,0 +1,241 @@ +package sub + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/google/uuid" + "github.com/op/go-logging" + "gorm.io/gorm" + + "github.com/mhsanaei/3x-ui/v3/internal/config" + "github.com/mhsanaei/3x-ui/v3/internal/database" + "github.com/mhsanaei/3x-ui/v3/internal/database/model" + xuilogger "github.com/mhsanaei/3x-ui/v3/internal/logger" + "github.com/mhsanaei/3x-ui/v3/internal/xray" +) + +const scaleTargetSubId = "scale-target-sub" + +// setupScaleSubDB mirrors the service package's scale gating: Postgres via +// XUI_DB_TYPE/XUI_DB_DSN, SQLite via XUI_SCALE_TEST=1, skip otherwise. +func setupScaleSubDB(t *testing.T) { + t.Helper() + xuilogger.InitLogger(logging.ERROR) + + if os.Getenv("XUI_DB_TYPE") == "postgres" && strings.TrimSpace(os.Getenv("XUI_DB_DSN")) != "" { + if err := database.InitDB(""); err != nil { + t.Fatalf("InitDB(postgres): %v", err) + } + t.Cleanup(func() { _ = database.CloseDB() }) + return + } + switch strings.ToLower(strings.TrimSpace(os.Getenv("XUI_SCALE_TEST"))) { + case "1", "true", "yes": + if err := database.InitDB(filepath.Join(t.TempDir(), "scale.db")); err != nil { + t.Fatalf("InitDB(sqlite): %v", err) + } + t.Cleanup(func() { _ = database.CloseDB() }) + return + } + t.Skip("set XUI_SCALE_TEST=1 (sqlite) or XUI_DB_TYPE=postgres + XUI_DB_DSN (postgres) to run the scale benchmark") +} + +func scaleSubSizes(t *testing.T, def ...int) []int { + t.Helper() + raw := strings.TrimSpace(os.Getenv("XUI_SCALE_SIZES")) + if raw == "" { + return def + } + var out []int + for _, part := range strings.Split(raw, ",") { + part = strings.TrimSpace(part) + if part == "" { + continue + } + n, err := strconv.Atoi(part) + if err != nil || n <= 0 { + t.Fatalf("XUI_SCALE_SIZES: invalid size %q", part) + } + out = append(out, n) + } + if len(out) == 0 { + return def + } + return out +} + +func resetScaleSubTables(t *testing.T, db *gorm.DB) { + t.Helper() + if config.GetDBKind() == "postgres" { + if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil { + t.Fatalf("truncate: %v", err) + } + } else { + for _, tbl := range []string{"inbounds", "clients", "client_inbounds", "client_traffics"} { + if err := db.Exec("DELETE FROM " + tbl).Error; err != nil { + t.Fatalf("delete %s: %v", tbl, err) + } + } + db.Exec("DELETE FROM sqlite_sequence") + } + if err := db.Where("1 = 1").Delete(&model.ClientExternalLink{}).Error; err != nil { + t.Fatalf("clear client_external_links: %v", err) + } +} + +// seedScaleSubDataset seeds one VLESS inbound holding n clients (the sub +// server's worst case: matchingClients parses the whole settings blob and +// getInboundsBySubId preloads every ClientStats row). Three clients share +// scaleTargetSubId; everyone else gets a unique subId. +func seedScaleSubDataset(t *testing.T, n int) { + t.Helper() + db := database.GetDB() + resetScaleSubTables(t, db) + + clients := make([]model.Client, n) + exp := time.Now().AddDate(1, 0, 0).UnixMilli() + targets := map[int]bool{n / 4: true, n / 2: true, 3 * n / 4: true} + for i := range n { + subId := fmt.Sprintf("sub-%07d", i) + if targets[i] { + subId = scaleTargetSubId + } + clients[i] = model.Client{ + ID: uuid.NewString(), + Email: fmt.Sprintf("user-%07d@subscale", i), + SubID: subId, + Enable: true, + ExpiryTime: exp, + TotalGB: 100 << 30, + } + } + + settingsMap := map[string]any{"clients": clients, "decryption": "none"} + settings, err := json.Marshal(settingsMap) + if err != nil { + t.Fatalf("marshal settings: %v", err) + } + + tx := db.Begin() + if tx.Error != nil { + t.Fatalf("begin seed tx: %v", tx.Error) + } + committed := false + defer func() { + if !committed { + tx.Rollback() + } + }() + + ib := &model.Inbound{ + UserId: 1, + Tag: fmt.Sprintf("subscale-%d", n), + Remark: "subscale", + Enable: true, + Listen: "203.0.113.1", + Port: 443, + Protocol: model.VLESS, + Settings: string(settings), + StreamSettings: `{"network":"tcp","security":"none"}`, + } + if err := tx.Create(ib).Error; err != nil { + t.Fatalf("seed inbound: %v", err) + } + + records := make([]*model.ClientRecord, n) + for i := range clients { + records[i] = clients[i].ToRecord() + } + if err := tx.CreateInBatches(records, 500).Error; err != nil { + t.Fatalf("seed clients: %v", err) + } + links := make([]model.ClientInbound, n) + for i := range records { + links[i] = model.ClientInbound{ClientId: records[i].Id, InboundId: ib.Id} + } + if err := tx.CreateInBatches(links, 1000).Error; err != nil { + t.Fatalf("seed client_inbounds: %v", err) + } + traffics := make([]xray.ClientTraffic, n) + for i := range clients { + traffics[i] = xray.ClientTraffic{ + InboundId: ib.Id, + Email: clients[i].Email, + Enable: true, + Total: clients[i].TotalGB, + ExpiryTime: clients[i].ExpiryTime, + } + } + if err := tx.CreateInBatches(traffics, 1000).Error; err != nil { + t.Fatalf("seed client_traffics: %v", err) + } + + if err := tx.Commit().Error; err != nil { + t.Fatalf("commit seed tx: %v", err) + } + committed = true + db.Exec("ANALYZE") +} + +// TestGetSubsScale measures one subscription fetch (raw and JSON format) for a +// 3-client subId living inside an n-client inbound, plus a subId miss — the +// per-request cost every subscriber pays. +func TestGetSubsScale(t *testing.T) { + for _, n := range scaleSubSizes(t, 10000, 100000) { + t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) { + setupScaleSubDB(t) + seedScaleSubDataset(t, n) + + svc := &SubService{} + const reps = 5 + start := time.Now() + var links []string + for range reps { + var err error + links, _, _, _, err = svc.GetSubs(scaleTargetSubId, "sub.example.com") + if err != nil { + t.Fatalf("GetSubs: %v", err) + } + } + rawDur := time.Since(start) / reps + if len(links) != 3 { + t.Fatalf("GetSubs links = %d, want 3", len(links)) + } + + jsonSvc := NewSubJsonService("", "", "", &SubService{}) + start = time.Now() + for range reps { + body, _, err := jsonSvc.GetJson(scaleTargetSubId, "sub.example.com") + if err != nil { + t.Fatalf("GetJson: %v", err) + } + if body == "" { + t.Fatal("GetJson returned empty body") + } + } + jsonDur := time.Since(start) / reps + + start = time.Now() + for range reps { + missLinks, _, _, _, err := svc.GetSubs("no-such-sub", "sub.example.com") + if err != nil { + t.Fatalf("GetSubs miss: %v", err) + } + if len(missLinks) != 0 { + t.Fatalf("GetSubs miss links = %d, want 0", len(missLinks)) + } + } + missDur := time.Since(start) / reps + + t.Logf("N=%-7d raw=%-10v json=%-10v miss=%v", + n, rawDur.Round(time.Millisecond), jsonDur.Round(time.Millisecond), missDur.Round(time.Millisecond)) + }) + } +} diff --git a/internal/web/job/check_client_ip_scale_test.go b/internal/web/job/check_client_ip_scale_test.go new file mode 100644 index 000000000..3760e121e --- /dev/null +++ b/internal/web/job/check_client_ip_scale_test.go @@ -0,0 +1,229 @@ +package job + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/op/go-logging" + "gorm.io/gorm" + + "github.com/mhsanaei/3x-ui/v3/internal/config" + "github.com/mhsanaei/3x-ui/v3/internal/database" + "github.com/mhsanaei/3x-ui/v3/internal/database/model" + xuilogger "github.com/mhsanaei/3x-ui/v3/internal/logger" +) + +// setupScaleJobDB mirrors the service package's scale gating: Postgres via +// XUI_DB_TYPE/XUI_DB_DSN, SQLite via XUI_SCALE_TEST=1, skip otherwise. +func setupScaleJobDB(t *testing.T) { + t.Helper() + loggerInitOnce.Do(func() { xuilogger.InitLogger(logging.ERROR) }) + t.Setenv("XUI_LOG_FOLDER", t.TempDir()) + + if os.Getenv("XUI_DB_TYPE") == "postgres" && strings.TrimSpace(os.Getenv("XUI_DB_DSN")) != "" { + if err := database.InitDB(""); err != nil { + t.Fatalf("InitDB(postgres): %v", err) + } + t.Cleanup(func() { _ = database.CloseDB() }) + return + } + switch strings.ToLower(strings.TrimSpace(os.Getenv("XUI_SCALE_TEST"))) { + case "1", "true", "yes": + if err := database.InitDB(filepath.Join(t.TempDir(), "scale.db")); err != nil { + t.Fatalf("InitDB(sqlite): %v", err) + } + t.Cleanup(func() { _ = database.CloseDB() }) + return + } + t.Skip("set XUI_SCALE_TEST=1 (sqlite) or XUI_DB_TYPE=postgres + XUI_DB_DSN (postgres) to run the scale benchmark") +} + +func scaleJobSizes(t *testing.T, def ...int) []int { + t.Helper() + raw := strings.TrimSpace(os.Getenv("XUI_SCALE_SIZES")) + if raw == "" { + return def + } + var out []int + for _, part := range strings.Split(raw, ",") { + part = strings.TrimSpace(part) + if part == "" { + continue + } + n, err := strconv.Atoi(part) + if err != nil || n <= 0 { + t.Fatalf("XUI_SCALE_SIZES: invalid size %q", part) + } + out = append(out, n) + } + if len(out) == 0 { + return def + } + return out +} + +func resetScaleJobTables(t *testing.T, db *gorm.DB) { + t.Helper() + if config.GetDBKind() == "postgres" { + if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds RESTART IDENTITY CASCADE").Error; err != nil { + t.Fatalf("truncate: %v", err) + } + } else { + for _, tbl := range []string{"inbounds", "clients", "client_inbounds"} { + if err := db.Exec("DELETE FROM " + tbl).Error; err != nil { + t.Fatalf("delete %s: %v", tbl, err) + } + } + db.Exec("DELETE FROM sqlite_sequence") + } + if err := db.Where("1 = 1").Delete(&model.InboundClientIps{}).Error; err != nil { + t.Fatalf("clear inbound client ips: %v", err) + } + if err := db.Where("1 = 1").Delete(&model.NodeClientIp{}).Error; err != nil { + t.Fatalf("clear node client ips: %v", err) + } +} + +// seedScaleIPDataset seeds n clients across numInbounds inbounds. Every client +// in the LAST inbound carries limitIp=3 (and 0 elsewhere), so hasLimitIp pays +// its full scan cost before finding a hit, and the returned emails all resolve +// to that last inbound for the processObserved measurement. +func seedScaleIPDataset(t *testing.T, n, numInbounds int) []string { + t.Helper() + db := database.GetDB() + resetScaleJobTables(t, db) + + tx := db.Begin() + if tx.Error != nil { + t.Fatalf("begin seed tx: %v", tx.Error) + } + committed := false + defer func() { + if !committed { + tx.Rollback() + } + }() + + var limitedEmails []string + per := n / numInbounds + for i := range numInbounds { + lo, hi := i*per, (i+1)*per + if i == numInbounds-1 { + hi = n + } + limitIp := 0 + if i == numInbounds-1 { + limitIp = 3 + } + clients := make([]model.Client, 0, hi-lo) + records := make([]*model.ClientRecord, 0, hi-lo) + for j := lo; j < hi; j++ { + email := fmt.Sprintf("user-%07d@ipscale", j) + clients = append(clients, model.Client{Email: email, LimitIP: limitIp, Enable: true}) + records = append(records, &model.ClientRecord{Email: email, LimitIP: limitIp, Enable: true}) + if limitIp > 0 { + limitedEmails = append(limitedEmails, email) + } + } + settings, err := json.Marshal(map[string][]model.Client{"clients": clients}) + if err != nil { + t.Fatalf("marshal settings: %v", err) + } + ib := &model.Inbound{ + UserId: 1, + Tag: fmt.Sprintf("ipscale-%d-%d", n, i), + Enable: true, + Port: 42000 + i, + Protocol: model.VLESS, + Settings: string(settings), + } + if err := tx.Create(ib).Error; err != nil { + t.Fatalf("seed inbound %d: %v", i, err) + } + if err := tx.CreateInBatches(records, 500).Error; err != nil { + t.Fatalf("seed clients %d: %v", i, err) + } + links := make([]model.ClientInbound, len(records)) + for j := range records { + links[j] = model.ClientInbound{ClientId: records[j].Id, InboundId: ib.Id} + } + if err := tx.CreateInBatches(links, 1000).Error; err != nil { + t.Fatalf("seed client_inbounds %d: %v", i, err) + } + } + + if err := tx.Commit().Error; err != nil { + t.Fatalf("commit seed tx: %v", err) + } + committed = true + db.Exec("ANALYZE") + return limitedEmails +} + +// TestCheckClientIpScale measures the @every 10s ip-limit job pieces: the +// hasLimitIp gate (settings LIKE scan + full JSON parse of every matching +// inbound) and processObserved with M online users (per-email inbound lookup, +// settings parse and autocommit save). Run twice: first scan half add / half +// update, second scan all update path. +func TestCheckClientIpScale(t *testing.T) { + shapes := []struct { + name string + inbounds int + observed int + }{{"single", 1, 50}, {"spread50", 50, 1000}} + + for _, n := range scaleJobSizes(t, 10000, 100000) { + for _, shape := range shapes { + t.Run(fmt.Sprintf("N=%d_%s", n, shape.name), func(t *testing.T) { + setupScaleJobDB(t) + limited := seedScaleIPDataset(t, n, shape.inbounds) + m := min(shape.observed, len(limited)) + + j := NewCheckClientIpJob() + const reps = 3 + start := time.Now() + for range reps { + if !j.hasLimitIp() { + t.Fatal("hasLimitIp = false, want true") + } + } + t.Logf("N=%-7d shape=%-8s hasLimitIp=%v/call", n, shape.name, (time.Since(start) / reps).Round(time.Millisecond)) + + now := time.Now().Unix() + observed := make(map[string]map[string]int64, m) + for i := range m { + observed[limited[i]] = map[string]int64{ + fmt.Sprintf("10.0.%d.%d", i/250, i%250+1): now, + } + } + for i := range m / 2 { + seedClientIps(t, limited[i], []IPWithTimestamp{{IP: "10.99.0.1", Timestamp: now - 60}}) + } + + start = time.Now() + j.processObserved(observed, true, true) + firstScan := time.Since(start) + start = time.Now() + j.processObserved(observed, true, true) + secondScan := time.Since(start) + t.Logf("N=%-7d shape=%-8s processObserved M=%-5d first=%-10v second=%-10v (%.1fms/email)", + n, shape.name, m, firstScan.Round(time.Millisecond), secondScan.Round(time.Millisecond), + float64(secondScan.Milliseconds())/float64(m)) + + var rows int64 + if err := database.GetDB().Model(&model.InboundClientIps{}).Count(&rows).Error; err != nil { + t.Fatalf("count ip rows: %v", err) + } + if rows != int64(m) { + t.Fatalf("inbound_client_ips rows = %d, want %d", rows, m) + } + }) + } + } +} diff --git a/internal/web/service/scale_helpers_test.go b/internal/web/service/scale_helpers_test.go index 20b01ac8b..894c3a990 100644 --- a/internal/web/service/scale_helpers_test.go +++ b/internal/web/service/scale_helpers_test.go @@ -1,22 +1,28 @@ package service import ( + "fmt" "os" "path/filepath" + "strconv" "strings" "testing" + "time" "github.com/mhsanaei/3x-ui/v3/internal/config" "github.com/mhsanaei/3x-ui/v3/internal/database" + "github.com/mhsanaei/3x-ui/v3/internal/database/model" xuilogger "github.com/mhsanaei/3x-ui/v3/internal/logger" + "github.com/mhsanaei/3x-ui/v3/internal/xray" "github.com/op/go-logging" "gorm.io/gorm" ) // setupScaleDB initializes the DB for a scale benchmark on either Postgres -// (XUI_DB_TYPE=postgres + XUI_DB_DSN) or SQLite (XUI_SCALE_TEST=1, temp file), -// and registers cleanup. Skips the test when neither backend is configured. +// (XUI_DB_TYPE=postgres + XUI_DB_DSN) or SQLite (XUI_SCALE_TEST=1, temp file; +// XUI_SCALE_DB_PATH persists the DB for manual smoke runs), and registers +// cleanup. Skips the test when neither backend is configured. func setupScaleDB(t *testing.T) { t.Helper() xuilogger.InitLogger(logging.ERROR) @@ -31,7 +37,10 @@ func setupScaleDB(t *testing.T) { switch strings.ToLower(strings.TrimSpace(os.Getenv("XUI_SCALE_TEST"))) { case "1", "true", "yes": - dbPath := filepath.Join(t.TempDir(), "scale.db") + dbPath := strings.TrimSpace(os.Getenv("XUI_SCALE_DB_PATH")) + if dbPath == "" { + dbPath = filepath.Join(t.TempDir(), "scale.db") + } if err := database.InitDB(dbPath); err != nil { t.Fatalf("InitDB(sqlite): %v", err) } @@ -42,6 +51,144 @@ func setupScaleDB(t *testing.T) { t.Skip("set XUI_SCALE_TEST=1 (sqlite) or XUI_DB_TYPE=postgres + XUI_DB_DSN (postgres) to run the scale benchmark") } +// scaleSizes returns the default size ladder unless XUI_SCALE_SIZES overrides +// it with a comma-separated list (e.g. "500000" or "10000,100000,500000"). +func scaleSizes(t *testing.T, def ...int) []int { + t.Helper() + raw := strings.TrimSpace(os.Getenv("XUI_SCALE_SIZES")) + if raw == "" { + return def + } + var out []int + for _, part := range strings.Split(raw, ",") { + part = strings.TrimSpace(part) + if part == "" { + continue + } + n, err := strconv.Atoi(part) + if err != nil || n <= 0 { + t.Fatalf("XUI_SCALE_SIZES: invalid size %q", part) + } + out = append(out, n) + } + if len(out) == 0 { + return def + } + return out +} + +type scaleDataset struct { + inboundIds []int + tags []string + emails []string + perInbound [][]model.Client +} + +// seedScaleDataset seeds n healthy clients (future expiry, unfilled quota) +// spread across numInbounds inbounds, writing inbounds, clients, +// client_inbounds and client_traffics directly in one transaction — orders of +// magnitude faster than SyncInbound and one fsync instead of thousands. +func seedScaleDataset(t *testing.T, n, numInbounds int) scaleDataset { + t.Helper() + db := database.GetDB() + resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics") + + clients := makeScaleClients(n) + exp := time.Now().AddDate(1, 0, 0).UnixMilli() + for i := range clients { + clients[i].ExpiryTime = exp + clients[i].TotalGB = 100 << 30 + } + + ds := scaleDataset{emails: emailsOf(clients)} + start := time.Now() + tx := db.Begin() + if tx.Error != nil { + t.Fatalf("begin seed tx: %v", tx.Error) + } + committed := false + defer func() { + if !committed { + tx.Rollback() + } + }() + + per := n / numInbounds + for i := range numInbounds { + lo, hi := i*per, (i+1)*per + if i == numInbounds-1 { + hi = n + } + chunk := clients[lo:hi] + ib := &model.Inbound{ + UserId: 1, + Tag: fmt.Sprintf("scale-%d-%d", n, i), + Enable: true, + Port: 41000 + i, + Protocol: model.VLESS, + Settings: clientsSettings(t, chunk), + } + if err := tx.Create(ib).Error; err != nil { + t.Fatalf("seed inbound %d: %v", i, err) + } + + records := make([]*model.ClientRecord, len(chunk)) + for j := range chunk { + records[j] = chunk[j].ToRecord() + } + if err := tx.CreateInBatches(records, 500).Error; err != nil { + t.Fatalf("seed clients %d: %v", i, err) + } + + links := make([]model.ClientInbound, len(records)) + for j := range records { + links[j] = model.ClientInbound{ClientId: records[j].Id, InboundId: ib.Id} + } + if err := tx.CreateInBatches(links, 1000).Error; err != nil { + t.Fatalf("seed client_inbounds %d: %v", i, err) + } + + traffics := make([]xray.ClientTraffic, len(chunk)) + for j := range chunk { + traffics[j] = xray.ClientTraffic{ + InboundId: ib.Id, + Email: chunk[j].Email, + Enable: true, + Total: chunk[j].TotalGB, + ExpiryTime: chunk[j].ExpiryTime, + } + } + if err := tx.CreateInBatches(traffics, 1000).Error; err != nil { + t.Fatalf("seed client_traffics %d: %v", i, err) + } + + ds.inboundIds = append(ds.inboundIds, ib.Id) + ds.tags = append(ds.tags, ib.Tag) + ds.perInbound = append(ds.perInbound, chunk) + } + + if err := tx.Commit().Error; err != nil { + t.Fatalf("commit seed tx: %v", err) + } + committed = true + db.Exec("ANALYZE") + t.Logf("seeded N=%d across %d inbound(s) in %v", n, numInbounds, time.Since(start).Round(time.Millisecond)) + return ds +} + +// sampleEmails picks k evenly spaced emails so active clients span the id range. +func sampleEmails(emails []string, k int) []string { + if k >= len(emails) { + return emails + } + out := make([]string, 0, k) + step := len(emails) / k + for i := 0; i < len(emails) && len(out) < k; i += step { + out = append(out, emails[i]) + } + return out +} + // resetScaleTables empties the given tables between sub-sizes. Postgres uses a // single TRUNCATE ... CASCADE; SQLite deletes per table and clears the // autoincrement counters so ids restart like RESTART IDENTITY. diff --git a/internal/web/service/traffic_poll_scale_test.go b/internal/web/service/traffic_poll_scale_test.go new file mode 100644 index 000000000..6266edd8a --- /dev/null +++ b/internal/web/service/traffic_poll_scale_test.go @@ -0,0 +1,106 @@ +package service + +import ( + "fmt" + "testing" + "time" + + "github.com/mhsanaei/3x-ui/v3/internal/database" + "github.com/mhsanaei/3x-ui/v3/internal/xray" +) + +func pollReport(ds scaleDataset, k int) ([]*xray.Traffic, []*xray.ClientTraffic) { + traffics := make([]*xray.Traffic, 0, len(ds.tags)) + for _, tag := range ds.tags { + traffics = append(traffics, &xray.Traffic{IsInbound: true, Tag: tag, Up: 1 << 20, Down: 2 << 20}) + } + clientTraffics := make([]*xray.ClientTraffic, 0, k) + for _, email := range sampleEmails(ds.emails, k) { + clientTraffics = append(clientTraffics, &xray.ClientTraffic{Email: email, Up: 1 << 20, Down: 2 << 20}) + } + return traffics, clientTraffics +} + +// TestAddTrafficPollScale measures one full traffic-poll cycle (the @every 5s +// job): per-client delta UPDATEs, the auto-renew probe and the depleted-client +// scan, in steady state and with 100 clients depleting / renewing. +func TestAddTrafficPollScale(t *testing.T) { + setupScaleDB(t) + svc := &InboundService{} + shapes := []struct { + name string + inbounds int + }{{"single", 1}, {"spread50", 50}} + + for _, n := range scaleSizes(t, 10000, 100000) { + for _, shape := range shapes { + t.Run(fmt.Sprintf("N=%d_%s", n, shape.name), func(t *testing.T) { + db := database.GetDB() + ds := seedScaleDataset(t, n, shape.inbounds) + + for _, k := range []int{1000, 10000} { + if k > n { + continue + } + traffics, clientTraffics := pollReport(ds, k) + const reps = 3 + start := time.Now() + for range reps { + if _, _, err := svc.AddTraffic(traffics, clientTraffics); err != nil { + t.Fatalf("AddTraffic steady: %v", err) + } + } + perPoll := time.Since(start) / reps + t.Logf("N=%-7d shape=%-8s K=%-6d steady=%v/poll", n, shape.name, k, perPoll.Round(time.Millisecond)) + + var probe xray.ClientTraffic + if err := db.Where("email = ?", clientTraffics[0].Email).First(&probe).Error; err != nil { + t.Fatalf("load probe row: %v", err) + } + if probe.Up == 0 || probe.Down == 0 { + t.Fatalf("steady polls did not accumulate traffic: up=%d down=%d", probe.Up, probe.Down) + } + } + + depleted := ds.perInbound[0][:100] + if err := db.Model(&xray.ClientTraffic{}). + Where("email IN ?", emailsOf(depleted)). + Updates(map[string]any{"up": int64(100 << 30), "down": int64(0)}).Error; err != nil { + t.Fatalf("mark depleted: %v", err) + } + start := time.Now() + if _, _, err := svc.AddTraffic(nil, nil); err != nil { + t.Fatalf("AddTraffic disable: %v", err) + } + t.Logf("N=%-7d shape=%-8s disable100=%v", n, shape.name, time.Since(start).Round(time.Millisecond)) + var disabledCount int64 + if err := db.Model(&xray.ClientTraffic{}).Where("enable = ?", false).Count(&disabledCount).Error; err != nil { + t.Fatalf("count disabled: %v", err) + } + if disabledCount != 100 { + t.Fatalf("disable100: got %d disabled rows, want 100", disabledCount) + } + + renew := ds.perInbound[0][100:200] + past := time.Now().Add(-time.Hour).UnixMilli() + if err := db.Model(&xray.ClientTraffic{}). + Where("email IN ?", emailsOf(renew)). + Updates(map[string]any{"reset": 30, "expiry_time": past, "up": int64(1 << 30)}).Error; err != nil { + t.Fatalf("mark renewable: %v", err) + } + start = time.Now() + if _, _, err := svc.AddTraffic(nil, nil); err != nil { + t.Fatalf("AddTraffic renew: %v", err) + } + t.Logf("N=%-7d shape=%-8s renew100=%v", n, shape.name, time.Since(start).Round(time.Millisecond)) + var renewed xray.ClientTraffic + if err := db.Where("email = ?", renew[0].Email).First(&renewed).Error; err != nil { + t.Fatalf("load renewed row: %v", err) + } + if renewed.ExpiryTime <= past || renewed.Up != 0 { + t.Fatalf("renew100 did not renew: expiry=%d up=%d", renewed.ExpiryTime, renewed.Up) + } + }) + } + } +} diff --git a/internal/web/service/ws_payload_scale_test.go b/internal/web/service/ws_payload_scale_test.go new file mode 100644 index 000000000..c3c0e2ca9 --- /dev/null +++ b/internal/web/service/ws_payload_scale_test.go @@ -0,0 +1,93 @@ +package service + +import ( + "encoding/json" + "fmt" + "testing" + "time" +) + +// TestWsPayloadScale compares the websocket broadcast payload paths the +// traffic job runs every 5s while a browser is connected: the full snapshot +// (GetAllClientTraffics + full last-online map) against the delta variant +// (GetActiveClientTraffics on this poll's active emails). Payload sizes are +// logged against the hub's 10MB drop threshold. +func TestWsPayloadScale(t *testing.T) { + setupScaleDB(t) + svc := &InboundService{} + const hubPayloadLimit = 10 * 1024 * 1024 + + for _, n := range scaleSizes(t, 10000, 100000) { + t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) { + ds := seedScaleDataset(t, n, 1) + k := min(10000, n) + activeEmails := sampleEmails(ds.emails, k) + + start := time.Now() + all, err := svc.GetAllClientTraffics() + if err != nil { + t.Fatalf("GetAllClientTraffics: %v", err) + } + fetchAll := time.Since(start) + if len(all) != n { + t.Fatalf("GetAllClientTraffics rows = %d, want %d", len(all), n) + } + start = time.Now() + snapshot, err := json.Marshal(map[string]any{"clients": all}) + if err != nil { + t.Fatalf("marshal snapshot: %v", err) + } + marshalAll := time.Since(start) + verdict := "delivered" + if len(snapshot) > hubPayloadLimit { + verdict = "DROPPED by hub (>10MB)" + } + t.Logf("N=%-7d snapshot: fetch=%-9v marshal=%-9v payload=%.1fMB %s", + n, fetchAll.Round(time.Millisecond), marshalAll.Round(time.Millisecond), + float64(len(snapshot))/(1<<20), verdict) + + start = time.Now() + active, err := svc.GetActiveClientTraffics(activeEmails) + if err != nil { + t.Fatalf("GetActiveClientTraffics: %v", err) + } + fetchActive := time.Since(start) + if len(active) != k { + t.Fatalf("GetActiveClientTraffics rows = %d, want %d", len(active), k) + } + start = time.Now() + delta, err := json.Marshal(map[string]any{"clients": active}) + if err != nil { + t.Fatalf("marshal delta: %v", err) + } + marshalActive := time.Since(start) + t.Logf("N=%-7d delta(K=%d): fetch=%-9v marshal=%-9v payload=%.1fMB", + n, k, fetchActive.Round(time.Millisecond), marshalActive.Round(time.Millisecond), + float64(len(delta))/(1<<20)) + + start = time.Now() + lastOnline, err := svc.GetClientsLastOnline() + if err != nil { + t.Fatalf("GetClientsLastOnline: %v", err) + } + fullMap := time.Since(start) + if len(lastOnline) != n { + t.Fatalf("GetClientsLastOnline entries = %d, want %d", len(lastOnline), n) + } + start = time.Now() + activeLastOnline := make(map[string]int64, len(active)) + for _, ct := range active { + activeLastOnline[ct.Email] = ct.LastOnline + } + activeMap := time.Since(start) + t.Logf("N=%-7d lastOnline: fullMap=%-9v activeMap(K=%d)=%v", + n, fullMap.Round(time.Millisecond), k, activeMap.Round(time.Microsecond)) + + start = time.Now() + if _, err := svc.GetInboundsTrafficSummary(); err != nil { + t.Fatalf("GetInboundsTrafficSummary: %v", err) + } + t.Logf("N=%-7d inboundsSummary=%v", n, time.Since(start).Round(time.Millisecond)) + }) + } +} diff --git a/internal/web/service/xray_config_scale_test.go b/internal/web/service/xray_config_scale_test.go new file mode 100644 index 000000000..6a0433f1f --- /dev/null +++ b/internal/web/service/xray_config_scale_test.go @@ -0,0 +1,66 @@ +package service + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/mhsanaei/3x-ui/v3/internal/database" + "github.com/mhsanaei/3x-ui/v3/internal/database/model" +) + +// TestGetXrayConfigScale measures building the full Xray config (the path +// every restart/reconcile takes) and, separately, how much of the per-inbound +// settings rebuild is spent on indented vs plain JSON marshaling. +func TestGetXrayConfigScale(t *testing.T) { + setupScaleDB(t) + svc := &XrayService{} + shapes := []struct { + name string + inbounds int + }{{"single", 1}, {"spread50", 50}} + + for _, n := range scaleSizes(t, 10000, 100000) { + for _, shape := range shapes { + t.Run(fmt.Sprintf("N=%d_%s", n, shape.name), func(t *testing.T) { + ds := seedScaleDataset(t, n, shape.inbounds) + + const reps = 3 + start := time.Now() + for range reps { + cfg, err := svc.GetXrayConfig() + if err != nil { + t.Fatalf("GetXrayConfig: %v", err) + } + if len(cfg.InboundConfigs) < shape.inbounds { + t.Fatalf("config has %d inbounds, want >= %d", len(cfg.InboundConfigs), shape.inbounds) + } + } + t.Logf("N=%-7d shape=%-8s GetXrayConfig=%v/run", + n, shape.name, (time.Since(start) / reps).Round(time.Millisecond)) + + var ib model.Inbound + if err := database.GetDB().First(&ib, ds.inboundIds[0]).Error; err != nil { + t.Fatalf("load inbound: %v", err) + } + settings := map[string]any{} + if err := json.Unmarshal([]byte(ib.Settings), &settings); err != nil { + t.Fatalf("unmarshal settings: %v", err) + } + start = time.Now() + if _, err := json.Marshal(settings); err != nil { + t.Fatalf("marshal settings: %v", err) + } + plain := time.Since(start) + start = time.Now() + if _, err := json.MarshalIndent(settings, "", " "); err != nil { + t.Fatalf("marshal indent settings: %v", err) + } + indented := time.Since(start) + t.Logf("N=%-7d shape=%-8s settingsMarshal plain=%-9v indent=%v", + n, shape.name, plain.Round(time.Millisecond), indented.Round(time.Millisecond)) + }) + } + } +}