Files
3x-ui/internal/web/service/inbound_migration.go
T
Rouzbeh† 82600936d6 fix(flow): restore XTLS Vision when an inbound becomes flow-eligible (#5520)
* fix(flow): restore XTLS Vision when an inbound becomes flow-eligible

clientWithInboundFlow strips Vision from a VLESS client whenever the target
inbound is not flow-eligible at client-write time — e.g. an XHTTP inbound
before its vlessenc (ML-KEM) encryption is set, or a client attached to such
an inbound. Nothing restored the flow once the inbound later became eligible:
an inbound edit stores its settings verbatim and never re-gates the clients.
So enabling encryption on an existing XHTTP inbound left every client without
flow, and the generated configs, share links and subscriptions silently
dropped flow=xtls-rprx-vision — most visibly on node inbounds and on any
inbound where encryption was turned on after the clients existed.

Restore the flow at the two points where an inbound can become eligible:

- UpdateInbound: after the new stream/settings are final, re-add Vision to
  clients that currently carry no flow but whose intended flow (their
  flow_override on a sibling inbound, via EffectiveFlowByEmail) is Vision —
  only when the inbound is now flow-eligible.
- MigrationRestoreVisionFlow: a one-time, idempotent boot migration that
  applies the same repair to existing installs and refreshes flow_override
  via SyncInbound.

The repair is conservative: it never invents a flow for a client that has
none anywhere, never overwrites an explicit flow, and is a no-op on healthy
installs. Adds EffectiveFlowByEmail and a unit test covering keep/skip/no-op
cases.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* style(flow): serialize restored settings with MarshalIndent

Match the indented JSON used by the adjacent timestamp block in UpdateInbound
and the externalProxy migration, so a restored inbound's settings column keeps
the same multi-line format as everything else (review nit on #5520).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* perf(flow): batch the intended-flow lookup and run it on the active tx

restoreVisionFlowForEligibleInbound resolved each empty-flow client's intended
flow with EffectiveFlowByEmail, which issued two queries per client
(GetRecordByEmail + EffectiveFlow). A client that genuinely uses no Vision keeps
an empty flow forever, so it was re-queried on every UpdateInbound and every
boot — O(clients) queries per save on a Reality/TCP or XHTTP+vlessenc inbound
carrying many non-Vision clients, executed inside the serialized writer
transaction.

Replace it with EffectiveFlowsByEmails: collect every empty-flow email first and
resolve them in a single batched join over client_inbounds + clients (lowest
inbound_id wins, same rule as before), chunked for the SQLite bind-var limit.

Also thread the active tx through restoreVisionFlowForEligibleInbound so the
read runs on the writer's own connection while it holds the lock instead of a
separate pooled connection (UpdateInbound passes its tx; the boot migration
passes nil → GetDB() as before).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-24 13:02:42 +02:00

298 lines
9.8 KiB
Go

package service
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
"github.com/mhsanaei/3x-ui/v3/internal/logger"
"github.com/mhsanaei/3x-ui/v3/internal/xray"
"gorm.io/gorm"
)
func (s *InboundService) MigrationRemoveOrphanedTraffics() {
db := database.GetDB()
query := fmt.Sprintf(
"DELETE FROM client_traffics WHERE email NOT IN (SELECT %s %s)",
database.JSONFieldText("client.value", "email"),
database.JSONClientsFromInbound(),
)
db.Exec(query)
}
func (s *InboundService) MigrationRequirements() {
db := database.GetDB()
tx := db.Begin()
var err error
defer func() {
if err == nil {
tx.Commit()
if !database.IsPostgres() {
if dbErr := db.Exec(`VACUUM "main"`).Error; dbErr != nil {
logger.Warningf("VACUUM failed: %v", dbErr)
}
}
} else {
tx.Rollback()
}
}()
if tx.Migrator().HasColumn(&model.Inbound{}, "all_time") {
if err = tx.Migrator().DropColumn(&model.Inbound{}, "all_time"); err != nil {
return
}
}
if tx.Migrator().HasColumn(&xray.ClientTraffic{}, "all_time") {
if err = tx.Migrator().DropColumn(&xray.ClientTraffic{}, "all_time"); err != nil {
return
}
}
if err = normalizeInboundShareAddressColumns(tx); err != nil {
return
}
// Normalize "enable" columns to boolean on Postgres. Legacy SQLite data
// (0/1 integers), partial migrations, or mixed write paths (public API
// inbound updates that flow through UpdateClientStat + client syncs, plus
// node traffic merge deltas) can leave the column as integer or with mixed
// interpretation. This (combined with the dialect-aware
// ClientTrafficEnableMergeExpr) prevents type problems in the node traffic
// sync merge (SetRemoteTraffic) and makes the sync robust even when
// inbounds are updated via the public API (incl. ones carrying
// externalProxy in streamSettings). The same expression is also safe on
// SQLite (no PG :: casts).
if database.IsPostgres() {
// Use DO block so it is idempotent and doesn't fail if already boolean.
normalizeBool := func(table, col string) {
tx.Exec(fmt.Sprintf(`
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = '%s' AND column_name = '%s'
AND data_type <> 'boolean'
) THEN
ALTER TABLE %s ALTER COLUMN %s
TYPE boolean USING (CASE WHEN %s::text IN ('1','true','t','yes') THEN true ELSE false END);
END IF;
END $$;`, table, col, table, col, col))
}
normalizeBool("inbounds", "enable")
normalizeBool("client_traffics", "enable")
normalizeBool("nodes", "enable")
normalizeBool("clients", "enable")
normalizeBool("api_tokens", "enabled")
normalizeBool("outbound_subscriptions", "enabled")
}
// Fix inbounds based problems
var inbounds []*model.Inbound
err = tx.Model(model.Inbound{}).Where("protocol IN (?)", []string{"vmess", "vless", "trojan", "shadowsocks", "hysteria"}).Find(&inbounds).Error
if err != nil && err != gorm.ErrRecordNotFound {
return
}
for inbound_index := range inbounds {
settings := map[string]any{}
json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
if raw, exists := settings["clients"]; exists && raw == nil {
settings["clients"] = []any{}
}
clients, ok := settings["clients"].([]any)
if ok {
// Fix Client configuration problems
newClients := make([]any, 0, len(clients))
hasVisionFlow := false
for client_index := range clients {
c := clients[client_index].(map[string]any)
// Add email='' if it is not exists
if _, ok := c["email"]; !ok {
c["email"] = ""
}
// Convert string tgId to int64
if _, ok := c["tgId"]; ok {
var tgId any = c["tgId"]
if tgIdStr, ok2 := tgId.(string); ok2 {
tgIdInt64, err := strconv.ParseInt(strings.ReplaceAll(tgIdStr, " ", ""), 10, 64)
if err == nil {
c["tgId"] = tgIdInt64
}
}
}
// Remove "flow": "xtls-rprx-direct"
if _, ok := c["flow"]; ok {
if c["flow"] == "xtls-rprx-direct" {
c["flow"] = ""
}
}
if flow, _ := c["flow"].(string); flow == "xtls-rprx-vision" {
hasVisionFlow = true
}
// Backfill created_at and updated_at
if _, ok := c["created_at"]; !ok {
c["created_at"] = time.Now().Unix() * 1000
}
c["updated_at"] = time.Now().Unix() * 1000
newClients = append(newClients, any(c))
}
settings["clients"] = newClients
// Drop orphaned testseed: VLESS-only field, only meaningful when at least
// one client uses the exact xtls-rprx-vision flow. Older versions saved it
// for any non-empty flow (including the UDP variant) or kept it after the
// flow was cleared from the client modal — clean those up here.
if inbounds[inbound_index].Protocol == model.VLESS && !hasVisionFlow {
delete(settings, "testseed")
}
modifiedSettings, err := json.MarshalIndent(settings, "", " ")
if err != nil {
return
}
inbounds[inbound_index].Settings = string(modifiedSettings)
}
// Add client traffic row for all clients which has email
modelClients, err := s.GetClients(inbounds[inbound_index])
if err != nil {
return
}
for _, modelClient := range modelClients {
if len(modelClient.Email) > 0 {
var count int64
tx.Model(xray.ClientTraffic{}).Where("email = ?", modelClient.Email).Count(&count)
if count == 0 {
s.AddClientStat(tx, inbounds[inbound_index].Id, &modelClient)
}
}
}
// Heal clients table for installs where the one-shot seeder
// skipped clients due to a tgId-string unmarshal error.
if syncErr := s.clientService.SyncInbound(tx, inbounds[inbound_index].Id, modelClients); syncErr != nil {
logger.Warning("MigrationRequirements sync clients failed:", syncErr)
}
}
tx.Save(inbounds)
// Remove orphaned traffics
tx.Where("inbound_id = 0").Delete(xray.ClientTraffic{})
// Migrate old MultiDomain to External Proxy
var externalProxy []struct {
Id int
Port int
StreamSettings string // text column on both DBs; safer than []byte for cross-DB scan
}
externalProxyQuery := `select id, port, stream_settings
from inbounds
WHERE protocol in ('vmess','vless','trojan')
AND json_extract(stream_settings, '$.security') = 'tls'
AND json_extract(stream_settings, '$.tlsSettings.settings.domains') IS NOT NULL`
if database.IsPostgres() {
externalProxyQuery = `select id, port, stream_settings
from inbounds
WHERE protocol in ('vmess','vless','trojan')
AND NULLIF(stream_settings, '')::jsonb #>> '{security}' = 'tls'
AND NULLIF(stream_settings, '')::jsonb #> '{tlsSettings,settings,domains}' IS NOT NULL`
}
err = tx.Raw(externalProxyQuery).Scan(&externalProxy).Error
if err != nil || len(externalProxy) == 0 {
return
}
for _, ep := range externalProxy {
var reverses any
var stream map[string]any
json.Unmarshal([]byte(ep.StreamSettings), &stream)
if tlsSettings, ok := stream["tlsSettings"].(map[string]any); ok {
if settings, ok := tlsSettings["settings"].(map[string]any); ok {
if domains, ok := settings["domains"].([]any); ok {
for _, domain := range domains {
if domainMap, ok := domain.(map[string]any); ok {
domainMap["forceTls"] = "same"
domainMap["port"] = ep.Port
domainMap["dest"] = domainMap["domain"].(string)
delete(domainMap, "domain")
}
}
}
reverses = settings["domains"]
delete(settings, "domains")
}
}
stream["externalProxy"] = reverses
newStream, _ := json.MarshalIndent(stream, " ", " ")
tx.Model(model.Inbound{}).Where("id = ?", ep.Id).Update("stream_settings", newStream)
}
// Legacy tag cleanup for old auto-generated tags (e.g. "0.0.0.0:443-...").
// Must be cross-DB: INSTR/REPLACE work on SQLite; Postgres needs position().
tagCleanup := `UPDATE inbounds
SET tag = REPLACE(tag, '0.0.0.0:', '')
WHERE INSTR(tag, '0.0.0.0:') > 0;`
if database.IsPostgres() {
tagCleanup = `UPDATE inbounds
SET tag = REPLACE(tag, '0.0.0.0:', '')
WHERE position('0.0.0.0:' in tag) > 0;`
}
err = tx.Exec(tagCleanup).Error
if err != nil {
return
}
}
func (s *InboundService) MigrateDB() {
s.MigrationRequirements()
s.MigrationRemoveOrphanedTraffics()
s.MigrationRestoreVisionFlow()
}
// MigrationRestoreVisionFlow repairs VLESS inbounds whose clients lost their
// XTLS Vision flow because the inbound was not flow-eligible when the client was
// written (e.g. an XHTTP inbound whose vlessenc encryption was enabled only
// later). For each now-eligible inbound it restores flow=xtls-rprx-vision on
// clients whose intended flow (their flow_override on a sibling inbound) is
// Vision. Idempotent: once a client carries the flow it is skipped, so this is a
// no-op on healthy installs and on subsequent boots.
func (s *InboundService) MigrationRestoreVisionFlow() {
db := database.GetDB()
var inbounds []*model.Inbound
if err := db.Model(&model.Inbound{}).
Where("protocol = ?", model.VLESS).
Find(&inbounds).Error; err != nil {
logger.Warning("MigrationRestoreVisionFlow: load inbounds failed:", err)
return
}
for _, ib := range inbounds {
restored, changed := s.restoreVisionFlowForEligibleInbound(nil, ib.Settings, ib.StreamSettings, ib.Protocol)
if !changed {
continue
}
clients, err := s.GetClients(&model.Inbound{Settings: restored})
if err != nil {
logger.Warning("MigrationRestoreVisionFlow: parse clients for inbound", ib.Id, "failed:", err)
continue
}
err = db.Transaction(func(tx *gorm.DB) error {
if e := tx.Model(&model.Inbound{}).Where("id = ?", ib.Id).Update("settings", restored).Error; e != nil {
return e
}
return s.clientService.SyncInbound(tx, ib.Id, clients)
})
if err != nil {
logger.Warning("MigrationRestoreVisionFlow: update inbound", ib.Id, "failed:", err)
continue
}
logger.Info("MigrationRestoreVisionFlow: restored XTLS Vision flow on inbound", ib.Id)
}
}