mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-07-04 11:54:24 +00:00
fix(sub): resolve subscription clients and stats from normalized tables
A subscription fetch inside a large inbound cost seconds because every layer re-parsed the inbound's full settings JSON: getInboundsBySubId preloaded the whole client_traffics table of each matched inbound, matchingClients parsed all clients to filter by subId, and then every per-protocol generator (raw links, JSON outbounds, Clash proxies) parsed the blob again per link — once to find the client by email and once for inbound-level fields like encryption or method. At 500k clients in one inbound that was 13s per raw fetch and 8.5s per JSON fetch; at 100k, 2.6s/1.7s. After this change both cost ~70ms at 100k. matchingClients now resolves through the indexed clients/client_inbounds tables (ListForInboundBySubId, ordered by clients.id like ListForInbound — the same source the running Xray users are built from), and the per-request SubService carries two caches: clientsByInbound, primed by matchingClients/inboundLinks so clientForLink resolves a client without parsing settings (with the old full-parse as fallback, which also fixes the export-all-links path that re-parsed the blob once per client), and settingsByInbound, a once-per-request shallow decode that skips materializing the clients array entirely. The ClientStats preload is replaced by loading only the subscriber's traffic rows (indexed clients.sub_id); statsForClient's per-email DB fallback (#5567) covers any miss, and the case-insensitive email dedupe keeps the #5134 guarantee for case-differing duplicate rows.
This commit is contained in:
@@ -240,8 +240,7 @@ func (s *SubClashService) buildProxy(subReq *SubService, inbound *model.Inbound,
|
||||
case model.VLESS:
|
||||
proxy["type"] = "vless"
|
||||
proxy["uuid"] = applyVlessRoute(client.ID, hostVlessRoute(ep))
|
||||
var inboundSettings map[string]any
|
||||
_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
|
||||
inboundSettings := subReq.linkSettings(inbound)
|
||||
streamSecurity, _ := stream["security"].(string)
|
||||
if client.Flow != "" && vlessFlowAllowed(network, streamSecurity, inboundSettings) {
|
||||
proxy["flow"] = client.Flow
|
||||
@@ -258,8 +257,7 @@ func (s *SubClashService) buildProxy(subReq *SubService, inbound *model.Inbound,
|
||||
case model.Shadowsocks:
|
||||
proxy["type"] = "ss"
|
||||
proxy["password"] = client.Password
|
||||
var inboundSettings map[string]any
|
||||
_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
|
||||
inboundSettings := subReq.linkSettings(inbound)
|
||||
method, _ := inboundSettings["method"].(string)
|
||||
if method == "" {
|
||||
return nil
|
||||
@@ -288,8 +286,7 @@ func (s *SubClashService) buildProxy(subReq *SubService, inbound *model.Inbound,
|
||||
// helpers prune fields (like `allowInsecure` / the salamander obfs
|
||||
// block) that the hysteria proxy wants preserved.
|
||||
func (s *SubClashService) buildHysteriaProxy(subReq *SubService, inbound *model.Inbound, client model.Client, ep map[string]any) map[string]any {
|
||||
var inboundSettings map[string]any
|
||||
_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
|
||||
inboundSettings := subReq.linkSettings(inbound)
|
||||
|
||||
proxyType := "hysteria2"
|
||||
authKey := "password"
|
||||
|
||||
@@ -212,11 +212,11 @@ func (s *SubJsonService) getConfig(subReq *SubService, inbound *model.Inbound, c
|
||||
case "vless":
|
||||
vc := client
|
||||
vc.ID = applyVlessRoute(client.ID, hostVlessRoute(extPrxy))
|
||||
newOutbounds = append(newOutbounds, s.genVless(inbound, streamSettings, vc, jsonMux(mux, hostMux)))
|
||||
newOutbounds = append(newOutbounds, s.genVless(subReq, inbound, streamSettings, vc, jsonMux(mux, hostMux)))
|
||||
case "trojan", "shadowsocks":
|
||||
newOutbounds = append(newOutbounds, s.genServer(inbound, streamSettings, client, jsonMux(mux, hostMux)))
|
||||
newOutbounds = append(newOutbounds, s.genServer(subReq, inbound, streamSettings, client, jsonMux(mux, hostMux)))
|
||||
case "hysteria":
|
||||
newOutbounds = append(newOutbounds, s.genHy(inbound, newStream, client, jsonMux(mux, hostMux)))
|
||||
newOutbounds = append(newOutbounds, s.genHy(subReq, inbound, newStream, client, jsonMux(mux, hostMux)))
|
||||
}
|
||||
|
||||
newOutbounds = append(newOutbounds, s.defaultOutbounds...)
|
||||
@@ -393,7 +393,7 @@ func (s *SubJsonService) genVnext(inbound *model.Inbound, streamSettings json_ut
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *SubJsonService) genVless(inbound *model.Inbound, streamSettings json_util.RawMessage, client model.Client, mux string) json_util.RawMessage {
|
||||
func (s *SubJsonService) genVless(subReq *SubService, inbound *model.Inbound, streamSettings json_util.RawMessage, client model.Client, mux string) json_util.RawMessage {
|
||||
outbound := Outbound{}
|
||||
outbound.Protocol = string(inbound.Protocol)
|
||||
outbound.Tag = "proxy"
|
||||
@@ -403,8 +403,7 @@ func (s *SubJsonService) genVless(inbound *model.Inbound, streamSettings json_ut
|
||||
outbound.StreamSettings = streamSettings
|
||||
|
||||
// Add encryption for VLESS outbound from inbound settings
|
||||
var inboundSettings map[string]any
|
||||
_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
|
||||
inboundSettings := subReq.linkSettings(inbound)
|
||||
encryption, _ := inboundSettings["encryption"].(string)
|
||||
|
||||
settings := map[string]any{
|
||||
@@ -422,7 +421,7 @@ func (s *SubJsonService) genVless(inbound *model.Inbound, streamSettings json_ut
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *SubJsonService) genServer(inbound *model.Inbound, streamSettings json_util.RawMessage, client model.Client, mux string) json_util.RawMessage {
|
||||
func (s *SubJsonService) genServer(subReq *SubService, inbound *model.Inbound, streamSettings json_util.RawMessage, client model.Client, mux string) json_util.RawMessage {
|
||||
outbound := Outbound{}
|
||||
|
||||
serverData := make([]ServerSetting, 1)
|
||||
@@ -434,8 +433,7 @@ func (s *SubJsonService) genServer(inbound *model.Inbound, streamSettings json_u
|
||||
}
|
||||
|
||||
if inbound.Protocol == model.Shadowsocks {
|
||||
var inboundSettings map[string]any
|
||||
_ = json.Unmarshal([]byte(inbound.Settings), &inboundSettings)
|
||||
inboundSettings := subReq.linkSettings(inbound)
|
||||
method, _ := inboundSettings["method"].(string)
|
||||
serverData[0].Method = method
|
||||
|
||||
@@ -475,7 +473,7 @@ func (s *SubJsonService) genServer(inbound *model.Inbound, streamSettings json_u
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *SubJsonService) genHy(inbound *model.Inbound, newStream map[string]any, client model.Client, mux string) json_util.RawMessage {
|
||||
func (s *SubJsonService) genHy(subReq *SubService, inbound *model.Inbound, newStream map[string]any, client model.Client, mux string) json_util.RawMessage {
|
||||
outbound := Outbound{}
|
||||
|
||||
outbound.Protocol = string(inbound.Protocol)
|
||||
|
||||
@@ -122,7 +122,7 @@ func TestSubJsonServiceVlessFlattened(t *testing.T) {
|
||||
inbound := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.VLESS, Settings: `{"encryption":"none"}`}
|
||||
client := model.Client{ID: "uuid-1", Flow: "xtls-rprx-vision"}
|
||||
|
||||
settings := outboundSettings(t, NewSubJsonService("", "", "", nil).genVless(inbound, nil, client, ""))
|
||||
settings := outboundSettings(t, NewSubJsonService("", "", "", nil).genVless(&SubService{}, inbound, nil, client, ""))
|
||||
if _, ok := settings["vnext"]; ok {
|
||||
t.Fatal("vless outbound must not use vnext")
|
||||
}
|
||||
@@ -151,7 +151,7 @@ func TestSubJsonServiceServerUsesServersArray(t *testing.T) {
|
||||
trojan := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.Trojan, Settings: `{}`}
|
||||
client := model.Client{Password: "p4ss"}
|
||||
|
||||
settings := outboundSettings(t, NewSubJsonService("", "", "", nil).genServer(trojan, nil, client, ""))
|
||||
settings := outboundSettings(t, NewSubJsonService("", "", "", nil).genServer(&SubService{}, trojan, nil, client, ""))
|
||||
server := firstServer(settings)
|
||||
if server == nil {
|
||||
t.Fatalf("trojan outbound must use a servers array, got: %#v", settings)
|
||||
@@ -164,7 +164,7 @@ func TestSubJsonServiceServerUsesServersArray(t *testing.T) {
|
||||
}
|
||||
|
||||
ss := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.Shadowsocks, Settings: `{"method":"aes-256-gcm"}`}
|
||||
ssSettings := outboundSettings(t, NewSubJsonService("", "", "", nil).genServer(ss, nil, client, ""))
|
||||
ssSettings := outboundSettings(t, NewSubJsonService("", "", "", nil).genServer(&SubService{}, ss, nil, client, ""))
|
||||
ssServer := firstServer(ssSettings)
|
||||
if ssServer == nil {
|
||||
t.Fatalf("shadowsocks outbound must use a servers array, got: %#v", ssSettings)
|
||||
@@ -194,7 +194,7 @@ func TestSubJsonServiceXmuxSuppressesGlobalMux(t *testing.T) {
|
||||
inbound := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.VLESS, Settings: `{"encryption":"none"}`}
|
||||
client := model.Client{ID: "uuid-1"}
|
||||
|
||||
raw := svc.genVless(inbound, streamSettings, client, mux)
|
||||
raw := svc.genVless(&SubService{}, inbound, streamSettings, client, mux)
|
||||
var ob map[string]any
|
||||
if err := json.Unmarshal(raw, &ob); err != nil {
|
||||
t.Fatalf("unmarshal outbound: %v", err)
|
||||
@@ -240,7 +240,7 @@ func TestSubJsonServiceGlobalMuxWhenNoXmux(t *testing.T) {
|
||||
inbound := &model.Inbound{Listen: "1.2.3.4", Port: 443, Protocol: model.VLESS, Settings: `{"encryption":"none"}`}
|
||||
client := model.Client{ID: "uuid-1"}
|
||||
|
||||
raw := svc.genVless(inbound, streamSettings, client, mux)
|
||||
raw := svc.genVless(&SubService{}, inbound, streamSettings, client, mux)
|
||||
var ob map[string]any
|
||||
if err := json.Unmarshal(raw, &ob); err != nil {
|
||||
t.Fatalf("unmarshal outbound: %v", err)
|
||||
|
||||
@@ -67,11 +67,11 @@ func TestSubJsonService_MuxAttachedWhenConfigured(t *testing.T) {
|
||||
protocol model.Protocol
|
||||
}{
|
||||
{"vmess mux", NewSubJsonService(mux, "", "", nil).genVnext(&model.Inbound{Protocol: model.VMESS, Settings: `{}`}, nil, client, mux), true, model.VMESS},
|
||||
{"vless mux", NewSubJsonService(mux, "", "", nil).genVless(&model.Inbound{Protocol: model.VLESS, Settings: `{}`}, nil, client, mux), true, model.VLESS},
|
||||
{"server mux", NewSubJsonService(mux, "", "", nil).genServer(&model.Inbound{Protocol: model.Trojan, Settings: `{}`}, nil, client, mux), true, model.Trojan},
|
||||
{"vless mux", NewSubJsonService(mux, "", "", nil).genVless(&SubService{}, &model.Inbound{Protocol: model.VLESS, Settings: `{}`}, nil, client, mux), true, model.VLESS},
|
||||
{"server mux", NewSubJsonService(mux, "", "", nil).genServer(&SubService{}, &model.Inbound{Protocol: model.Trojan, Settings: `{}`}, nil, client, mux), true, model.Trojan},
|
||||
{"vmess no mux", NewSubJsonService("", "", "", nil).genVnext(&model.Inbound{Protocol: model.VMESS, Settings: `{}`}, nil, client, ""), false, model.VMESS},
|
||||
{"vless no mux", NewSubJsonService("", "", "", nil).genVless(&model.Inbound{Protocol: model.VLESS, Settings: `{}`}, nil, client, ""), false, model.VLESS},
|
||||
{"server no mux", NewSubJsonService("", "", "", nil).genServer(&model.Inbound{Protocol: model.Trojan, Settings: `{}`}, nil, client, ""), false, model.Trojan},
|
||||
{"vless no mux", NewSubJsonService("", "", "", nil).genVless(&SubService{}, &model.Inbound{Protocol: model.VLESS, Settings: `{}`}, nil, client, ""), false, model.VLESS},
|
||||
{"server no mux", NewSubJsonService("", "", "", nil).genServer(&SubService{}, &model.Inbound{Protocol: model.Trojan, Settings: `{}`}, nil, client, ""), false, model.Trojan},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
@@ -459,11 +459,8 @@ func (s *SubService) statsForClient(inbound *model.Inbound, client model.Client)
|
||||
// needed when a global remark template references client-only tokens. Falls back
|
||||
// to an email-only client if not found.
|
||||
func (s *SubService) lookupClient(inbound *model.Inbound, email string) model.Client {
|
||||
clients, _ := s.inboundService.GetClients(inbound)
|
||||
for _, c := range clients {
|
||||
if c.Email == email {
|
||||
return c
|
||||
}
|
||||
if c, ok := s.clientForLink(inbound, email); ok {
|
||||
return c
|
||||
}
|
||||
return model.Client{Email: email}
|
||||
}
|
||||
|
||||
+171
-65
@@ -54,6 +54,17 @@ type SubService struct {
|
||||
// doesn't own its row (multi-inbound subscriptions). Filled in
|
||||
// getInboundsBySubId; reset per request in PrepareForRequest.
|
||||
statsByEmail map[string]xray.ClientTraffic
|
||||
// clientsByInbound caches clients resolved for this request keyed by
|
||||
// inbound id then email, so the per-protocol link generators look a client
|
||||
// up without re-parsing the inbound's settings JSON per link.
|
||||
// fullyPrimedInbounds marks inbounds whose complete client list is cached
|
||||
// (a miss there is authoritative). Reset per request in PrepareForRequest.
|
||||
clientsByInbound map[int]map[string]model.Client
|
||||
fullyPrimedInbounds map[int]bool
|
||||
// settingsByInbound caches each inbound's settings decoded once per request
|
||||
// with the clients array left out; generators read only inbound-level
|
||||
// fields (encryption, method, version, …) from it.
|
||||
settingsByInbound map[int]map[string]any
|
||||
}
|
||||
|
||||
// NewSubService creates a new subscription service with the given configuration.
|
||||
@@ -86,10 +97,96 @@ func (s *SubService) PrepareForRequest(host string) {
|
||||
s.address = host
|
||||
s.usageShown = map[string]bool{}
|
||||
s.statsByEmail = map[string]xray.ClientTraffic{}
|
||||
s.clientsByInbound = map[int]map[string]model.Client{}
|
||||
s.fullyPrimedInbounds = map[int]bool{}
|
||||
s.settingsByInbound = map[int]map[string]any{}
|
||||
s.loadNodes()
|
||||
s.loadRemarkSettings()
|
||||
}
|
||||
|
||||
// primeLinkClients caches clients (first occurrence per email, matching the
|
||||
// old settings-JSON iteration order) so clientForLink resolves them without a
|
||||
// parse. complete marks the inbound's whole client list as cached.
|
||||
func (s *SubService) primeLinkClients(inboundId int, clients []model.Client, complete bool) {
|
||||
if inboundId <= 0 {
|
||||
return
|
||||
}
|
||||
if s.clientsByInbound == nil {
|
||||
s.clientsByInbound = map[int]map[string]model.Client{}
|
||||
}
|
||||
m := s.clientsByInbound[inboundId]
|
||||
if m == nil {
|
||||
m = make(map[string]model.Client, len(clients))
|
||||
s.clientsByInbound[inboundId] = m
|
||||
}
|
||||
for _, c := range clients {
|
||||
if _, exists := m[c.Email]; !exists {
|
||||
m[c.Email] = c
|
||||
}
|
||||
}
|
||||
if complete {
|
||||
if s.fullyPrimedInbounds == nil {
|
||||
s.fullyPrimedInbounds = map[int]bool{}
|
||||
}
|
||||
s.fullyPrimedInbounds[inboundId] = true
|
||||
}
|
||||
}
|
||||
|
||||
// clientForLink resolves one client of an inbound by email for link
|
||||
// generation: from the per-request cache when primed, otherwise by parsing
|
||||
// the settings JSON once and caching every client from it.
|
||||
func (s *SubService) clientForLink(inbound *model.Inbound, email string) (model.Client, bool) {
|
||||
if m, ok := s.clientsByInbound[inbound.Id]; ok {
|
||||
if c, hit := m[email]; hit {
|
||||
return c, true
|
||||
}
|
||||
if s.fullyPrimedInbounds[inbound.Id] {
|
||||
return model.Client{}, false
|
||||
}
|
||||
}
|
||||
clients, err := s.inboundService.GetClients(inbound)
|
||||
if err != nil {
|
||||
return model.Client{}, false
|
||||
}
|
||||
s.primeLinkClients(inbound.Id, clients, true)
|
||||
for i := range clients {
|
||||
if clients[i].Email == email {
|
||||
return clients[i], true
|
||||
}
|
||||
}
|
||||
return model.Client{}, false
|
||||
}
|
||||
|
||||
// linkSettings returns the inbound's settings decoded once per request with
|
||||
// the clients array left out — the link generators read only inbound-level
|
||||
// fields from it and resolve clients via clientForLink. The shallow
|
||||
// RawMessage pass skips materializing a huge clients array entirely.
|
||||
func (s *SubService) linkSettings(inbound *model.Inbound) map[string]any {
|
||||
if inbound.Id > 0 {
|
||||
if cached, ok := s.settingsByInbound[inbound.Id]; ok {
|
||||
return cached
|
||||
}
|
||||
}
|
||||
shallow := map[string]json.RawMessage{}
|
||||
_ = json.Unmarshal([]byte(inbound.Settings), &shallow)
|
||||
out := make(map[string]any, len(shallow))
|
||||
for key, raw := range shallow {
|
||||
if key == "clients" {
|
||||
continue
|
||||
}
|
||||
var value any
|
||||
_ = json.Unmarshal(raw, &value)
|
||||
out[key] = value
|
||||
}
|
||||
if inbound.Id > 0 {
|
||||
if s.settingsByInbound == nil {
|
||||
s.settingsByInbound = map[int]map[string]any{}
|
||||
}
|
||||
s.settingsByInbound[inbound.Id] = out
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// loadRemarkSettings populates the per-request remark formatting state so
|
||||
// every subscription format — raw, JSON, Clash — renders remarks the same way
|
||||
// (the date formatter reads datepicker). Loading it only in getSubs left
|
||||
@@ -144,25 +241,23 @@ func listenIsInternalOnly(listen string) bool {
|
||||
}
|
||||
|
||||
// matchingClients returns the inbound's clients whose SubID equals subId,
|
||||
// deduplicated by email. settings.clients can accumulate duplicate entries
|
||||
// for the same client (multi-node sync/import drift, old DBs): SyncInbound
|
||||
// dedupes the normalized client_inbounds rows on write but never rewrites
|
||||
// the legacy JSON, and the subscription builders iterate that JSON — so
|
||||
// without this guard every duplicate became a duplicate profile in the
|
||||
// output (#5134). Link generation keys purely on (inbound, email), so
|
||||
// same-email entries are pure duplicates and dropping them is lossless.
|
||||
// resolved from the normalized clients/client_inbounds tables (both filter
|
||||
// columns indexed) instead of parsing the settings JSON — at large client
|
||||
// counts that parse made every subscription fetch cost seconds. The
|
||||
// case-insensitive email dedupe stays as cheap insurance even though
|
||||
// clients.email is unique, preserving the #5134 guarantee that duplicate
|
||||
// settings entries never fan out into duplicate profiles. Resolved clients
|
||||
// are primed into the per-request cache so the link generators don't parse
|
||||
// settings either.
|
||||
func (s *SubService) matchingClients(inbound *model.Inbound, subId string) []model.Client {
|
||||
clients, err := s.inboundService.GetClients(inbound)
|
||||
clients, err := s.inboundService.GetClientsBySubId(inbound.Id, subId)
|
||||
if err != nil {
|
||||
logger.Error("SubService - GetClients: Unable to get clients from inbound")
|
||||
logger.Error("SubService - GetClientsBySubId: Unable to get clients from inbound")
|
||||
return nil
|
||||
}
|
||||
var out []model.Client
|
||||
seen := make(map[string]struct{}, len(clients))
|
||||
for _, client := range clients {
|
||||
if client.SubID != subId {
|
||||
continue
|
||||
}
|
||||
key := strings.ToLower(client.Email)
|
||||
if _, dup := seen[key]; dup {
|
||||
continue
|
||||
@@ -170,6 +265,7 @@ func (s *SubService) matchingClients(inbound *model.Inbound, subId string) []mod
|
||||
seen[key] = struct{}{}
|
||||
out = append(out, client)
|
||||
}
|
||||
s.primeLinkClients(inbound.Id, out, false)
|
||||
return out
|
||||
}
|
||||
|
||||
@@ -253,6 +349,7 @@ func (s *SubService) inboundLinks(inbound *model.Inbound) []string {
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
s.primeLinkClients(inbound.Id, clients, true)
|
||||
s.projectThroughFallbackMaster(inbound)
|
||||
hostEps := s.hostEndpoints(inbound, "raw")
|
||||
var out []string
|
||||
@@ -362,7 +459,7 @@ func subscriptionExpiryFromClient(nowMs, expiryTime int64) int64 {
|
||||
func (s *SubService) getInboundsBySubId(subId string) ([]*model.Inbound, error) {
|
||||
db := database.GetDB()
|
||||
var inbounds []*model.Inbound
|
||||
err := db.Model(model.Inbound{}).Preload("ClientStats").Where(`id in (
|
||||
err := db.Model(model.Inbound{}).Where(`id in (
|
||||
SELECT DISTINCT inbounds.id
|
||||
FROM inbounds
|
||||
JOIN client_inbounds ON client_inbounds.inbound_id = inbounds.id
|
||||
@@ -374,19 +471,34 @@ func (s *SubService) getInboundsBySubId(subId string) ([]*model.Inbound, error)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.indexStatsByEmail(inbounds)
|
||||
s.indexStatsBySubId(subId)
|
||||
return inbounds, nil
|
||||
}
|
||||
|
||||
// indexStatsByEmail records every loaded inbound's client traffic rows keyed by
|
||||
// email so statsForClient can resolve a client's usage even on an inbound that
|
||||
// doesn't own its (globally unique) client_traffics row. See statsByEmail.
|
||||
func (s *SubService) indexStatsByEmail(inbounds []*model.Inbound) {
|
||||
// indexStatsBySubId loads the traffic rows for just this subscriber's clients
|
||||
// into statsByEmail so statsForClient can resolve a client's usage on any of
|
||||
// its inbounds. It replaces preloading every matched inbound's ClientStats,
|
||||
// which read the entire client_traffics table on every subscription fetch of
|
||||
// a large inbound; statsForClient's per-email DB fallback covers any miss.
|
||||
func (s *SubService) indexStatsBySubId(subId string) {
|
||||
if s.statsByEmail == nil {
|
||||
s.statsByEmail = map[string]xray.ClientTraffic{}
|
||||
}
|
||||
for _, inbound := range inbounds {
|
||||
for _, st := range inbound.ClientStats {
|
||||
db := database.GetDB()
|
||||
var emails []string
|
||||
if err := db.Model(&model.ClientRecord{}).Where("sub_id = ?", subId).Pluck("email", &emails).Error; err != nil {
|
||||
logger.Error("SubService - indexStatsBySubId: load emails:", err)
|
||||
return
|
||||
}
|
||||
const chunk = 400
|
||||
for lo := 0; lo < len(emails); lo += chunk {
|
||||
hi := min(lo+chunk, len(emails))
|
||||
var rows []xray.ClientTraffic
|
||||
if err := db.Where("email IN ?", emails[lo:hi]).Find(&rows).Error; err != nil {
|
||||
logger.Error("SubService - indexStatsBySubId: load traffics:", err)
|
||||
return
|
||||
}
|
||||
for _, st := range rows {
|
||||
s.statsByEmail[st.Email] = st
|
||||
}
|
||||
}
|
||||
@@ -516,21 +628,14 @@ func (s *SubService) genWireguardLink(inbound *model.Inbound, email string) stri
|
||||
if inbound.Protocol != model.WireGuard {
|
||||
return ""
|
||||
}
|
||||
settings := map[string]any{}
|
||||
_ = json.Unmarshal([]byte(inbound.Settings), &settings)
|
||||
settings := s.linkSettings(inbound)
|
||||
secretKey, _ := settings["secretKey"].(string)
|
||||
|
||||
clients, _ := s.inboundService.GetClients(inbound)
|
||||
var client *model.Client
|
||||
for i := range clients {
|
||||
if clients[i].Email == email {
|
||||
client = &clients[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if client == nil || client.PrivateKey == "" {
|
||||
resolved, ok := s.clientForLink(inbound, email)
|
||||
if !ok || resolved.PrivateKey == "" {
|
||||
return ""
|
||||
}
|
||||
client := &resolved
|
||||
|
||||
link := fmt.Sprintf("wireguard://%s@%s", encodeUserinfo(client.PrivateKey), joinHostPort(s.resolveInboundAddress(inbound), inbound.Port))
|
||||
params := make(map[string]string)
|
||||
@@ -607,10 +712,12 @@ func (s *SubService) genVmessLink(inbound *model.Inbound, email string) string {
|
||||
applyVmessTLSParams(stream, obj)
|
||||
}
|
||||
|
||||
clients, _ := s.inboundService.GetClients(inbound)
|
||||
clientIndex := findClientIndex(clients, email)
|
||||
obj["id"] = clients[clientIndex].ID
|
||||
obj["scy"] = clients[clientIndex].Security
|
||||
client, ok := s.clientForLink(inbound, email)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
obj["id"] = client.ID
|
||||
obj["scy"] = client.Security
|
||||
|
||||
externalProxies, _ := stream["externalProxy"].([]any)
|
||||
|
||||
@@ -659,17 +766,18 @@ func (s *SubService) genVlessLink(inbound *model.Inbound, email string) string {
|
||||
}
|
||||
address := s.resolveInboundAddress(inbound)
|
||||
stream := unmarshalStreamSettings(inbound.StreamSettings)
|
||||
clients, _ := s.inboundService.GetClients(inbound)
|
||||
clientIndex := findClientIndex(clients, email)
|
||||
uuid := clients[clientIndex].ID
|
||||
client, ok := s.clientForLink(inbound, email)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
uuid := client.ID
|
||||
port := inbound.Port
|
||||
streamNetwork := stream["network"].(string)
|
||||
params := make(map[string]string)
|
||||
params["type"] = streamNetwork
|
||||
|
||||
// Add encryption parameter for VLESS from inbound settings
|
||||
var settings map[string]any
|
||||
_ = json.Unmarshal([]byte(inbound.Settings), &settings)
|
||||
settings := s.linkSettings(inbound)
|
||||
if encryption, ok := settings["encryption"].(string); ok {
|
||||
params["encryption"] = encryption
|
||||
}
|
||||
@@ -683,12 +791,12 @@ func (s *SubService) genVlessLink(inbound *model.Inbound, email string) string {
|
||||
case "tls":
|
||||
applyShareTLSParams(stream, params)
|
||||
case "reality":
|
||||
applyShareRealityParams(stream, params, subKey(clients[clientIndex]))
|
||||
applyShareRealityParams(stream, params, subKey(client))
|
||||
default:
|
||||
params["security"] = "none"
|
||||
}
|
||||
if len(clients[clientIndex].Flow) > 0 && vlessFlowAllowed(streamNetwork, security, settings) {
|
||||
params["flow"] = clients[clientIndex].Flow
|
||||
if len(client.Flow) > 0 && vlessFlowAllowed(streamNetwork, security, settings) {
|
||||
params["flow"] = client.Flow
|
||||
}
|
||||
|
||||
externalProxies, _ := stream["externalProxy"].([]any)
|
||||
@@ -717,9 +825,11 @@ func (s *SubService) genTrojanLink(inbound *model.Inbound, email string) string
|
||||
}
|
||||
address := s.resolveInboundAddress(inbound)
|
||||
stream := unmarshalStreamSettings(inbound.StreamSettings)
|
||||
clients, _ := s.inboundService.GetClients(inbound)
|
||||
clientIndex := findClientIndex(clients, email)
|
||||
password := encodeUserinfo(clients[clientIndex].Password)
|
||||
client, ok := s.clientForLink(inbound, email)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
password := encodeUserinfo(client.Password)
|
||||
port := inbound.Port
|
||||
streamNetwork := stream["network"].(string)
|
||||
params := make(map[string]string)
|
||||
@@ -734,9 +844,9 @@ func (s *SubService) genTrojanLink(inbound *model.Inbound, email string) string
|
||||
case "tls":
|
||||
applyShareTLSParams(stream, params)
|
||||
case "reality":
|
||||
applyShareRealityParams(stream, params, subKey(clients[clientIndex]))
|
||||
if streamNetwork == "tcp" && len(clients[clientIndex].Flow) > 0 {
|
||||
params["flow"] = clients[clientIndex].Flow
|
||||
applyShareRealityParams(stream, params, subKey(client))
|
||||
if streamNetwork == "tcp" && len(client.Flow) > 0 {
|
||||
params["flow"] = client.Flow
|
||||
}
|
||||
default:
|
||||
params["security"] = "none"
|
||||
@@ -788,13 +898,14 @@ func (s *SubService) genShadowsocksLink(inbound *model.Inbound, email string) st
|
||||
}
|
||||
address := s.resolveInboundAddress(inbound)
|
||||
stream := unmarshalStreamSettings(inbound.StreamSettings)
|
||||
clients, _ := s.inboundService.GetClients(inbound)
|
||||
client, ok := s.clientForLink(inbound, email)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
|
||||
var settings map[string]any
|
||||
_ = json.Unmarshal([]byte(inbound.Settings), &settings)
|
||||
settings := s.linkSettings(inbound)
|
||||
inboundPassword := settings["password"].(string)
|
||||
method := settings["method"].(string)
|
||||
clientIndex := findClientIndex(clients, email)
|
||||
streamNetwork := stream["network"].(string)
|
||||
params := make(map[string]string)
|
||||
params["type"] = streamNetwork
|
||||
@@ -828,9 +939,9 @@ func (s *SubService) genShadowsocksLink(inbound *model.Inbound, email string) st
|
||||
userInfo = fmt.Sprintf("%s:%s:%s",
|
||||
url.QueryEscape(method),
|
||||
url.QueryEscape(inboundPassword),
|
||||
url.QueryEscape(clients[clientIndex].Password))
|
||||
url.QueryEscape(client.Password))
|
||||
} else {
|
||||
userInfo = base64.RawURLEncoding.EncodeToString(fmt.Appendf(nil, "%s:%s", method, clients[clientIndex].Password))
|
||||
userInfo = base64.RawURLEncoding.EncodeToString(fmt.Appendf(nil, "%s:%s", method, client.Password))
|
||||
}
|
||||
|
||||
externalProxies, _ := stream["externalProxy"].([]any)
|
||||
@@ -861,15 +972,11 @@ func (s *SubService) genHysteriaLink(inbound *model.Inbound, email string) strin
|
||||
}
|
||||
var stream map[string]any
|
||||
_ = json.Unmarshal([]byte(inbound.StreamSettings), &stream)
|
||||
clients, _ := s.inboundService.GetClients(inbound)
|
||||
clientIndex := -1
|
||||
for i, client := range clients {
|
||||
if client.Email == email {
|
||||
clientIndex = i
|
||||
break
|
||||
}
|
||||
client, ok := s.clientForLink(inbound, email)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
auth := encodeUserinfo(clients[clientIndex].Auth)
|
||||
auth := encodeUserinfo(client.Auth)
|
||||
params := make(map[string]string)
|
||||
|
||||
params["security"] = "tls"
|
||||
@@ -928,8 +1035,7 @@ func (s *SubService) genHysteriaLink(inbound *model.Inbound, email string) strin
|
||||
}
|
||||
}
|
||||
|
||||
var settings map[string]any
|
||||
_ = json.Unmarshal([]byte(inbound.Settings), &settings)
|
||||
settings := s.linkSettings(inbound)
|
||||
version, _ := settings["version"].(float64)
|
||||
protocol := "hysteria2"
|
||||
if int(version) == 1 {
|
||||
|
||||
@@ -69,19 +69,35 @@ func TestGetSubs_DuplicateSettingsClients_Deduped(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestMatchingClients_DedupsCaseInsensitiveEmail pins the dedup KEY, not just the count:
|
||||
// the two entries differ only by email case, so dropping strings.ToLower (or keying on
|
||||
// another field) yields two clients. The byte-identical dupes above can't catch that.
|
||||
// TestMatchingClients_DedupsCaseInsensitiveEmail pins the dedup KEY, not just the count.
|
||||
// clients.email is unique but case-sensitively so: two rows differing only by email case
|
||||
// can coexist (import drift), and dropping strings.ToLower (or keying on another field)
|
||||
// would emit both. The first row by id must win, matching the old settings-JSON order.
|
||||
func TestMatchingClients_DedupsCaseInsensitiveEmail(t *testing.T) {
|
||||
dbDir := t.TempDir()
|
||||
t.Setenv("XUI_DB_FOLDER", dbDir)
|
||||
if err := database.InitDB(filepath.Join(dbDir, "x-ui.db")); err != nil {
|
||||
t.Fatalf("InitDB: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { _ = database.CloseDB() })
|
||||
|
||||
const subId = "s1"
|
||||
const uuid = "11111111-2222-4333-8444-555555555555"
|
||||
ib := &model.Inbound{
|
||||
Protocol: model.VLESS,
|
||||
Settings: `{"clients":[
|
||||
{"id":"` + uuid + `","email":"Dup@Example.com","subId":"` + subId + `","enable":true},
|
||||
{"id":"` + uuid + `","email":"dup@example.com","subId":"` + subId + `","enable":true}
|
||||
]}`,
|
||||
db := database.GetDB()
|
||||
ib := &model.Inbound{Protocol: model.VLESS, Enable: true, Port: 42002, Tag: "dedup-ci", Settings: `{"clients":[]}`}
|
||||
if err := db.Create(ib).Error; err != nil {
|
||||
t.Fatalf("seed inbound: %v", err)
|
||||
}
|
||||
for _, email := range []string{"Dup@Example.com", "dup@example.com"} {
|
||||
c := &model.ClientRecord{Email: email, SubID: subId, UUID: uuid, Enable: true}
|
||||
if err := db.Create(c).Error; err != nil {
|
||||
t.Fatalf("seed client %q: %v", email, err)
|
||||
}
|
||||
if err := db.Create(&model.ClientInbound{ClientId: c.Id, InboundId: ib.Id}).Error; err != nil {
|
||||
t.Fatalf("seed client_inbound %q: %v", email, err)
|
||||
}
|
||||
}
|
||||
|
||||
s := &SubService{}
|
||||
got := s.matchingClients(ib, subId)
|
||||
if len(got) != 1 {
|
||||
@@ -90,7 +106,7 @@ func TestMatchingClients_DedupsCaseInsensitiveEmail(t *testing.T) {
|
||||
if got[0].Email != "Dup@Example.com" {
|
||||
t.Fatalf("first occurrence must be kept, got %q", got[0].Email)
|
||||
}
|
||||
// A wrong subId must still be excluded (guards the subId filter at service.go:127).
|
||||
// A wrong subId must still be excluded (guards the SQL subId filter).
|
||||
if other := s.matchingClients(ib, "nope"); len(other) != 0 {
|
||||
t.Fatalf("non-matching subId must yield 0 clients, got %d", len(other))
|
||||
}
|
||||
|
||||
@@ -197,3 +197,34 @@ func (s *ClientService) ListForInbound(tx *gorm.DB, inboundId int) ([]model.Clie
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ListForInboundBySubId is ListForInbound narrowed to one subscription id —
|
||||
// both filter columns are indexed, so the subscription server resolves a
|
||||
// subscriber's clients without touching the inbound's settings JSON.
|
||||
func (s *ClientService) ListForInboundBySubId(tx *gorm.DB, inboundId int, subId string) ([]model.Client, error) {
|
||||
if tx == nil {
|
||||
tx = database.GetDB()
|
||||
}
|
||||
type joinedRow struct {
|
||||
model.ClientRecord
|
||||
FlowOverride string
|
||||
}
|
||||
var rows []joinedRow
|
||||
err := tx.Table("clients").
|
||||
Select("clients.*, client_inbounds.flow_override AS flow_override").
|
||||
Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
|
||||
Where("client_inbounds.inbound_id = ? AND clients.sub_id = ?", inboundId, subId).
|
||||
Order("clients.id ASC").
|
||||
Find(&rows).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := make([]model.Client, 0, len(rows))
|
||||
for i := range rows {
|
||||
c := rows[i].ToClient()
|
||||
c.Flow = rows[i].FlowOverride
|
||||
out = append(out, *c)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
@@ -409,6 +409,13 @@ func (s *InboundService) GetClients(inbound *model.Inbound) ([]model.Client, err
|
||||
return clients, nil
|
||||
}
|
||||
|
||||
// GetClientsBySubId returns the inbound's clients with the given subscription
|
||||
// id, resolved from the normalized clients tables (the same source the running
|
||||
// Xray users are built from) instead of parsing the settings JSON blob.
|
||||
func (s *InboundService) GetClientsBySubId(inboundId int, subId string) ([]model.Client, error) {
|
||||
return s.clientService.ListForInboundBySubId(nil, inboundId, subId)
|
||||
}
|
||||
|
||||
func (s *InboundService) GetAllEmails() ([]string, error) {
|
||||
db := database.GetDB()
|
||||
var emails []string
|
||||
|
||||
Reference in New Issue
Block a user