fix(sub): resolve subscription clients and stats from normalized tables

A subscription fetch inside a large inbound cost seconds because every
layer re-parsed the inbound's full settings JSON: getInboundsBySubId
preloaded the whole client_traffics table of each matched inbound,
matchingClients parsed all clients to filter by subId, and then every
per-protocol generator (raw links, JSON outbounds, Clash proxies) parsed
the blob again per link — once to find the client by email and once for
inbound-level fields like encryption or method. At 500k clients in one
inbound that was 13s per raw fetch and 8.5s per JSON fetch; at 100k,
2.6s/1.7s. After this change both cost ~70ms at 100k.

matchingClients now resolves through the indexed clients/client_inbounds
tables (ListForInboundBySubId, ordered by clients.id like ListForInbound
— the same source the running Xray users are built from), and the
per-request SubService carries two caches: clientsByInbound, primed by
matchingClients/inboundLinks so clientForLink resolves a client without
parsing settings (with the old full-parse as fallback, which also fixes
the export-all-links path that re-parsed the blob once per client), and
settingsByInbound, a once-per-request shallow decode that skips
materializing the clients array entirely. The ClientStats preload is
replaced by loading only the subscriber's traffic rows (indexed
clients.sub_id); statsForClient's per-email DB fallback (#5567) covers
any miss, and the case-insensitive email dedupe keeps the #5134
guarantee for case-differing duplicate rows.
This commit is contained in:
MHSanaei
2026-07-02 16:58:00 +02:00
parent c0d17e132d
commit 7c12700c7d
9 changed files with 257 additions and 105 deletions
+3 -6
View File
@@ -240,8 +240,7 @@ func (s *SubClashService) buildProxy(subReq *SubService, inbound *model.Inbound,
case model.VLESS:
proxy["type"] = "vless"
proxy["uuid"] = applyVlessRoute(client.ID, hostVlessRoute(ep))
var inboundSettings map[string]any
_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
inboundSettings := subReq.linkSettings(inbound)
streamSecurity, _ := stream["security"].(string)
if client.Flow != "" && vlessFlowAllowed(network, streamSecurity, inboundSettings) {
proxy["flow"] = client.Flow
@@ -258,8 +257,7 @@ func (s *SubClashService) buildProxy(subReq *SubService, inbound *model.Inbound,
case model.Shadowsocks:
proxy["type"] = "ss"
proxy["password"] = client.Password
var inboundSettings map[string]any
_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
inboundSettings := subReq.linkSettings(inbound)
method, _ := inboundSettings["method"].(string)
if method == "" {
return nil
@@ -288,8 +286,7 @@ func (s *SubClashService) buildProxy(subReq *SubService, inbound *model.Inbound,
// helpers prune fields (like `allowInsecure` / the salamander obfs
// block) that the hysteria proxy wants preserved.
func (s *SubClashService) buildHysteriaProxy(subReq *SubService, inbound *model.Inbound, client model.Client, ep map[string]any) map[string]any {
var inboundSettings map[string]any
_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
inboundSettings := subReq.linkSettings(inbound)
proxyType := "hysteria2"
authKey := "password"
+8 -10
View File
@@ -212,11 +212,11 @@ func (s *SubJsonService) getConfig(subReq *SubService, inbound *model.Inbound, c
case "vless":
vc := client
vc.ID = applyVlessRoute(client.ID, hostVlessRoute(extPrxy))
newOutbounds = append(newOutbounds, s.genVless(inbound, streamSettings, vc, jsonMux(mux, hostMux)))
newOutbounds = append(newOutbounds, s.genVless(subReq, inbound, streamSettings, vc, jsonMux(mux, hostMux)))
case "trojan", "shadowsocks":
newOutbounds = append(newOutbounds, s.genServer(inbound, streamSettings, client, jsonMux(mux, hostMux)))
newOutbounds = append(newOutbounds, s.genServer(subReq, inbound, streamSettings, client, jsonMux(mux, hostMux)))
case "hysteria":
newOutbounds = append(newOutbounds, s.genHy(inbound, newStream, client, jsonMux(mux, hostMux)))
newOutbounds = append(newOutbounds, s.genHy(subReq, inbound, newStream, client, jsonMux(mux, hostMux)))
}
newOutbounds = append(newOutbounds, s.defaultOutbounds...)
@@ -393,7 +393,7 @@ func (s *SubJsonService) genVnext(inbound *model.Inbound, streamSettings json_ut
return result
}
func (s *SubJsonService) genVless(inbound *model.Inbound, streamSettings json_util.RawMessage, client model.Client, mux string) json_util.RawMessage {
func (s *SubJsonService) genVless(subReq *SubService, inbound *model.Inbound, streamSettings json_util.RawMessage, client model.Client, mux string) json_util.RawMessage {
outbound := Outbound{}
outbound.Protocol = string(inbound.Protocol)
outbound.Tag = "proxy"
@@ -403,8 +403,7 @@ func (s *SubJsonService) genVless(inbound *model.Inbound, streamSettings json_ut
outbound.StreamSettings = streamSettings
// Add encryption for VLESS outbound from inbound settings
var inboundSettings map[string]any
_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
inboundSettings := subReq.linkSettings(inbound)
encryption, _ := inboundSettings["encryption"].(string)
settings := map[string]any{
@@ -422,7 +421,7 @@ func (s *SubJsonService) genVless(inbound *model.Inbound, streamSettings json_ut
return result
}
func (s *SubJsonService) genServer(inbound *model.Inbound, streamSettings json_util.RawMessage, client model.Client, mux string) json_util.RawMessage {
func (s *SubJsonService) genServer(subReq *SubService, inbound *model.Inbound, streamSettings json_util.RawMessage, client model.Client, mux string) json_util.RawMessage {
outbound := Outbound{}
serverData := make([]ServerSetting, 1)
@@ -434,8 +433,7 @@ func (s *SubJsonService) genServer(inbound *model.Inbound, streamSettings json_u
}
if inbound.Protocol == model.Shadowsocks {
var inboundSettings map[string]any
_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
inboundSettings := subReq.linkSettings(inbound)
method, _ := inboundSettings["method"].(string)
serverData[0].Method = method
@@ -475,7 +473,7 @@ func (s *SubJsonService) genServer(inbound *model.Inbound, streamSettings json_u
return result
}
func (s *SubJsonService) genHy(inbound *model.Inbound, newStream map[string]any, client model.Client, mux string) json_util.RawMessage {
func (s *SubJsonService) genHy(subReq *SubService, inbound *model.Inbound, newStream map[string]any, client model.Client, mux string) json_util.RawMessage {
outbound := Outbound{}
outbound.Protocol = string(inbound.Protocol)
+5 -5
View File
@@ -122,7 +122,7 @@ func TestSubJsonServiceVlessFlattened(t *testing.T) {
inbound := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.VLESS, Settings: `{"encryption":"none"}`}
client := model.Client{ID: "uuid-1", Flow: "xtls-rprx-vision"}
settings := outboundSettings(t, NewSubJsonService("", "", "", nil).genVless(inbound, nil, client, ""))
settings := outboundSettings(t, NewSubJsonService("", "", "", nil).genVless(&SubService{}, inbound, nil, client, ""))
if _, ok := settings["vnext"]; ok {
t.Fatal("vless outbound must not use vnext")
}
@@ -151,7 +151,7 @@ func TestSubJsonServiceServerUsesServersArray(t *testing.T) {
trojan := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.Trojan, Settings: `{}`}
client := model.Client{Password: "p4ss"}
settings := outboundSettings(t, NewSubJsonService("", "", "", nil).genServer(trojan, nil, client, ""))
settings := outboundSettings(t, NewSubJsonService("", "", "", nil).genServer(&SubService{}, trojan, nil, client, ""))
server := firstServer(settings)
if server == nil {
t.Fatalf("trojan outbound must use a servers array, got: %#v", settings)
@@ -164,7 +164,7 @@ func TestSubJsonServiceServerUsesServersArray(t *testing.T) {
}
ss := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.Shadowsocks, Settings: `{"method":"aes-256-gcm"}`}
ssSettings := outboundSettings(t, NewSubJsonService("", "", "", nil).genServer(ss, nil, client, ""))
ssSettings := outboundSettings(t, NewSubJsonService("", "", "", nil).genServer(&SubService{}, ss, nil, client, ""))
ssServer := firstServer(ssSettings)
if ssServer == nil {
t.Fatalf("shadowsocks outbound must use a servers array, got: %#v", ssSettings)
@@ -194,7 +194,7 @@ func TestSubJsonServiceXmuxSuppressesGlobalMux(t *testing.T) {
inbound := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.VLESS, Settings: `{"encryption":"none"}`}
client := model.Client{ID: "uuid-1"}
raw := svc.genVless(inbound, streamSettings, client, mux)
raw := svc.genVless(&SubService{}, inbound, streamSettings, client, mux)
var ob map[string]any
if err := json.Unmarshal(raw, &ob); err != nil {
t.Fatalf("unmarshal outbound: %v", err)
@@ -240,7 +240,7 @@ func TestSubJsonServiceGlobalMuxWhenNoXmux(t *testing.T) {
inbound := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.VLESS, Settings: `{"encryption":"none"}`}
client := model.Client{ID: "uuid-1"}
raw := svc.genVless(inbound, streamSettings, client, mux)
raw := svc.genVless(&SubService{}, inbound, streamSettings, client, mux)
var ob map[string]any
if err := json.Unmarshal(raw, &ob); err != nil {
t.Fatalf("unmarshal outbound: %v", err)
+4 -4
View File
@@ -67,11 +67,11 @@ func TestSubJsonService_MuxAttachedWhenConfigured(t *testing.T) {
protocol model.Protocol
}{
{"vmess mux", NewSubJsonService(mux, "", "", nil).genVnext(&model.Inbound{Protocol: model.VMESS, Settings: `{}`}, nil, client, mux), true, model.VMESS},
{"vless mux", NewSubJsonService(mux, "", "", nil).genVless(&model.Inbound{Protocol: model.VLESS, Settings: `{}`}, nil, client, mux), true, model.VLESS},
{"server mux", NewSubJsonService(mux, "", "", nil).genServer(&model.Inbound{Protocol: model.Trojan, Settings: `{}`}, nil, client, mux), true, model.Trojan},
{"vless mux", NewSubJsonService(mux, "", "", nil).genVless(&SubService{}, &model.Inbound{Protocol: model.VLESS, Settings: `{}`}, nil, client, mux), true, model.VLESS},
{"server mux", NewSubJsonService(mux, "", "", nil).genServer(&SubService{}, &model.Inbound{Protocol: model.Trojan, Settings: `{}`}, nil, client, mux), true, model.Trojan},
{"vmess no mux", NewSubJsonService("", "", "", nil).genVnext(&model.Inbound{Protocol: model.VMESS, Settings: `{}`}, nil, client, ""), false, model.VMESS},
{"vless no mux", NewSubJsonService("", "", "", nil).genVless(&model.Inbound{Protocol: model.VLESS, Settings: `{}`}, nil, client, ""), false, model.VLESS},
{"server no mux", NewSubJsonService("", "", "", nil).genServer(&model.Inbound{Protocol: model.Trojan, Settings: `{}`}, nil, client, ""), false, model.Trojan},
{"vless no mux", NewSubJsonService("", "", "", nil).genVless(&SubService{}, &model.Inbound{Protocol: model.VLESS, Settings: `{}`}, nil, client, ""), false, model.VLESS},
{"server no mux", NewSubJsonService("", "", "", nil).genServer(&SubService{}, &model.Inbound{Protocol: model.Trojan, Settings: `{}`}, nil, client, ""), false, model.Trojan},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
+2 -5
View File
@@ -459,11 +459,8 @@ func (s *SubService) statsForClient(inbound *model.Inbound, client model.Client)
// needed when a global remark template references client-only tokens. Falls back
// to an email-only client if not found.
func (s *SubService) lookupClient(inbound *model.Inbound, email string) model.Client {
clients, _ := s.inboundService.GetClients(inbound)
for _, c := range clients {
if c.Email == email {
return c
}
if c, ok := s.clientForLink(inbound, email); ok {
return c
}
return model.Client{Email: email}
}
+171 -65
View File
@@ -54,6 +54,17 @@ type SubService struct {
// doesn't own its row (multi-inbound subscriptions). Filled in
// getInboundsBySubId; reset per request in PrepareForRequest.
statsByEmail map[string]xray.ClientTraffic
// clientsByInbound caches clients resolved for this request keyed by
// inbound id then email, so the per-protocol link generators look a client
// up without re-parsing the inbound's settings JSON per link.
// fullyPrimedInbounds marks inbounds whose complete client list is cached
// (a miss there is authoritative). Reset per request in PrepareForRequest.
clientsByInbound map[int]map[string]model.Client
fullyPrimedInbounds map[int]bool
// settingsByInbound caches each inbound's settings decoded once per request
// with the clients array left out; generators read only inbound-level
// fields (encryption, method, version, …) from it.
settingsByInbound map[int]map[string]any
}
// NewSubService creates a new subscription service with the given configuration.
@@ -86,10 +97,96 @@ func (s *SubService) PrepareForRequest(host string) {
s.address = host
s.usageShown = map[string]bool{}
s.statsByEmail = map[string]xray.ClientTraffic{}
s.clientsByInbound = map[int]map[string]model.Client{}
s.fullyPrimedInbounds = map[int]bool{}
s.settingsByInbound = map[int]map[string]any{}
s.loadNodes()
s.loadRemarkSettings()
}
// primeLinkClients caches clients (first occurrence per email, matching the
// old settings-JSON iteration order) so clientForLink resolves them without a
// parse. complete marks the inbound's whole client list as cached.
func (s *SubService) primeLinkClients(inboundId int, clients []model.Client, complete bool) {
if inboundId <= 0 {
return
}
if s.clientsByInbound == nil {
s.clientsByInbound = map[int]map[string]model.Client{}
}
m := s.clientsByInbound[inboundId]
if m == nil {
m = make(map[string]model.Client, len(clients))
s.clientsByInbound[inboundId] = m
}
for _, c := range clients {
if _, exists := m[c.Email]; !exists {
m[c.Email] = c
}
}
if complete {
if s.fullyPrimedInbounds == nil {
s.fullyPrimedInbounds = map[int]bool{}
}
s.fullyPrimedInbounds[inboundId] = true
}
}
// clientForLink resolves one client of an inbound by email for link
// generation: from the per-request cache when primed, otherwise by parsing
// the settings JSON once and caching every client from it.
func (s *SubService) clientForLink(inbound *model.Inbound, email string) (model.Client, bool) {
if m, ok := s.clientsByInbound[inbound.Id]; ok {
if c, hit := m[email]; hit {
return c, true
}
if s.fullyPrimedInbounds[inbound.Id] {
return model.Client{}, false
}
}
clients, err := s.inboundService.GetClients(inbound)
if err != nil {
return model.Client{}, false
}
s.primeLinkClients(inbound.Id, clients, true)
for i := range clients {
if clients[i].Email == email {
return clients[i], true
}
}
return model.Client{}, false
}
// linkSettings returns the inbound's settings decoded once per request with
// the clients array left out — the link generators read only inbound-level
// fields from it and resolve clients via clientForLink. The shallow
// RawMessage pass skips materializing a huge clients array entirely.
func (s *SubService) linkSettings(inbound *model.Inbound) map[string]any {
if inbound.Id > 0 {
if cached, ok := s.settingsByInbound[inbound.Id]; ok {
return cached
}
}
shallow := map[string]json.RawMessage{}
_ = json.Unmarshal([]byte(inbound.Settings), &shallow)
out := make(map[string]any, len(shallow))
for key, raw := range shallow {
if key == "clients" {
continue
}
var value any
_ = json.Unmarshal(raw, &value)
out[key] = value
}
if inbound.Id > 0 {
if s.settingsByInbound == nil {
s.settingsByInbound = map[int]map[string]any{}
}
s.settingsByInbound[inbound.Id] = out
}
return out
}
// loadRemarkSettings populates the per-request remark formatting state so
// every subscription format — raw, JSON, Clash — renders remarks the same way
// (the date formatter reads datepicker). Loading it only in getSubs left
@@ -144,25 +241,23 @@ func listenIsInternalOnly(listen string) bool {
}
// matchingClients returns the inbound's clients whose SubID equals subId,
// deduplicated by email. settings.clients can accumulate duplicate entries
// for the same client (multi-node sync/import drift, old DBs): SyncInbound
// dedupes the normalized client_inbounds rows on write but never rewrites
// the legacy JSON, and the subscription builders iterate that JSON — so
// without this guard every duplicate became a duplicate profile in the
// output (#5134). Link generation keys purely on (inbound, email), so
// same-email entries are pure duplicates and dropping them is lossless.
// resolved from the normalized clients/client_inbounds tables (both filter
// columns indexed) instead of parsing the settings JSON — at large client
// counts that parse made every subscription fetch cost seconds. The
// case-insensitive email dedupe stays as cheap insurance even though
// clients.email is unique, preserving the #5134 guarantee that duplicate
// settings entries never fan out into duplicate profiles. Resolved clients
// are primed into the per-request cache so the link generators don't parse
// settings either.
func (s *SubService) matchingClients(inbound *model.Inbound, subId string) []model.Client {
clients, err := s.inboundService.GetClients(inbound)
clients, err := s.inboundService.GetClientsBySubId(inbound.Id, subId)
if err != nil {
logger.Error("SubService - GetClients: Unable to get clients from inbound")
logger.Error("SubService - GetClientsBySubId: Unable to get clients from inbound")
return nil
}
var out []model.Client
seen := make(map[string]struct{}, len(clients))
for _, client := range clients {
if client.SubID != subId {
continue
}
key := strings.ToLower(client.Email)
if _, dup := seen[key]; dup {
continue
@@ -170,6 +265,7 @@ func (s *SubService) matchingClients(inbound *model.Inbound, subId string) []mod
seen[key] = struct{}{}
out = append(out, client)
}
s.primeLinkClients(inbound.Id, out, false)
return out
}
@@ -253,6 +349,7 @@ func (s *SubService) inboundLinks(inbound *model.Inbound) []string {
if err != nil {
return nil
}
s.primeLinkClients(inbound.Id, clients, true)
s.projectThroughFallbackMaster(inbound)
hostEps := s.hostEndpoints(inbound, "raw")
var out []string
@@ -362,7 +459,7 @@ func subscriptionExpiryFromClient(nowMs, expiryTime int64) int64 {
func (s *SubService) getInboundsBySubId(subId string) ([]*model.Inbound, error) {
db := database.GetDB()
var inbounds []*model.Inbound
err := db.Model(model.Inbound{}).Preload("ClientStats").Where(`id in (
err := db.Model(model.Inbound{}).Where(`id in (
SELECT DISTINCT inbounds.id
FROM inbounds
JOIN client_inbounds ON client_inbounds.inbound_id = inbounds.id
@@ -374,19 +471,34 @@ func (s *SubService) getInboundsBySubId(subId string) ([]*model.Inbound, error)
if err != nil {
return nil, err
}
s.indexStatsByEmail(inbounds)
s.indexStatsBySubId(subId)
return inbounds, nil
}
// indexStatsByEmail records every loaded inbound's client traffic rows keyed by
// email so statsForClient can resolve a client's usage even on an inbound that
// doesn't own its (globally unique) client_traffics row. See statsByEmail.
func (s *SubService) indexStatsByEmail(inbounds []*model.Inbound) {
// indexStatsBySubId loads the traffic rows for just this subscriber's clients
// into statsByEmail so statsForClient can resolve a client's usage on any of
// its inbounds. It replaces preloading every matched inbound's ClientStats,
// which read the entire client_traffics table on every subscription fetch of
// a large inbound; statsForClient's per-email DB fallback covers any miss.
func (s *SubService) indexStatsBySubId(subId string) {
if s.statsByEmail == nil {
s.statsByEmail = map[string]xray.ClientTraffic{}
}
for _, inbound := range inbounds {
for _, st := range inbound.ClientStats {
db := database.GetDB()
var emails []string
if err := db.Model(&model.ClientRecord{}).Where("sub_id = ?", subId).Pluck("email", &emails).Error; err != nil {
logger.Error("SubService - indexStatsBySubId: load emails:", err)
return
}
const chunk = 400
for lo := 0; lo < len(emails); lo += chunk {
hi := min(lo+chunk, len(emails))
var rows []xray.ClientTraffic
if err := db.Where("email IN ?", emails[lo:hi]).Find(&rows).Error; err != nil {
logger.Error("SubService - indexStatsBySubId: load traffics:", err)
return
}
for _, st := range rows {
s.statsByEmail[st.Email] = st
}
}
@@ -516,21 +628,14 @@ func (s *SubService) genWireguardLink(inbound *model.Inbound, email string) stri
if inbound.Protocol != model.WireGuard {
return ""
}
settings := map[string]any{}
_ = json.Unmarshal([]byte(inbound.Settings), &settings)
settings := s.linkSettings(inbound)
secretKey, _ := settings["secretKey"].(string)
clients, _ := s.inboundService.GetClients(inbound)
var client *model.Client
for i := range clients {
if clients[i].Email == email {
client = &clients[i]
break
}
}
if client == nil || client.PrivateKey == "" {
resolved, ok := s.clientForLink(inbound, email)
if !ok || resolved.PrivateKey == "" {
return ""
}
client := &resolved
link := fmt.Sprintf("wireguard://%s@%s", encodeUserinfo(client.PrivateKey), joinHostPort(s.resolveInboundAddress(inbound), inbound.Port))
params := make(map[string]string)
@@ -607,10 +712,12 @@ func (s *SubService) genVmessLink(inbound *model.Inbound, email string) string {
applyVmessTLSParams(stream, obj)
}
clients, _ := s.inboundService.GetClients(inbound)
clientIndex := findClientIndex(clients, email)
obj["id"] = clients[clientIndex].ID
obj["scy"] = clients[clientIndex].Security
client, ok := s.clientForLink(inbound, email)
if !ok {
return ""
}
obj["id"] = client.ID
obj["scy"] = client.Security
externalProxies, _ := stream["externalProxy"].([]any)
@@ -659,17 +766,18 @@ func (s *SubService) genVlessLink(inbound *model.Inbound, email string) string {
}
address := s.resolveInboundAddress(inbound)
stream := unmarshalStreamSettings(inbound.StreamSettings)
clients, _ := s.inboundService.GetClients(inbound)
clientIndex := findClientIndex(clients, email)
uuid := clients[clientIndex].ID
client, ok := s.clientForLink(inbound, email)
if !ok {
return ""
}
uuid := client.ID
port := inbound.Port
streamNetwork := stream["network"].(string)
params := make(map[string]string)
params["type"] = streamNetwork
// Add encryption parameter for VLESS from inbound settings
var settings map[string]any
_ = json.Unmarshal([]byte(inbound.Settings), &settings)
settings := s.linkSettings(inbound)
if encryption, ok := settings["encryption"].(string); ok {
params["encryption"] = encryption
}
@@ -683,12 +791,12 @@ func (s *SubService) genVlessLink(inbound *model.Inbound, email string) string {
case "tls":
applyShareTLSParams(stream, params)
case "reality":
applyShareRealityParams(stream, params, subKey(clients[clientIndex]))
applyShareRealityParams(stream, params, subKey(client))
default:
params["security"] = "none"
}
if len(clients[clientIndex].Flow) > 0 && vlessFlowAllowed(streamNetwork, security, settings) {
params["flow"] = clients[clientIndex].Flow
if len(client.Flow) > 0 && vlessFlowAllowed(streamNetwork, security, settings) {
params["flow"] = client.Flow
}
externalProxies, _ := stream["externalProxy"].([]any)
@@ -717,9 +825,11 @@ func (s *SubService) genTrojanLink(inbound *model.Inbound, email string) string
}
address := s.resolveInboundAddress(inbound)
stream := unmarshalStreamSettings(inbound.StreamSettings)
clients, _ := s.inboundService.GetClients(inbound)
clientIndex := findClientIndex(clients, email)
password := encodeUserinfo(clients[clientIndex].Password)
client, ok := s.clientForLink(inbound, email)
if !ok {
return ""
}
password := encodeUserinfo(client.Password)
port := inbound.Port
streamNetwork := stream["network"].(string)
params := make(map[string]string)
@@ -734,9 +844,9 @@ func (s *SubService) genTrojanLink(inbound *model.Inbound, email string) string
case "tls":
applyShareTLSParams(stream, params)
case "reality":
applyShareRealityParams(stream, params, subKey(clients[clientIndex]))
if streamNetwork == "tcp" && len(clients[clientIndex].Flow) > 0 {
params["flow"] = clients[clientIndex].Flow
applyShareRealityParams(stream, params, subKey(client))
if streamNetwork == "tcp" && len(client.Flow) > 0 {
params["flow"] = client.Flow
}
default:
params["security"] = "none"
@@ -788,13 +898,14 @@ func (s *SubService) genShadowsocksLink(inbound *model.Inbound, email string) st
}
address := s.resolveInboundAddress(inbound)
stream := unmarshalStreamSettings(inbound.StreamSettings)
clients, _ := s.inboundService.GetClients(inbound)
client, ok := s.clientForLink(inbound, email)
if !ok {
return ""
}
var settings map[string]any
_ = json.Unmarshal([]byte(inbound.Settings), &settings)
settings := s.linkSettings(inbound)
inboundPassword := settings["password"].(string)
method := settings["method"].(string)
clientIndex := findClientIndex(clients, email)
streamNetwork := stream["network"].(string)
params := make(map[string]string)
params["type"] = streamNetwork
@@ -828,9 +939,9 @@ func (s *SubService) genShadowsocksLink(inbound *model.Inbound, email string) st
userInfo = fmt.Sprintf("%s:%s:%s",
url.QueryEscape(method),
url.QueryEscape(inboundPassword),
url.QueryEscape(clients[clientIndex].Password))
url.QueryEscape(client.Password))
} else {
userInfo = base64.RawURLEncoding.EncodeToString(fmt.Appendf(nil, "%s:%s", method, clients[clientIndex].Password))
userInfo = base64.RawURLEncoding.EncodeToString(fmt.Appendf(nil, "%s:%s", method, client.Password))
}
externalProxies, _ := stream["externalProxy"].([]any)
@@ -861,15 +972,11 @@ func (s *SubService) genHysteriaLink(inbound *model.Inbound, email string) strin
}
var stream map[string]any
_ = json.Unmarshal([]byte(inbound.StreamSettings), &stream)
clients, _ := s.inboundService.GetClients(inbound)
clientIndex := -1
for i, client := range clients {
if client.Email == email {
clientIndex = i
break
}
client, ok := s.clientForLink(inbound, email)
if !ok {
return ""
}
auth := encodeUserinfo(clients[clientIndex].Auth)
auth := encodeUserinfo(client.Auth)
params := make(map[string]string)
params["security"] = "tls"
@@ -928,8 +1035,7 @@ func (s *SubService) genHysteriaLink(inbound *model.Inbound, email string) strin
}
}
var settings map[string]any
_ = json.Unmarshal([]byte(inbound.Settings), &settings)
settings := s.linkSettings(inbound)
version, _ := settings["version"].(float64)
protocol := "hysteria2"
if int(version) == 1 {
+26 -10
View File
@@ -69,19 +69,35 @@ func TestGetSubs_DuplicateSettingsClients_Deduped(t *testing.T) {
}
}
// TestMatchingClients_DedupsCaseInsensitiveEmail pins the dedup KEY, not just the count:
// the two entries differ only by email case, so dropping strings.ToLower (or keying on
// another field) yields two clients. The byte-identical dupes above can't catch that.
// TestMatchingClients_DedupsCaseInsensitiveEmail pins the dedup KEY, not just the count.
// clients.email is unique but case-sensitively so: two rows differing only by email case
// can coexist (import drift), and dropping strings.ToLower (or keying on another field)
// would emit both. The first row by id must win, matching the old settings-JSON order.
func TestMatchingClients_DedupsCaseInsensitiveEmail(t *testing.T) {
dbDir := t.TempDir()
t.Setenv("XUI_DB_FOLDER", dbDir)
if err := database.InitDB(filepath.Join(dbDir, "x-ui.db")); err != nil {
t.Fatalf("InitDB: %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
const subId = "s1"
const uuid = "11111111-2222-4333-8444-555555555555"
ib := &model.Inbound{
Protocol: model.VLESS,
Settings: `{"clients":[
{"id":"` + uuid + `","email":"Dup@Example.com","subId":"` + subId + `","enable":true},
{"id":"` + uuid + `","email":"dup@example.com","subId":"` + subId + `","enable":true}
]}`,
db := database.GetDB()
ib := &model.Inbound{Protocol: model.VLESS, Enable: true, Port: 42002, Tag: "dedup-ci", Settings: `{"clients":[]}`}
if err := db.Create(ib).Error; err != nil {
t.Fatalf("seed inbound: %v", err)
}
for _, email := range []string{"Dup@Example.com", "dup@example.com"} {
c := &model.ClientRecord{Email: email, SubID: subId, UUID: uuid, Enable: true}
if err := db.Create(c).Error; err != nil {
t.Fatalf("seed client %q: %v", email, err)
}
if err := db.Create(&model.ClientInbound{ClientId: c.Id, InboundId: ib.Id}).Error; err != nil {
t.Fatalf("seed client_inbound %q: %v", email, err)
}
}
s := &SubService{}
got := s.matchingClients(ib, subId)
if len(got) != 1 {
@@ -90,7 +106,7 @@ func TestMatchingClients_DedupsCaseInsensitiveEmail(t *testing.T) {
if got[0].Email != "Dup@Example.com" {
t.Fatalf("first occurrence must be kept, got %q", got[0].Email)
}
// A wrong subId must still be excluded (guards the subId filter at service.go:127).
// A wrong subId must still be excluded (guards the SQL subId filter).
if other := s.matchingClients(ib, "nope"); len(other) != 0 {
t.Fatalf("non-matching subId must yield 0 clients, got %d", len(other))
}
+31
View File
@@ -197,3 +197,34 @@ func (s *ClientService) ListForInbound(tx *gorm.DB, inboundId int) ([]model.Clie
}
return out, nil
}
// ListForInboundBySubId is ListForInbound narrowed to one subscription id —
// both filter columns are indexed, so the subscription server resolves a
// subscriber's clients without touching the inbound's settings JSON.
func (s *ClientService) ListForInboundBySubId(tx *gorm.DB, inboundId int, subId string) ([]model.Client, error) {
if tx == nil {
tx = database.GetDB()
}
type joinedRow struct {
model.ClientRecord
FlowOverride string
}
var rows []joinedRow
err := tx.Table("clients").
Select("clients.*, client_inbounds.flow_override AS flow_override").
Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
Where("client_inbounds.inbound_id = ? AND clients.sub_id = ?", inboundId, subId).
Order("clients.id ASC").
Find(&rows).Error
if err != nil {
return nil, err
}
out := make([]model.Client, 0, len(rows))
for i := range rows {
c := rows[i].ToClient()
c.Flow = rows[i].FlowOverride
out = append(out, *c)
}
return out, nil
}
+7
View File
@@ -409,6 +409,13 @@ func (s *InboundService) GetClients(inbound *model.Inbound) ([]model.Client, err
return clients, nil
}
// GetClientsBySubId returns the inbound's clients with the given subscription
// id, resolved from the normalized clients tables (the same source the running
// Xray users are built from) instead of parsing the settings JSON blob.
func (s *InboundService) GetClientsBySubId(inboundId int, subId string) ([]model.Client, error) {
return s.clientService.ListForInboundBySubId(nil, inboundId, subId)
}
func (s *InboundService) GetAllEmails() ([]string, error) {
db := database.GetDB()
var emails []string