fix(xray): reconcile client auto-disable through the API instead of a forced restart

When a client expired or hit its traffic limit, XrayTrafficJob called
RestartXray(true), stopping the whole process and dropping every live
connection on every inbound (#5712 reported this as XHTTP on 443 dying) —
even though disableInvalidClients had already removed the user from the
running core over gRPC. The force restart existed only to re-sync the
process's config snapshot.

Switch the job to a non-forced restart and teach ComputeHotDiff to express
a client-only inbound change as per-user AlterInbound operations for
vless/vmess/trojan, so the reconcile is a no-op RemoveUser plus a snapshot
update rather than a handler swap that would still blip that inbound's
listener. Anything beyond the clients list still falls back to handler
replacement or a full restart as before.

Closes #5712
This commit is contained in:
MHSanaei
2026-07-02 09:26:53 +02:00
parent 1153d5db8c
commit e5b56c9444
5 changed files with 220 additions and 4 deletions
+2 -2
View File
@@ -50,8 +50,8 @@ func (j *XrayTrafficJob) Run() {
logger.Warning("get RestartXrayOnClientDisable failed:", settingErr) logger.Warning("get RestartXrayOnClientDisable failed:", settingErr)
} }
if restartOnDisable { if restartOnDisable {
if err := j.xrayService.RestartXray(true); err != nil { if err := j.xrayService.RestartXray(false); err != nil {
logger.Warning("restart xray after disabling clients failed:", err) logger.Warning("reconcile xray after disabling clients failed:", err)
j.xrayService.SetToNeedRestart() j.xrayService.SetToNeedRestart()
} }
} }
+25
View File
@@ -1004,6 +1004,12 @@ func (s *XrayService) tryHotApply(newCfg *xray.Config) bool {
// Removals first so changed handlers and port swaps never collide with // Removals first so changed handlers and port swaps never collide with
// the additions that follow. // the additions that follow.
for _, u := range diff.RemovedUsers {
if err := hotAPI.RemoveUser(u.Tag, u.Email); err != nil && !xray.IsMissingHandlerErr(err) {
logger.Info("hot apply: remove user [", u.Email, "] from [", u.Tag, "] failed:", err)
return false
}
}
for _, tag := range diff.RemovedInboundTags { for _, tag := range diff.RemovedInboundTags {
if err := hotAPI.DelInbound(tag); err != nil && !xray.IsMissingHandlerErr(err) { if err := hotAPI.DelInbound(tag); err != nil && !xray.IsMissingHandlerErr(err) {
logger.Info("hot apply: remove inbound [", tag, "] failed:", err) logger.Info("hot apply: remove inbound [", tag, "] failed:", err)
@@ -1028,6 +1034,12 @@ func (s *XrayService) tryHotApply(newCfg *xray.Config) bool {
return false return false
} }
} }
for _, u := range diff.AddedUsers {
if err := addUserReconciling(&hotAPI, u); err != nil {
logger.Info("hot apply: add user [", u.Email, "] to [", u.Tag, "] failed:", err)
return false
}
}
if diff.RoutingConfig != nil { if diff.RoutingConfig != nil {
if err := hotAPI.ApplyRoutingConfig(diff.RoutingConfig); err != nil { if err := hotAPI.ApplyRoutingConfig(diff.RoutingConfig); err != nil {
logger.Info("hot apply: apply routing config failed:", err) logger.Info("hot apply: apply routing config failed:", err)
@@ -1039,6 +1051,19 @@ func (s *XrayService) tryHotApply(newCfg *xray.Config) bool {
return true return true
} }
// addUserReconciling adds a user, and on an email conflict (the user was
// already applied through the runtime API) replaces the existing user instead.
func addUserReconciling(api *xray.XrayAPI, u xray.UserOp) error {
err := api.AddUser(u.Protocol, u.Tag, u.User)
if err == nil || !xray.IsUserExistsErr(err) {
return err
}
if delErr := api.RemoveUser(u.Tag, u.Email); delErr != nil && !xray.IsMissingHandlerErr(delErr) {
return delErr
}
return api.AddUser(u.Protocol, u.Tag, u.User)
}
// addInboundReconciling adds an inbound, and on a tag conflict (the handler // addInboundReconciling adds an inbound, and on a tag conflict (the handler
// was already created through the runtime API while the stored snapshot was // was already created through the runtime API while the stored snapshot was
// stale) replaces the existing handler instead. // stale) replaces the existing handler instead.
+9
View File
@@ -397,6 +397,15 @@ func IsExistingTagErr(err error) bool {
return strings.Contains(strings.ToLower(err.Error()), "existing tag") return strings.Contains(strings.ToLower(err.Error()), "existing tag")
} }
// IsUserExistsErr reports whether err is xray's response to adding a user whose
// email is already registered on the inbound.
func IsUserExistsErr(err error) bool {
if err == nil {
return false
}
return strings.Contains(strings.ToLower(err.Error()), "already exists")
}
// ensureXrayAssetLocation makes geoip.dat/geosite.dat resolvable when xray-core // ensureXrayAssetLocation makes geoip.dat/geosite.dat resolvable when xray-core
// config builders run inside the panel process. The xray binary resolves assets // config builders run inside the panel process. The xray binary resolves assets
// relative to its own executable, but the panel binary lives one level above // relative to its own executable, but the panel binary lives one level above
+108
View File
@@ -15,15 +15,27 @@ import (
type HotDiff struct { type HotDiff struct {
RemovedInboundTags []string RemovedInboundTags []string
AddedInbounds [][]byte AddedInbounds [][]byte
RemovedUsers []UserOp
AddedUsers []UserOp
RemovedOutboundTags []string RemovedOutboundTags []string
AddedOutbounds [][]byte AddedOutbounds [][]byte
RoutingConfig []byte // full new routing section; nil when unchanged RoutingConfig []byte // full new routing section; nil when unchanged
} }
// UserOp is a per-user AlterInbound operation; User is nil for removals.
type UserOp struct {
Tag string
Protocol string
Email string
User map[string]any
}
// Empty reports whether the diff contains no operations. // Empty reports whether the diff contains no operations.
func (d *HotDiff) Empty() bool { func (d *HotDiff) Empty() bool {
return len(d.RemovedInboundTags) == 0 && return len(d.RemovedInboundTags) == 0 &&
len(d.AddedInbounds) == 0 && len(d.AddedInbounds) == 0 &&
len(d.RemovedUsers) == 0 &&
len(d.AddedUsers) == 0 &&
len(d.RemovedOutboundTags) == 0 && len(d.RemovedOutboundTags) == 0 &&
len(d.AddedOutbounds) == 0 && len(d.AddedOutbounds) == 0 &&
d.RoutingConfig == nil d.RoutingConfig == nil
@@ -112,6 +124,9 @@ func diffInbounds(oldCfg, newCfg *Config, diff *HotDiff) bool {
logger.Debug("hot diff: inbound [", oldIb.Tag, "] carries a reverse-tagged client, forcing a full restart instead of a hot swap") logger.Debug("hot diff: inbound [", oldIb.Tag, "] carries a reverse-tagged client, forcing a full restart instead of a hot swap")
return false return false
} }
if exists && diffInboundUsers(oldIb, newIb, diff) {
continue
}
diff.RemovedInboundTags = append(diff.RemovedInboundTags, oldIb.Tag) diff.RemovedInboundTags = append(diff.RemovedInboundTags, oldIb.Tag)
if exists { if exists {
raw, err := json.Marshal(newIb) raw, err := json.Marshal(newIb)
@@ -138,6 +153,99 @@ func diffInbounds(oldCfg, newCfg *Config, diff *HotDiff) bool {
return true return true
} }
var userDiffableProtocols = map[string]struct{}{"vless": {}, "vmess": {}, "trojan": {}}
// diffInboundUsers emits per-user AlterInbound ops when two same-tag inbounds
// differ only in settings.clients, so the handler (and its listener) survives.
func diffInboundUsers(oldIb, newIb *InboundConfig, diff *HotDiff) bool {
if oldIb.Port != newIb.Port || oldIb.Protocol != newIb.Protocol || oldIb.Tag != newIb.Tag {
return false
}
if _, ok := userDiffableProtocols[oldIb.Protocol]; !ok {
return false
}
if !rawEqualNormalized(oldIb.Listen, newIb.Listen) ||
!rawEqualNormalized(oldIb.StreamSettings, newIb.StreamSettings) ||
!rawEqualNormalized(oldIb.Sniffing, newIb.Sniffing) {
return false
}
oldClients, oldRest, ok := splitSettingsClients(oldIb.Settings)
if !ok {
return false
}
newClients, newRest, ok := splitSettingsClients(newIb.Settings)
if !ok {
return false
}
if !bytes.Equal(oldRest, newRest) {
return false
}
for email, oldC := range oldClients {
newC, exists := newClients[email]
if exists && bytes.Equal(oldC.norm, newC.norm) {
continue
}
diff.RemovedUsers = append(diff.RemovedUsers, UserOp{Tag: oldIb.Tag, Protocol: oldIb.Protocol, Email: email})
if exists {
diff.AddedUsers = append(diff.AddedUsers, UserOp{Tag: oldIb.Tag, Protocol: oldIb.Protocol, Email: email, User: newC.user})
}
}
for email, newC := range newClients {
if _, exists := oldClients[email]; !exists {
diff.AddedUsers = append(diff.AddedUsers, UserOp{Tag: oldIb.Tag, Protocol: oldIb.Protocol, Email: email, User: newC.user})
}
}
return true
}
type clientEntry struct {
user map[string]any
norm []byte
}
// splitSettingsClients indexes settings.clients by email and returns the rest of
// the settings in canonical form; ok is false when a client has no unique email.
func splitSettingsClients(raw json_util.RawMessage) (map[string]clientEntry, []byte, bool) {
if len(raw) == 0 {
return nil, nil, false
}
settings := map[string]any{}
decoder := json.NewDecoder(bytes.NewReader(raw))
decoder.UseNumber()
if err := decoder.Decode(&settings); err != nil {
return nil, nil, false
}
clientsRaw, hasClients := settings["clients"].([]any)
if !hasClients {
return nil, nil, false
}
clients := make(map[string]clientEntry, len(clientsRaw))
for _, c := range clientsRaw {
obj, ok := c.(map[string]any)
if !ok {
return nil, nil, false
}
email, _ := obj["email"].(string)
if email == "" {
return nil, nil, false
}
if _, dup := clients[email]; dup {
return nil, nil, false
}
norm, err := json.Marshal(obj)
if err != nil {
return nil, nil, false
}
clients[email] = clientEntry{user: obj, norm: norm}
}
delete(settings, "clients")
rest, err := json.Marshal(settings)
if err != nil {
return nil, nil, false
}
return clients, rest, true
}
func inboundHasReverseClient(ib *InboundConfig) bool { func inboundHasReverseClient(ib *InboundConfig) bool {
if ib == nil { if ib == nil {
return false return false
+76 -2
View File
@@ -148,8 +148,8 @@ func TestComputeHotDiff_StaticSectionChangeNeedsRestart(t *testing.T) {
func TestComputeHotDiff_InboundAddRemoveChange(t *testing.T) { func TestComputeHotDiff_InboundAddRemoveChange(t *testing.T) {
oldCfg := makeHotConfig() oldCfg := makeHotConfig()
newCfg := makeHotConfig() newCfg := makeHotConfig()
// change existing // change existing beyond the clients list, so no user-level shortcut applies
newCfg.InboundConfigs[1].Settings = json_util.RawMessage(`{"clients":[{"email":"a"}]}`) newCfg.InboundConfigs[1].Settings = json_util.RawMessage(`{"clients":[],"decryption":"none"}`)
// add new // add new
newCfg.InboundConfigs = append(newCfg.InboundConfigs, InboundConfig{ newCfg.InboundConfigs = append(newCfg.InboundConfigs, InboundConfig{
Port: 2080, Protocol: "vmess", Tag: "inbound-2080", Port: 2080, Protocol: "vmess", Tag: "inbound-2080",
@@ -171,6 +171,80 @@ func TestComputeHotDiff_InboundAddRemoveChange(t *testing.T) {
} }
} }
func TestComputeHotDiff_ClientOnlyChangeUsesUserOps(t *testing.T) {
oldCfg := makeHotConfig()
oldCfg.InboundConfigs[1].Settings = json_util.RawMessage(`{"clients":[{"email":"a","id":"uuid-a"},{"email":"b","id":"uuid-b"}],"decryption":"none"}`)
newCfg := makeHotConfig()
// b expired and is stripped from the generated config (#5712); a's id rotated.
newCfg.InboundConfigs[1].Settings = json_util.RawMessage(`{"clients":[{"email":"a","id":"uuid-a2"},{"email":"c","id":"uuid-c"}],"decryption":"none"}`)
diff, ok := ComputeHotDiff(oldCfg, newCfg)
if !ok {
t.Fatal("client-only change must be hot-appliable")
}
if len(diff.RemovedInboundTags) != 0 || len(diff.AddedInbounds) != 0 {
t.Fatalf("client-only change must not replace the handler, got %+v", diff)
}
removed := map[string]bool{}
for _, u := range diff.RemovedUsers {
if u.Tag != "inbound-1080" || u.Protocol != "vless" {
t.Fatalf("removed user op has wrong target: %+v", u)
}
removed[u.Email] = true
}
if len(removed) != 2 || !removed["a"] || !removed["b"] {
t.Fatalf("expected users a (changed) and b (gone) removed, got %v", removed)
}
added := map[string]string{}
for _, u := range diff.AddedUsers {
id, _ := u.User["id"].(string)
added[u.Email] = id
}
if len(added) != 2 || added["a"] != "uuid-a2" || added["c"] != "uuid-c" {
t.Fatalf("expected users a (new id) and c added, got %v", added)
}
}
func TestComputeHotDiff_ClientChangeFallsBackToReplace(t *testing.T) {
cases := []struct {
name string
mutate func(cfg *Config)
}{
{
name: "unsupported protocol",
mutate: func(cfg *Config) {
cfg.InboundConfigs[1].Protocol = "shadowsocks"
},
},
{
name: "client without email",
mutate: func(cfg *Config) {
cfg.InboundConfigs[1].Settings = json_util.RawMessage(`{"clients":[{"id":"uuid-a"}]}`)
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
oldCfg := makeHotConfig()
newCfg := makeHotConfig()
tc.mutate(oldCfg)
tc.mutate(newCfg)
newCfg.InboundConfigs[1].Settings = json_util.RawMessage(`{"clients":[{"email":"x","id":"uuid-x","password":"pw"}]}`)
diff, ok := ComputeHotDiff(oldCfg, newCfg)
if !ok {
t.Fatal("change must still be hot-appliable via handler replacement")
}
if len(diff.RemovedUsers) != 0 || len(diff.AddedUsers) != 0 {
t.Fatalf("expected no user ops, got %+v", diff)
}
if len(diff.RemovedInboundTags) != 1 || len(diff.AddedInbounds) != 1 {
t.Fatalf("expected handler replacement, got %+v", diff)
}
})
}
}
func TestComputeHotDiff_ApiInboundChangeNeedsRestart(t *testing.T) { func TestComputeHotDiff_ApiInboundChangeNeedsRestart(t *testing.T) {
newCfg := makeHotConfig() newCfg := makeHotConfig()
newCfg.InboundConfigs[0].Port = 62790 newCfg.InboundConfigs[0].Port = 62790