diff --git a/internal/web/job/node_traffic_sync_job.go b/internal/web/job/node_traffic_sync_job.go index eddc0b20b..e919a2e54 100644 --- a/internal/web/job/node_traffic_sync_job.go +++ b/internal/web/job/node_traffic_sync_job.go @@ -19,6 +19,7 @@ const ( nodeTrafficSyncRequestTimeout = 4 * time.Second nodeReconcileTimeout = 30 * time.Second nodeClientIpSyncInterval = 10 * time.Second + nodeClientIpSyncTimeout = 6 * time.Second nodeGlobalPushInterval = 30 * time.Second ) @@ -204,7 +205,7 @@ func (j *NodeTrafficSyncJob) maybePushGlobals(mgr *runtime.Manager, nodes []*mod } traffics, err := j.inboundService.GetNodeClientTraffics(n.Id) if err != nil { - logger.Warning("node traffic sync: load globals for", n.Name, "failed:", err) + logger.Warningf("node traffic sync: load globals for %s failed: %v", n.Name, err) continue } if len(traffics) == 0 { @@ -222,9 +223,9 @@ func (j *NodeTrafficSyncJob) maybePushGlobals(mgr *runtime.Manager, nodes []*mod // An old-build node without the endpoint answers 404 — not worth a // warning every cycle. if strings.Contains(err.Error(), "HTTP 404") { - logger.Debug("node traffic sync: node", n.Name, "has no global-traffic endpoint (old build)") + logger.Debugf("node traffic sync: node %s has no global-traffic endpoint (old build)", n.Name) } else { - logger.Warning("node traffic sync: push globals to", n.Name, "failed:", err) + logger.Warningf("node traffic sync: push globals to %s failed: %v", n.Name, err) } } }) @@ -235,7 +236,7 @@ func (j *NodeTrafficSyncJob) maybePushGlobals(mgr *runtime.Manager, nodes []*mod func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSync bool) { rt, err := mgr.RemoteFor(n) if err != nil { - logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err) + logger.Warningf("node traffic sync: remote lookup failed for %s: %v", n.Name, err) return } @@ -244,11 +245,11 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy reconcileErr := j.inboundService.ReconcileNode(reconcileCtx, rt, n) reconcileCancel() if reconcileErr != nil { - logger.Warning("node traffic sync: reconcile for", n.Name, "failed:", reconcileErr) + logger.Warningf("node traffic sync: reconcile for %s failed: %v", n.Name, reconcileErr) return } if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil { - logger.Warning("node traffic sync: clear dirty for", n.Name, "failed:", clearErr) + logger.Warningf("node traffic sync: clear dirty for %s failed: %v", n.Name, clearErr) } j.structural.set() } @@ -258,7 +259,7 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy snap, err := rt.FetchTrafficSnapshot(ctx) if err != nil { - logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err) + logger.Warningf("node traffic sync: fetch from %s failed: %v", n.Name, err) j.inboundService.ClearNodeOnlineClients(n.Id) return } @@ -266,7 +267,7 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy _, _, dirty, _, _ := j.nodeService.NodeSyncState(n.Id) changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap, dirty) if err != nil { - logger.Warning("node traffic sync: merge for", n.Name, "failed:", err) + logger.Warningf("node traffic sync: merge for %s failed: %v", n.Name, err) return } if changed { @@ -277,34 +278,37 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy return } - nodeIps, err := rt.FetchAllClientIps(ctx) + ipCtx, ipCancel := context.WithTimeout(context.Background(), nodeClientIpSyncTimeout) + defer ipCancel() + + nodeIps, err := rt.FetchAllClientIps(ipCtx) if err == nil && len(nodeIps) > 0 { if err := j.inboundService.MergeInboundClientIps(nodeIps); err != nil { - logger.Warning("node traffic sync: merge client ips from", n.Name, "failed:", err) + logger.Warningf("node traffic sync: merge client ips from %s failed: %v", n.Name, err) } } else if err != nil { - logger.Warning("node traffic sync: fetch client ips from", n.Name, "failed:", err) + logger.Warningf("node traffic sync: fetch client ips from %s failed: %v", n.Name, err) } masterIps, err := j.inboundService.GetAllInboundClientIps() if err != nil { - logger.Warning("node traffic sync: load client ips for push to", n.Name, "failed:", err) + logger.Warningf("node traffic sync: load client ips for push to %s failed: %v", n.Name, err) return } if len(masterIps) > 0 { - if err := rt.PushAllClientIps(ctx, masterIps); err != nil { - logger.Warning("node traffic sync: push client ips to", n.Name, "failed:", err) + if err := rt.PushAllClientIps(ipCtx, masterIps); err != nil { + logger.Warningf("node traffic sync: push client ips to %s failed: %v", n.Name, err) } } // Per-node IP attribution: pull the node's guid-keyed subtree (its own // observations plus any descendants) so the master can tell which node each // IP is on. Old nodes without the endpoint just return an error — skip them. - if guidTrees, err := rt.FetchClientIpsByGuid(ctx); err != nil { - logger.Debug("node traffic sync: fetch client ip attribution from", n.Name, "failed:", err) + if guidTrees, err := rt.FetchClientIpsByGuid(ipCtx); err != nil { + logger.Debugf("node traffic sync: fetch client ip attribution from %s failed: %v", n.Name, err) } else if len(guidTrees) > 0 { if err := j.inboundService.MergeClientIpsByGuid(guidTrees); err != nil { - logger.Warning("node traffic sync: merge client ip attribution from", n.Name, "failed:", err) + logger.Warningf("node traffic sync: merge client ip attribution from %s failed: %v", n.Name, err) } } }