Files
3x-ui/internal/web/service/node_bulk_dispatch_test.go
T
MHSanaei a0989e0f4d fix(node): stop client edits from tearing down node inbounds and harden reconcile fingerprints
A client save on the master always stamped a fresh updated_at, marked
the node dirty, and let the 5s sync push a full inbounds/update to the
node, where applying it removes and re-adds the Xray handler - killing
live traffic on every edit, including no-op saves (open the editor,
click Save). Nodes stayed online with Xray running while forwarding
nothing until a manual Xray restart.

- No-op client saves preserve the client's updated_at and return before
  any DB write, runtime RPC, or node dirty mark when the effective
  settings did not change.
- Successful per-client add/update/delete pushes advance the node's
  reconcile-skip fingerprint only when the recorded fingerprint proves
  the node held the exact pre-edit payload and every push in the edit
  succeeded (Remote.AdvancePushedInbound). Anything unproven keeps the
  stale fingerprint so the dirty reconcile still sends the full inbound.
  Unconditional stamping would certify folded bulk changes (threshold,
  flow change, offline edit) or partially failed batches as delivered:
  a folded 41->6 bulk delete followed by one live edit left the node
  permanently serving all 41 clients in end-to-end testing, with the
  snapshot adoption then resurrecting the deleted clients on the master.
- DeleteUser treats only an envelope-level not-found as already deleted;
  an HTTP 404 from an old node build without the detach endpoint
  surfaces as an error instead of certifying an undelivered delete.
  cacheDel drops the fingerprint alongside the id cache so DelInbound
  and tag renames leave no stale skip entry.
- Adopting the node's own settings serialization into the master row now
  also stamps the fingerprint (RecordAdoptedInbound). Without it the
  serialization round-trip invalidated the fingerprint one sync tick
  after every push, so each edit degraded back to a full teardown push.
- UpdateInboundClient applies the Shadowsocks method normalization
  before the no-op comparison (real method changes bump updated_at, SS
  no-op edits are detected) and syncs the generated subId into the
  pushed client so the node cannot mint a different one.

Verified with a two-panel docker deployment: no-op saves produce zero
node requests, real edits send one lightweight clients/update RPC with
zero full inbound updates and zero handler teardowns, and folded bulk
deletes still converge.

Based on PR #5778 by @rqzbeh.

Closes #5764
Closes #5771
2026-07-05 02:06:58 +02:00

247 lines
8.0 KiB
Go

package service
import (
"context"
"fmt"
"sync/atomic"
"testing"
"github.com/google/uuid"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
"github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
)
// fakeNodeRuntime is a runtime.Runtime stub that counts the per-client dispatch
// calls so a test can assert a bulk op does NOT stream one RPC per client.
type fakeNodeRuntime struct {
addClient atomic.Int32
deleteUser atomic.Int32
updateUser atomic.Int32
}
func (f *fakeNodeRuntime) Name() string { return "fake-node" }
func (f *fakeNodeRuntime) AddInbound(context.Context, *model.Inbound) error { return nil }
func (f *fakeNodeRuntime) DelInbound(context.Context, *model.Inbound) error { return nil }
func (f *fakeNodeRuntime) UpdateInbound(context.Context, *model.Inbound, *model.Inbound) error {
return nil
}
func (f *fakeNodeRuntime) AddUser(context.Context, *model.Inbound, map[string]any) error { return nil }
func (f *fakeNodeRuntime) RemoveUser(context.Context, *model.Inbound, string) error { return nil }
func (f *fakeNodeRuntime) UpdateUser(context.Context, *model.Inbound, string, model.Client) error {
f.updateUser.Add(1)
return nil
}
func (f *fakeNodeRuntime) DeleteUser(context.Context, *model.Inbound, string) error {
f.deleteUser.Add(1)
return nil
}
func (f *fakeNodeRuntime) AddClient(context.Context, *model.Inbound, model.Client) error {
f.addClient.Add(1)
return nil
}
func (f *fakeNodeRuntime) RestartXray(context.Context) error { return nil }
func (f *fakeNodeRuntime) ResetClientTraffic(context.Context, *model.Inbound, string) error {
return nil
}
func (f *fakeNodeRuntime) ResetInboundTraffic(context.Context, *model.Inbound) error { return nil }
func (f *fakeNodeRuntime) ResetAllTraffics(context.Context) error { return nil }
// setupNodeRuntime wires an online node + a fake runtime override and returns the
// node id and the fake so a test can drive the service node-dispatch path without
// a network node.
func setupNodeRuntime(t *testing.T) (int, *fakeNodeRuntime) {
t.Helper()
prev := runtime.GetManager()
mgr := runtime.NewManager(runtime.LocalDeps{APIPort: func() int { return 0 }, SetNeedRestart: func() {}})
runtime.SetManager(mgr)
t.Cleanup(func() { runtime.SetManager(prev) })
node := &model.Node{Name: "n1", Address: "127.0.0.1", Port: 2096, ApiToken: "tok", Enable: true, Status: "online"}
if err := database.GetDB().Create(node).Error; err != nil {
t.Fatalf("create node: %v", err)
}
fake := &fakeNodeRuntime{}
mgr.SetRuntimeOverride(node.Id, fake)
return node.Id, fake
}
func nodeInbound(t *testing.T, nodeID, port int, clients []model.Client) *model.Inbound {
t.Helper()
if clients == nil {
clients = []model.Client{}
}
ib := &model.Inbound{
UserId: 1, NodeID: &nodeID, Tag: fmt.Sprintf("in-%d", port), Enable: true,
Port: port, Protocol: model.VLESS, Settings: clientsSettings(t, clients),
}
if err := database.GetDB().Create(ib).Error; err != nil {
t.Fatalf("create node inbound: %v", err)
}
if err := (&ClientService{}).SyncInbound(nil, ib.Id, clients); err != nil {
t.Fatalf("seed SyncInbound: %v", err)
}
return ib
}
func makeNodeClients(n int) []model.Client {
out := make([]model.Client, n)
for i := range n {
out[i] = model.Client{ID: uuid.NewString(), Email: fmt.Sprintf("nu-%05d@x", i), Enable: true}
}
return out
}
// TestNodeBulk_LargeAddFoldsToDirty: adding more than the threshold of clients to
// an online node inbound must NOT stream one AddClient RPC per client; it marks
// the node dirty so a single reconcile push converges it instead.
func TestNodeBulk_LargeAddFoldsToDirty(t *testing.T) {
setupBulkDB(t)
nodeID, fake := setupNodeRuntime(t)
ib := nodeInbound(t, nodeID, 30001, nil)
svc := &ClientService{}
inboundSvc := &InboundService{}
add := makeNodeClients(nodeBulkPushThreshold + 10)
if _, err := svc.AddInboundClient(inboundSvc, &model.Inbound{Id: ib.Id, Protocol: model.VLESS, Settings: clientsSettings(t, add)}); err != nil {
t.Fatalf("AddInboundClient: %v", err)
}
if got := fake.addClient.Load(); got != 0 {
t.Fatalf("large add streamed %d AddClient RPCs, want 0 (should fold to dirty)", got)
}
if _, _, dirty, _, err := (&NodeService{}).NodeSyncState(nodeID); err != nil {
t.Fatalf("NodeSyncState: %v", err)
} else if !dirty {
t.Fatal("large add must mark the node dirty")
}
}
// TestNodeBulk_SmallAddPushesLive: a small add stays on the live per-client path.
func TestNodeBulk_SmallAddPushesLive(t *testing.T) {
setupBulkDB(t)
nodeID, fake := setupNodeRuntime(t)
ib := nodeInbound(t, nodeID, 30002, nil)
svc := &ClientService{}
inboundSvc := &InboundService{}
const small = 3
add := makeNodeClients(small)
if _, err := svc.AddInboundClient(inboundSvc, &model.Inbound{Id: ib.Id, Protocol: model.VLESS, Settings: clientsSettings(t, add)}); err != nil {
t.Fatalf("AddInboundClient: %v", err)
}
if got := fake.addClient.Load(); got != int32(small) {
t.Fatalf("small add streamed %d AddClient RPCs, want %d", got, small)
}
}
func TestNodeUpdateInboundClientNoopSkipsRuntimeAndDirty(t *testing.T) {
setupBulkDB(t)
nodeID, fake := setupNodeRuntime(t)
client := model.Client{
ID: uuid.NewString(),
Email: "noop@x",
SubID: "sub-noop",
Enable: true,
CreatedAt: 111,
UpdatedAt: 222,
}
ib := nodeInbound(t, nodeID, 30020, []model.Client{client})
svc := &ClientService{}
inboundSvc := &InboundService{}
if _, err := svc.UpdateInboundClient(inboundSvc, &model.Inbound{
Id: ib.Id,
Protocol: model.VLESS,
Settings: clientsSettings(t, []model.Client{client}),
}, client.Email); err != nil {
t.Fatalf("UpdateInboundClient: %v", err)
}
if got := fake.updateUser.Load(); got != 0 {
t.Fatalf("no-op update streamed %d UpdateUser RPCs, want 0", got)
}
if _, _, dirty, _, err := (&NodeService{}).NodeSyncState(nodeID); err != nil {
t.Fatalf("NodeSyncState: %v", err)
} else if dirty {
t.Fatal("no-op update must not mark the node dirty")
}
reloaded, err := inboundSvc.GetInbound(ib.Id)
if err != nil {
t.Fatalf("GetInbound: %v", err)
}
if reloaded.Settings != ib.Settings {
t.Fatal("no-op update rewrote inbound settings")
}
}
func TestNodeUpdateInboundClientLivePushKeepsDirtyBackup(t *testing.T) {
setupBulkDB(t)
nodeID, fake := setupNodeRuntime(t)
client := model.Client{
ID: uuid.NewString(),
Email: "edit@x",
SubID: "sub-edit",
Enable: true,
CreatedAt: 111,
UpdatedAt: 222,
}
ib := nodeInbound(t, nodeID, 30021, []model.Client{client})
edited := client
edited.Comment = "changed"
svc := &ClientService{}
inboundSvc := &InboundService{}
if _, err := svc.UpdateInboundClient(inboundSvc, &model.Inbound{
Id: ib.Id,
Protocol: model.VLESS,
Settings: clientsSettings(t, []model.Client{edited}),
}, client.Email); err != nil {
t.Fatalf("UpdateInboundClient: %v", err)
}
if got := fake.updateUser.Load(); got != 1 {
t.Fatalf("edit streamed %d UpdateUser RPCs, want 1", got)
}
if _, _, dirty, _, err := (&NodeService{}).NodeSyncState(nodeID); err != nil {
t.Fatalf("NodeSyncState: %v", err)
} else if !dirty {
t.Fatal("successful live update should keep node dirty as reconcile backup")
}
}
// TestNodeBulk_LargeDeleteFoldsToDirty: deleting more than the threshold from an
// online node inbound must fold into a reconcile rather than per-client deletes.
func TestNodeBulk_LargeDeleteFoldsToDirty(t *testing.T) {
setupBulkDB(t)
nodeID, fake := setupNodeRuntime(t)
seed := makeNodeClients(nodeBulkPushThreshold + 10)
nodeInbound(t, nodeID, 30003, seed)
svc := &ClientService{}
inboundSvc := &InboundService{}
emails := make([]string, len(seed))
for i := range seed {
emails[i] = seed[i].Email
}
if _, _, err := svc.BulkDelete(inboundSvc, emails, false); err != nil {
t.Fatalf("BulkDelete: %v", err)
}
if got := fake.deleteUser.Load(); got != 0 {
t.Fatalf("large delete streamed %d DeleteUser RPCs, want 0 (should fold to dirty)", got)
}
if _, _, dirty, _, err := (&NodeService{}).NodeSyncState(nodeID); err != nil {
t.Fatalf("NodeSyncState: %v", err)
} else if !dirty {
t.Fatal("large delete must mark the node dirty")
}
}