From d105b2741c62b4e6901f84e3ff8e0765aa8ec6e1 Mon Sep 17 00:00:00 2001 From: MHSanaei Date: Fri, 3 Jul 2026 09:47:30 +0200 Subject: [PATCH] fix(node): stop one rejected inbound from starving a node's traffic sync A legacy socks inbound (predating the socks-to-mixed protocol rename) fails the node's request validation when pushed. ReconcileNode aborted on the first failed inbound and syncOne then skipped the traffic snapshot entirely and never cleared ConfigDirty, so the whole node re-failed every tick and the master stopped deducting traffic for every client on that node, exactly as reported in #5685. Three-part fix: ReconcileNode now pushes every inbound and runs the delete sweep even past individual failures, returning the failures joined; syncOne logs a failed reconcile but continues with the traffic pull (dirty stays set, so reconcile retries and the merge stays in its conservative mode); and a migration renames legacy socks inbounds to mixed, which has an identical settings shape, removing the known trigger. Closes #5685 --- internal/database/db.go | 20 +++++ internal/web/job/node_traffic_sync_job.go | 15 ++-- internal/web/service/inbound_node.go | 11 ++- .../service/inbound_node_reconcile_test.go | 82 +++++++++++++++++++ 4 files changed, 119 insertions(+), 9 deletions(-) diff --git a/internal/database/db.go b/internal/database/db.go index 05ff98e5e..1c034bb60 100644 --- a/internal/database/db.go +++ b/internal/database/db.go @@ -113,6 +113,9 @@ func initModels() error { if err := normalizeInboundSubSortIndex(); err != nil { return err } + if err := migrateLegacySocksInboundsToMixed(); err != nil { + return err + } if IsPostgres() { if err := resyncPostgresSequences(db, models); err != nil { log.Printf("Error resyncing postgres sequences: %v", err) @@ -483,6 +486,23 @@ func pruneOrphanedClientInbounds() error { return nil } +// migrateLegacySocksInboundsToMixed renames legacy socks inbounds to mixed. +// The protocol enum dropped socks in favor of mixed (identical settings shape, +// same behavior plus HTTP on the shared port), so rows predating the rename +// fail model validation — most visibly when pushed to a node, where one legacy +// inbound stalled the entire node's config and traffic sync (#5685). +func migrateLegacySocksInboundsToMixed() error { + res := db.Exec("UPDATE inbounds SET protocol = 'mixed' WHERE protocol = 'socks'") + if res.Error != nil { + log.Printf("Error migrating legacy socks inbounds to mixed: %v", res.Error) + return res.Error + } + if res.RowsAffected > 0 { + log.Printf("Migrated %d legacy socks inbound(s) to mixed", res.RowsAffected) + } + return nil +} + // normalizeInboundSubSortIndex lifts sub_sort_index values below the 1-based // minimum (rows written by builds that defaulted the column to 0, or by nodes // predating the field) so they cannot sort ahead of explicitly ranked inbounds. diff --git a/internal/web/job/node_traffic_sync_job.go b/internal/web/job/node_traffic_sync_job.go index e180a75ee..56913f080 100644 --- a/internal/web/job/node_traffic_sync_job.go +++ b/internal/web/job/node_traffic_sync_job.go @@ -368,13 +368,16 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy reconcileErr := j.inboundService.ReconcileNode(reconcileCtx, rt, n) reconcileCancel() if reconcileErr != nil { - logger.Warningf("node traffic sync: reconcile for %s failed: %v", n.Name, reconcileErr) - return nil + // The dirty flag stays set so reconcile retries next tick, but traffic + // accounting must keep flowing: one rejected inbound used to starve the + // whole node's traffic/online sync forever (#5685). + logger.Warningf("node traffic sync: reconcile for %s failed, continuing with traffic pull: %v", n.Name, reconcileErr) + } else { + if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil { + logger.Warningf("node traffic sync: clear dirty for %s failed: %v", n.Name, clearErr) + } + j.structural.set() } - if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil { - logger.Warningf("node traffic sync: clear dirty for %s failed: %v", n.Name, clearErr) - } - j.structural.set() } ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout) diff --git a/internal/web/service/inbound_node.go b/internal/web/service/inbound_node.go index af46d001c..f3881b039 100644 --- a/internal/web/service/inbound_node.go +++ b/internal/web/service/inbound_node.go @@ -82,6 +82,10 @@ func (s *InboundService) AnyNodePending(inboundIds []int) bool { return false } +// ReconcileNode pushes every inbound and sweeps undesired remote tags even when +// individual operations fail, returning the failures joined: one inbound the +// node rejects (e.g. a legacy protocol failing validation, #5685) must not +// stall the rest of the node's config — or, via syncOne, its traffic sync. func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, n *model.Node) error { if rt == nil || n == nil || n.Id <= 0 { return nil @@ -102,6 +106,7 @@ func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, } prefix := nodeTagPrefix(&nodeID) desiredTags := make(map[string]struct{}, len(inbounds)*2) + var errs []error for _, ib := range inbounds { desiredTags[ib.Tag] = struct{}{} // existsOnNode: does the node already report this inbound under any of the @@ -121,7 +126,7 @@ func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, } } if _, err := rt.ReconcileInbound(ctx, ib, existsOnNode); err != nil { - return fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err) + errs = append(errs, fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err)) } } // In "selected" sync mode the panel only manages the selected tags: the @@ -145,10 +150,10 @@ func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, } } if err := rt.DelInbound(ctx, &model.Inbound{Tag: tag}); err != nil { - return fmt.Errorf("reconcile delete %q: %w", tag, err) + errs = append(errs, fmt.Errorf("reconcile delete %q: %w", tag, err)) } } - return nil + return errors.Join(errs...) } const resetGracePeriodMs int64 = 30000 diff --git a/internal/web/service/inbound_node_reconcile_test.go b/internal/web/service/inbound_node_reconcile_test.go index 353392f38..6f99c21e6 100644 --- a/internal/web/service/inbound_node_reconcile_test.go +++ b/internal/web/service/inbound_node_reconcile_test.go @@ -143,6 +143,88 @@ func TestReconcileNode_AllModeDeletesUndesiredRemoteInbounds(t *testing.T) { } } +// One inbound the node rejects (e.g. a legacy protocol failing the node's +// request validation, #5685) must not abort the reconcile: the healthy inbound +// is still pushed, the delete sweep still runs, and the returned error names +// the failed tag so the caller keeps the dirty flag set for retry. +func TestReconcileNode_ContinuesPastFailedInbound(t *testing.T) { + setupConflictDB(t) + + var mu sync.Mutex + updated := map[int]int{} + var deleted []int + tagToID := map[string]int{"legacy": 1, "healthy": 2, "gone": 3} + writeOK := func(w http.ResponseWriter, obj any) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{"success": true, "msg": "", "obj": obj}) + } + mux := http.NewServeMux() + mux.HandleFunc("/panel/api/inbounds/list", func(w http.ResponseWriter, _ *http.Request) { + type row struct { + Id int `json:"id"` + Tag string `json:"tag"` + } + rows := make([]row, 0, len(tagToID)) + for tag, id := range tagToID { + rows = append(rows, row{Id: id, Tag: tag}) + } + writeOK(w, rows) + }) + mux.HandleFunc("/panel/api/inbounds/update/", func(w http.ResponseWriter, r *http.Request) { + id, err := strconv.Atoi(strings.TrimPrefix(r.URL.Path, "/panel/api/inbounds/update/")) + if err != nil { + http.Error(w, "bad id", http.StatusBadRequest) + return + } + if id == tagToID["legacy"] { + http.Error(w, "request body failed validation", http.StatusBadRequest) + return + } + mu.Lock() + updated[id]++ + mu.Unlock() + writeOK(w, nil) + }) + mux.HandleFunc("/panel/api/inbounds/del/", func(w http.ResponseWriter, r *http.Request) { + id, err := strconv.Atoi(strings.TrimPrefix(r.URL.Path, "/panel/api/inbounds/del/")) + if err != nil { + http.Error(w, "bad id", http.StatusBadRequest) + return + } + mu.Lock() + deleted = append(deleted, id) + mu.Unlock() + writeOK(w, nil) + }) + ts := httptest.NewServer(mux) + t.Cleanup(ts.Close) + + node := reconcileTestNode(t, ts, "half-broken-node", "all", nil) + seedInboundConflictNode(t, "legacy", "", 1080, model.Protocol("socks"), ``, `{"auth":"noauth"}`, &node.Id) + seedInboundConflictNode(t, "healthy", "", 443, model.VLESS, `{"network":"tcp"}`, `{"clients":[]}`, &node.Id) + + svc := InboundService{} + err := svc.ReconcileNode(context.Background(), runtime.NewRemote(node, nil), node) + if err == nil { + t.Fatal("ReconcileNode: want an error naming the rejected inbound, got nil") + } + if !strings.Contains(err.Error(), `reconcile inbound "legacy"`) { + t.Fatalf("ReconcileNode error = %q, want it to name inbound \"legacy\"", err) + } + + mu.Lock() + healthyPushes := updated[tagToID["healthy"]] + gotDeleted := append([]int(nil), deleted...) + mu.Unlock() + if healthyPushes != 1 { + t.Fatalf("healthy inbound pushed %d times, want 1", healthyPushes) + } + sort.Ints(gotDeleted) + if len(gotDeleted) != 1 || gotDeleted[0] != tagToID["gone"] { + t.Fatalf("deleted remote ids = %v, want [%d] (sweep must still run past the failure)", gotDeleted, tagToID["gone"]) + } +} + func TestEnsureInboundTagAllowed(t *testing.T) { setupConflictDB(t) db := database.GetDB()