test(scale): cover traffic poll, ws payloads, ip-limit job, sub and xray config at 500k

The paths that run continuously in production had no scale coverage: the
5s traffic poll (AddTraffic with its auto-renew and depleted scans), the
websocket snapshot the job broadcasts while a browser is connected, the
10s ip-limit job (hasLimitIp LIKE scan + per-email settings parse), a
subscription fetch inside a huge inbound, and the full Xray config build.

New benchmarks reuse the XUI_SCALE_TEST / XUI_DB_TYPE gating and stay
log-only. Sizes default to 10k/100k; XUI_SCALE_SIZES=500000 raises the
ladder without editing code. seedScaleDataset writes inbounds, clients,
client_inbounds and client_traffics directly in one transaction instead
of SyncInbound, so a 500k seed takes seconds. XUI_SCALE_DB_PATH persists
the seeded SQLite file for manual smoke runs against a live panel.
This commit is contained in:
MHSanaei
2026-07-02 16:12:46 +02:00
parent 28f7690224
commit 4fc301682f
6 changed files with 885 additions and 3 deletions
+241
View File
@@ -0,0 +1,241 @@
package sub
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
"github.com/google/uuid"
"github.com/op/go-logging"
"gorm.io/gorm"
"github.com/mhsanaei/3x-ui/v3/internal/config"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
xuilogger "github.com/mhsanaei/3x-ui/v3/internal/logger"
"github.com/mhsanaei/3x-ui/v3/internal/xray"
)
const scaleTargetSubId = "scale-target-sub"
// setupScaleSubDB mirrors the service package's scale gating: Postgres via
// XUI_DB_TYPE/XUI_DB_DSN, SQLite via XUI_SCALE_TEST=1, skip otherwise.
func setupScaleSubDB(t *testing.T) {
t.Helper()
xuilogger.InitLogger(logging.ERROR)
if os.Getenv("XUI_DB_TYPE") == "postgres" && strings.TrimSpace(os.Getenv("XUI_DB_DSN")) != "" {
if err := database.InitDB(""); err != nil {
t.Fatalf("InitDB(postgres): %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
return
}
switch strings.ToLower(strings.TrimSpace(os.Getenv("XUI_SCALE_TEST"))) {
case "1", "true", "yes":
if err := database.InitDB(filepath.Join(t.TempDir(), "scale.db")); err != nil {
t.Fatalf("InitDB(sqlite): %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
return
}
t.Skip("set XUI_SCALE_TEST=1 (sqlite) or XUI_DB_TYPE=postgres + XUI_DB_DSN (postgres) to run the scale benchmark")
}
func scaleSubSizes(t *testing.T, def ...int) []int {
t.Helper()
raw := strings.TrimSpace(os.Getenv("XUI_SCALE_SIZES"))
if raw == "" {
return def
}
var out []int
for _, part := range strings.Split(raw, ",") {
part = strings.TrimSpace(part)
if part == "" {
continue
}
n, err := strconv.Atoi(part)
if err != nil || n <= 0 {
t.Fatalf("XUI_SCALE_SIZES: invalid size %q", part)
}
out = append(out, n)
}
if len(out) == 0 {
return def
}
return out
}
func resetScaleSubTables(t *testing.T, db *gorm.DB) {
t.Helper()
if config.GetDBKind() == "postgres" {
if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds, client_traffics RESTART IDENTITY CASCADE").Error; err != nil {
t.Fatalf("truncate: %v", err)
}
} else {
for _, tbl := range []string{"inbounds", "clients", "client_inbounds", "client_traffics"} {
if err := db.Exec("DELETE FROM " + tbl).Error; err != nil {
t.Fatalf("delete %s: %v", tbl, err)
}
}
db.Exec("DELETE FROM sqlite_sequence")
}
if err := db.Where("1 = 1").Delete(&model.ClientExternalLink{}).Error; err != nil {
t.Fatalf("clear client_external_links: %v", err)
}
}
// seedScaleSubDataset seeds one VLESS inbound holding n clients (the sub
// server's worst case: matchingClients parses the whole settings blob and
// getInboundsBySubId preloads every ClientStats row). Three clients share
// scaleTargetSubId; everyone else gets a unique subId.
func seedScaleSubDataset(t *testing.T, n int) {
t.Helper()
db := database.GetDB()
resetScaleSubTables(t, db)
clients := make([]model.Client, n)
exp := time.Now().AddDate(1, 0, 0).UnixMilli()
targets := map[int]bool{n / 4: true, n / 2: true, 3 * n / 4: true}
for i := range n {
subId := fmt.Sprintf("sub-%07d", i)
if targets[i] {
subId = scaleTargetSubId
}
clients[i] = model.Client{
ID: uuid.NewString(),
Email: fmt.Sprintf("user-%07d@subscale", i),
SubID: subId,
Enable: true,
ExpiryTime: exp,
TotalGB: 100 << 30,
}
}
settingsMap := map[string]any{"clients": clients, "decryption": "none"}
settings, err := json.Marshal(settingsMap)
if err != nil {
t.Fatalf("marshal settings: %v", err)
}
tx := db.Begin()
if tx.Error != nil {
t.Fatalf("begin seed tx: %v", tx.Error)
}
committed := false
defer func() {
if !committed {
tx.Rollback()
}
}()
ib := &model.Inbound{
UserId: 1,
Tag: fmt.Sprintf("subscale-%d", n),
Remark: "subscale",
Enable: true,
Listen: "203.0.113.1",
Port: 443,
Protocol: model.VLESS,
Settings: string(settings),
StreamSettings: `{"network":"tcp","security":"none"}`,
}
if err := tx.Create(ib).Error; err != nil {
t.Fatalf("seed inbound: %v", err)
}
records := make([]*model.ClientRecord, n)
for i := range clients {
records[i] = clients[i].ToRecord()
}
if err := tx.CreateInBatches(records, 500).Error; err != nil {
t.Fatalf("seed clients: %v", err)
}
links := make([]model.ClientInbound, n)
for i := range records {
links[i] = model.ClientInbound{ClientId: records[i].Id, InboundId: ib.Id}
}
if err := tx.CreateInBatches(links, 1000).Error; err != nil {
t.Fatalf("seed client_inbounds: %v", err)
}
traffics := make([]xray.ClientTraffic, n)
for i := range clients {
traffics[i] = xray.ClientTraffic{
InboundId: ib.Id,
Email: clients[i].Email,
Enable: true,
Total: clients[i].TotalGB,
ExpiryTime: clients[i].ExpiryTime,
}
}
if err := tx.CreateInBatches(traffics, 1000).Error; err != nil {
t.Fatalf("seed client_traffics: %v", err)
}
if err := tx.Commit().Error; err != nil {
t.Fatalf("commit seed tx: %v", err)
}
committed = true
db.Exec("ANALYZE")
}
// TestGetSubsScale measures one subscription fetch (raw and JSON format) for a
// 3-client subId living inside an n-client inbound, plus a subId miss — the
// per-request cost every subscriber pays.
func TestGetSubsScale(t *testing.T) {
for _, n := range scaleSubSizes(t, 10000, 100000) {
t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
setupScaleSubDB(t)
seedScaleSubDataset(t, n)
svc := &SubService{}
const reps = 5
start := time.Now()
var links []string
for range reps {
var err error
links, _, _, _, err = svc.GetSubs(scaleTargetSubId, "sub.example.com")
if err != nil {
t.Fatalf("GetSubs: %v", err)
}
}
rawDur := time.Since(start) / reps
if len(links) != 3 {
t.Fatalf("GetSubs links = %d, want 3", len(links))
}
jsonSvc := NewSubJsonService("", "", "", &SubService{})
start = time.Now()
for range reps {
body, _, err := jsonSvc.GetJson(scaleTargetSubId, "sub.example.com")
if err != nil {
t.Fatalf("GetJson: %v", err)
}
if body == "" {
t.Fatal("GetJson returned empty body")
}
}
jsonDur := time.Since(start) / reps
start = time.Now()
for range reps {
missLinks, _, _, _, err := svc.GetSubs("no-such-sub", "sub.example.com")
if err != nil {
t.Fatalf("GetSubs miss: %v", err)
}
if len(missLinks) != 0 {
t.Fatalf("GetSubs miss links = %d, want 0", len(missLinks))
}
}
missDur := time.Since(start) / reps
t.Logf("N=%-7d raw=%-10v json=%-10v miss=%v",
n, rawDur.Round(time.Millisecond), jsonDur.Round(time.Millisecond), missDur.Round(time.Millisecond))
})
}
}
@@ -0,0 +1,229 @@
package job
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
"github.com/op/go-logging"
"gorm.io/gorm"
"github.com/mhsanaei/3x-ui/v3/internal/config"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
xuilogger "github.com/mhsanaei/3x-ui/v3/internal/logger"
)
// setupScaleJobDB mirrors the service package's scale gating: Postgres via
// XUI_DB_TYPE/XUI_DB_DSN, SQLite via XUI_SCALE_TEST=1, skip otherwise.
func setupScaleJobDB(t *testing.T) {
t.Helper()
loggerInitOnce.Do(func() { xuilogger.InitLogger(logging.ERROR) })
t.Setenv("XUI_LOG_FOLDER", t.TempDir())
if os.Getenv("XUI_DB_TYPE") == "postgres" && strings.TrimSpace(os.Getenv("XUI_DB_DSN")) != "" {
if err := database.InitDB(""); err != nil {
t.Fatalf("InitDB(postgres): %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
return
}
switch strings.ToLower(strings.TrimSpace(os.Getenv("XUI_SCALE_TEST"))) {
case "1", "true", "yes":
if err := database.InitDB(filepath.Join(t.TempDir(), "scale.db")); err != nil {
t.Fatalf("InitDB(sqlite): %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
return
}
t.Skip("set XUI_SCALE_TEST=1 (sqlite) or XUI_DB_TYPE=postgres + XUI_DB_DSN (postgres) to run the scale benchmark")
}
func scaleJobSizes(t *testing.T, def ...int) []int {
t.Helper()
raw := strings.TrimSpace(os.Getenv("XUI_SCALE_SIZES"))
if raw == "" {
return def
}
var out []int
for _, part := range strings.Split(raw, ",") {
part = strings.TrimSpace(part)
if part == "" {
continue
}
n, err := strconv.Atoi(part)
if err != nil || n <= 0 {
t.Fatalf("XUI_SCALE_SIZES: invalid size %q", part)
}
out = append(out, n)
}
if len(out) == 0 {
return def
}
return out
}
func resetScaleJobTables(t *testing.T, db *gorm.DB) {
t.Helper()
if config.GetDBKind() == "postgres" {
if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds RESTART IDENTITY CASCADE").Error; err != nil {
t.Fatalf("truncate: %v", err)
}
} else {
for _, tbl := range []string{"inbounds", "clients", "client_inbounds"} {
if err := db.Exec("DELETE FROM " + tbl).Error; err != nil {
t.Fatalf("delete %s: %v", tbl, err)
}
}
db.Exec("DELETE FROM sqlite_sequence")
}
if err := db.Where("1 = 1").Delete(&model.InboundClientIps{}).Error; err != nil {
t.Fatalf("clear inbound client ips: %v", err)
}
if err := db.Where("1 = 1").Delete(&model.NodeClientIp{}).Error; err != nil {
t.Fatalf("clear node client ips: %v", err)
}
}
// seedScaleIPDataset seeds n clients across numInbounds inbounds. Every client
// in the LAST inbound carries limitIp=3 (and 0 elsewhere), so hasLimitIp pays
// its full scan cost before finding a hit, and the returned emails all resolve
// to that last inbound for the processObserved measurement.
func seedScaleIPDataset(t *testing.T, n, numInbounds int) []string {
t.Helper()
db := database.GetDB()
resetScaleJobTables(t, db)
tx := db.Begin()
if tx.Error != nil {
t.Fatalf("begin seed tx: %v", tx.Error)
}
committed := false
defer func() {
if !committed {
tx.Rollback()
}
}()
var limitedEmails []string
per := n / numInbounds
for i := range numInbounds {
lo, hi := i*per, (i+1)*per
if i == numInbounds-1 {
hi = n
}
limitIp := 0
if i == numInbounds-1 {
limitIp = 3
}
clients := make([]model.Client, 0, hi-lo)
records := make([]*model.ClientRecord, 0, hi-lo)
for j := lo; j < hi; j++ {
email := fmt.Sprintf("user-%07d@ipscale", j)
clients = append(clients, model.Client{Email: email, LimitIP: limitIp, Enable: true})
records = append(records, &model.ClientRecord{Email: email, LimitIP: limitIp, Enable: true})
if limitIp > 0 {
limitedEmails = append(limitedEmails, email)
}
}
settings, err := json.Marshal(map[string][]model.Client{"clients": clients})
if err != nil {
t.Fatalf("marshal settings: %v", err)
}
ib := &model.Inbound{
UserId: 1,
Tag: fmt.Sprintf("ipscale-%d-%d", n, i),
Enable: true,
Port: 42000 + i,
Protocol: model.VLESS,
Settings: string(settings),
}
if err := tx.Create(ib).Error; err != nil {
t.Fatalf("seed inbound %d: %v", i, err)
}
if err := tx.CreateInBatches(records, 500).Error; err != nil {
t.Fatalf("seed clients %d: %v", i, err)
}
links := make([]model.ClientInbound, len(records))
for j := range records {
links[j] = model.ClientInbound{ClientId: records[j].Id, InboundId: ib.Id}
}
if err := tx.CreateInBatches(links, 1000).Error; err != nil {
t.Fatalf("seed client_inbounds %d: %v", i, err)
}
}
if err := tx.Commit().Error; err != nil {
t.Fatalf("commit seed tx: %v", err)
}
committed = true
db.Exec("ANALYZE")
return limitedEmails
}
// TestCheckClientIpScale measures the @every 10s ip-limit job pieces: the
// hasLimitIp gate (settings LIKE scan + full JSON parse of every matching
// inbound) and processObserved with M online users (per-email inbound lookup,
// settings parse and autocommit save). Run twice: first scan half add / half
// update, second scan all update path.
func TestCheckClientIpScale(t *testing.T) {
shapes := []struct {
name string
inbounds int
observed int
}{{"single", 1, 50}, {"spread50", 50, 1000}}
for _, n := range scaleJobSizes(t, 10000, 100000) {
for _, shape := range shapes {
t.Run(fmt.Sprintf("N=%d_%s", n, shape.name), func(t *testing.T) {
setupScaleJobDB(t)
limited := seedScaleIPDataset(t, n, shape.inbounds)
m := min(shape.observed, len(limited))
j := NewCheckClientIpJob()
const reps = 3
start := time.Now()
for range reps {
if !j.hasLimitIp() {
t.Fatal("hasLimitIp = false, want true")
}
}
t.Logf("N=%-7d shape=%-8s hasLimitIp=%v/call", n, shape.name, (time.Since(start) / reps).Round(time.Millisecond))
now := time.Now().Unix()
observed := make(map[string]map[string]int64, m)
for i := range m {
observed[limited[i]] = map[string]int64{
fmt.Sprintf("10.0.%d.%d", i/250, i%250+1): now,
}
}
for i := range m / 2 {
seedClientIps(t, limited[i], []IPWithTimestamp{{IP: "10.99.0.1", Timestamp: now - 60}})
}
start = time.Now()
j.processObserved(observed, true, true)
firstScan := time.Since(start)
start = time.Now()
j.processObserved(observed, true, true)
secondScan := time.Since(start)
t.Logf("N=%-7d shape=%-8s processObserved M=%-5d first=%-10v second=%-10v (%.1fms/email)",
n, shape.name, m, firstScan.Round(time.Millisecond), secondScan.Round(time.Millisecond),
float64(secondScan.Milliseconds())/float64(m))
var rows int64
if err := database.GetDB().Model(&model.InboundClientIps{}).Count(&rows).Error; err != nil {
t.Fatalf("count ip rows: %v", err)
}
if rows != int64(m) {
t.Fatalf("inbound_client_ips rows = %d, want %d", rows, m)
}
})
}
}
}
+150 -3
View File
@@ -1,22 +1,28 @@
package service
import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
"github.com/mhsanaei/3x-ui/v3/internal/config"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
xuilogger "github.com/mhsanaei/3x-ui/v3/internal/logger"
"github.com/mhsanaei/3x-ui/v3/internal/xray"
"github.com/op/go-logging"
"gorm.io/gorm"
)
// setupScaleDB initializes the DB for a scale benchmark on either Postgres
// (XUI_DB_TYPE=postgres + XUI_DB_DSN) or SQLite (XUI_SCALE_TEST=1, temp file),
// and registers cleanup. Skips the test when neither backend is configured.
// (XUI_DB_TYPE=postgres + XUI_DB_DSN) or SQLite (XUI_SCALE_TEST=1, temp file;
// XUI_SCALE_DB_PATH persists the DB for manual smoke runs), and registers
// cleanup. Skips the test when neither backend is configured.
func setupScaleDB(t *testing.T) {
t.Helper()
xuilogger.InitLogger(logging.ERROR)
@@ -31,7 +37,10 @@ func setupScaleDB(t *testing.T) {
switch strings.ToLower(strings.TrimSpace(os.Getenv("XUI_SCALE_TEST"))) {
case "1", "true", "yes":
dbPath := filepath.Join(t.TempDir(), "scale.db")
dbPath := strings.TrimSpace(os.Getenv("XUI_SCALE_DB_PATH"))
if dbPath == "" {
dbPath = filepath.Join(t.TempDir(), "scale.db")
}
if err := database.InitDB(dbPath); err != nil {
t.Fatalf("InitDB(sqlite): %v", err)
}
@@ -42,6 +51,144 @@ func setupScaleDB(t *testing.T) {
t.Skip("set XUI_SCALE_TEST=1 (sqlite) or XUI_DB_TYPE=postgres + XUI_DB_DSN (postgres) to run the scale benchmark")
}
// scaleSizes returns the default size ladder unless XUI_SCALE_SIZES overrides
// it with a comma-separated list (e.g. "500000" or "10000,100000,500000").
func scaleSizes(t *testing.T, def ...int) []int {
t.Helper()
raw := strings.TrimSpace(os.Getenv("XUI_SCALE_SIZES"))
if raw == "" {
return def
}
var out []int
for _, part := range strings.Split(raw, ",") {
part = strings.TrimSpace(part)
if part == "" {
continue
}
n, err := strconv.Atoi(part)
if err != nil || n <= 0 {
t.Fatalf("XUI_SCALE_SIZES: invalid size %q", part)
}
out = append(out, n)
}
if len(out) == 0 {
return def
}
return out
}
type scaleDataset struct {
inboundIds []int
tags []string
emails []string
perInbound [][]model.Client
}
// seedScaleDataset seeds n healthy clients (future expiry, unfilled quota)
// spread across numInbounds inbounds, writing inbounds, clients,
// client_inbounds and client_traffics directly in one transaction — orders of
// magnitude faster than SyncInbound and one fsync instead of thousands.
func seedScaleDataset(t *testing.T, n, numInbounds int) scaleDataset {
t.Helper()
db := database.GetDB()
resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics")
clients := makeScaleClients(n)
exp := time.Now().AddDate(1, 0, 0).UnixMilli()
for i := range clients {
clients[i].ExpiryTime = exp
clients[i].TotalGB = 100 << 30
}
ds := scaleDataset{emails: emailsOf(clients)}
start := time.Now()
tx := db.Begin()
if tx.Error != nil {
t.Fatalf("begin seed tx: %v", tx.Error)
}
committed := false
defer func() {
if !committed {
tx.Rollback()
}
}()
per := n / numInbounds
for i := range numInbounds {
lo, hi := i*per, (i+1)*per
if i == numInbounds-1 {
hi = n
}
chunk := clients[lo:hi]
ib := &model.Inbound{
UserId: 1,
Tag: fmt.Sprintf("scale-%d-%d", n, i),
Enable: true,
Port: 41000 + i,
Protocol: model.VLESS,
Settings: clientsSettings(t, chunk),
}
if err := tx.Create(ib).Error; err != nil {
t.Fatalf("seed inbound %d: %v", i, err)
}
records := make([]*model.ClientRecord, len(chunk))
for j := range chunk {
records[j] = chunk[j].ToRecord()
}
if err := tx.CreateInBatches(records, 500).Error; err != nil {
t.Fatalf("seed clients %d: %v", i, err)
}
links := make([]model.ClientInbound, len(records))
for j := range records {
links[j] = model.ClientInbound{ClientId: records[j].Id, InboundId: ib.Id}
}
if err := tx.CreateInBatches(links, 1000).Error; err != nil {
t.Fatalf("seed client_inbounds %d: %v", i, err)
}
traffics := make([]xray.ClientTraffic, len(chunk))
for j := range chunk {
traffics[j] = xray.ClientTraffic{
InboundId: ib.Id,
Email: chunk[j].Email,
Enable: true,
Total: chunk[j].TotalGB,
ExpiryTime: chunk[j].ExpiryTime,
}
}
if err := tx.CreateInBatches(traffics, 1000).Error; err != nil {
t.Fatalf("seed client_traffics %d: %v", i, err)
}
ds.inboundIds = append(ds.inboundIds, ib.Id)
ds.tags = append(ds.tags, ib.Tag)
ds.perInbound = append(ds.perInbound, chunk)
}
if err := tx.Commit().Error; err != nil {
t.Fatalf("commit seed tx: %v", err)
}
committed = true
db.Exec("ANALYZE")
t.Logf("seeded N=%d across %d inbound(s) in %v", n, numInbounds, time.Since(start).Round(time.Millisecond))
return ds
}
// sampleEmails picks k evenly spaced emails so active clients span the id range.
func sampleEmails(emails []string, k int) []string {
if k >= len(emails) {
return emails
}
out := make([]string, 0, k)
step := len(emails) / k
for i := 0; i < len(emails) && len(out) < k; i += step {
out = append(out, emails[i])
}
return out
}
// resetScaleTables empties the given tables between sub-sizes. Postgres uses a
// single TRUNCATE ... CASCADE; SQLite deletes per table and clears the
// autoincrement counters so ids restart like RESTART IDENTITY.
@@ -0,0 +1,106 @@
package service
import (
"fmt"
"testing"
"time"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/xray"
)
func pollReport(ds scaleDataset, k int) ([]*xray.Traffic, []*xray.ClientTraffic) {
traffics := make([]*xray.Traffic, 0, len(ds.tags))
for _, tag := range ds.tags {
traffics = append(traffics, &xray.Traffic{IsInbound: true, Tag: tag, Up: 1 << 20, Down: 2 << 20})
}
clientTraffics := make([]*xray.ClientTraffic, 0, k)
for _, email := range sampleEmails(ds.emails, k) {
clientTraffics = append(clientTraffics, &xray.ClientTraffic{Email: email, Up: 1 << 20, Down: 2 << 20})
}
return traffics, clientTraffics
}
// TestAddTrafficPollScale measures one full traffic-poll cycle (the @every 5s
// job): per-client delta UPDATEs, the auto-renew probe and the depleted-client
// scan, in steady state and with 100 clients depleting / renewing.
func TestAddTrafficPollScale(t *testing.T) {
setupScaleDB(t)
svc := &InboundService{}
shapes := []struct {
name string
inbounds int
}{{"single", 1}, {"spread50", 50}}
for _, n := range scaleSizes(t, 10000, 100000) {
for _, shape := range shapes {
t.Run(fmt.Sprintf("N=%d_%s", n, shape.name), func(t *testing.T) {
db := database.GetDB()
ds := seedScaleDataset(t, n, shape.inbounds)
for _, k := range []int{1000, 10000} {
if k > n {
continue
}
traffics, clientTraffics := pollReport(ds, k)
const reps = 3
start := time.Now()
for range reps {
if _, _, err := svc.AddTraffic(traffics, clientTraffics); err != nil {
t.Fatalf("AddTraffic steady: %v", err)
}
}
perPoll := time.Since(start) / reps
t.Logf("N=%-7d shape=%-8s K=%-6d steady=%v/poll", n, shape.name, k, perPoll.Round(time.Millisecond))
var probe xray.ClientTraffic
if err := db.Where("email = ?", clientTraffics[0].Email).First(&probe).Error; err != nil {
t.Fatalf("load probe row: %v", err)
}
if probe.Up == 0 || probe.Down == 0 {
t.Fatalf("steady polls did not accumulate traffic: up=%d down=%d", probe.Up, probe.Down)
}
}
depleted := ds.perInbound[0][:100]
if err := db.Model(&xray.ClientTraffic{}).
Where("email IN ?", emailsOf(depleted)).
Updates(map[string]any{"up": int64(100 << 30), "down": int64(0)}).Error; err != nil {
t.Fatalf("mark depleted: %v", err)
}
start := time.Now()
if _, _, err := svc.AddTraffic(nil, nil); err != nil {
t.Fatalf("AddTraffic disable: %v", err)
}
t.Logf("N=%-7d shape=%-8s disable100=%v", n, shape.name, time.Since(start).Round(time.Millisecond))
var disabledCount int64
if err := db.Model(&xray.ClientTraffic{}).Where("enable = ?", false).Count(&disabledCount).Error; err != nil {
t.Fatalf("count disabled: %v", err)
}
if disabledCount != 100 {
t.Fatalf("disable100: got %d disabled rows, want 100", disabledCount)
}
renew := ds.perInbound[0][100:200]
past := time.Now().Add(-time.Hour).UnixMilli()
if err := db.Model(&xray.ClientTraffic{}).
Where("email IN ?", emailsOf(renew)).
Updates(map[string]any{"reset": 30, "expiry_time": past, "up": int64(1 << 30)}).Error; err != nil {
t.Fatalf("mark renewable: %v", err)
}
start = time.Now()
if _, _, err := svc.AddTraffic(nil, nil); err != nil {
t.Fatalf("AddTraffic renew: %v", err)
}
t.Logf("N=%-7d shape=%-8s renew100=%v", n, shape.name, time.Since(start).Round(time.Millisecond))
var renewed xray.ClientTraffic
if err := db.Where("email = ?", renew[0].Email).First(&renewed).Error; err != nil {
t.Fatalf("load renewed row: %v", err)
}
if renewed.ExpiryTime <= past || renewed.Up != 0 {
t.Fatalf("renew100 did not renew: expiry=%d up=%d", renewed.ExpiryTime, renewed.Up)
}
})
}
}
}
@@ -0,0 +1,93 @@
package service
import (
"encoding/json"
"fmt"
"testing"
"time"
)
// TestWsPayloadScale compares the websocket broadcast payload paths the
// traffic job runs every 5s while a browser is connected: the full snapshot
// (GetAllClientTraffics + full last-online map) against the delta variant
// (GetActiveClientTraffics on this poll's active emails). Payload sizes are
// logged against the hub's 10MB drop threshold.
func TestWsPayloadScale(t *testing.T) {
setupScaleDB(t)
svc := &InboundService{}
const hubPayloadLimit = 10 * 1024 * 1024
for _, n := range scaleSizes(t, 10000, 100000) {
t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
ds := seedScaleDataset(t, n, 1)
k := min(10000, n)
activeEmails := sampleEmails(ds.emails, k)
start := time.Now()
all, err := svc.GetAllClientTraffics()
if err != nil {
t.Fatalf("GetAllClientTraffics: %v", err)
}
fetchAll := time.Since(start)
if len(all) != n {
t.Fatalf("GetAllClientTraffics rows = %d, want %d", len(all), n)
}
start = time.Now()
snapshot, err := json.Marshal(map[string]any{"clients": all})
if err != nil {
t.Fatalf("marshal snapshot: %v", err)
}
marshalAll := time.Since(start)
verdict := "delivered"
if len(snapshot) > hubPayloadLimit {
verdict = "DROPPED by hub (>10MB)"
}
t.Logf("N=%-7d snapshot: fetch=%-9v marshal=%-9v payload=%.1fMB %s",
n, fetchAll.Round(time.Millisecond), marshalAll.Round(time.Millisecond),
float64(len(snapshot))/(1<<20), verdict)
start = time.Now()
active, err := svc.GetActiveClientTraffics(activeEmails)
if err != nil {
t.Fatalf("GetActiveClientTraffics: %v", err)
}
fetchActive := time.Since(start)
if len(active) != k {
t.Fatalf("GetActiveClientTraffics rows = %d, want %d", len(active), k)
}
start = time.Now()
delta, err := json.Marshal(map[string]any{"clients": active})
if err != nil {
t.Fatalf("marshal delta: %v", err)
}
marshalActive := time.Since(start)
t.Logf("N=%-7d delta(K=%d): fetch=%-9v marshal=%-9v payload=%.1fMB",
n, k, fetchActive.Round(time.Millisecond), marshalActive.Round(time.Millisecond),
float64(len(delta))/(1<<20))
start = time.Now()
lastOnline, err := svc.GetClientsLastOnline()
if err != nil {
t.Fatalf("GetClientsLastOnline: %v", err)
}
fullMap := time.Since(start)
if len(lastOnline) != n {
t.Fatalf("GetClientsLastOnline entries = %d, want %d", len(lastOnline), n)
}
start = time.Now()
activeLastOnline := make(map[string]int64, len(active))
for _, ct := range active {
activeLastOnline[ct.Email] = ct.LastOnline
}
activeMap := time.Since(start)
t.Logf("N=%-7d lastOnline: fullMap=%-9v activeMap(K=%d)=%v",
n, fullMap.Round(time.Millisecond), k, activeMap.Round(time.Microsecond))
start = time.Now()
if _, err := svc.GetInboundsTrafficSummary(); err != nil {
t.Fatalf("GetInboundsTrafficSummary: %v", err)
}
t.Logf("N=%-7d inboundsSummary=%v", n, time.Since(start).Round(time.Millisecond))
})
}
}
@@ -0,0 +1,66 @@
package service
import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
)
// TestGetXrayConfigScale measures building the full Xray config (the path
// every restart/reconcile takes) and, separately, how much of the per-inbound
// settings rebuild is spent on indented vs plain JSON marshaling.
func TestGetXrayConfigScale(t *testing.T) {
setupScaleDB(t)
svc := &XrayService{}
shapes := []struct {
name string
inbounds int
}{{"single", 1}, {"spread50", 50}}
for _, n := range scaleSizes(t, 10000, 100000) {
for _, shape := range shapes {
t.Run(fmt.Sprintf("N=%d_%s", n, shape.name), func(t *testing.T) {
ds := seedScaleDataset(t, n, shape.inbounds)
const reps = 3
start := time.Now()
for range reps {
cfg, err := svc.GetXrayConfig()
if err != nil {
t.Fatalf("GetXrayConfig: %v", err)
}
if len(cfg.InboundConfigs) < shape.inbounds {
t.Fatalf("config has %d inbounds, want >= %d", len(cfg.InboundConfigs), shape.inbounds)
}
}
t.Logf("N=%-7d shape=%-8s GetXrayConfig=%v/run",
n, shape.name, (time.Since(start) / reps).Round(time.Millisecond))
var ib model.Inbound
if err := database.GetDB().First(&ib, ds.inboundIds[0]).Error; err != nil {
t.Fatalf("load inbound: %v", err)
}
settings := map[string]any{}
if err := json.Unmarshal([]byte(ib.Settings), &settings); err != nil {
t.Fatalf("unmarshal settings: %v", err)
}
start = time.Now()
if _, err := json.Marshal(settings); err != nil {
t.Fatalf("marshal settings: %v", err)
}
plain := time.Since(start)
start = time.Now()
if _, err := json.MarshalIndent(settings, "", " "); err != nil {
t.Fatalf("marshal indent settings: %v", err)
}
indented := time.Since(start)
t.Logf("N=%-7d shape=%-8s settingsMarshal plain=%-9v indent=%v",
n, shape.name, plain.Round(time.Millisecond), indented.Round(time.Millisecond))
})
}
}
}