mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-07-04 20:04:20 +00:00
fix(node): stop force-restarting a node's Xray when its clients auto-disable
When a depleted or expired client lived on a node, the master pushed the
updated inbound (client flipped off) to the node and then also told the
node to fully restart Xray. The push alone already applies the disable:
the node updates that one inbound on its running core. The extra restart
dropped every live connection on the node each time any of its clients
crossed a quota or expiry, and a restart that failed to come back left
the node forwarding nothing until someone restarted Xray by hand.
This mirrors e5b56c94, which removed the same forced restart from the
local auto-disable path; remote nodes now get the same graceful
reconcile-by-push treatment.
Closes #5740
This commit is contained in:
@@ -74,7 +74,7 @@ func TestDepletedCond_ProbeGuard(t *testing.T) {
|
|||||||
t.Fatalf("empty globals must use the local-only predicate")
|
t.Fatalf("empty globals must use the local-only predicate")
|
||||||
}
|
}
|
||||||
seedClientRow(t, "local-cap", 1, 600, 600, 1000)
|
seedClientRow(t, "local-cap", 1, 600, 600, 1000)
|
||||||
if _, count, _, err := svc.disableInvalidClients(db); err != nil {
|
if _, count, err := svc.disableInvalidClients(db); err != nil {
|
||||||
t.Fatalf("disableInvalidClients: %v", err)
|
t.Fatalf("disableInvalidClients: %v", err)
|
||||||
} else if count != 1 {
|
} else if count != 1 {
|
||||||
t.Fatalf("local over-quota client must be disabled, disabled %d", count)
|
t.Fatalf("local over-quota client must be disabled, disabled %d", count)
|
||||||
@@ -100,7 +100,7 @@ func TestGlobalUsage_DisablesClient(t *testing.T) {
|
|||||||
t.Fatalf("AcceptGlobalTraffic: %v", err)
|
t.Fatalf("AcceptGlobalTraffic: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, count, _, err := svc.disableInvalidClients(db); err != nil {
|
if _, count, err := svc.disableInvalidClients(db); err != nil {
|
||||||
t.Fatalf("disableInvalidClients: %v", err)
|
t.Fatalf("disableInvalidClients: %v", err)
|
||||||
} else if count != 1 {
|
} else if count != 1 {
|
||||||
t.Fatalf("expected 1 client disabled, got %d", count)
|
t.Fatalf("expected 1 client disabled, got %d", count)
|
||||||
|
|||||||
@@ -80,7 +80,7 @@ func depletedCond(tx *gorm.DB) string {
|
|||||||
return depletedClientsCondLocal
|
return depletedClientsCondLocal
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int, error) {
|
func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) {
|
||||||
now := time.Now().Unix() * 1000
|
now := time.Now().Unix() * 1000
|
||||||
needRestart := false
|
needRestart := false
|
||||||
cond := depletedCond(tx)
|
cond := depletedCond(tx)
|
||||||
@@ -90,10 +90,10 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int,
|
|||||||
Where(cond+" AND enable = ?", now, true).
|
Where(cond+" AND enable = ?", now, true).
|
||||||
Find(&depletedRows).Error
|
Find(&depletedRows).Error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, 0, nil, err
|
return false, 0, err
|
||||||
}
|
}
|
||||||
if len(depletedRows) == 0 {
|
if len(depletedRows) == 0 {
|
||||||
return false, 0, nil, nil
|
return false, 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
depletedEmails := make([]string, 0, len(depletedRows))
|
depletedEmails := make([]string, 0, len(depletedRows))
|
||||||
@@ -121,7 +121,7 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int,
|
|||||||
WHERE clients.email IN ?
|
WHERE clients.email IN ?
|
||||||
`, depletedEmails).Scan(&targets).Error
|
`, depletedEmails).Scan(&targets).Error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, 0, nil, err
|
return false, 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -168,7 +168,7 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int,
|
|||||||
err = result.Error
|
err = result.Error
|
||||||
count := result.RowsAffected
|
count := result.RowsAffected
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return needRestart, count, nil, err
|
return needRestart, count, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(depletedEmails) > 0 {
|
if len(depletedEmails) > 0 {
|
||||||
@@ -179,7 +179,6 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
disabledNodeIDs := make(map[int]struct{})
|
|
||||||
for inboundID, group := range remoteByInbound {
|
for inboundID, group := range remoteByInbound {
|
||||||
emails := make(map[string]struct{}, len(group))
|
emails := make(map[string]struct{}, len(group))
|
||||||
for _, t := range group {
|
for _, t := range group {
|
||||||
@@ -188,21 +187,10 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int,
|
|||||||
if pushErr := s.disableRemoteClients(tx, inboundID, emails); pushErr != nil {
|
if pushErr := s.disableRemoteClients(tx, inboundID, emails); pushErr != nil {
|
||||||
logger.Warning("disableInvalidClients: push to remote failed for inbound", inboundID, ":", pushErr)
|
logger.Warning("disableInvalidClients: push to remote failed for inbound", inboundID, ":", pushErr)
|
||||||
needRestart = true
|
needRestart = true
|
||||||
} else {
|
|
||||||
for _, t := range group {
|
|
||||||
if t.NodeID != nil {
|
|
||||||
disabledNodeIDs[*t.NodeID] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeIDs := make([]int, 0, len(disabledNodeIDs))
|
return needRestart, count, nil
|
||||||
for nodeID := range disabledNodeIDs {
|
|
||||||
nodeIDs = append(nodeIDs, nodeID)
|
|
||||||
}
|
|
||||||
|
|
||||||
return needRestart, count, nodeIDs, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// markClientsDisabledInSettings flips client.enable=false in the inbound's
|
// markClientsDisabledInSettings flips client.enable=false in the inbound's
|
||||||
@@ -255,6 +243,10 @@ func (s *InboundService) markClientsDisabledInSettings(tx *gorm.DB, inboundID in
|
|||||||
return &snapshot, &ib, nil
|
return &snapshot, &ib, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// disableRemoteClients flips the clients off in the inbound's stored settings
|
||||||
|
// and pushes the updated inbound to its node, which applies it to its own
|
||||||
|
// running Xray. That push is the whole reconcile — restarting the node's Xray
|
||||||
|
// afterwards would drop every live connection on the node for nothing (#5740).
|
||||||
func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error {
|
func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error {
|
||||||
oldSnapshot, ib, err := s.markClientsDisabledInSettings(tx, inboundID, emails)
|
oldSnapshot, ib, err := s.markClientsDisabledInSettings(tx, inboundID, emails)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -895,28 +895,6 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
|
|||||||
return structuralChange, nil
|
return structuralChange, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InboundService) restartRemoteNodesOnDisable(nodeIDs []int) {
|
|
||||||
restartOnDisable, err := (&SettingService{}).GetRestartXrayOnClientDisable()
|
|
||||||
if err != nil {
|
|
||||||
logger.Warning("disableInvalidClients: get RestartXrayOnClientDisable failed:", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !restartOnDisable {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, nodeID := range nodeIDs {
|
|
||||||
nodeIDCopy := nodeID
|
|
||||||
rt, rtErr := runtime.GetManager().RuntimeFor(&nodeIDCopy)
|
|
||||||
if rtErr != nil {
|
|
||||||
logger.Warning("disableInvalidClients: get runtime for node", nodeID, "failed:", rtErr)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if rtErr = rt.RestartXray(context.Background()); rtErr != nil {
|
|
||||||
logger.Warning("disableInvalidClients: restart xray on node", nodeID, "failed:", rtErr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *InboundService) GetOnlineClients() []string {
|
func (s *InboundService) GetOnlineClients() []string {
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return []string{}
|
return []string{}
|
||||||
|
|||||||
@@ -19,19 +19,15 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) {
|
func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) {
|
||||||
var disabledNodeIDs []int
|
|
||||||
err = submitTrafficWrite(func() error {
|
err = submitTrafficWrite(func() error {
|
||||||
var inner error
|
var inner error
|
||||||
needRestart, clientsDisabled, disabledNodeIDs, inner = s.addTrafficLocked(inboundTraffics, clientTraffics)
|
needRestart, clientsDisabled, inner = s.addTrafficLocked(inboundTraffics, clientTraffics)
|
||||||
return inner
|
return inner
|
||||||
})
|
})
|
||||||
if err == nil && len(disabledNodeIDs) > 0 {
|
|
||||||
s.restartRemoteNodesOnDisable(disabledNodeIDs)
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, []int, error) {
|
func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) {
|
||||||
var err error
|
var err error
|
||||||
db := database.GetDB()
|
db := database.GetDB()
|
||||||
tx := db.Begin()
|
tx := db.Begin()
|
||||||
@@ -45,11 +41,11 @@ func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clien
|
|||||||
}()
|
}()
|
||||||
err = s.addInboundTraffic(tx, inboundTraffics)
|
err = s.addInboundTraffic(tx, inboundTraffics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, false, nil, err
|
return false, false, err
|
||||||
}
|
}
|
||||||
err = s.addClientTraffic(tx, clientTraffics)
|
err = s.addClientTraffic(tx, clientTraffics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, false, nil, err
|
return false, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
needRestart0, count, err := s.autoRenewClients(tx)
|
needRestart0, count, err := s.autoRenewClients(tx)
|
||||||
@@ -60,7 +56,7 @@ func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clien
|
|||||||
}
|
}
|
||||||
|
|
||||||
disabledClientsCount := int64(0)
|
disabledClientsCount := int64(0)
|
||||||
needRestart1, count, disabledNodeIDs, err := s.disableInvalidClients(tx)
|
needRestart1, count, err := s.disableInvalidClients(tx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warning("Error in disabling invalid clients:", err)
|
logger.Warning("Error in disabling invalid clients:", err)
|
||||||
} else if count > 0 {
|
} else if count > 0 {
|
||||||
@@ -74,7 +70,7 @@ func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clien
|
|||||||
} else if count > 0 {
|
} else if count > 0 {
|
||||||
logger.Debugf("%v inbounds disabled", count)
|
logger.Debugf("%v inbounds disabled", count)
|
||||||
}
|
}
|
||||||
return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, disabledNodeIDs, nil
|
return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
|
func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
|
||||||
|
|||||||
Reference in New Issue
Block a user