mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-06-28 00:24:19 +00:00
21143a6d72
The orphan sweeps in setRemoteTrafficLocked deleted the (node, email) baseline row unconditionally whenever an email was missing from one inbound's snapshot stats — even though baselines are keyed per node, not per inbound. For a client attached to two inbounds of the same node whose stats the node reports under only one of them, the sweep for the other inbound deleted the baseline at the end of every sync cycle. Depending on inbound order, the baseline written earlier in the same transaction was wiped each time, so the next cycle computed delta against a missing baseline (zero) and the client's traffic froze permanently. Scope both sweeps to the union of emails across the whole snapshot: a baseline is only dropped when the email left the node entirely.
333 lines
14 KiB
Go
333 lines
14 KiB
Go
package service
|
|
|
|
import (
|
|
"fmt"
|
|
"path/filepath"
|
|
"testing"
|
|
|
|
"github.com/mhsanaei/3x-ui/v3/internal/database"
|
|
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
|
|
"github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
|
|
"github.com/mhsanaei/3x-ui/v3/internal/xray"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
func initTrafficTestDB(t *testing.T) *gorm.DB {
|
|
t.Helper()
|
|
dbDir := t.TempDir()
|
|
t.Setenv("XUI_DB_FOLDER", dbDir)
|
|
if err := database.InitDB(filepath.Join(dbDir, "x-ui.db")); err != nil {
|
|
t.Fatalf("InitDB: %v", err)
|
|
}
|
|
t.Cleanup(func() { _ = database.CloseDB() })
|
|
return database.GetDB()
|
|
}
|
|
|
|
func createNodeInbound(t *testing.T, db *gorm.DB, nodeID int, tag string, port int) {
|
|
t.Helper()
|
|
nid := nodeID
|
|
ib := &model.Inbound{UserId: 1, Tag: tag, Enable: true, Port: port, Protocol: model.VLESS, NodeID: &nid}
|
|
if err := db.Create(ib).Error; err != nil {
|
|
t.Fatalf("create node inbound %q: %v", tag, err)
|
|
}
|
|
}
|
|
|
|
// createNodeInboundWithClient mirrors createNodeInbound but stores the client
|
|
// in the settings JSON so emailUsedByOtherInbounds can see the attachment.
|
|
func createNodeInboundWithClient(t *testing.T, db *gorm.DB, nodeID int, tag string, port int, email string) {
|
|
t.Helper()
|
|
nid := nodeID
|
|
settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
|
|
ib := &model.Inbound{UserId: 1, Tag: tag, Enable: true, Port: port, Protocol: model.VLESS, NodeID: &nid, Settings: settings}
|
|
if err := db.Create(ib).Error; err != nil {
|
|
t.Fatalf("create node inbound %q: %v", tag, err)
|
|
}
|
|
}
|
|
|
|
func syncNode(t *testing.T, svc *InboundService, nodeID int, tag string, stats ...xray.ClientTraffic) {
|
|
t.Helper()
|
|
snap := &runtime.TrafficSnapshot{
|
|
Inbounds: []*model.Inbound{{Tag: tag, ClientStats: stats}},
|
|
}
|
|
if _, err := svc.setRemoteTrafficLocked(nodeID, snap, false); err != nil {
|
|
t.Fatalf("setRemoteTrafficLocked node %d: %v", nodeID, err)
|
|
}
|
|
}
|
|
|
|
// syncNodeWithSettings mirrors syncNode but carries a real settings JSON on
|
|
// the snapshot inbound, like production nodes do — the sync mirrors snapshot
|
|
// settings onto the central row, and the shared-accumulator guard reads the
|
|
// clients out of those settings.
|
|
func syncNodeWithSettings(t *testing.T, svc *InboundService, nodeID int, tag, settings string, stats ...xray.ClientTraffic) {
|
|
t.Helper()
|
|
snap := &runtime.TrafficSnapshot{
|
|
Inbounds: []*model.Inbound{{Tag: tag, Settings: settings, ClientStats: stats}},
|
|
}
|
|
if _, err := svc.setRemoteTrafficLocked(nodeID, snap, false); err != nil {
|
|
t.Fatalf("setRemoteTrafficLocked node %d: %v", nodeID, err)
|
|
}
|
|
}
|
|
|
|
func readTraffic(t *testing.T, db *gorm.DB, email string) xray.ClientTraffic {
|
|
t.Helper()
|
|
var ct xray.ClientTraffic
|
|
if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).First(&ct).Error; err != nil {
|
|
t.Fatalf("read client_traffics %q: %v", email, err)
|
|
}
|
|
return ct
|
|
}
|
|
|
|
func assertUpDown(t *testing.T, ct xray.ClientTraffic, wantUp, wantDown int64, when string) {
|
|
t.Helper()
|
|
if ct.Up != wantUp || ct.Down != wantDown {
|
|
t.Errorf("%s: up=%d down=%d, want %d/%d", when, ct.Up, ct.Down, wantUp, wantDown)
|
|
}
|
|
}
|
|
|
|
func TestTwoNodesShareEmail_SumsCorrectly(t *testing.T) {
|
|
db := initTrafficTestDB(t)
|
|
createNodeInbound(t, db, 1, "n1-in", 41001)
|
|
createNodeInbound(t, db, 2, "n2-in", 41002)
|
|
svc := &InboundService{}
|
|
|
|
const email = "shared"
|
|
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
|
|
syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
|
|
|
|
assertUpDown(t, readTraffic(t, db, email), 100, 100, "after baselines")
|
|
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 150, Down: 150, Enable: true})
|
|
syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 260, Down: 260, Enable: true})
|
|
|
|
assertUpDown(t, readTraffic(t, db, email), 210, 210, "after both nodes grow")
|
|
}
|
|
|
|
func TestSingleNode_MirrorsCorrectly(t *testing.T) {
|
|
db := initTrafficTestDB(t)
|
|
createNodeInbound(t, db, 1, "n1-in", 41001)
|
|
svc := &InboundService{}
|
|
|
|
const email = "solo"
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 500, Down: 600, Enable: true})
|
|
assertUpDown(t, readTraffic(t, db, email), 500, 600, "first sync")
|
|
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 700, Down: 800, Enable: true})
|
|
assertUpDown(t, readTraffic(t, db, email), 700, 800, "second sync mirrors cumulative")
|
|
}
|
|
|
|
func TestUpgrade_PreExistingRow_NoDoubleCount(t *testing.T) {
|
|
db := initTrafficTestDB(t)
|
|
createNodeInbound(t, db, 1, "n1-in", 41001)
|
|
svc := &InboundService{}
|
|
|
|
const email = "legacy"
|
|
var ib model.Inbound
|
|
if err := db.Where("tag = ?", "n1-in").First(&ib).Error; err != nil {
|
|
t.Fatalf("load inbound: %v", err)
|
|
}
|
|
if err := db.Create(&xray.ClientTraffic{InboundId: ib.Id, Email: email, Up: 1000, Down: 2000, Enable: true}).Error; err != nil {
|
|
t.Fatalf("seed pre-existing row: %v", err)
|
|
}
|
|
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 1000, Down: 2000, Enable: true})
|
|
assertUpDown(t, readTraffic(t, db, email), 1000, 2000, "first snapshot must not double-count")
|
|
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 1100, Down: 2100, Enable: true})
|
|
assertUpDown(t, readTraffic(t, db, email), 1100, 2100, "growth after upgrade accrues")
|
|
}
|
|
|
|
func TestNodeCounterReset_Clamped(t *testing.T) {
|
|
db := initTrafficTestDB(t)
|
|
createNodeInbound(t, db, 1, "n1-in", 41001)
|
|
svc := &InboundService{}
|
|
|
|
const email = "restart"
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 900, Down: 900, Enable: true})
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 950, Down: 950, Enable: true})
|
|
assertUpDown(t, readTraffic(t, db, email), 950, 950, "before node reset")
|
|
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 50, Down: 50, Enable: true})
|
|
ct := readTraffic(t, db, email)
|
|
if ct.Up < 0 || ct.Down < 0 {
|
|
t.Fatalf("row went negative after node reset: up=%d down=%d", ct.Up, ct.Down)
|
|
}
|
|
assertUpDown(t, ct, 1000, 1000, "after node counter reset (clamped)")
|
|
}
|
|
|
|
func TestCentralReset_NoReAdd(t *testing.T) {
|
|
db := initTrafficTestDB(t)
|
|
createNodeInbound(t, db, 1, "n1-in", 41001)
|
|
createNodeInbound(t, db, 2, "n2-in", 41002)
|
|
svc := &InboundService{}
|
|
|
|
const email = "reset"
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
|
|
syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
|
|
syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
|
|
|
|
if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).
|
|
Updates(map[string]any{"up": 0, "down": 0}).Error; err != nil {
|
|
t.Fatalf("simulate central reset: %v", err)
|
|
}
|
|
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 210, Down: 210, Enable: true})
|
|
syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 205, Down: 205, Enable: true})
|
|
|
|
assertUpDown(t, readTraffic(t, db, email), 15, 15, "after central reset only increments accrue")
|
|
}
|
|
|
|
func TestInboundRemoval_KeepsSharedEmailRow(t *testing.T) {
|
|
db := initTrafficTestDB(t)
|
|
createNodeInboundWithClient(t, db, 1, "n1-in", 41001, "shared")
|
|
createNodeInboundWithClient(t, db, 2, "n2-in", 41002, "shared")
|
|
svc := &InboundService{}
|
|
|
|
const email = "shared"
|
|
settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
|
|
syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
|
|
syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 150, Down: 150, Enable: true})
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 260, Down: 260, Enable: true})
|
|
assertUpDown(t, readTraffic(t, db, email), 210, 210, "baseline sum")
|
|
|
|
// Node 1 rebuilt (reinstall / another master's reconcile): its inbound
|
|
// vanishes from the snapshot. The shared accumulator must survive — losing
|
|
// it would let the next node sync re-seed the row with that node's counter
|
|
// alone, showing only the last panel's number instead of the sum.
|
|
if _, err := svc.setRemoteTrafficLocked(1, &runtime.TrafficSnapshot{}, false); err != nil {
|
|
t.Fatalf("sync node 1 with empty snapshot: %v", err)
|
|
}
|
|
assertUpDown(t, readTraffic(t, db, email), 210, 210, "after node 1 inbound removal")
|
|
|
|
// Node 2 keeps accruing onto the surviving row.
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 300, Down: 300, Enable: true})
|
|
assertUpDown(t, readTraffic(t, db, email), 250, 250, "after node 2 grows")
|
|
}
|
|
|
|
func TestClientGoneFromOneNode_KeepsSharedEmailRow(t *testing.T) {
|
|
db := initTrafficTestDB(t)
|
|
createNodeInboundWithClient(t, db, 1, "n1-in", 41001, "shared")
|
|
createNodeInboundWithClient(t, db, 2, "n2-in", 41002, "shared")
|
|
svc := &InboundService{}
|
|
|
|
const email = "shared"
|
|
settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
|
|
syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
|
|
|
|
// Client detached from node 1's inbound only: its stats vanish from that
|
|
// inbound's snapshot while node 2 still hosts the email.
|
|
syncNodeWithSettings(t, svc, 1, "n1-in", `{"clients": []}`)
|
|
assertUpDown(t, readTraffic(t, db, email), 100, 100, "after client left node 1")
|
|
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 240, Down: 240, Enable: true})
|
|
assertUpDown(t, readTraffic(t, db, email), 140, 140, "node 2 keeps accruing")
|
|
}
|
|
|
|
// TestStatsUnderSiblingInbound_KeepsNodeBaseline reproduces the recurring
|
|
// sweep bug behind #5202: the client is attached to two inbounds of the SAME
|
|
// node, the node reports its stats under n1-a only, but the master-side row
|
|
// is owned by n1-b's mirror. The per-email sweep for n1-b must not drop the
|
|
// (node, email) baseline that n1-a's delta computation needs — doing so every
|
|
// cycle froze the client's counter permanently.
|
|
func TestStatsUnderSiblingInbound_KeepsNodeBaseline(t *testing.T) {
|
|
db := initTrafficTestDB(t)
|
|
createNodeInboundWithClient(t, db, 1, "n1-a", 41001, "fresh")
|
|
createNodeInbound(t, db, 1, "n1-b", 41002)
|
|
svc := &InboundService{}
|
|
|
|
const email = "fresh"
|
|
var ibB model.Inbound
|
|
if err := db.Where("tag = ?", "n1-b").First(&ibB).Error; err != nil {
|
|
t.Fatalf("load n1-b: %v", err)
|
|
}
|
|
// Master-side row created when the client was added on the panel, owned by
|
|
// n1-b's mirror (e.g. the client form targeted that inbound).
|
|
if err := db.Create(&xray.ClientTraffic{InboundId: ibB.Id, Email: email, Enable: true}).Error; err != nil {
|
|
t.Fatalf("seed master row: %v", err)
|
|
}
|
|
|
|
settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
|
|
sync := func(up, down int64) {
|
|
t.Helper()
|
|
snap := &runtime.TrafficSnapshot{Inbounds: []*model.Inbound{
|
|
{Tag: "n1-a", Settings: settings, ClientStats: []xray.ClientTraffic{{Email: email, Up: up, Down: down, Enable: true}}},
|
|
{Tag: "n1-b", Settings: `{"clients": []}`},
|
|
}}
|
|
if _, err := svc.setRemoteTrafficLocked(1, snap, false); err != nil {
|
|
t.Fatalf("sync: %v", err)
|
|
}
|
|
}
|
|
|
|
sync(630, 630)
|
|
var baselines int64
|
|
if err := db.Model(&model.NodeClientTraffic{}).Where("node_id = ? AND email = ?", 1, email).Count(&baselines).Error; err != nil {
|
|
t.Fatalf("count baselines: %v", err)
|
|
}
|
|
if baselines != 1 {
|
|
t.Fatalf("baseline must survive the sibling-inbound sweep, found %d rows", baselines)
|
|
}
|
|
|
|
sync(700, 700)
|
|
assertUpDown(t, readTraffic(t, db, email), 70, 70, "delta accrues once baseline survives")
|
|
}
|
|
|
|
func TestDelClientStat_CleansNodeBaselines(t *testing.T) {
|
|
db := initTrafficTestDB(t)
|
|
svc := &InboundService{}
|
|
|
|
const email = "gone"
|
|
if err := db.Create(&xray.ClientTraffic{InboundId: 1, Email: email, Enable: true}).Error; err != nil {
|
|
t.Fatalf("seed client_traffics: %v", err)
|
|
}
|
|
if err := db.Create(&model.NodeClientTraffic{NodeId: 1, Email: email, Up: 10, Down: 10}).Error; err != nil {
|
|
t.Fatalf("seed node baseline 1: %v", err)
|
|
}
|
|
if err := db.Create(&model.NodeClientTraffic{NodeId: 2, Email: email, Up: 20, Down: 20}).Error; err != nil {
|
|
t.Fatalf("seed node baseline 2: %v", err)
|
|
}
|
|
|
|
if err := svc.DelClientStat(db, email); err != nil {
|
|
t.Fatalf("DelClientStat: %v", err)
|
|
}
|
|
|
|
var cnt int64
|
|
if err := db.Model(&model.NodeClientTraffic{}).Where("email = ?", email).Count(&cnt).Error; err != nil {
|
|
t.Fatalf("count baselines: %v", err)
|
|
}
|
|
if cnt != 0 {
|
|
t.Errorf("expected node baselines cleaned, found %d", cnt)
|
|
}
|
|
}
|
|
|
|
func TestNodeDelete_CleansNodeBaselines(t *testing.T) {
|
|
db := initTrafficTestDB(t)
|
|
nodeSvc := NodeService{}
|
|
|
|
if err := db.Create(&model.NodeClientTraffic{NodeId: 7, Email: "a", Up: 1, Down: 1}).Error; err != nil {
|
|
t.Fatalf("seed node 7 a: %v", err)
|
|
}
|
|
if err := db.Create(&model.NodeClientTraffic{NodeId: 7, Email: "b", Up: 2, Down: 2}).Error; err != nil {
|
|
t.Fatalf("seed node 7 b: %v", err)
|
|
}
|
|
if err := db.Create(&model.NodeClientTraffic{NodeId: 8, Email: "c", Up: 3, Down: 3}).Error; err != nil {
|
|
t.Fatalf("seed node 8 c: %v", err)
|
|
}
|
|
|
|
if err := nodeSvc.Delete(7); err != nil {
|
|
t.Fatalf("NodeService.Delete(7): %v", err)
|
|
}
|
|
|
|
var sevenCnt, eightCnt int64
|
|
db.Model(&model.NodeClientTraffic{}).Where("node_id = ?", 7).Count(&sevenCnt)
|
|
db.Model(&model.NodeClientTraffic{}).Where("node_id = ?", 8).Count(&eightCnt)
|
|
if sevenCnt != 0 {
|
|
t.Errorf("node 7 baselines not cleaned: %d remain", sevenCnt)
|
|
}
|
|
if eightCnt != 1 {
|
|
t.Errorf("node 8 baseline should survive, found %d", eightCnt)
|
|
}
|
|
}
|