mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-07-03 19:34:34 +00:00
41645255f1
* refactor(service): split client.go into focused files
client.go had grown to 4455 lines mixing ~10 responsibilities. Split it
verbatim into cohesive same-package files (no behavior change):
client.go foundation: ClientService, ClientWithAttachments,
ClientCreatePayload, ErrClientNotInInbound, sqlInChunk
client_locks.go inbound mutation locks, delete tombstones, compactOrphans
client_lookup.go read-only lookups (GetByID, List, EffectiveFlow, ...)
client_link.go inbound association sync (SyncInbound, DetachInbound, ...)
client_crud.go single-client CRUD + validation + protocol defaults
client_inbound_apply.go low-level inbound-settings mutators + by-email setters
client_bulk.go bulk attach/detach/adjust/delete/create + DelDepleted
client_traffic.go traffic-reset paths
client_groups.go client group management
client_paging.go paged listing, filtering, sorting, summary
Every declaration moved unchanged (verified: identical func/type/const/var
signature set before vs after). Imports redistributed per file via goimports.
go build ./..., go vet, and go test ./web/service/... all pass.
* refactor(service): split inbound.go into focused files
inbound.go was 4100 lines. Split it verbatim into cohesive same-package
files (no behavior change):
inbound.go core inbound CRUD + InboundService (keeps pkg doc)
inbound_protocol.go protocol / stream capability helpers
inbound_node.go node/runtime/remote coordination + online tracking
inbound_traffic.go traffic accounting, reset, client stats
inbound_client_ips.go per-client IP tracking
inbound_clients.go client lookups within inbounds + copy-clients
inbound_disable.go auto-disable invalid inbounds/clients
inbound_migration.go DB migrations
inbound_sublink.go subscription link providers
inbound_util.go generic slice/string helpers
Identical func/type/const/var signature set before vs after; package doc
comment preserved on inbound.go. Imports redistributed via goimports.
Build, vet, and go test ./web/service/... all pass.
* refactor(service): split tgbot.go into focused files
tgbot.go was 3738 lines dominated by a 1246-line answerCallback. Split it
verbatim into cohesive same-package files (no behavior change):
tgbot.go lifecycle, bot setup, caches, small utils
tgbot_router.go incoming update / command / callback dispatch
tgbot_send.go outbound messaging primitives
tgbot_client.go client views, actions, subscription links
tgbot_inbound.go inbound listing / pickers
tgbot_report.go server usage, exhausted, online, backups, notifications
Identical func/type/const/var signature set before vs after. Imports
redistributed via goimports. Build, vet, and go test ./web/service/... pass.
* refactor(client): dedupe single-field by-email setters
ResetClientIpLimitByEmail, ResetClientExpiryTimeByEmail, and
ResetClientTrafficLimitByEmail shared an identical ~50-line body that
resolves the inbound by email, confirms the client exists, rewrites a
single-client settings payload, and delegates to UpdateInboundClient.
Extract that into applyClientFieldByEmail(inboundSvc, email, mutate) and
reduce each setter to a 3-line wrapper. Behavior is unchanged: same checks
and error strings, same single-client payload contract, same totalGB guard.
SetClientTelegramUserID (resolves by traffic id, different error text) and
ToggleClientEnableByEmail/SetClientEnableByEmail (different return shape and
a pre-read of the old state) intentionally keep their own bodies.
* refactor(service): extract panel/ subpackage
Move the panel-administration leaf services out of the flat service
package into web/service/panel/ (package panel):
user.go UserService (auth / 2FA / LDAP)
panel.go PanelService (restart / self-update) + version helpers
panel_other.go non-unix RestartPanel
panel_unix.go unix RestartPanel
api_token.go ApiTokenService
websocket.go WebSocketService
panel_test.go version/shellQuote unit tests
These are leaves: they depend on core (SettingService, Release) but no
core file references them, so the extraction creates no import cycle.
Core references are now qualified (service.SettingService, service.Release);
callers in main.go, web/web.go, and web/controller/* updated to panel.*.
Build, vet, and go test ./web/... pass.
* refactor(service): extract integration/ subpackage
Move the external-provider integration leaves into web/service/integration/
(package integration):
warp.go WarpService (Cloudflare WARP)
nord.go NordService (NordVPN)
custom_geo.go CustomGeoService (custom geo asset management)
*_test.go custom_geo / panel-proxy tests
These depend on core (SettingService, ServerService, XraySettingService) but
no core file references them. xray_setting.go stays in core because it calls
the unexported SettingService.saveSetting. The shared isBlockedIP SSRF helper
(used by core url_safety.go and by custom_geo) now has a small copy in each
package rather than being exported. Core references qualified; callers in
web/web.go, web/job/*, and web/controller/* updated to integration.*.
Build, vet, and go test ./web/... pass.
* refactor(service): extract tgbot/ subpackage
Move the Telegram bot (6 files + test) into web/service/tgbot/ (package
tgbot). It is a leaf: it embeds five core services (Inbound/Client/Setting/
Server/Xray) and the core never references it, so no import cycle.
To support the package boundary without changing behavior:
- core exposes XrayProcess() *xray.Process so tgbot keeps calling the
exact same running-process methods it used via the package-level `p`;
- three core methods tgbot calls are exported: ClientService.checkIs-
EnabledByEmail -> CheckIsEnabledByEmail, InboundService.getAllEmails ->
GetAllEmails (callers updated in-package);
- tgbot's embedded-field types and the few core type refs (Status,
ClientCreatePayload, SanitizePublicHTTPURL) are now service-qualified.
Callers in main.go, web/web.go, web/job/*, and web/controller/* updated to
tgbot.*. Build, vet, and go test ./web/... pass.
* refactor(service): extract outbound/ subpackage
OutboundService (outbound.go) imports only neutral packages (config,
database, model, xray) and its production code is referenced by no core or
sibling service file — only by web/controller/xray_setting.go and
web/job/xray_traffic_job.go. Move it to web/service/outbound/ (package
outbound); no core qualification needed inside. Callers updated to outbound.*.
The one coupling was a tiny pure test helper, outboundsContainTag, used by
both outbound.go and the core outbound_subscription_test.go; it now has a
small copy in that test file rather than being shared across the boundary.
Build, vet, and go test ./web/... pass.
* refactor(util): move wireguard into its own subpackage
util/wireguard.go was the lone file of the root `util` package (24 lines,
one exported func GenerateWireguardKeypair), while every other util concern
lives in a focused subpackage (util/common, util/crypto, util/netsafe, ...).
Move it to util/wireguard/ (package wireguard) for consistency; its only
importer, web/service/integration/warp.go, is updated. The root `util`
package no longer exists.
* refactor(sub): drop redundant sub prefix from filenames
Inside package sub the subXxx.go prefix just repeats the package name
(like client_*.go did inside service). Rename for consistency; content and
type names are unchanged:
subController.go -> controller.go
subService.go -> service.go
subClashService.go -> clash_service.go
subJsonService.go -> json_service.go
(+ matching _test.go files)
* refactor(controller): rename xui.go -> spa.go
XUIController serves the panel's single-page-app shell; spa.go names that
role plainly (the other controller files are domain-named). File rename only
— the type stays XUIController. api_docs_test.go keys route base paths by
filename, so its "xui.go" case is updated to "spa.go".
* refactor: move backend packages under internal/
Adopt the idiomatic Go application layout: the backend packages now live
under internal/ (a boundary the toolchain enforces), signalling private
implementation instead of a library-style flat root. No runtime behavior
changes — only import paths and a few build/config paths move.
Moved: config, database, logger, mtproto, sub, util, web, xray -> internal/.
main.go stays at the repo root and tools/openapigen stays under tools/ (both
still import internal/* because the internal rule keys off the module root).
The module path github.com/mhsanaei/3x-ui/v3 is unchanged; 149 .go files had
their import prefix rewritten to .../internal/<pkg>.
Couplings the Go compiler can't see, updated to the new layout:
- frontend i18n imports of web/translation (react.ts, setup.components.ts)
- vite outDir + eslint/tsconfig ignore globs -> internal/web/dist
- Dockerfile COPY paths for web/dist and web/translation
- locale.go os.DirFS("web") disk fallback -> "internal/web"
- .gitignore and ci.yml go:embed stub for internal/web/dist
- api_docs_test.go repo-root relative walk (one level deeper)
- tools/openapigen filesystem package paths; ApiTokenView repointed to the
web/service/panel subpackage and codegen regenerated (clears a stale
type the ci.yml codegen check was failing on)
Verified: go build/vet/test (all packages), and frontend typecheck, lint,
vitest (478 tests), and production build into internal/web/dist.
* fix(config): keep test runs from writing logs into the source tree
GetLogFolder() returns a CWD-relative "./log" on Windows. Under `go test`
the working directory is each package's own folder, so InitLogger (called by
tests in web/job, web/service, xray, web/websocket) created stray log/
directories scattered through the source tree (e.g. internal/web/job/log/).
Redirect to a shared temp folder when testing.Testing() reports a test run.
Production behavior is unchanged: Windows still uses ./log next to the binary
and Linux /var/log/x-ui. The log files were always gitignored (*.log) and
never committed; this just stops the noise at the source.
* docs: move subscription-template guide out of root into docs/
sub_templates/ was a top-level folder holding only a README and no actual
templates (3x-ui ships none by design), referenced nowhere and unlinked from
any doc — it read like an empty placeholder cluttering the repo root.
Move the guide to docs/custom-subscription-templates.md (a proper docs home),
reword its intro to read as documentation rather than a folder note, link it
from the Features list in README.md, and drop the empty sub_templates/ folder.
* fix: update stale web/ path references after the internal/ move
The internal/ migration rewrote Go import paths but left some references to
the old top-level layout in docs, comments, and a few runtime disk paths.
Functional (dev-mode only): the disk-serving fallbacks that read the Vite
build from disk when running from source still pointed at web/dist/, which
moved to internal/web/dist/ — so `os.DirFS`/`os.Stat`/`os.ReadFile` in
internal/web/web.go and internal/sub/{sub,controller}.go are corrected.
Production was unaffected (it serves the embedded FS; verified by the Docker
build), but `go run` with a live frontend build silently fell back to embed.
Docs/comments: frontend/README.md, CONTRIBUTING.md, the claude-issue-bot and
release workflows, the openapigen -root help text, and assorted Go comments
now reference internal/web, internal/database, internal/sub, internal/xray,
etc. Package-name mentions (the "web" package), root paths (main.go,
frontend/, install scripts, /etc/x-ui), routes (/panel/api/xray), and the
historical "web/assets no longer exists" note were intentionally left as-is.
* refactor(web): remove the legacy /xui -> /panel redirect middleware
RedirectMiddleware existed only for backward compatibility with the old
`/xui` URL scheme (301-redirecting /xui and /xui/API to /panel and
/panel/api). That cutover was long ago, so drop the middleware, its
registration in initRouter, and the now-inaccurate "URL redirection"
mention in the middleware package doc. Old /xui URLs now 404 like any other
unknown path. HTTPS auto-redirect and auth redirects are unrelated and stay.
* build: fix .dockerignore for internal/ layout and exclude runtime dir
- web/dist -> internal/web/dist: the embedded frontend moved under internal/,
so the stale exclude no longer matched and the locally-built dist could be
sent to the build context (the frontend stage rebuilds it fresh anyway).
- exclude x-ui/: the local runtime directory (SQLite db, geo .dat files, xray
binaries, certs — ~150MB) was being shipped into the build context for no
reason. Verified the pattern excludes only the directory and still keeps
x-ui.sh, which the Dockerfile copies to /usr/bin/x-ui.
853 lines
25 KiB
Go
853 lines
25 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mhsanaei/3x-ui/v3/internal/database"
|
|
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
|
|
"github.com/mhsanaei/3x-ui/v3/internal/logger"
|
|
"github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
|
|
"github.com/mhsanaei/3x-ui/v3/internal/xray"
|
|
|
|
"gorm.io/gorm"
|
|
"gorm.io/gorm/clause"
|
|
)
|
|
|
|
var reportedRemoteTagConflict sync.Map
|
|
|
|
func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) {
|
|
mgr := runtime.GetManager()
|
|
if mgr == nil {
|
|
return nil, fmt.Errorf("runtime manager not initialised")
|
|
}
|
|
return mgr.RuntimeFor(ib.NodeID)
|
|
}
|
|
|
|
func (s *InboundService) nodePushPlan(ib *model.Inbound) (runtime.Runtime, bool, bool, error) {
|
|
if ib.NodeID == nil {
|
|
rt, err := s.runtimeFor(ib)
|
|
if err != nil {
|
|
return nil, false, false, nil
|
|
}
|
|
return rt, true, false, nil
|
|
}
|
|
nodeSvc := NodeService{}
|
|
enabled, status, _, _, err := nodeSvc.NodeSyncState(*ib.NodeID)
|
|
if err != nil {
|
|
return nil, false, false, err
|
|
}
|
|
if !enabled || status == "offline" {
|
|
return nil, false, true, nil
|
|
}
|
|
rt, err := s.runtimeFor(ib)
|
|
if err != nil {
|
|
return nil, false, true, nil
|
|
}
|
|
return rt, true, false, nil
|
|
}
|
|
|
|
func (s *InboundService) NodeIsPending(nodeID *int) bool {
|
|
if nodeID == nil {
|
|
return false
|
|
}
|
|
return (&NodeService{}).IsNodePending(*nodeID)
|
|
}
|
|
|
|
func (s *InboundService) AnyNodePending(inboundIds []int) bool {
|
|
if len(inboundIds) == 0 {
|
|
return false
|
|
}
|
|
nodeSvc := NodeService{}
|
|
for _, id := range inboundIds {
|
|
ib, err := s.GetInbound(id)
|
|
if err != nil || ib.NodeID == nil {
|
|
continue
|
|
}
|
|
if nodeSvc.IsNodePending(*ib.NodeID) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, nodeID int) error {
|
|
if rt == nil || nodeID <= 0 {
|
|
return nil
|
|
}
|
|
db := database.GetDB()
|
|
var inbounds []*model.Inbound
|
|
if err := db.Model(model.Inbound{}).Where("node_id = ?", nodeID).Find(&inbounds).Error; err != nil {
|
|
return err
|
|
}
|
|
remoteTags, err := rt.ListRemoteTags(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
prefix := nodeTagPrefix(&nodeID)
|
|
desiredTags := make(map[string]struct{}, len(inbounds)*2)
|
|
for _, ib := range inbounds {
|
|
desiredTags[ib.Tag] = struct{}{}
|
|
if prefix != "" {
|
|
if stripped, found := strings.CutPrefix(ib.Tag, prefix); found {
|
|
desiredTags[stripped] = struct{}{}
|
|
} else {
|
|
desiredTags[prefix+ib.Tag] = struct{}{}
|
|
}
|
|
}
|
|
if err := rt.UpdateInbound(ctx, ib, ib); err != nil {
|
|
return fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err)
|
|
}
|
|
}
|
|
for _, tag := range remoteTags {
|
|
if _, want := desiredTags[tag]; want {
|
|
continue
|
|
}
|
|
if err := rt.DelInbound(ctx, &model.Inbound{Tag: tag}); err != nil {
|
|
return fmt.Errorf("reconcile delete %q: %w", tag, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
const resetGracePeriodMs int64 = 30000
|
|
|
|
// onlineGracePeriodMs must comfortably exceed the 5s traffic-poll interval —
|
|
// Xray's stats counters often report a zero delta for an active session across
|
|
// a single poll, so a 5s grace would still drop the client on the next tick.
|
|
// ~4 polls of slack keeps idle-but-connected clients visible without lingering
|
|
// long after a real disconnect.
|
|
const onlineGracePeriodMs int64 = 20000
|
|
|
|
type nodeTrafficCounter struct {
|
|
Up int64
|
|
Down int64
|
|
}
|
|
|
|
func (s *InboundService) upsertNodeBaseline(tx *gorm.DB, nodeID int, email string, up, down int64) error {
|
|
return tx.Clauses(clause.OnConflict{
|
|
Columns: []clause.Column{{Name: "node_id"}, {Name: "email"}},
|
|
DoUpdates: clause.AssignmentColumns([]string{"up", "down"}),
|
|
}).Create(&model.NodeClientTraffic{NodeId: nodeID, Email: email, Up: up, Down: down}).Error
|
|
}
|
|
|
|
func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) {
|
|
var structuralChange bool
|
|
err := submitTrafficWrite(func() error {
|
|
var inner error
|
|
structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap, dirty)
|
|
return inner
|
|
})
|
|
return structuralChange, err
|
|
}
|
|
|
|
func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) {
|
|
if snap == nil || nodeID <= 0 {
|
|
return false, nil
|
|
}
|
|
db := database.GetDB()
|
|
now := time.Now().UnixMilli()
|
|
|
|
// originGuidFor attributes a synced inbound to the panel that physically
|
|
// hosts it: inbounds the node forwards from its own sub-nodes already carry
|
|
// a non-empty OriginNodeGuid (kept as-is across hops); the node's own local
|
|
// inbounds report empty, so they are attributed to the node's own GUID. An
|
|
// empty result (old-build node with no GUID yet) leaves attribution to the
|
|
// node_id fallback downstream (#4983).
|
|
var nodeRow model.Node
|
|
db.Select("guid").Where("id = ?", nodeID).First(&nodeRow)
|
|
originGuidFor := func(snapIb *model.Inbound) string {
|
|
if snapIb.OriginNodeGuid != "" {
|
|
return snapIb.OriginNodeGuid
|
|
}
|
|
return nodeRow.Guid
|
|
}
|
|
|
|
var central []model.Inbound
|
|
if err := db.Model(model.Inbound{}).
|
|
Where("node_id = ?", nodeID).
|
|
Find(¢ral).Error; err != nil {
|
|
return false, err
|
|
}
|
|
// Index under the stored tag and its prefix-flipped form so a snap matches
|
|
// whether the n<id>- prefix lives on the node side, the central side, or
|
|
// neither — a mismatch must never spawn a duplicate central inbound.
|
|
tagToCentral := make(map[string]*model.Inbound, len(central)*2)
|
|
prefix := nodeTagPrefix(&nodeID)
|
|
for i := range central {
|
|
tagToCentral[central[i].Tag] = ¢ral[i]
|
|
if prefix != "" {
|
|
if stripped, found := strings.CutPrefix(central[i].Tag, prefix); found {
|
|
tagToCentral[stripped] = ¢ral[i]
|
|
} else {
|
|
tagToCentral[prefix+central[i].Tag] = ¢ral[i]
|
|
}
|
|
}
|
|
}
|
|
|
|
var centralClientStats []xray.ClientTraffic
|
|
if len(central) > 0 {
|
|
ids := make([]int, 0, len(central))
|
|
for i := range central {
|
|
ids = append(ids, central[i].Id)
|
|
}
|
|
if err := db.Model(xray.ClientTraffic{}).
|
|
Where("inbound_id IN ?", ids).
|
|
Find(¢ralClientStats).Error; err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
type csKey struct {
|
|
inboundID int
|
|
email string
|
|
}
|
|
centralCS := make(map[csKey]*xray.ClientTraffic, len(centralClientStats))
|
|
centralCSByEmail := make(map[string]*xray.ClientTraffic, len(centralClientStats))
|
|
for i := range centralClientStats {
|
|
centralCS[csKey{centralClientStats[i].InboundId, centralClientStats[i].Email}] = ¢ralClientStats[i]
|
|
centralCSByEmail[centralClientStats[i].Email] = ¢ralClientStats[i]
|
|
}
|
|
|
|
nodeBaselines := make(map[string]nodeTrafficCounter)
|
|
var baselineRows []model.NodeClientTraffic
|
|
if err := db.Model(&model.NodeClientTraffic{}).
|
|
Where("node_id = ?", nodeID).
|
|
Find(&baselineRows).Error; err != nil {
|
|
return false, err
|
|
}
|
|
for i := range baselineRows {
|
|
nodeBaselines[baselineRows[i].Email] = nodeTrafficCounter{Up: baselineRows[i].Up, Down: baselineRows[i].Down}
|
|
}
|
|
|
|
var existingEmailsList []string
|
|
if err := db.Model(xray.ClientTraffic{}).Pluck("email", &existingEmailsList).Error; err != nil {
|
|
return false, err
|
|
}
|
|
existingEmails := make(map[string]struct{}, len(existingEmailsList))
|
|
for _, e := range existingEmailsList {
|
|
existingEmails[e] = struct{}{}
|
|
}
|
|
|
|
var defaultUserId int
|
|
if len(central) > 0 {
|
|
defaultUserId = central[0].UserId
|
|
} else {
|
|
var u model.User
|
|
if err := db.Model(model.User{}).Order("id asc").First(&u).Error; err == nil {
|
|
defaultUserId = u.Id
|
|
} else {
|
|
defaultUserId = 1
|
|
}
|
|
}
|
|
|
|
tx := db.Begin()
|
|
committed := false
|
|
defer func() {
|
|
if !committed {
|
|
tx.Rollback()
|
|
}
|
|
}()
|
|
|
|
structuralChange := false
|
|
|
|
snapTags := make(map[string]struct{}, len(snap.Inbounds))
|
|
for _, snapIb := range snap.Inbounds {
|
|
if snapIb == nil {
|
|
continue
|
|
}
|
|
snapTags[snapIb.Tag] = struct{}{}
|
|
// Record the prefix-flipped form too so the orphan sweep below keeps a
|
|
// central inbound whether its tag carries the n<id>- prefix or not.
|
|
if prefix != "" {
|
|
if stripped, found := strings.CutPrefix(snapIb.Tag, prefix); found {
|
|
snapTags[stripped] = struct{}{}
|
|
} else {
|
|
snapTags[prefix+snapIb.Tag] = struct{}{}
|
|
}
|
|
}
|
|
|
|
c, ok := tagToCentral[snapIb.Tag]
|
|
if !ok {
|
|
if dirty {
|
|
continue
|
|
}
|
|
// Try snap.Tag first; on collision fall back to the n<id>-
|
|
// prefixed form so local+node can both own the same port.
|
|
pickFreeTag := func() (string, error) {
|
|
candidates := []string{snapIb.Tag}
|
|
if prefix != "" && !strings.HasPrefix(snapIb.Tag, prefix) {
|
|
candidates = append(candidates, prefix+snapIb.Tag)
|
|
}
|
|
for _, t := range candidates {
|
|
var owner model.Inbound
|
|
err := tx.Where("tag = ?", t).First(&owner).Error
|
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return t, nil
|
|
}
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
return "", nil
|
|
}
|
|
chosenTag, err := pickFreeTag()
|
|
if err != nil {
|
|
logger.Warningf("setRemoteTraffic: check tag %q failed: %v", snapIb.Tag, err)
|
|
continue
|
|
}
|
|
if chosenTag == "" {
|
|
key := fmt.Sprintf("%d:%s", nodeID, snapIb.Tag)
|
|
if _, seen := reportedRemoteTagConflict.LoadOrStore(key, struct{}{}); !seen {
|
|
logger.Warningf(
|
|
"setRemoteTraffic: tag %q from node %d collides with an existing inbound even after the n%d- prefix — skipping (rename one side to remove the duplicate)",
|
|
snapIb.Tag, nodeID, nodeID,
|
|
)
|
|
}
|
|
continue
|
|
}
|
|
newIb := model.Inbound{
|
|
UserId: defaultUserId,
|
|
NodeID: &nodeID,
|
|
OriginNodeGuid: originGuidFor(snapIb),
|
|
Tag: chosenTag,
|
|
Listen: snapIb.Listen,
|
|
Port: snapIb.Port,
|
|
Protocol: snapIb.Protocol,
|
|
Settings: snapIb.Settings,
|
|
StreamSettings: snapIb.StreamSettings,
|
|
Sniffing: snapIb.Sniffing,
|
|
TrafficReset: snapIb.TrafficReset,
|
|
LastTrafficResetTime: snapIb.LastTrafficResetTime,
|
|
Enable: snapIb.Enable,
|
|
Remark: snapIb.Remark,
|
|
Total: snapIb.Total,
|
|
ExpiryTime: snapIb.ExpiryTime,
|
|
Up: snapIb.Up,
|
|
Down: snapIb.Down,
|
|
}
|
|
if err := tx.Create(&newIb).Error; err != nil {
|
|
logger.Warningf("setRemoteTraffic: create central inbound for tag %q failed: %v", snapIb.Tag, err)
|
|
continue
|
|
}
|
|
tagToCentral[snapIb.Tag] = &newIb
|
|
if newIb.Tag != snapIb.Tag {
|
|
tagToCentral[newIb.Tag] = &newIb
|
|
}
|
|
structuralChange = true
|
|
continue
|
|
}
|
|
|
|
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
|
|
|
|
updates := map[string]any{}
|
|
if !dirty {
|
|
updates["enable"] = snapIb.Enable
|
|
updates["remark"] = snapIb.Remark
|
|
updates["listen"] = snapIb.Listen
|
|
updates["port"] = snapIb.Port
|
|
updates["protocol"] = snapIb.Protocol
|
|
updates["total"] = snapIb.Total
|
|
updates["expiry_time"] = snapIb.ExpiryTime
|
|
updates["settings"] = snapIb.Settings
|
|
updates["stream_settings"] = snapIb.StreamSettings
|
|
updates["sniffing"] = snapIb.Sniffing
|
|
updates["traffic_reset"] = snapIb.TrafficReset
|
|
updates["last_traffic_reset_time"] = snapIb.LastTrafficResetTime
|
|
}
|
|
if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) {
|
|
updates["up"] = snapIb.Up
|
|
updates["down"] = snapIb.Down
|
|
}
|
|
// Physical-home attribution is independent of config-dirty state, so
|
|
// keep it current even while the node has pending offline edits. Writes
|
|
// once to backfill an existing row, then stays equal (#4983).
|
|
if og := originGuidFor(snapIb); c.OriginNodeGuid != og {
|
|
updates["origin_node_guid"] = og
|
|
}
|
|
|
|
if !dirty && (c.Settings != snapIb.Settings ||
|
|
c.Remark != snapIb.Remark ||
|
|
c.Listen != snapIb.Listen ||
|
|
c.Port != snapIb.Port ||
|
|
c.Total != snapIb.Total ||
|
|
c.ExpiryTime != snapIb.ExpiryTime ||
|
|
c.Enable != snapIb.Enable) {
|
|
structuralChange = true
|
|
}
|
|
|
|
if len(updates) > 0 {
|
|
if err := tx.Model(model.Inbound{}).
|
|
Where("id = ?", c.Id).
|
|
Updates(updates).Error; err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, c := range central {
|
|
if dirty {
|
|
continue
|
|
}
|
|
if _, kept := snapTags[c.Tag]; kept {
|
|
continue
|
|
}
|
|
var goneEmails []string
|
|
if err := tx.Model(xray.ClientTraffic{}).
|
|
Where("inbound_id = ?", c.Id).
|
|
Pluck("email", &goneEmails).Error; err != nil {
|
|
return false, err
|
|
}
|
|
if len(goneEmails) > 0 {
|
|
// Chunk to avoid SQLite bind var limit when a node has many clients
|
|
// removed (e.g. after API bulk delete or structural change on node inbound).
|
|
for _, batch := range chunkStrings(goneEmails, sqliteMaxVars) {
|
|
if err := tx.Where("node_id = ? AND email IN ?", nodeID, batch).
|
|
Delete(&model.NodeClientTraffic{}).Error; err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
}
|
|
if err := tx.Where("inbound_id = ?", c.Id).
|
|
Delete(&xray.ClientTraffic{}).Error; err != nil {
|
|
return false, err
|
|
}
|
|
if err := s.clientService.DetachInbound(tx, c.Id); err != nil {
|
|
return false, err
|
|
}
|
|
if err := tx.Where("id = ?", c.Id).
|
|
Delete(&model.Inbound{}).Error; err != nil {
|
|
return false, err
|
|
}
|
|
delete(tagToCentral, c.Tag)
|
|
structuralChange = true
|
|
}
|
|
|
|
for _, snapIb := range snap.Inbounds {
|
|
if snapIb == nil {
|
|
continue
|
|
}
|
|
c, ok := tagToCentral[snapIb.Tag]
|
|
if !ok {
|
|
continue
|
|
}
|
|
snapEmails := make(map[string]struct{}, len(snapIb.ClientStats))
|
|
for _, cs := range snapIb.ClientStats {
|
|
snapEmails[cs.Email] = struct{}{}
|
|
|
|
base, seen := nodeBaselines[cs.Email]
|
|
var deltaUp, deltaDown int64
|
|
if seen {
|
|
if deltaUp = cs.Up - base.Up; deltaUp < 0 {
|
|
deltaUp = cs.Up
|
|
}
|
|
if deltaDown = cs.Down - base.Down; deltaDown < 0 {
|
|
deltaDown = cs.Down
|
|
}
|
|
}
|
|
|
|
if _, rowExists := existingEmails[cs.Email]; !rowExists {
|
|
if dirty {
|
|
continue
|
|
}
|
|
row := &xray.ClientTraffic{
|
|
InboundId: c.Id,
|
|
Email: cs.Email,
|
|
Enable: cs.Enable,
|
|
Total: cs.Total,
|
|
ExpiryTime: cs.ExpiryTime,
|
|
Reset: cs.Reset,
|
|
Up: cs.Up,
|
|
Down: cs.Down,
|
|
LastOnline: cs.LastOnline,
|
|
}
|
|
if err := tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
|
|
Create(row).Error; err != nil {
|
|
return false, err
|
|
}
|
|
centralCS[csKey{c.Id, cs.Email}] = row
|
|
centralCSByEmail[cs.Email] = row
|
|
existingEmails[cs.Email] = struct{}{}
|
|
structuralChange = true
|
|
if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, cs.Up, cs.Down); err != nil {
|
|
return false, err
|
|
}
|
|
nodeBaselines[cs.Email] = nodeTrafficCounter{Up: cs.Up, Down: cs.Down}
|
|
continue
|
|
}
|
|
|
|
if existing := centralCSByEmail[cs.Email]; existing != nil &&
|
|
(existing.Enable != cs.Enable ||
|
|
existing.Total != cs.Total ||
|
|
existing.ExpiryTime != cs.ExpiryTime ||
|
|
existing.Reset != cs.Reset) {
|
|
structuralChange = true
|
|
}
|
|
|
|
enableExpr := database.ClientTrafficEnableMergeExpr()
|
|
if err := tx.Exec(
|
|
fmt.Sprintf(
|
|
`UPDATE client_traffics
|
|
SET up = up + ?, down = down + ?, enable = %s, total = ?, expiry_time = ?, reset = ?,
|
|
last_online = %s
|
|
WHERE email = ?`,
|
|
enableExpr,
|
|
database.GreatestExpr("last_online", "?"),
|
|
),
|
|
deltaUp, deltaDown, cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset,
|
|
cs.LastOnline, cs.Email,
|
|
).Error; err != nil {
|
|
return false, err
|
|
}
|
|
if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, cs.Up, cs.Down); err != nil {
|
|
return false, err
|
|
}
|
|
nodeBaselines[cs.Email] = nodeTrafficCounter{Up: cs.Up, Down: cs.Down}
|
|
}
|
|
|
|
for k, existing := range centralCS {
|
|
if dirty {
|
|
continue
|
|
}
|
|
if k.inboundID != c.Id {
|
|
continue
|
|
}
|
|
if _, kept := snapEmails[k.email]; kept {
|
|
continue
|
|
}
|
|
if err := tx.Where("node_id = ? AND email = ?", nodeID, existing.Email).
|
|
Delete(&model.NodeClientTraffic{}).Error; err != nil {
|
|
return false, err
|
|
}
|
|
if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email).
|
|
Delete(&xray.ClientTraffic{}).Error; err != nil {
|
|
return false, err
|
|
}
|
|
structuralChange = true
|
|
}
|
|
}
|
|
|
|
type oldSet struct {
|
|
inboundID int
|
|
emails map[string]struct{}
|
|
}
|
|
var perInboundOld []oldSet
|
|
for _, snapIb := range snap.Inbounds {
|
|
if snapIb == nil {
|
|
continue
|
|
}
|
|
c, ok := tagToCentral[snapIb.Tag]
|
|
if !ok {
|
|
continue
|
|
}
|
|
if dirty {
|
|
continue
|
|
}
|
|
var oldEmailsRows []string
|
|
if err := tx.Table("clients").
|
|
Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
|
|
Where("client_inbounds.inbound_id = ?", c.Id).
|
|
Pluck("email", &oldEmailsRows).Error; err == nil {
|
|
oldEmails := make(map[string]struct{}, len(oldEmailsRows))
|
|
for _, e := range oldEmailsRows {
|
|
if e != "" {
|
|
oldEmails[e] = struct{}{}
|
|
}
|
|
}
|
|
perInboundOld = append(perInboundOld, oldSet{inboundID: c.Id, emails: oldEmails})
|
|
}
|
|
|
|
clients, gcErr := s.GetClients(snapIb)
|
|
if gcErr != nil {
|
|
logger.Warningf("setRemoteTraffic: parse clients for tag %q failed: %v", snapIb.Tag, gcErr)
|
|
continue
|
|
}
|
|
csEnableByEmail := make(map[string]bool, len(snapIb.ClientStats))
|
|
for _, cs := range snapIb.ClientStats {
|
|
csEnableByEmail[cs.Email] = cs.Enable
|
|
}
|
|
filtered := clients[:0]
|
|
for i := range clients {
|
|
if isClientEmailTombstoned(clients[i].Email) {
|
|
continue
|
|
}
|
|
if cse, hit := csEnableByEmail[clients[i].Email]; hit && !cse {
|
|
clients[i].Enable = false
|
|
}
|
|
filtered = append(filtered, clients[i])
|
|
}
|
|
localEmails := make([]string, 0, len(filtered))
|
|
for i := range filtered {
|
|
if filtered[i].Email != "" {
|
|
localEmails = append(localEmails, filtered[i].Email)
|
|
}
|
|
}
|
|
if len(localEmails) > 0 {
|
|
var localMeta []struct {
|
|
Email string
|
|
Comment string `gorm:"column:comment"`
|
|
}
|
|
if err := tx.Table("clients").
|
|
Select("email, comment").
|
|
Where("email IN ?", localEmails).
|
|
Find(&localMeta).Error; err == nil {
|
|
commentByEmail := make(map[string]string, len(localMeta))
|
|
for _, m := range localMeta {
|
|
commentByEmail[m.Email] = m.Comment
|
|
}
|
|
for i := range filtered {
|
|
if cmt, ok := commentByEmail[filtered[i].Email]; ok {
|
|
filtered[i].Comment = cmt
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if err := s.clientService.SyncInbound(tx, c.Id, filtered); err != nil {
|
|
logger.Warningf("setRemoteTraffic: sync clients for tag %q failed: %v", snapIb.Tag, err)
|
|
}
|
|
}
|
|
|
|
for _, old := range perInboundOld {
|
|
var stillAttached []string
|
|
if err := tx.Table("clients").
|
|
Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
|
|
Where("client_inbounds.inbound_id = ?", old.inboundID).
|
|
Pluck("email", &stillAttached).Error; err != nil {
|
|
continue
|
|
}
|
|
stillSet := make(map[string]struct{}, len(stillAttached))
|
|
for _, e := range stillAttached {
|
|
stillSet[e] = struct{}{}
|
|
}
|
|
for email := range old.emails {
|
|
if _, kept := stillSet[email]; kept {
|
|
continue
|
|
}
|
|
var attachmentCount int64
|
|
if err := tx.Table("client_inbounds").
|
|
Joins("JOIN clients ON clients.id = client_inbounds.client_id").
|
|
Where("clients.email = ?", email).
|
|
Count(&attachmentCount).Error; err != nil {
|
|
continue
|
|
}
|
|
if attachmentCount > 0 {
|
|
continue
|
|
}
|
|
if err := tx.Where("email = ?", email).Delete(&model.ClientRecord{}).Error; err != nil {
|
|
logger.Warningf("setRemoteTraffic: delete ClientRecord %q failed: %v", email, err)
|
|
}
|
|
if err := tx.Where("email = ?", email).Delete(&xray.ClientTraffic{}).Error; err != nil {
|
|
logger.Warningf("setRemoteTraffic: delete ClientTraffic %q failed: %v", email, err)
|
|
}
|
|
if err := tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error; err != nil {
|
|
logger.Warningf("setRemoteTraffic: delete NodeClientTraffic %q failed: %v", email, err)
|
|
}
|
|
structuralChange = true
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit().Error; err != nil {
|
|
return false, err
|
|
}
|
|
committed = true
|
|
|
|
if p != nil {
|
|
tree := snap.OnlineTree
|
|
if len(tree) == 0 && len(snap.OnlineEmails) > 0 {
|
|
// Old-build node (no GUID tree): key its flat online list under its
|
|
// own effective identity so attribution still works for that branch.
|
|
effectiveGuid := nodeRow.Guid
|
|
if effectiveGuid == "" {
|
|
effectiveGuid = synthNodeGuid(nodeID)
|
|
}
|
|
tree = map[string][]string{effectiveGuid: snap.OnlineEmails}
|
|
}
|
|
p.SetNodeOnlineTree(nodeID, tree)
|
|
}
|
|
|
|
return structuralChange, nil
|
|
}
|
|
|
|
func (s *InboundService) restartRemoteNodesOnDisable(nodeIDs []int) {
|
|
restartOnDisable, err := (&SettingService{}).GetRestartXrayOnClientDisable()
|
|
if err != nil {
|
|
logger.Warning("disableInvalidClients: get RestartXrayOnClientDisable failed:", err)
|
|
return
|
|
}
|
|
if !restartOnDisable {
|
|
return
|
|
}
|
|
for _, nodeID := range nodeIDs {
|
|
nodeIDCopy := nodeID
|
|
rt, rtErr := runtime.GetManager().RuntimeFor(&nodeIDCopy)
|
|
if rtErr != nil {
|
|
logger.Warning("disableInvalidClients: get runtime for node", nodeID, "failed:", rtErr)
|
|
continue
|
|
}
|
|
if rtErr = rt.RestartXray(context.Background()); rtErr != nil {
|
|
logger.Warning("disableInvalidClients: restart xray on node", nodeID, "failed:", rtErr)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *InboundService) GetOnlineClients() []string {
|
|
if p == nil {
|
|
return []string{}
|
|
}
|
|
return p.GetOnlineClients()
|
|
}
|
|
|
|
// GetOnlineClientsByGuid returns online emails keyed by the panelGuid of the
|
|
// node that physically hosts each set: this panel's own clients under its own
|
|
// GUID, plus every node in the tree under its GUID (#4983). Replaces the old
|
|
// node-id keying so a client three hops down is attributed to its real node,
|
|
// not the intermediate one it was synced through.
|
|
func (s *InboundService) GetOnlineClientsByGuid() map[string][]string {
|
|
if p == nil {
|
|
return map[string][]string{}
|
|
}
|
|
out := p.GetMergedNodeTrees()
|
|
if local := p.GetLocalOnlineClients(); len(local) > 0 {
|
|
if guid := s.panelGuid(); guid != "" {
|
|
out[guid] = mergeEmails(out[guid], local)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// GetActiveInboundsByGuid returns the inbound tags that carried traffic within
|
|
// the grace window for THIS panel, under its own GUID. Remote nodes don't
|
|
// report per-inbound activity, so a GUID missing from the map means "don't
|
|
// gate" for that node's inbounds.
|
|
func (s *InboundService) GetActiveInboundsByGuid() map[string][]string {
|
|
if p == nil {
|
|
return map[string][]string{}
|
|
}
|
|
active := p.GetLocalActiveInbounds()
|
|
if len(active) == 0 {
|
|
return map[string][]string{}
|
|
}
|
|
guid := s.panelGuid()
|
|
if guid == "" {
|
|
return map[string][]string{}
|
|
}
|
|
return map[string][]string{guid: active}
|
|
}
|
|
|
|
func (s *InboundService) SetNodeOnlineTree(nodeID int, tree map[string][]string) {
|
|
if p != nil {
|
|
p.SetNodeOnlineTree(nodeID, tree)
|
|
}
|
|
}
|
|
|
|
func (s *InboundService) ClearNodeOnlineClients(nodeID int) {
|
|
if p != nil {
|
|
p.ClearNodeOnlineClients(nodeID)
|
|
}
|
|
}
|
|
|
|
// panelGuid returns this panel's stable self-identifier, used to key the local
|
|
// panel's own clients in the per-node online maps (#4983).
|
|
func (s *InboundService) panelGuid() string {
|
|
guid, _ := (&SettingService{}).GetPanelGuid()
|
|
return guid
|
|
}
|
|
|
|
// synthNodeGuid is the stable per-node fallback identity for a directly-attached
|
|
// node whose panel hasn't reported a panelGuid yet (old build). Node ids are
|
|
// master-local, so this only composes for direct nodes — exactly the pre-#4983
|
|
// flat-topology case where an old-build node appears.
|
|
func synthNodeGuid(nodeID int) string {
|
|
return fmt.Sprintf("node:%d", nodeID)
|
|
}
|
|
|
|
// mergeEmails returns the deduped union of two email slices.
|
|
func mergeEmails(a, b []string) []string {
|
|
if len(a) == 0 {
|
|
return b
|
|
}
|
|
seen := make(map[string]struct{}, len(a)+len(b))
|
|
out := make([]string, 0, len(a)+len(b))
|
|
for _, e := range a {
|
|
if _, ok := seen[e]; !ok {
|
|
seen[e] = struct{}{}
|
|
out = append(out, e)
|
|
}
|
|
}
|
|
for _, e := range b {
|
|
if _, ok := seen[e]; !ok {
|
|
seen[e] = struct{}{}
|
|
out = append(out, e)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) {
|
|
db := database.GetDB()
|
|
var rows []xray.ClientTraffic
|
|
err := db.Model(&xray.ClientTraffic{}).Select("email, last_online").Find(&rows).Error
|
|
if err != nil && err != gorm.ErrRecordNotFound {
|
|
return nil, err
|
|
}
|
|
result := make(map[string]int64, len(rows))
|
|
for _, r := range rows {
|
|
result[r.Email] = r.LastOnline
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// RefreshLocalOnlineClients folds the emails and inbound tags active on this
|
|
// panel's own xray this poll into the local online/active sets, applying the
|
|
// grace window and pruning stale entries. Pass nil to only prune. See
|
|
// xray.Process for why the local sets are kept separate from the shared
|
|
// last_online column.
|
|
func (s *InboundService) RefreshLocalOnlineClients(activeEmails, activeInboundTags []string) {
|
|
if p != nil {
|
|
p.RefreshLocalOnline(activeEmails, activeInboundTags, time.Now().UnixMilli(), onlineGracePeriodMs)
|
|
}
|
|
}
|
|
|
|
func (s *InboundService) FilterAndSortClientEmails(emails []string) ([]string, []string, error) {
|
|
db := database.GetDB()
|
|
|
|
// Step 1: Get ClientTraffic records for emails in the input list.
|
|
// Chunked to stay under SQLite's bind-variable limit on huge inputs.
|
|
uniqEmails := uniqueNonEmptyStrings(emails)
|
|
clients := make([]xray.ClientTraffic, 0, len(uniqEmails))
|
|
for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
|
|
var page []xray.ClientTraffic
|
|
if err := db.Where("email IN ?", batch).Find(&page).Error; err != nil && err != gorm.ErrRecordNotFound {
|
|
return nil, nil, err
|
|
}
|
|
clients = append(clients, page...)
|
|
}
|
|
|
|
// Step 2: Sort clients by (Up + Down) descending
|
|
sort.Slice(clients, func(i, j int) bool {
|
|
return (clients[i].Up + clients[i].Down) > (clients[j].Up + clients[j].Down)
|
|
})
|
|
|
|
// Step 3: Extract sorted valid emails and track found ones
|
|
validEmails := make([]string, 0, len(clients))
|
|
found := make(map[string]bool)
|
|
for _, client := range clients {
|
|
validEmails = append(validEmails, client.Email)
|
|
found[client.Email] = true
|
|
}
|
|
|
|
// Step 4: Identify emails that were not found in the database
|
|
extraEmails := make([]string, 0)
|
|
for _, email := range emails {
|
|
if !found[email] {
|
|
extraEmails = append(extraEmails, email)
|
|
}
|
|
}
|
|
|
|
return validEmails, extraEmails, nil
|
|
}
|