diff --git a/database/dialect.go b/database/dialect.go index 48aa2de2d..9d53ab64c 100644 --- a/database/dialect.go +++ b/database/dialect.go @@ -2,9 +2,6 @@ package database import "fmt" -// JSONClientsFromInbound returns the FROM clause that yields one row per element -// of inbounds.settings -> clients, with a column named `client.value` whose text -// fields can be read with JSONFieldText("client.value", ""). func JSONClientsFromInbound() string { if IsPostgres() { return "FROM inbounds, jsonb_array_elements(inbounds.settings::jsonb -> 'clients') AS client(value)" @@ -27,26 +24,9 @@ func GreatestExpr(a, b string) string { return fmt.Sprintf("MAX(%s, %s)", a, b) } -// ClientTrafficEnableMergeExpr returns the SQL expression used in the -// node traffic merge to update client_traffics.enable. -// -// The intent is: only allow the remote node to *disable* a client -// (never re-enable one that the central panel has disabled). -// -// We use a dialect-specific expression because: -// - On PostgreSQL we want strict boolean typing and casts to avoid -// "CASE types boolean and integer cannot be matched" errors -// (and similar internal expansions of AND/GREATEST). -// - On SQLite, enable is stored with INTEGER affinity (0/1), there is -// no :: cast syntax, and we must produce a numeric-compatible result. -// -// The expression must be valid SQL for tx.Exec with a boolean parameter -// as the first ?. func ClientTrafficEnableMergeExpr() string { if IsPostgres() { return "CASE WHEN ?::boolean THEN enable::boolean ELSE false END" } - // SQLite: no :: casts. Use numeric CASE. 1/0 work as true/false - // thanks to SQLite's affinity and how GORM/drivers bind bools. return "CASE WHEN ? THEN enable ELSE 0 END" } diff --git a/web/service/inbound.go b/web/service/inbound.go index c5c5c8ac1..f05f6ea4b 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -1665,23 +1665,24 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi continue } newIb := model.Inbound{ - UserId: defaultUserId, - NodeID: &nodeID, - OriginNodeGuid: originGuidFor(snapIb), - Tag: chosenTag, - Listen: snapIb.Listen, - Port: snapIb.Port, - Protocol: snapIb.Protocol, - Settings: snapIb.Settings, - StreamSettings: snapIb.StreamSettings, - Sniffing: snapIb.Sniffing, - TrafficReset: snapIb.TrafficReset, - Enable: snapIb.Enable, - Remark: snapIb.Remark, - Total: snapIb.Total, - ExpiryTime: snapIb.ExpiryTime, - Up: snapIb.Up, - Down: snapIb.Down, + UserId: defaultUserId, + NodeID: &nodeID, + OriginNodeGuid: originGuidFor(snapIb), + Tag: chosenTag, + Listen: snapIb.Listen, + Port: snapIb.Port, + Protocol: snapIb.Protocol, + Settings: snapIb.Settings, + StreamSettings: snapIb.StreamSettings, + Sniffing: snapIb.Sniffing, + TrafficReset: snapIb.TrafficReset, + LastTrafficResetTime: snapIb.LastTrafficResetTime, + Enable: snapIb.Enable, + Remark: snapIb.Remark, + Total: snapIb.Total, + ExpiryTime: snapIb.ExpiryTime, + Up: snapIb.Up, + Down: snapIb.Down, } if err := tx.Create(&newIb).Error; err != nil { logger.Warningf("setRemoteTraffic: create central inbound for tag %q failed: %v", snapIb.Tag, err) @@ -1710,6 +1711,7 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi updates["stream_settings"] = snapIb.StreamSettings updates["sniffing"] = snapIb.Sniffing updates["traffic_reset"] = snapIb.TrafficReset + updates["last_traffic_reset_time"] = snapIb.LastTrafficResetTime } if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) { updates["up"] = snapIb.Up @@ -1755,9 +1757,13 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi return false, err } if len(goneEmails) > 0 { - if err := tx.Where("node_id = ? AND email IN ?", nodeID, goneEmails). - Delete(&model.NodeClientTraffic{}).Error; err != nil { - return false, err + // Chunk to avoid SQLite bind var limit when a node has many clients + // removed (e.g. after API bulk delete or structural change on node inbound). + for _, batch := range chunkStrings(goneEmails, sqliteMaxVars) { + if err := tx.Where("node_id = ? AND email IN ?", nodeID, batch). + Delete(&model.NodeClientTraffic{}).Error; err != nil { + return false, err + } } } if err := tx.Where("inbound_id = ?", c.Id). @@ -1836,18 +1842,6 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi structuralChange = true } - // Only allow the node to disable a client (cs.Enable=false), never - // to re-enable one the panel has already disabled. A stale snapshot - // from the node arriving after a central disable would otherwise - // overwrite enable=false back to true, letting the client accumulate - // far more traffic than their limit before being disabled again. - // - // We use a dialect-aware expression (see database.ClientTrafficEnableMergeExpr) - // because the old "enable AND ?" form (and naive CASE with :: casts) - // caused type mismatches on PostgreSQL after public API inbound updates - // (which go through updateClientTraffics + SyncInbound and can touch - // client_traffics rows) and would also break on SQLite due to PG-only - // ::boolean syntax. enableExpr := database.ClientTrafficEnableMergeExpr() if err := tx.Exec( fmt.Sprintf( @@ -3697,7 +3691,7 @@ func (s *InboundService) MigrationRequirements() { var externalProxy []struct { Id int Port int - StreamSettings []byte + StreamSettings string // text column on both DBs; safer than []byte for cross-DB scan } externalProxyQuery := `select id, port, stream_settings from inbounds @@ -3719,7 +3713,7 @@ func (s *InboundService) MigrationRequirements() { for _, ep := range externalProxy { var reverses any var stream map[string]any - json.Unmarshal(ep.StreamSettings, &stream) + json.Unmarshal([]byte(ep.StreamSettings), &stream) if tlsSettings, ok := stream["tlsSettings"].(map[string]any); ok { if settings, ok := tlsSettings["settings"].(map[string]any); ok { if domains, ok := settings["domains"].([]any); ok { @@ -3741,9 +3735,17 @@ func (s *InboundService) MigrationRequirements() { tx.Model(model.Inbound{}).Where("id = ?", ep.Id).Update("stream_settings", newStream) } - err = tx.Raw(`UPDATE inbounds - SET tag = REPLACE(tag, '0.0.0.0:', '') - WHERE INSTR(tag, '0.0.0.0:') > 0;`).Error + // Legacy tag cleanup for old auto-generated tags (e.g. "0.0.0.0:443-..."). + // Must be cross-DB: INSTR/REPLACE work on SQLite; Postgres needs position(). + tagCleanup := `UPDATE inbounds + SET tag = REPLACE(tag, '0.0.0.0:', '') + WHERE INSTR(tag, '0.0.0.0:') > 0;` + if database.IsPostgres() { + tagCleanup = `UPDATE inbounds + SET tag = REPLACE(tag, '0.0.0.0:', '') + WHERE position('0.0.0.0:' in tag) > 0;` + } + err = tx.Raw(tagCleanup).Error if err != nil { return } diff --git a/web/service/node.go b/web/service/node.go index 14e4ccfcc..785e5e08e 100644 --- a/web/service/node.go +++ b/web/service/node.go @@ -226,11 +226,20 @@ func (s *NodeService) GetAll() ([]*model.Node, error) { for id := range nodeByInbound { inboundIDs = append(inboundIDs, id) } - if err := db.Table("client_traffics"). - Select("inbound_id, email, enable, total, up, down, expiry_time"). - Where("inbound_id IN ?", inboundIDs). - Scan(&trafficRows).Error; err == nil { - depletedByNode := make(map[int]int) + // Chunk the IN clause to avoid "too many SQL variables" on SQLite + // when there are many node-owned inbounds (common with many nodes). + // sqliteMaxVars is defined in this package (inbound.go). + for _, batch := range chunkInts(inboundIDs, sqliteMaxVars) { + var page []trafficRow + if err := db.Table("client_traffics"). + Select("inbound_id, email, enable, total, up, down, expiry_time"). + Where("inbound_id IN ?", batch). + Scan(&page).Error; err == nil { + trafficRows = append(trafficRows, page...) + } + } + depletedByNode := make(map[int]int) + if len(trafficRows) > 0 { for _, row := range trafficRows { nodeID, ok := nodeByInbound[row.InboundID] if !ok { @@ -242,15 +251,15 @@ func (s *NodeService) GetAll() ([]*model.Node, error) { depletedByNode[nodeID]++ } } - onlineByGuid := s.onlineEmailsByGuid() - for _, n := range nodes { - n.InboundCount = len(inboundsByNode[n.Id]) - n.DepletedCount = depletedByNode[n.Id] - // Online is attributed to the node that physically hosts the client - // (by GUID): a client on a sub-node counts under the sub-node, not - // the intermediate node it syncs through (#4983). - n.OnlineCount = len(onlineByGuid[effectiveNodeGuid(n)]) - } + } + onlineByGuid := s.onlineEmailsByGuid() + for _, n := range nodes { + n.InboundCount = len(inboundsByNode[n.Id]) + n.DepletedCount = depletedByNode[n.Id] + // Online is attributed to the node that physically hosts the client + // (by GUID): a client on a sub-node counts under the sub-node, not + // the intermediate node it syncs through (#4983). + n.OnlineCount = len(onlineByGuid[effectiveNodeGuid(n)]) } return nodes, nil