mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-06-28 00:24:19 +00:00
fix(node-sync): give client-IP sync its own deadline; fix log spacing
The IP-sync phase shared a single 4s context with the traffic-snapshot fetch that runs before it. On high-latency nodes the snapshot's round-trips drained that budget, so FetchAllClientIps/PushAllClientIps/FetchClientIpsByGuid failed with 'context deadline exceeded' every cycle, silently breaking cross-node client-IP sync. Give the phase its own fresh context (nodeClientIpSyncTimeout=6s), mirroring maybePushGlobals. Also convert node-name log lines to Warningf/Debugf: fmt.Sprint inserts no space between adjacent string args, so messages rendered as 'push client ips toUS1failed:'.
This commit is contained in:
@@ -19,6 +19,7 @@ const (
|
||||
nodeTrafficSyncRequestTimeout = 4 * time.Second
|
||||
nodeReconcileTimeout = 30 * time.Second
|
||||
nodeClientIpSyncInterval = 10 * time.Second
|
||||
nodeClientIpSyncTimeout = 6 * time.Second
|
||||
nodeGlobalPushInterval = 30 * time.Second
|
||||
)
|
||||
|
||||
@@ -204,7 +205,7 @@ func (j *NodeTrafficSyncJob) maybePushGlobals(mgr *runtime.Manager, nodes []*mod
|
||||
}
|
||||
traffics, err := j.inboundService.GetNodeClientTraffics(n.Id)
|
||||
if err != nil {
|
||||
logger.Warning("node traffic sync: load globals for", n.Name, "failed:", err)
|
||||
logger.Warningf("node traffic sync: load globals for %s failed: %v", n.Name, err)
|
||||
continue
|
||||
}
|
||||
if len(traffics) == 0 {
|
||||
@@ -222,9 +223,9 @@ func (j *NodeTrafficSyncJob) maybePushGlobals(mgr *runtime.Manager, nodes []*mod
|
||||
// An old-build node without the endpoint answers 404 — not worth a
|
||||
// warning every cycle.
|
||||
if strings.Contains(err.Error(), "HTTP 404") {
|
||||
logger.Debug("node traffic sync: node", n.Name, "has no global-traffic endpoint (old build)")
|
||||
logger.Debugf("node traffic sync: node %s has no global-traffic endpoint (old build)", n.Name)
|
||||
} else {
|
||||
logger.Warning("node traffic sync: push globals to", n.Name, "failed:", err)
|
||||
logger.Warningf("node traffic sync: push globals to %s failed: %v", n.Name, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -235,7 +236,7 @@ func (j *NodeTrafficSyncJob) maybePushGlobals(mgr *runtime.Manager, nodes []*mod
|
||||
func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSync bool) {
|
||||
rt, err := mgr.RemoteFor(n)
|
||||
if err != nil {
|
||||
logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
|
||||
logger.Warningf("node traffic sync: remote lookup failed for %s: %v", n.Name, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -244,11 +245,11 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy
|
||||
reconcileErr := j.inboundService.ReconcileNode(reconcileCtx, rt, n)
|
||||
reconcileCancel()
|
||||
if reconcileErr != nil {
|
||||
logger.Warning("node traffic sync: reconcile for", n.Name, "failed:", reconcileErr)
|
||||
logger.Warningf("node traffic sync: reconcile for %s failed: %v", n.Name, reconcileErr)
|
||||
return
|
||||
}
|
||||
if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil {
|
||||
logger.Warning("node traffic sync: clear dirty for", n.Name, "failed:", clearErr)
|
||||
logger.Warningf("node traffic sync: clear dirty for %s failed: %v", n.Name, clearErr)
|
||||
}
|
||||
j.structural.set()
|
||||
}
|
||||
@@ -258,7 +259,7 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy
|
||||
|
||||
snap, err := rt.FetchTrafficSnapshot(ctx)
|
||||
if err != nil {
|
||||
logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
|
||||
logger.Warningf("node traffic sync: fetch from %s failed: %v", n.Name, err)
|
||||
j.inboundService.ClearNodeOnlineClients(n.Id)
|
||||
return
|
||||
}
|
||||
@@ -266,7 +267,7 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy
|
||||
_, _, dirty, _, _ := j.nodeService.NodeSyncState(n.Id)
|
||||
changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap, dirty)
|
||||
if err != nil {
|
||||
logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
|
||||
logger.Warningf("node traffic sync: merge for %s failed: %v", n.Name, err)
|
||||
return
|
||||
}
|
||||
if changed {
|
||||
@@ -277,34 +278,37 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy
|
||||
return
|
||||
}
|
||||
|
||||
nodeIps, err := rt.FetchAllClientIps(ctx)
|
||||
ipCtx, ipCancel := context.WithTimeout(context.Background(), nodeClientIpSyncTimeout)
|
||||
defer ipCancel()
|
||||
|
||||
nodeIps, err := rt.FetchAllClientIps(ipCtx)
|
||||
if err == nil && len(nodeIps) > 0 {
|
||||
if err := j.inboundService.MergeInboundClientIps(nodeIps); err != nil {
|
||||
logger.Warning("node traffic sync: merge client ips from", n.Name, "failed:", err)
|
||||
logger.Warningf("node traffic sync: merge client ips from %s failed: %v", n.Name, err)
|
||||
}
|
||||
} else if err != nil {
|
||||
logger.Warning("node traffic sync: fetch client ips from", n.Name, "failed:", err)
|
||||
logger.Warningf("node traffic sync: fetch client ips from %s failed: %v", n.Name, err)
|
||||
}
|
||||
|
||||
masterIps, err := j.inboundService.GetAllInboundClientIps()
|
||||
if err != nil {
|
||||
logger.Warning("node traffic sync: load client ips for push to", n.Name, "failed:", err)
|
||||
logger.Warningf("node traffic sync: load client ips for push to %s failed: %v", n.Name, err)
|
||||
return
|
||||
}
|
||||
if len(masterIps) > 0 {
|
||||
if err := rt.PushAllClientIps(ctx, masterIps); err != nil {
|
||||
logger.Warning("node traffic sync: push client ips to", n.Name, "failed:", err)
|
||||
if err := rt.PushAllClientIps(ipCtx, masterIps); err != nil {
|
||||
logger.Warningf("node traffic sync: push client ips to %s failed: %v", n.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Per-node IP attribution: pull the node's guid-keyed subtree (its own
|
||||
// observations plus any descendants) so the master can tell which node each
|
||||
// IP is on. Old nodes without the endpoint just return an error — skip them.
|
||||
if guidTrees, err := rt.FetchClientIpsByGuid(ctx); err != nil {
|
||||
logger.Debug("node traffic sync: fetch client ip attribution from", n.Name, "failed:", err)
|
||||
if guidTrees, err := rt.FetchClientIpsByGuid(ipCtx); err != nil {
|
||||
logger.Debugf("node traffic sync: fetch client ip attribution from %s failed: %v", n.Name, err)
|
||||
} else if len(guidTrees) > 0 {
|
||||
if err := j.inboundService.MergeClientIpsByGuid(guidTrees); err != nil {
|
||||
logger.Warning("node traffic sync: merge client ip attribution from", n.Name, "failed:", err)
|
||||
logger.Warningf("node traffic sync: merge client ip attribution from %s failed: %v", n.Name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user