fix(node): stop force-restarting a node's Xray when its clients auto-disable

When a depleted or expired client lived on a node, the master pushed the
updated inbound (client flipped off) to the node and then also told the
node to fully restart Xray. The push alone already applies the disable:
the node updates that one inbound on its running core. The extra restart
dropped every live connection on the node each time any of its clients
crossed a quota or expiry, and a restart that failed to come back left
the node forwarding nothing until someone restarted Xray by hand.

This mirrors e5b56c94, which removed the same forced restart from the
local auto-disable path; remote nodes now get the same graceful
reconcile-by-push treatment.

Closes #5740
This commit is contained in:
MHSanaei
2026-07-02 13:27:36 +02:00
parent 62f303905e
commit 4d6f2ddd97
4 changed files with 18 additions and 52 deletions
+2 -2
View File
@@ -74,7 +74,7 @@ func TestDepletedCond_ProbeGuard(t *testing.T) {
t.Fatalf("empty globals must use the local-only predicate") t.Fatalf("empty globals must use the local-only predicate")
} }
seedClientRow(t, "local-cap", 1, 600, 600, 1000) seedClientRow(t, "local-cap", 1, 600, 600, 1000)
if _, count, _, err := svc.disableInvalidClients(db); err != nil { if _, count, err := svc.disableInvalidClients(db); err != nil {
t.Fatalf("disableInvalidClients: %v", err) t.Fatalf("disableInvalidClients: %v", err)
} else if count != 1 { } else if count != 1 {
t.Fatalf("local over-quota client must be disabled, disabled %d", count) t.Fatalf("local over-quota client must be disabled, disabled %d", count)
@@ -100,7 +100,7 @@ func TestGlobalUsage_DisablesClient(t *testing.T) {
t.Fatalf("AcceptGlobalTraffic: %v", err) t.Fatalf("AcceptGlobalTraffic: %v", err)
} }
if _, count, _, err := svc.disableInvalidClients(db); err != nil { if _, count, err := svc.disableInvalidClients(db); err != nil {
t.Fatalf("disableInvalidClients: %v", err) t.Fatalf("disableInvalidClients: %v", err)
} else if count != 1 { } else if count != 1 {
t.Fatalf("expected 1 client disabled, got %d", count) t.Fatalf("expected 1 client disabled, got %d", count)
+10 -18
View File
@@ -80,7 +80,7 @@ func depletedCond(tx *gorm.DB) string {
return depletedClientsCondLocal return depletedClientsCondLocal
} }
func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int, error) { func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) {
now := time.Now().Unix() * 1000 now := time.Now().Unix() * 1000
needRestart := false needRestart := false
cond := depletedCond(tx) cond := depletedCond(tx)
@@ -90,10 +90,10 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int,
Where(cond+" AND enable = ?", now, true). Where(cond+" AND enable = ?", now, true).
Find(&depletedRows).Error Find(&depletedRows).Error
if err != nil { if err != nil {
return false, 0, nil, err return false, 0, err
} }
if len(depletedRows) == 0 { if len(depletedRows) == 0 {
return false, 0, nil, nil return false, 0, nil
} }
depletedEmails := make([]string, 0, len(depletedRows)) depletedEmails := make([]string, 0, len(depletedRows))
@@ -121,7 +121,7 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int,
WHERE clients.email IN ? WHERE clients.email IN ?
`, depletedEmails).Scan(&targets).Error `, depletedEmails).Scan(&targets).Error
if err != nil { if err != nil {
return false, 0, nil, err return false, 0, err
} }
} }
@@ -168,7 +168,7 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int,
err = result.Error err = result.Error
count := result.RowsAffected count := result.RowsAffected
if err != nil { if err != nil {
return needRestart, count, nil, err return needRestart, count, err
} }
if len(depletedEmails) > 0 { if len(depletedEmails) > 0 {
@@ -179,7 +179,6 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int,
} }
} }
disabledNodeIDs := make(map[int]struct{})
for inboundID, group := range remoteByInbound { for inboundID, group := range remoteByInbound {
emails := make(map[string]struct{}, len(group)) emails := make(map[string]struct{}, len(group))
for _, t := range group { for _, t := range group {
@@ -188,21 +187,10 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int,
if pushErr := s.disableRemoteClients(tx, inboundID, emails); pushErr != nil { if pushErr := s.disableRemoteClients(tx, inboundID, emails); pushErr != nil {
logger.Warning("disableInvalidClients: push to remote failed for inbound", inboundID, ":", pushErr) logger.Warning("disableInvalidClients: push to remote failed for inbound", inboundID, ":", pushErr)
needRestart = true needRestart = true
} else {
for _, t := range group {
if t.NodeID != nil {
disabledNodeIDs[*t.NodeID] = struct{}{}
}
}
} }
} }
nodeIDs := make([]int, 0, len(disabledNodeIDs)) return needRestart, count, nil
for nodeID := range disabledNodeIDs {
nodeIDs = append(nodeIDs, nodeID)
}
return needRestart, count, nodeIDs, nil
} }
// markClientsDisabledInSettings flips client.enable=false in the inbound's // markClientsDisabledInSettings flips client.enable=false in the inbound's
@@ -255,6 +243,10 @@ func (s *InboundService) markClientsDisabledInSettings(tx *gorm.DB, inboundID in
return &snapshot, &ib, nil return &snapshot, &ib, nil
} }
// disableRemoteClients flips the clients off in the inbound's stored settings
// and pushes the updated inbound to its node, which applies it to its own
// running Xray. That push is the whole reconcile — restarting the node's Xray
// afterwards would drop every live connection on the node for nothing (#5740).
func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error { func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error {
oldSnapshot, ib, err := s.markClientsDisabledInSettings(tx, inboundID, emails) oldSnapshot, ib, err := s.markClientsDisabledInSettings(tx, inboundID, emails)
if err != nil { if err != nil {
-22
View File
@@ -895,28 +895,6 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
return structuralChange, nil return structuralChange, nil
} }
func (s *InboundService) restartRemoteNodesOnDisable(nodeIDs []int) {
restartOnDisable, err := (&SettingService{}).GetRestartXrayOnClientDisable()
if err != nil {
logger.Warning("disableInvalidClients: get RestartXrayOnClientDisable failed:", err)
return
}
if !restartOnDisable {
return
}
for _, nodeID := range nodeIDs {
nodeIDCopy := nodeID
rt, rtErr := runtime.GetManager().RuntimeFor(&nodeIDCopy)
if rtErr != nil {
logger.Warning("disableInvalidClients: get runtime for node", nodeID, "failed:", rtErr)
continue
}
if rtErr = rt.RestartXray(context.Background()); rtErr != nil {
logger.Warning("disableInvalidClients: restart xray on node", nodeID, "failed:", rtErr)
}
}
}
func (s *InboundService) GetOnlineClients() []string { func (s *InboundService) GetOnlineClients() []string {
if p == nil { if p == nil {
return []string{} return []string{}
+6 -10
View File
@@ -19,19 +19,15 @@ import (
) )
func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) { func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) {
var disabledNodeIDs []int
err = submitTrafficWrite(func() error { err = submitTrafficWrite(func() error {
var inner error var inner error
needRestart, clientsDisabled, disabledNodeIDs, inner = s.addTrafficLocked(inboundTraffics, clientTraffics) needRestart, clientsDisabled, inner = s.addTrafficLocked(inboundTraffics, clientTraffics)
return inner return inner
}) })
if err == nil && len(disabledNodeIDs) > 0 {
s.restartRemoteNodesOnDisable(disabledNodeIDs)
}
return return
} }
func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, []int, error) { func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) {
var err error var err error
db := database.GetDB() db := database.GetDB()
tx := db.Begin() tx := db.Begin()
@@ -45,11 +41,11 @@ func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clien
}() }()
err = s.addInboundTraffic(tx, inboundTraffics) err = s.addInboundTraffic(tx, inboundTraffics)
if err != nil { if err != nil {
return false, false, nil, err return false, false, err
} }
err = s.addClientTraffic(tx, clientTraffics) err = s.addClientTraffic(tx, clientTraffics)
if err != nil { if err != nil {
return false, false, nil, err return false, false, err
} }
needRestart0, count, err := s.autoRenewClients(tx) needRestart0, count, err := s.autoRenewClients(tx)
@@ -60,7 +56,7 @@ func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clien
} }
disabledClientsCount := int64(0) disabledClientsCount := int64(0)
needRestart1, count, disabledNodeIDs, err := s.disableInvalidClients(tx) needRestart1, count, err := s.disableInvalidClients(tx)
if err != nil { if err != nil {
logger.Warning("Error in disabling invalid clients:", err) logger.Warning("Error in disabling invalid clients:", err)
} else if count > 0 { } else if count > 0 {
@@ -74,7 +70,7 @@ func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clien
} else if count > 0 { } else if count > 0 {
logger.Debugf("%v inbounds disabled", count) logger.Debugf("%v inbounds disabled", count)
} }
return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, disabledNodeIDs, nil return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, nil
} }
func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error { func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {