From 94b8196e84a2e7c6d397ca9fa120a0b292cc79b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rouzbeh=E2=80=A0?= <78313022+rqzbeh@users.noreply.github.com> Date: Mon, 8 Jun 2026 20:39:40 +0200 Subject: [PATCH] fix(db): additional cross-DB and node traffic edge cases (migration scan + node reset time) (#5045) * fix(db): additional cross-DB and node traffic edge cases - ExternalProxy migration: change StreamSettings scan from []byte to string (text column on both SQLite and Postgres). Use []byte() for json.Unmarshal. Avoids potential scan/encoding differences in migration of old multi-domain data on PG. - Node mirror inbound creation and updates in SetRemoteTraffic: now copy LastTrafficResetTime from the node's reported snapshot. Previously, resets done on the node (or via API that affects node-owned inbounds) would not update the grace period tracking on the central mirror. This improves traffic reset + node traffic combining accuracy when using the public API to manage node inbounds or when nodes perform resets. These are independent additional issues around node traffic combining, creating mirrored node inbounds from snapshots, and migration code that can affect Postgres (or mixed) setups after API changes or node operations. They do not depend on the previous enable-merge or tag/sub fixes. Base: upstream/main (separate PR). * fix(db): even more node/SQLite edge cases (chunked IN for node stats, tag cleanup) - In NodeService.GetAll (used for node list/stats): the load of client_traffics for node inbound IDs used a direct "IN ?" with all IDs. On SQLite this can hit the bind var limit ("too many SQL variables") when there are many nodes/inbounds. Chunked using the existing chunkInts + sqliteMaxVars (same pattern as other large IN queries in the package). This is a specific scale issue for "node" setups on SQLite (PG is fine with large IN). - Tag cleanup raw in MigrationRequirements (always runs at startup): was using SQLite-only INSTR. Fixed to use position() on PG (same as the previous tag fix on the main branch). Prevents startup crash on PG after node/inbound API changes that leave old tags. These are additional specific cases around node traffic/stats combining, node inbound counts, and startup migrations that can affect Postgres users or large SQLite node deployments. They are independent of the enable/traffic core fixes and the prior additional ones. Added to the clean additional-issues branch for the separate PR. * fix(db): even more for node traffic merge on 5045 (dialect enable expr + chunk gone deletes) - Full dialect-safe client enable merge in setRemoteTrafficLocked: - Added ClientTrafficEnableMergeExpr() helper (PG CASE with ::boolean casts to avoid type errors; SQLite numeric for affinity). - Updated GreatestExpr with ::bigint casts on PG. - Switched the merge UPDATE from "enable AND ?" to the helper. This completes the node traffic sync safety for the "only node can disable" logic across DBs (core of the original symptom after API inbound updates on nodes). - Chunked the NodeClientTraffic delete for "goneEmails" (when a node's snapshot no longer includes clients previously attached to a mirrored inbound). The "email IN ?" could exceed SQLite bind limit for nodes with many clients (after API deletes, bulk ops, or structural changes). Uses chunkStrings + sqliteMaxVars (consistent with the node stats chunk we added earlier). These are direct extensions of node traffic combining, mirrored inbound lifecycle, and API-driven changes that affect client_traffics / NodeClientTraffic for nodes. Stayed on the clean 5045 branch as requested. Pushed to update https://github.com/MHSanaei/3x-ui/pull/5045 --------- Co-authored-by: Rqzbeh --- database/dialect.go | 20 ----------- web/service/inbound.go | 76 ++++++++++++++++++++++-------------------- web/service/node.go | 37 ++++++++++++-------- 3 files changed, 62 insertions(+), 71 deletions(-) diff --git a/database/dialect.go b/database/dialect.go index 48aa2de2d..9d53ab64c 100644 --- a/database/dialect.go +++ b/database/dialect.go @@ -2,9 +2,6 @@ package database import "fmt" -// JSONClientsFromInbound returns the FROM clause that yields one row per element -// of inbounds.settings -> clients, with a column named `client.value` whose text -// fields can be read with JSONFieldText("client.value", ""). func JSONClientsFromInbound() string { if IsPostgres() { return "FROM inbounds, jsonb_array_elements(inbounds.settings::jsonb -> 'clients') AS client(value)" @@ -27,26 +24,9 @@ func GreatestExpr(a, b string) string { return fmt.Sprintf("MAX(%s, %s)", a, b) } -// ClientTrafficEnableMergeExpr returns the SQL expression used in the -// node traffic merge to update client_traffics.enable. -// -// The intent is: only allow the remote node to *disable* a client -// (never re-enable one that the central panel has disabled). -// -// We use a dialect-specific expression because: -// - On PostgreSQL we want strict boolean typing and casts to avoid -// "CASE types boolean and integer cannot be matched" errors -// (and similar internal expansions of AND/GREATEST). -// - On SQLite, enable is stored with INTEGER affinity (0/1), there is -// no :: cast syntax, and we must produce a numeric-compatible result. -// -// The expression must be valid SQL for tx.Exec with a boolean parameter -// as the first ?. func ClientTrafficEnableMergeExpr() string { if IsPostgres() { return "CASE WHEN ?::boolean THEN enable::boolean ELSE false END" } - // SQLite: no :: casts. Use numeric CASE. 1/0 work as true/false - // thanks to SQLite's affinity and how GORM/drivers bind bools. return "CASE WHEN ? THEN enable ELSE 0 END" } diff --git a/web/service/inbound.go b/web/service/inbound.go index c5c5c8ac1..f05f6ea4b 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -1665,23 +1665,24 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi continue } newIb := model.Inbound{ - UserId: defaultUserId, - NodeID: &nodeID, - OriginNodeGuid: originGuidFor(snapIb), - Tag: chosenTag, - Listen: snapIb.Listen, - Port: snapIb.Port, - Protocol: snapIb.Protocol, - Settings: snapIb.Settings, - StreamSettings: snapIb.StreamSettings, - Sniffing: snapIb.Sniffing, - TrafficReset: snapIb.TrafficReset, - Enable: snapIb.Enable, - Remark: snapIb.Remark, - Total: snapIb.Total, - ExpiryTime: snapIb.ExpiryTime, - Up: snapIb.Up, - Down: snapIb.Down, + UserId: defaultUserId, + NodeID: &nodeID, + OriginNodeGuid: originGuidFor(snapIb), + Tag: chosenTag, + Listen: snapIb.Listen, + Port: snapIb.Port, + Protocol: snapIb.Protocol, + Settings: snapIb.Settings, + StreamSettings: snapIb.StreamSettings, + Sniffing: snapIb.Sniffing, + TrafficReset: snapIb.TrafficReset, + LastTrafficResetTime: snapIb.LastTrafficResetTime, + Enable: snapIb.Enable, + Remark: snapIb.Remark, + Total: snapIb.Total, + ExpiryTime: snapIb.ExpiryTime, + Up: snapIb.Up, + Down: snapIb.Down, } if err := tx.Create(&newIb).Error; err != nil { logger.Warningf("setRemoteTraffic: create central inbound for tag %q failed: %v", snapIb.Tag, err) @@ -1710,6 +1711,7 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi updates["stream_settings"] = snapIb.StreamSettings updates["sniffing"] = snapIb.Sniffing updates["traffic_reset"] = snapIb.TrafficReset + updates["last_traffic_reset_time"] = snapIb.LastTrafficResetTime } if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) { updates["up"] = snapIb.Up @@ -1755,9 +1757,13 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi return false, err } if len(goneEmails) > 0 { - if err := tx.Where("node_id = ? AND email IN ?", nodeID, goneEmails). - Delete(&model.NodeClientTraffic{}).Error; err != nil { - return false, err + // Chunk to avoid SQLite bind var limit when a node has many clients + // removed (e.g. after API bulk delete or structural change on node inbound). + for _, batch := range chunkStrings(goneEmails, sqliteMaxVars) { + if err := tx.Where("node_id = ? AND email IN ?", nodeID, batch). + Delete(&model.NodeClientTraffic{}).Error; err != nil { + return false, err + } } } if err := tx.Where("inbound_id = ?", c.Id). @@ -1836,18 +1842,6 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi structuralChange = true } - // Only allow the node to disable a client (cs.Enable=false), never - // to re-enable one the panel has already disabled. A stale snapshot - // from the node arriving after a central disable would otherwise - // overwrite enable=false back to true, letting the client accumulate - // far more traffic than their limit before being disabled again. - // - // We use a dialect-aware expression (see database.ClientTrafficEnableMergeExpr) - // because the old "enable AND ?" form (and naive CASE with :: casts) - // caused type mismatches on PostgreSQL after public API inbound updates - // (which go through updateClientTraffics + SyncInbound and can touch - // client_traffics rows) and would also break on SQLite due to PG-only - // ::boolean syntax. enableExpr := database.ClientTrafficEnableMergeExpr() if err := tx.Exec( fmt.Sprintf( @@ -3697,7 +3691,7 @@ func (s *InboundService) MigrationRequirements() { var externalProxy []struct { Id int Port int - StreamSettings []byte + StreamSettings string // text column on both DBs; safer than []byte for cross-DB scan } externalProxyQuery := `select id, port, stream_settings from inbounds @@ -3719,7 +3713,7 @@ func (s *InboundService) MigrationRequirements() { for _, ep := range externalProxy { var reverses any var stream map[string]any - json.Unmarshal(ep.StreamSettings, &stream) + json.Unmarshal([]byte(ep.StreamSettings), &stream) if tlsSettings, ok := stream["tlsSettings"].(map[string]any); ok { if settings, ok := tlsSettings["settings"].(map[string]any); ok { if domains, ok := settings["domains"].([]any); ok { @@ -3741,9 +3735,17 @@ func (s *InboundService) MigrationRequirements() { tx.Model(model.Inbound{}).Where("id = ?", ep.Id).Update("stream_settings", newStream) } - err = tx.Raw(`UPDATE inbounds - SET tag = REPLACE(tag, '0.0.0.0:', '') - WHERE INSTR(tag, '0.0.0.0:') > 0;`).Error + // Legacy tag cleanup for old auto-generated tags (e.g. "0.0.0.0:443-..."). + // Must be cross-DB: INSTR/REPLACE work on SQLite; Postgres needs position(). + tagCleanup := `UPDATE inbounds + SET tag = REPLACE(tag, '0.0.0.0:', '') + WHERE INSTR(tag, '0.0.0.0:') > 0;` + if database.IsPostgres() { + tagCleanup = `UPDATE inbounds + SET tag = REPLACE(tag, '0.0.0.0:', '') + WHERE position('0.0.0.0:' in tag) > 0;` + } + err = tx.Raw(tagCleanup).Error if err != nil { return } diff --git a/web/service/node.go b/web/service/node.go index 14e4ccfcc..785e5e08e 100644 --- a/web/service/node.go +++ b/web/service/node.go @@ -226,11 +226,20 @@ func (s *NodeService) GetAll() ([]*model.Node, error) { for id := range nodeByInbound { inboundIDs = append(inboundIDs, id) } - if err := db.Table("client_traffics"). - Select("inbound_id, email, enable, total, up, down, expiry_time"). - Where("inbound_id IN ?", inboundIDs). - Scan(&trafficRows).Error; err == nil { - depletedByNode := make(map[int]int) + // Chunk the IN clause to avoid "too many SQL variables" on SQLite + // when there are many node-owned inbounds (common with many nodes). + // sqliteMaxVars is defined in this package (inbound.go). + for _, batch := range chunkInts(inboundIDs, sqliteMaxVars) { + var page []trafficRow + if err := db.Table("client_traffics"). + Select("inbound_id, email, enable, total, up, down, expiry_time"). + Where("inbound_id IN ?", batch). + Scan(&page).Error; err == nil { + trafficRows = append(trafficRows, page...) + } + } + depletedByNode := make(map[int]int) + if len(trafficRows) > 0 { for _, row := range trafficRows { nodeID, ok := nodeByInbound[row.InboundID] if !ok { @@ -242,15 +251,15 @@ func (s *NodeService) GetAll() ([]*model.Node, error) { depletedByNode[nodeID]++ } } - onlineByGuid := s.onlineEmailsByGuid() - for _, n := range nodes { - n.InboundCount = len(inboundsByNode[n.Id]) - n.DepletedCount = depletedByNode[n.Id] - // Online is attributed to the node that physically hosts the client - // (by GUID): a client on a sub-node counts under the sub-node, not - // the intermediate node it syncs through (#4983). - n.OnlineCount = len(onlineByGuid[effectiveNodeGuid(n)]) - } + } + onlineByGuid := s.onlineEmailsByGuid() + for _, n := range nodes { + n.InboundCount = len(inboundsByNode[n.Id]) + n.DepletedCount = depletedByNode[n.Id] + // Online is attributed to the node that physically hosts the client + // (by GUID): a client on a sub-node counts under the sub-node, not + // the intermediate node it syncs through (#4983). + n.OnlineCount = len(onlineByGuid[effectiveNodeGuid(n)]) } return nodes, nil