Files
3x-ui/internal/web/service/inbound_node.go
T
iYuan 2a7342baa9 feat: add inbound share address strategy (#5162)
* feat: add inbound share address strategy

Allow node-managed inbounds to choose whether exported share links use the node address, routable listen address, or a custom endpoint. Preserve locally configured share address fields during remote node traffic sync.

Refs #5161

Refs #4891

* fix: preserve inbound share address settings

Forward share address fields to remote nodes, keep existing values when older update payloads omit them, align localhost handling between frontend and subscriptions, and preserve share address settings when cloning inbounds.

* fix: keep share address strategy out of subscriptions

Limit the new share address strategy to direct exported share links and QR codes. Restore subscription address resolution to the existing panel-owned behavior and update the UI help text accordingly.

* fix: address share address review feedback

* fix: validate custom share address

* fix

---------

Co-authored-by: Sanaei <ho3ein.sanaei@gmail.com>
2026-06-11 20:24:15 +02:00

879 lines
26 KiB
Go

package service
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
"github.com/mhsanaei/3x-ui/v3/internal/logger"
"github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
"github.com/mhsanaei/3x-ui/v3/internal/xray"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
var reportedRemoteTagConflict sync.Map
func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) {
mgr := runtime.GetManager()
if mgr == nil {
return nil, fmt.Errorf("runtime manager not initialised")
}
return mgr.RuntimeFor(ib.NodeID)
}
func (s *InboundService) nodePushPlan(ib *model.Inbound) (runtime.Runtime, bool, bool, error) {
if ib.NodeID == nil {
rt, err := s.runtimeFor(ib)
if err != nil {
return nil, false, false, nil
}
return rt, true, false, nil
}
nodeSvc := NodeService{}
enabled, status, _, _, err := nodeSvc.NodeSyncState(*ib.NodeID)
if err != nil {
return nil, false, false, err
}
if !enabled || status == "offline" {
return nil, false, true, nil
}
rt, err := s.runtimeFor(ib)
if err != nil {
return nil, false, true, nil
}
return rt, true, false, nil
}
func (s *InboundService) NodeIsPending(nodeID *int) bool {
if nodeID == nil {
return false
}
return (&NodeService{}).IsNodePending(*nodeID)
}
func (s *InboundService) AnyNodePending(inboundIds []int) bool {
if len(inboundIds) == 0 {
return false
}
nodeSvc := NodeService{}
for _, id := range inboundIds {
ib, err := s.GetInbound(id)
if err != nil || ib.NodeID == nil {
continue
}
if nodeSvc.IsNodePending(*ib.NodeID) {
return true
}
}
return false
}
func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, nodeID int) error {
if rt == nil || nodeID <= 0 {
return nil
}
db := database.GetDB()
var inbounds []*model.Inbound
if err := db.Model(model.Inbound{}).Where("node_id = ?", nodeID).Find(&inbounds).Error; err != nil {
return err
}
remoteTags, err := rt.ListRemoteTags(ctx)
if err != nil {
return err
}
prefix := nodeTagPrefix(&nodeID)
desiredTags := make(map[string]struct{}, len(inbounds)*2)
for _, ib := range inbounds {
desiredTags[ib.Tag] = struct{}{}
if prefix != "" {
if stripped, found := strings.CutPrefix(ib.Tag, prefix); found {
desiredTags[stripped] = struct{}{}
} else {
desiredTags[prefix+ib.Tag] = struct{}{}
}
}
if err := rt.UpdateInbound(ctx, ib, ib); err != nil {
return fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err)
}
}
for _, tag := range remoteTags {
if _, want := desiredTags[tag]; want {
continue
}
if err := rt.DelInbound(ctx, &model.Inbound{Tag: tag}); err != nil {
return fmt.Errorf("reconcile delete %q: %w", tag, err)
}
}
return nil
}
const resetGracePeriodMs int64 = 30000
// onlineGracePeriodMs must comfortably exceed the 5s traffic-poll interval —
// Xray's stats counters often report a zero delta for an active session across
// a single poll, so a 5s grace would still drop the client on the next tick.
// ~4 polls of slack keeps idle-but-connected clients visible without lingering
// long after a real disconnect.
const onlineGracePeriodMs int64 = 20000
type nodeTrafficCounter struct {
Up int64
Down int64
}
func (s *InboundService) upsertNodeBaseline(tx *gorm.DB, nodeID int, email string, up, down int64) error {
return tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "node_id"}, {Name: "email"}},
DoUpdates: clause.AssignmentColumns([]string{"up", "down"}),
}).Create(&model.NodeClientTraffic{NodeId: nodeID, Email: email, Up: up, Down: down}).Error
}
func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) {
var structuralChange bool
err := submitTrafficWrite(func() error {
var inner error
structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap, dirty)
return inner
})
return structuralChange, err
}
func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) {
if snap == nil || nodeID <= 0 {
return false, nil
}
db := database.GetDB()
now := time.Now().UnixMilli()
// originGuidFor attributes a synced inbound to the panel that physically
// hosts it: inbounds the node forwards from its own sub-nodes already carry
// a non-empty OriginNodeGuid (kept as-is across hops); the node's own local
// inbounds report empty, so they are attributed to the node's own GUID. An
// empty result (old-build node with no GUID yet) leaves attribution to the
// node_id fallback downstream (#4983).
var nodeRow model.Node
db.Select("guid").Where("id = ?", nodeID).First(&nodeRow)
originGuidFor := func(snapIb *model.Inbound) string {
if snapIb.OriginNodeGuid != "" {
return snapIb.OriginNodeGuid
}
return nodeRow.Guid
}
var central []model.Inbound
if err := db.Model(model.Inbound{}).
Where("node_id = ?", nodeID).
Find(&central).Error; err != nil {
return false, err
}
// Index under the stored tag and its prefix-flipped form so a snap matches
// whether the n<id>- prefix lives on the node side, the central side, or
// neither — a mismatch must never spawn a duplicate central inbound.
tagToCentral := make(map[string]*model.Inbound, len(central)*2)
prefix := nodeTagPrefix(&nodeID)
for i := range central {
tagToCentral[central[i].Tag] = &central[i]
if prefix != "" {
if stripped, found := strings.CutPrefix(central[i].Tag, prefix); found {
tagToCentral[stripped] = &central[i]
} else {
tagToCentral[prefix+central[i].Tag] = &central[i]
}
}
}
var centralClientStats []xray.ClientTraffic
if len(central) > 0 {
ids := make([]int, 0, len(central))
for i := range central {
ids = append(ids, central[i].Id)
}
if err := db.Model(xray.ClientTraffic{}).
Where("inbound_id IN ?", ids).
Find(&centralClientStats).Error; err != nil {
return false, err
}
}
type csKey struct {
inboundID int
email string
}
centralCS := make(map[csKey]*xray.ClientTraffic, len(centralClientStats))
centralCSByEmail := make(map[string]*xray.ClientTraffic, len(centralClientStats))
for i := range centralClientStats {
centralCS[csKey{centralClientStats[i].InboundId, centralClientStats[i].Email}] = &centralClientStats[i]
centralCSByEmail[centralClientStats[i].Email] = &centralClientStats[i]
}
nodeBaselines := make(map[string]nodeTrafficCounter)
var baselineRows []model.NodeClientTraffic
if err := db.Model(&model.NodeClientTraffic{}).
Where("node_id = ?", nodeID).
Find(&baselineRows).Error; err != nil {
return false, err
}
for i := range baselineRows {
nodeBaselines[baselineRows[i].Email] = nodeTrafficCounter{Up: baselineRows[i].Up, Down: baselineRows[i].Down}
}
var existingEmailsList []string
if err := db.Model(xray.ClientTraffic{}).Pluck("email", &existingEmailsList).Error; err != nil {
return false, err
}
existingEmails := make(map[string]struct{}, len(existingEmailsList))
for _, e := range existingEmailsList {
existingEmails[e] = struct{}{}
}
var defaultUserId int
if len(central) > 0 {
defaultUserId = central[0].UserId
} else {
var u model.User
if err := db.Model(model.User{}).Order("id asc").First(&u).Error; err == nil {
defaultUserId = u.Id
} else {
defaultUserId = 1
}
}
tx := db.Begin()
committed := false
defer func() {
if !committed {
tx.Rollback()
}
}()
structuralChange := false
snapTags := make(map[string]struct{}, len(snap.Inbounds))
for _, snapIb := range snap.Inbounds {
if snapIb == nil {
continue
}
snapTags[snapIb.Tag] = struct{}{}
// Record the prefix-flipped form too so the orphan sweep below keeps a
// central inbound whether its tag carries the n<id>- prefix or not.
if prefix != "" {
if stripped, found := strings.CutPrefix(snapIb.Tag, prefix); found {
snapTags[stripped] = struct{}{}
} else {
snapTags[prefix+snapIb.Tag] = struct{}{}
}
}
c, ok := tagToCentral[snapIb.Tag]
if !ok {
if dirty {
continue
}
// Try snap.Tag first; on collision fall back to the n<id>-
// prefixed form so local+node can both own the same port.
pickFreeTag := func() (string, error) {
candidates := []string{snapIb.Tag}
if prefix != "" && !strings.HasPrefix(snapIb.Tag, prefix) {
candidates = append(candidates, prefix+snapIb.Tag)
}
for _, t := range candidates {
var owner model.Inbound
err := tx.Where("tag = ?", t).First(&owner).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return t, nil
}
if err != nil {
return "", err
}
}
return "", nil
}
chosenTag, err := pickFreeTag()
if err != nil {
logger.Warningf("setRemoteTraffic: check tag %q failed: %v", snapIb.Tag, err)
continue
}
if chosenTag == "" {
key := fmt.Sprintf("%d:%s", nodeID, snapIb.Tag)
if _, seen := reportedRemoteTagConflict.LoadOrStore(key, struct{}{}); !seen {
logger.Warningf(
"setRemoteTraffic: tag %q from node %d collides with an existing inbound even after the n%d- prefix — skipping (rename one side to remove the duplicate)",
snapIb.Tag, nodeID, nodeID,
)
}
continue
}
newIb := model.Inbound{
UserId: defaultUserId,
NodeID: &nodeID,
OriginNodeGuid: originGuidFor(snapIb),
Tag: chosenTag,
Listen: snapIb.Listen,
Port: snapIb.Port,
Protocol: snapIb.Protocol,
Settings: snapIb.Settings,
StreamSettings: snapIb.StreamSettings,
Sniffing: snapIb.Sniffing,
TrafficReset: snapIb.TrafficReset,
LastTrafficResetTime: snapIb.LastTrafficResetTime,
Enable: snapIb.Enable,
Remark: snapIb.Remark,
Total: snapIb.Total,
ExpiryTime: snapIb.ExpiryTime,
Up: snapIb.Up,
Down: snapIb.Down,
ShareAddrStrategy: "node",
}
if err := tx.Create(&newIb).Error; err != nil {
logger.Warningf("setRemoteTraffic: create central inbound for tag %q failed: %v", snapIb.Tag, err)
continue
}
tagToCentral[snapIb.Tag] = &newIb
if newIb.Tag != snapIb.Tag {
tagToCentral[newIb.Tag] = &newIb
}
structuralChange = true
continue
}
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
updates := map[string]any{}
if !dirty {
updates["enable"] = snapIb.Enable
updates["remark"] = snapIb.Remark
updates["listen"] = snapIb.Listen
updates["port"] = snapIb.Port
updates["protocol"] = snapIb.Protocol
updates["total"] = snapIb.Total
updates["expiry_time"] = snapIb.ExpiryTime
updates["settings"] = snapIb.Settings
updates["stream_settings"] = snapIb.StreamSettings
updates["sniffing"] = snapIb.Sniffing
updates["traffic_reset"] = snapIb.TrafficReset
updates["last_traffic_reset_time"] = snapIb.LastTrafficResetTime
}
if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) {
updates["up"] = snapIb.Up
updates["down"] = snapIb.Down
}
// Physical-home attribution is independent of config-dirty state, so
// keep it current even while the node has pending offline edits. Writes
// once to backfill an existing row, then stays equal (#4983).
if og := originGuidFor(snapIb); c.OriginNodeGuid != og {
updates["origin_node_guid"] = og
}
if !dirty && (c.Settings != snapIb.Settings ||
c.Remark != snapIb.Remark ||
c.Listen != snapIb.Listen ||
c.Port != snapIb.Port ||
c.Total != snapIb.Total ||
c.ExpiryTime != snapIb.ExpiryTime ||
c.Enable != snapIb.Enable) {
structuralChange = true
}
if len(updates) > 0 {
if err := tx.Model(model.Inbound{}).
Where("id = ?", c.Id).
Updates(updates).Error; err != nil {
return false, err
}
}
}
for _, c := range central {
if dirty {
continue
}
if _, kept := snapTags[c.Tag]; kept {
continue
}
var goneEmails []string
if err := tx.Model(xray.ClientTraffic{}).
Where("inbound_id = ?", c.Id).
Pluck("email", &goneEmails).Error; err != nil {
return false, err
}
if len(goneEmails) > 0 {
// Chunk to avoid SQLite bind var limit when a node has many clients
// removed (e.g. after API bulk delete or structural change on node inbound).
for _, batch := range chunkStrings(goneEmails, sqliteMaxVars) {
if err := tx.Where("node_id = ? AND email IN ?", nodeID, batch).
Delete(&model.NodeClientTraffic{}).Error; err != nil {
return false, err
}
}
// The per-email row is the shared accumulator across every inbound
// (and node) the email is attached to. Only drop it when this was the
// email's last inbound — wiping it while a sibling still feeds it
// loses the summed history, and the next node sync would re-seed the
// row with that node's counter alone.
sharedEmails, sErr := s.emailsUsedByOtherInbounds(goneEmails, c.Id)
if sErr != nil {
return false, sErr
}
delEmails := make([]string, 0, len(goneEmails))
for _, e := range goneEmails {
if !sharedEmails[strings.ToLower(strings.TrimSpace(e))] {
delEmails = append(delEmails, e)
}
}
for _, batch := range chunkStrings(delEmails, sqliteMaxVars) {
if err := tx.Where("inbound_id = ? AND email IN ?", c.Id, batch).
Delete(&xray.ClientTraffic{}).Error; err != nil {
return false, err
}
}
}
if err := s.clientService.DetachInbound(tx, c.Id); err != nil {
return false, err
}
if err := tx.Where("id = ?", c.Id).
Delete(&model.Inbound{}).Error; err != nil {
return false, err
}
delete(tagToCentral, c.Tag)
structuralChange = true
}
for _, snapIb := range snap.Inbounds {
if snapIb == nil {
continue
}
c, ok := tagToCentral[snapIb.Tag]
if !ok {
continue
}
snapEmails := make(map[string]struct{}, len(snapIb.ClientStats))
for _, cs := range snapIb.ClientStats {
snapEmails[cs.Email] = struct{}{}
base, seen := nodeBaselines[cs.Email]
var deltaUp, deltaDown int64
if seen {
if deltaUp = cs.Up - base.Up; deltaUp < 0 {
deltaUp = cs.Up
}
if deltaDown = cs.Down - base.Down; deltaDown < 0 {
deltaDown = cs.Down
}
}
if _, rowExists := existingEmails[cs.Email]; !rowExists {
if dirty {
continue
}
row := &xray.ClientTraffic{
InboundId: c.Id,
Email: cs.Email,
Enable: cs.Enable,
Total: cs.Total,
ExpiryTime: cs.ExpiryTime,
Reset: cs.Reset,
Up: cs.Up,
Down: cs.Down,
LastOnline: cs.LastOnline,
}
if err := tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
Create(row).Error; err != nil {
return false, err
}
centralCS[csKey{c.Id, cs.Email}] = row
centralCSByEmail[cs.Email] = row
existingEmails[cs.Email] = struct{}{}
structuralChange = true
if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, cs.Up, cs.Down); err != nil {
return false, err
}
nodeBaselines[cs.Email] = nodeTrafficCounter{Up: cs.Up, Down: cs.Down}
continue
}
if existing := centralCSByEmail[cs.Email]; existing != nil &&
(existing.Enable != cs.Enable ||
existing.Total != cs.Total ||
existing.ExpiryTime != cs.ExpiryTime ||
existing.Reset != cs.Reset) {
structuralChange = true
}
enableExpr := database.ClientTrafficEnableMergeExpr()
if err := tx.Exec(
fmt.Sprintf(
`UPDATE client_traffics
SET up = up + ?, down = down + ?, enable = %s, total = ?, expiry_time = ?, reset = ?,
last_online = %s
WHERE email = ?`,
enableExpr,
database.GreatestExpr("last_online", "?"),
),
deltaUp, deltaDown, cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset,
cs.LastOnline, cs.Email,
).Error; err != nil {
return false, err
}
if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, cs.Up, cs.Down); err != nil {
return false, err
}
nodeBaselines[cs.Email] = nodeTrafficCounter{Up: cs.Up, Down: cs.Down}
}
for k, existing := range centralCS {
if dirty {
continue
}
if k.inboundID != c.Id {
continue
}
if _, kept := snapEmails[k.email]; kept {
continue
}
if err := tx.Where("node_id = ? AND email = ?", nodeID, existing.Email).
Delete(&model.NodeClientTraffic{}).Error; err != nil {
return false, err
}
// Same shared-accumulator rule as the inbound-removal sweep above:
// keep the row while another inbound still references the email.
stillUsed, uErr := s.emailUsedByOtherInbounds(existing.Email, c.Id)
if uErr != nil {
return false, uErr
}
if !stillUsed {
if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email).
Delete(&xray.ClientTraffic{}).Error; err != nil {
return false, err
}
}
structuralChange = true
}
}
type oldSet struct {
inboundID int
emails map[string]struct{}
}
var perInboundOld []oldSet
for _, snapIb := range snap.Inbounds {
if snapIb == nil {
continue
}
c, ok := tagToCentral[snapIb.Tag]
if !ok {
continue
}
if dirty {
continue
}
var oldEmailsRows []string
if err := tx.Table("clients").
Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
Where("client_inbounds.inbound_id = ?", c.Id).
Pluck("email", &oldEmailsRows).Error; err == nil {
oldEmails := make(map[string]struct{}, len(oldEmailsRows))
for _, e := range oldEmailsRows {
if e != "" {
oldEmails[e] = struct{}{}
}
}
perInboundOld = append(perInboundOld, oldSet{inboundID: c.Id, emails: oldEmails})
}
clients, gcErr := s.GetClients(snapIb)
if gcErr != nil {
logger.Warningf("setRemoteTraffic: parse clients for tag %q failed: %v", snapIb.Tag, gcErr)
continue
}
csEnableByEmail := make(map[string]bool, len(snapIb.ClientStats))
for _, cs := range snapIb.ClientStats {
csEnableByEmail[cs.Email] = cs.Enable
}
filtered := clients[:0]
for i := range clients {
if isClientEmailTombstoned(clients[i].Email) {
continue
}
if cse, hit := csEnableByEmail[clients[i].Email]; hit && !cse {
clients[i].Enable = false
}
filtered = append(filtered, clients[i])
}
localEmails := make([]string, 0, len(filtered))
for i := range filtered {
if filtered[i].Email != "" {
localEmails = append(localEmails, filtered[i].Email)
}
}
if len(localEmails) > 0 {
var localMeta []struct {
Email string
Comment string `gorm:"column:comment"`
}
if err := tx.Table("clients").
Select("email, comment").
Where("email IN ?", localEmails).
Find(&localMeta).Error; err == nil {
commentByEmail := make(map[string]string, len(localMeta))
for _, m := range localMeta {
commentByEmail[m.Email] = m.Comment
}
for i := range filtered {
if cmt, ok := commentByEmail[filtered[i].Email]; ok {
filtered[i].Comment = cmt
}
}
}
}
if err := s.clientService.SyncInbound(tx, c.Id, filtered); err != nil {
logger.Warningf("setRemoteTraffic: sync clients for tag %q failed: %v", snapIb.Tag, err)
}
}
for _, old := range perInboundOld {
var stillAttached []string
if err := tx.Table("clients").
Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
Where("client_inbounds.inbound_id = ?", old.inboundID).
Pluck("email", &stillAttached).Error; err != nil {
continue
}
stillSet := make(map[string]struct{}, len(stillAttached))
for _, e := range stillAttached {
stillSet[e] = struct{}{}
}
for email := range old.emails {
if _, kept := stillSet[email]; kept {
continue
}
var attachmentCount int64
if err := tx.Table("client_inbounds").
Joins("JOIN clients ON clients.id = client_inbounds.client_id").
Where("clients.email = ?", email).
Count(&attachmentCount).Error; err != nil {
continue
}
if attachmentCount > 0 {
continue
}
if err := tx.Where("email = ?", email).Delete(&model.ClientRecord{}).Error; err != nil {
logger.Warningf("setRemoteTraffic: delete ClientRecord %q failed: %v", email, err)
}
if err := tx.Where("email = ?", email).Delete(&xray.ClientTraffic{}).Error; err != nil {
logger.Warningf("setRemoteTraffic: delete ClientTraffic %q failed: %v", email, err)
}
if err := tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error; err != nil {
logger.Warningf("setRemoteTraffic: delete NodeClientTraffic %q failed: %v", email, err)
}
structuralChange = true
}
}
if err := tx.Commit().Error; err != nil {
return false, err
}
committed = true
if p != nil {
tree := snap.OnlineTree
if len(tree) == 0 && len(snap.OnlineEmails) > 0 {
// Old-build node (no GUID tree): key its flat online list under its
// own effective identity so attribution still works for that branch.
effectiveGuid := nodeRow.Guid
if effectiveGuid == "" {
effectiveGuid = synthNodeGuid(nodeID)
}
tree = map[string][]string{effectiveGuid: snap.OnlineEmails}
}
p.SetNodeOnlineTree(nodeID, tree)
}
return structuralChange, nil
}
func (s *InboundService) restartRemoteNodesOnDisable(nodeIDs []int) {
restartOnDisable, err := (&SettingService{}).GetRestartXrayOnClientDisable()
if err != nil {
logger.Warning("disableInvalidClients: get RestartXrayOnClientDisable failed:", err)
return
}
if !restartOnDisable {
return
}
for _, nodeID := range nodeIDs {
nodeIDCopy := nodeID
rt, rtErr := runtime.GetManager().RuntimeFor(&nodeIDCopy)
if rtErr != nil {
logger.Warning("disableInvalidClients: get runtime for node", nodeID, "failed:", rtErr)
continue
}
if rtErr = rt.RestartXray(context.Background()); rtErr != nil {
logger.Warning("disableInvalidClients: restart xray on node", nodeID, "failed:", rtErr)
}
}
}
func (s *InboundService) GetOnlineClients() []string {
if p == nil {
return []string{}
}
return p.GetOnlineClients()
}
// GetOnlineClientsByGuid returns online emails keyed by the panelGuid of the
// node that physically hosts each set: this panel's own clients under its own
// GUID, plus every node in the tree under its GUID (#4983). Replaces the old
// node-id keying so a client three hops down is attributed to its real node,
// not the intermediate one it was synced through.
func (s *InboundService) GetOnlineClientsByGuid() map[string][]string {
if p == nil {
return map[string][]string{}
}
out := p.GetMergedNodeTrees()
if local := p.GetLocalOnlineClients(); len(local) > 0 {
if guid := s.panelGuid(); guid != "" {
out[guid] = mergeEmails(out[guid], local)
}
}
return out
}
// GetActiveInboundsByGuid returns the inbound tags that carried traffic within
// the grace window for THIS panel, under its own GUID. Remote nodes don't
// report per-inbound activity, so a GUID missing from the map means "don't
// gate" for that node's inbounds.
func (s *InboundService) GetActiveInboundsByGuid() map[string][]string {
if p == nil {
return map[string][]string{}
}
active := p.GetLocalActiveInbounds()
if len(active) == 0 {
return map[string][]string{}
}
guid := s.panelGuid()
if guid == "" {
return map[string][]string{}
}
return map[string][]string{guid: active}
}
func (s *InboundService) SetNodeOnlineTree(nodeID int, tree map[string][]string) {
if p != nil {
p.SetNodeOnlineTree(nodeID, tree)
}
}
func (s *InboundService) ClearNodeOnlineClients(nodeID int) {
if p != nil {
p.ClearNodeOnlineClients(nodeID)
}
}
// panelGuid returns this panel's stable self-identifier, used to key the local
// panel's own clients in the per-node online maps (#4983).
func (s *InboundService) panelGuid() string {
guid, _ := (&SettingService{}).GetPanelGuid()
return guid
}
// synthNodeGuid is the stable per-node fallback identity for a directly-attached
// node whose panel hasn't reported a panelGuid yet (old build). Node ids are
// master-local, so this only composes for direct nodes — exactly the pre-#4983
// flat-topology case where an old-build node appears.
func synthNodeGuid(nodeID int) string {
return fmt.Sprintf("node:%d", nodeID)
}
// mergeEmails returns the deduped union of two email slices.
func mergeEmails(a, b []string) []string {
if len(a) == 0 {
return b
}
seen := make(map[string]struct{}, len(a)+len(b))
out := make([]string, 0, len(a)+len(b))
for _, e := range a {
if _, ok := seen[e]; !ok {
seen[e] = struct{}{}
out = append(out, e)
}
}
for _, e := range b {
if _, ok := seen[e]; !ok {
seen[e] = struct{}{}
out = append(out, e)
}
}
return out
}
func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) {
db := database.GetDB()
var rows []xray.ClientTraffic
err := db.Model(&xray.ClientTraffic{}).Select("email, last_online").Find(&rows).Error
if err != nil && err != gorm.ErrRecordNotFound {
return nil, err
}
result := make(map[string]int64, len(rows))
for _, r := range rows {
result[r.Email] = r.LastOnline
}
return result, nil
}
// RefreshLocalOnlineClients folds the emails and inbound tags active on this
// panel's own xray this poll into the local online/active sets, applying the
// grace window and pruning stale entries. Pass nil to only prune. See
// xray.Process for why the local sets are kept separate from the shared
// last_online column.
func (s *InboundService) RefreshLocalOnlineClients(activeEmails, activeInboundTags []string) {
if p != nil {
p.RefreshLocalOnline(activeEmails, activeInboundTags, time.Now().UnixMilli(), onlineGracePeriodMs)
}
}
func (s *InboundService) FilterAndSortClientEmails(emails []string) ([]string, []string, error) {
db := database.GetDB()
// Step 1: Get ClientTraffic records for emails in the input list.
// Chunked to stay under SQLite's bind-variable limit on huge inputs.
uniqEmails := uniqueNonEmptyStrings(emails)
clients := make([]xray.ClientTraffic, 0, len(uniqEmails))
for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
var page []xray.ClientTraffic
if err := db.Where("email IN ?", batch).Find(&page).Error; err != nil && err != gorm.ErrRecordNotFound {
return nil, nil, err
}
clients = append(clients, page...)
}
// Step 2: Sort clients by (Up + Down) descending
sort.Slice(clients, func(i, j int) bool {
return (clients[i].Up + clients[i].Down) > (clients[j].Up + clients[j].Down)
})
// Step 3: Extract sorted valid emails and track found ones
validEmails := make([]string, 0, len(clients))
found := make(map[string]bool)
for _, client := range clients {
validEmails = append(validEmails, client.Email)
found[client.Email] = true
}
// Step 4: Identify emails that were not found in the database
extraEmails := make([]string, 0)
for _, email := range emails {
if !found[email] {
extraEmails = append(extraEmails, email)
}
}
return validEmails, extraEmails, nil
}