diff --git a/internal/database/db.go b/internal/database/db.go index 05ff98e5e..1c034bb60 100644 --- a/internal/database/db.go +++ b/internal/database/db.go @@ -113,6 +113,9 @@ func initModels() error { if err := normalizeInboundSubSortIndex(); err != nil { return err } + if err := migrateLegacySocksInboundsToMixed(); err != nil { + return err + } if IsPostgres() { if err := resyncPostgresSequences(db, models); err != nil { log.Printf("Error resyncing postgres sequences: %v", err) @@ -483,6 +486,23 @@ func pruneOrphanedClientInbounds() error { return nil } +// migrateLegacySocksInboundsToMixed renames legacy socks inbounds to mixed. +// The protocol enum dropped socks in favor of mixed (identical settings shape, +// same behavior plus HTTP on the shared port), so rows predating the rename +// fail model validation — most visibly when pushed to a node, where one legacy +// inbound stalled the entire node's config and traffic sync (#5685). +func migrateLegacySocksInboundsToMixed() error { + res := db.Exec("UPDATE inbounds SET protocol = 'mixed' WHERE protocol = 'socks'") + if res.Error != nil { + log.Printf("Error migrating legacy socks inbounds to mixed: %v", res.Error) + return res.Error + } + if res.RowsAffected > 0 { + log.Printf("Migrated %d legacy socks inbound(s) to mixed", res.RowsAffected) + } + return nil +} + // normalizeInboundSubSortIndex lifts sub_sort_index values below the 1-based // minimum (rows written by builds that defaulted the column to 0, or by nodes // predating the field) so they cannot sort ahead of explicitly ranked inbounds. diff --git a/internal/web/job/node_traffic_sync_job.go b/internal/web/job/node_traffic_sync_job.go index e180a75ee..56913f080 100644 --- a/internal/web/job/node_traffic_sync_job.go +++ b/internal/web/job/node_traffic_sync_job.go @@ -368,13 +368,16 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy reconcileErr := j.inboundService.ReconcileNode(reconcileCtx, rt, n) reconcileCancel() if reconcileErr != nil { - logger.Warningf("node traffic sync: reconcile for %s failed: %v", n.Name, reconcileErr) - return nil + // The dirty flag stays set so reconcile retries next tick, but traffic + // accounting must keep flowing: one rejected inbound used to starve the + // whole node's traffic/online sync forever (#5685). + logger.Warningf("node traffic sync: reconcile for %s failed, continuing with traffic pull: %v", n.Name, reconcileErr) + } else { + if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil { + logger.Warningf("node traffic sync: clear dirty for %s failed: %v", n.Name, clearErr) + } + j.structural.set() } - if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil { - logger.Warningf("node traffic sync: clear dirty for %s failed: %v", n.Name, clearErr) - } - j.structural.set() } ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout) diff --git a/internal/web/service/inbound_node.go b/internal/web/service/inbound_node.go index af46d001c..f3881b039 100644 --- a/internal/web/service/inbound_node.go +++ b/internal/web/service/inbound_node.go @@ -82,6 +82,10 @@ func (s *InboundService) AnyNodePending(inboundIds []int) bool { return false } +// ReconcileNode pushes every inbound and sweeps undesired remote tags even when +// individual operations fail, returning the failures joined: one inbound the +// node rejects (e.g. a legacy protocol failing validation, #5685) must not +// stall the rest of the node's config — or, via syncOne, its traffic sync. func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, n *model.Node) error { if rt == nil || n == nil || n.Id <= 0 { return nil @@ -102,6 +106,7 @@ func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, } prefix := nodeTagPrefix(&nodeID) desiredTags := make(map[string]struct{}, len(inbounds)*2) + var errs []error for _, ib := range inbounds { desiredTags[ib.Tag] = struct{}{} // existsOnNode: does the node already report this inbound under any of the @@ -121,7 +126,7 @@ func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, } } if _, err := rt.ReconcileInbound(ctx, ib, existsOnNode); err != nil { - return fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err) + errs = append(errs, fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err)) } } // In "selected" sync mode the panel only manages the selected tags: the @@ -145,10 +150,10 @@ func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, } } if err := rt.DelInbound(ctx, &model.Inbound{Tag: tag}); err != nil { - return fmt.Errorf("reconcile delete %q: %w", tag, err) + errs = append(errs, fmt.Errorf("reconcile delete %q: %w", tag, err)) } } - return nil + return errors.Join(errs...) } const resetGracePeriodMs int64 = 30000 diff --git a/internal/web/service/inbound_node_reconcile_test.go b/internal/web/service/inbound_node_reconcile_test.go index 353392f38..6f99c21e6 100644 --- a/internal/web/service/inbound_node_reconcile_test.go +++ b/internal/web/service/inbound_node_reconcile_test.go @@ -143,6 +143,88 @@ func TestReconcileNode_AllModeDeletesUndesiredRemoteInbounds(t *testing.T) { } } +// One inbound the node rejects (e.g. a legacy protocol failing the node's +// request validation, #5685) must not abort the reconcile: the healthy inbound +// is still pushed, the delete sweep still runs, and the returned error names +// the failed tag so the caller keeps the dirty flag set for retry. +func TestReconcileNode_ContinuesPastFailedInbound(t *testing.T) { + setupConflictDB(t) + + var mu sync.Mutex + updated := map[int]int{} + var deleted []int + tagToID := map[string]int{"legacy": 1, "healthy": 2, "gone": 3} + writeOK := func(w http.ResponseWriter, obj any) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{"success": true, "msg": "", "obj": obj}) + } + mux := http.NewServeMux() + mux.HandleFunc("/panel/api/inbounds/list", func(w http.ResponseWriter, _ *http.Request) { + type row struct { + Id int `json:"id"` + Tag string `json:"tag"` + } + rows := make([]row, 0, len(tagToID)) + for tag, id := range tagToID { + rows = append(rows, row{Id: id, Tag: tag}) + } + writeOK(w, rows) + }) + mux.HandleFunc("/panel/api/inbounds/update/", func(w http.ResponseWriter, r *http.Request) { + id, err := strconv.Atoi(strings.TrimPrefix(r.URL.Path, "/panel/api/inbounds/update/")) + if err != nil { + http.Error(w, "bad id", http.StatusBadRequest) + return + } + if id == tagToID["legacy"] { + http.Error(w, "request body failed validation", http.StatusBadRequest) + return + } + mu.Lock() + updated[id]++ + mu.Unlock() + writeOK(w, nil) + }) + mux.HandleFunc("/panel/api/inbounds/del/", func(w http.ResponseWriter, r *http.Request) { + id, err := strconv.Atoi(strings.TrimPrefix(r.URL.Path, "/panel/api/inbounds/del/")) + if err != nil { + http.Error(w, "bad id", http.StatusBadRequest) + return + } + mu.Lock() + deleted = append(deleted, id) + mu.Unlock() + writeOK(w, nil) + }) + ts := httptest.NewServer(mux) + t.Cleanup(ts.Close) + + node := reconcileTestNode(t, ts, "half-broken-node", "all", nil) + seedInboundConflictNode(t, "legacy", "", 1080, model.Protocol("socks"), ``, `{"auth":"noauth"}`, &node.Id) + seedInboundConflictNode(t, "healthy", "", 443, model.VLESS, `{"network":"tcp"}`, `{"clients":[]}`, &node.Id) + + svc := InboundService{} + err := svc.ReconcileNode(context.Background(), runtime.NewRemote(node, nil), node) + if err == nil { + t.Fatal("ReconcileNode: want an error naming the rejected inbound, got nil") + } + if !strings.Contains(err.Error(), `reconcile inbound "legacy"`) { + t.Fatalf("ReconcileNode error = %q, want it to name inbound \"legacy\"", err) + } + + mu.Lock() + healthyPushes := updated[tagToID["healthy"]] + gotDeleted := append([]int(nil), deleted...) + mu.Unlock() + if healthyPushes != 1 { + t.Fatalf("healthy inbound pushed %d times, want 1", healthyPushes) + } + sort.Ints(gotDeleted) + if len(gotDeleted) != 1 || gotDeleted[0] != tagToID["gone"] { + t.Fatalf("deleted remote ids = %v, want [%d] (sweep must still run past the failure)", gotDeleted, tagToID["gone"]) + } +} + func TestEnsureInboundTagAllowed(t *testing.T) { setupConflictDB(t) db := database.GetDB()