Files
3x-ui/internal/web/service/inbound_node.go
T
MHSanaei b32837e523 fix(node): import per-client traffic history on first sync of a node-hosted inbound
On the first sync of a node-hosted inbound, the central inbound adopted the
node's full lifetime counter but every client_traffics row was seeded at 0 (with
the delta baseline set to the node's current counter). So adding or migrating a
node that already had traffic kept the inbound total correct while every
per-client counter restarted from zero, and the master under-reported per-client
usage by the entire pre-attach history.

Seed a new client_traffics row from the node counter only when the inbound was
created during the same sync (a genuine node-add / inbound re-import); a client
reappearing under a pre-existing inbound still seeds 0, preserving the ghost
protection in TestGhostData_NoPhantomTraffic. The seed is additionally gated on
the delete tombstone so a just-deleted client cannot be resurrected if its
inbound is recreated. Baseline still equals the seeded value, so the next sync
delta is 0 and no traffic is double counted.

Adds TestNodeAdd_ImportsClientHistoryWithNewInbound and
TestNodeAdd_TombstonedClientNotResurrected.
2026-06-25 21:19:27 +02:00

1061 lines
34 KiB
Go

package service
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
"github.com/mhsanaei/3x-ui/v3/internal/logger"
"github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
"github.com/mhsanaei/3x-ui/v3/internal/xray"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
var reportedRemoteTagConflict sync.Map
// nodeBulkPushThreshold caps how many per-client RPCs a single operation will
// stream to a remote node. Above it, the panel marks the node dirty instead and
// lets one ReconcileNode push converge the whole inbound — far cheaper than M
// sequential round-trips. Small ops stay on the live per-client path.
const nodeBulkPushThreshold = 32
func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) {
mgr := runtime.GetManager()
if mgr == nil {
return nil, fmt.Errorf("runtime manager not initialised")
}
return mgr.RuntimeFor(ib.NodeID)
}
func (s *InboundService) nodePushPlan(ib *model.Inbound) (runtime.Runtime, bool, bool, error) {
if ib.NodeID == nil {
rt, err := s.runtimeFor(ib)
if err != nil {
return nil, false, false, nil
}
return rt, true, false, nil
}
nodeSvc := NodeService{}
enabled, status, _, _, err := nodeSvc.NodeSyncState(*ib.NodeID)
if err != nil {
return nil, false, false, err
}
if !enabled || status == "offline" {
return nil, false, true, nil
}
rt, err := s.runtimeFor(ib)
if err != nil {
return nil, false, true, nil
}
return rt, true, false, nil
}
func (s *InboundService) NodeIsPending(nodeID *int) bool {
if nodeID == nil {
return false
}
return (&NodeService{}).IsNodePending(*nodeID)
}
func (s *InboundService) AnyNodePending(inboundIds []int) bool {
if len(inboundIds) == 0 {
return false
}
nodeSvc := NodeService{}
for _, id := range inboundIds {
ib, err := s.GetInbound(id)
if err != nil || ib.NodeID == nil {
continue
}
if nodeSvc.IsNodePending(*ib.NodeID) {
return true
}
}
return false
}
func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, n *model.Node) error {
if rt == nil || n == nil || n.Id <= 0 {
return nil
}
nodeID := n.Id
db := database.GetDB()
var inbounds []*model.Inbound
if err := db.Model(model.Inbound{}).Where("node_id = ?", nodeID).Find(&inbounds).Error; err != nil {
return err
}
remoteTags, err := rt.ListRemoteTags(ctx)
if err != nil {
return err
}
remoteTagSet := make(map[string]struct{}, len(remoteTags))
for _, tag := range remoteTags {
remoteTagSet[tag] = struct{}{}
}
prefix := nodeTagPrefix(&nodeID)
desiredTags := make(map[string]struct{}, len(inbounds)*2)
for _, ib := range inbounds {
desiredTags[ib.Tag] = struct{}{}
// existsOnNode: does the node already report this inbound under any of the
// tag forms it may be stored as? If so, an unchanged push can be skipped.
_, existsOnNode := remoteTagSet[ib.Tag]
if prefix != "" {
if stripped, found := strings.CutPrefix(ib.Tag, prefix); found {
desiredTags[stripped] = struct{}{}
if _, ok := remoteTagSet[stripped]; ok {
existsOnNode = true
}
} else {
desiredTags[prefix+ib.Tag] = struct{}{}
if _, ok := remoteTagSet[prefix+ib.Tag]; ok {
existsOnNode = true
}
}
}
if _, err := rt.ReconcileInbound(ctx, ib, existsOnNode); err != nil {
return fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err)
}
}
// In "selected" sync mode the panel only manages the selected tags: the
// rest were never imported, so their absence from the local DB must not
// delete them from the node. Only a selected tag missing locally (the
// panel deleted it while the node was unreachable) may be swept.
var selected map[string]struct{}
if n.InboundSyncMode == "selected" {
selected = make(map[string]struct{}, len(n.InboundTags))
for _, tag := range n.InboundTags {
selected[tag] = struct{}{}
}
}
for _, tag := range remoteTags {
if _, want := desiredTags[tag]; want {
continue
}
if selected != nil {
if _, managed := selected[tag]; !managed {
continue
}
}
if err := rt.DelInbound(ctx, &model.Inbound{Tag: tag}); err != nil {
return fmt.Errorf("reconcile delete %q: %w", tag, err)
}
}
return nil
}
const resetGracePeriodMs int64 = 30000
// onlineGracePeriodMs must comfortably exceed the 5s traffic-poll interval —
// Xray's stats counters often report a zero delta for an active session across
// a single poll, so a 5s grace would still drop the client on the next tick.
// ~4 polls of slack keeps idle-but-connected clients visible without lingering
// long after a real disconnect.
const onlineGracePeriodMs int64 = 20000
type nodeTrafficCounter struct {
Up int64
Down int64
}
func (s *InboundService) upsertNodeBaseline(tx *gorm.DB, nodeID int, email string, up, down int64) error {
return tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "node_id"}, {Name: "email"}},
DoUpdates: clause.AssignmentColumns([]string{"up", "down"}),
}).Create(&model.NodeClientTraffic{NodeId: nodeID, Email: email, Up: up, Down: down}).Error
}
// mergeActivationExpiry reconciles a node-reported client expiry with the value
// already stored on the master. "Start after first connect" persists a negative
// duration that each node converts to an absolute deadline (now+duration) the
// first time the client connects there. The per-email client_traffics row is
// shared across every node, so a node that has not yet seen a first connection
// keeps reporting the negative duration — which must never reset a deadline
// another node already activated.
//
// A node may legitimately move an already-activated deadline forward (traffic
// reset / auto-renew extends it), so any positive node value is still adopted —
// only an un-activated (<= 0) value is rejected once an absolute deadline
// exists. Kept in lockstep with the SQL CASE in setRemoteTrafficLocked.
func mergeActivationExpiry(existing, node int64) int64 {
if existing > 0 && node <= 0 {
return existing
}
return node
}
func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) {
var structuralChange bool
err := submitTrafficWrite(func() error {
var inner error
structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap, dirty)
return inner
})
return structuralChange, err
}
// GetNodeInboundTrafficTotals returns the current cumulative up/down for every
// node-hosted inbound, keyed by tag. The node sync diffs successive snapshots of
// this to derive per-inbound speed for the dashboard — node inbounds have no
// local Xray poll to produce live deltas the way local inbounds do.
func (s *InboundService) GetNodeInboundTrafficTotals() (map[string][2]int64, error) {
var rows []struct {
Tag string
Up int64
Down int64
}
if err := database.GetDB().Table("inbounds").
Select("tag, up, down").
Where("node_id IS NOT NULL").
Scan(&rows).Error; err != nil {
return nil, err
}
out := make(map[string][2]int64, len(rows))
for _, r := range rows {
out[r.Tag] = [2]int64{r.Up, r.Down}
}
return out, nil
}
func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) {
if snap == nil || nodeID <= 0 {
return false, nil
}
db := database.GetDB()
now := time.Now().UnixMilli()
// originGuidFor attributes a synced inbound to the panel that physically
// hosts it. A node's OWN inbounds report either an empty origin or — on
// builds that set it locally — the node's own panelGuid; both resolve to
// selfKey, which is the node's panelGuid unless that GUID is ambiguous
// (shared with another node or the master, i.e. a cloned server), in which
// case it falls back to the node-unique id so #4983 attribution doesn't
// collapse two physical nodes into one bucket. Only a DIFFERENT, non-empty
// origin (an inbound the node forwards from its own sub-node) is kept as-is,
// so a chained Node1->Node2->Node3 still attributes Node3's inbounds to Node3.
var nodeRow model.Node
db.Select("guid").Where("id = ?", nodeID).First(&nodeRow)
selfKey := effectiveNodeKey(&model.Node{Id: nodeID, Guid: nodeRow.Guid})
guidShared := nodeRow.Guid != "" && selfKey != nodeRow.Guid
originGuidFor := func(snapIb *model.Inbound) string {
if snapIb.OriginNodeGuid != "" && snapIb.OriginNodeGuid != nodeRow.Guid {
return snapIb.OriginNodeGuid
}
return selfKey
}
var central []model.Inbound
if err := db.Model(model.Inbound{}).
Where("node_id = ?", nodeID).
Find(&central).Error; err != nil {
return false, err
}
// Index under the stored tag and its prefix-flipped form so a snap matches
// whether the n<id>- prefix lives on the node side, the central side, or
// neither — a mismatch must never spawn a duplicate central inbound.
tagToCentral := make(map[string]*model.Inbound, len(central)*2)
prefix := nodeTagPrefix(&nodeID)
for i := range central {
tagToCentral[central[i].Tag] = &central[i]
if prefix != "" {
if stripped, found := strings.CutPrefix(central[i].Tag, prefix); found {
tagToCentral[stripped] = &central[i]
} else {
tagToCentral[prefix+central[i].Tag] = &central[i]
}
}
}
var centralClientStats []xray.ClientTraffic
if len(central) > 0 {
ids := make([]int, 0, len(central))
for i := range central {
ids = append(ids, central[i].Id)
}
if err := db.Model(xray.ClientTraffic{}).
Where("inbound_id IN ?", ids).
Find(&centralClientStats).Error; err != nil {
return false, err
}
}
type csKey struct {
inboundID int
email string
}
centralCS := make(map[csKey]*xray.ClientTraffic, len(centralClientStats))
centralCSByEmail := make(map[string]*xray.ClientTraffic, len(centralClientStats))
for i := range centralClientStats {
centralCS[csKey{centralClientStats[i].InboundId, centralClientStats[i].Email}] = &centralClientStats[i]
centralCSByEmail[centralClientStats[i].Email] = &centralClientStats[i]
}
nodeBaselines := make(map[string]nodeTrafficCounter)
var baselineRows []model.NodeClientTraffic
if err := db.Model(&model.NodeClientTraffic{}).
Where("node_id = ?", nodeID).
Find(&baselineRows).Error; err != nil {
return false, err
}
for i := range baselineRows {
nodeBaselines[baselineRows[i].Email] = nodeTrafficCounter{Up: baselineRows[i].Up, Down: baselineRows[i].Down}
}
var defaultUserId int
if len(central) > 0 {
defaultUserId = central[0].UserId
} else {
var u model.User
if err := db.Model(model.User{}).Order("id asc").First(&u).Error; err == nil {
defaultUserId = u.Id
} else {
defaultUserId = 1
}
}
// Union of every email the snapshot still reports, across all inbounds.
// The (node, email) baseline rows are keyed per node, not per inbound, so
// the sweeps below must only drop one when the email left the node
// entirely — an email whose stats moved to (or always lived under) a
// sibling inbound still needs its baseline for the sibling's delta
// computation (#5202).
//
// Xray counts traffic per email, not per inbound, so a multi-attached
// client's shared counter is copied onto every inbound it's on. Fold each
// email to its per-field max (nodeEmailTotals) so divergent copies can't make
// the reset clamp re-add a lower sibling as fresh traffic (#5274).
snapEmailsAll := make(map[string]struct{})
nodeEmailTotals := make(map[string]nodeTrafficCounter)
for _, snapIb := range snap.Inbounds {
if snapIb == nil {
continue
}
for i := range snapIb.ClientStats {
email := snapIb.ClientStats[i].Email
snapEmailsAll[email] = struct{}{}
cur := nodeEmailTotals[email]
if snapIb.ClientStats[i].Up > cur.Up {
cur.Up = snapIb.ClientStats[i].Up
}
if snapIb.ClientStats[i].Down > cur.Down {
cur.Down = snapIb.ClientStats[i].Down
}
nodeEmailTotals[email] = cur
}
}
// Membership set for the rowExists checks below. Only the snapshot's emails
// are ever probed, so scope the lookup to those instead of plucking the whole
// client_traffics table (50k+ rows) on every node poll.
existingEmails := make(map[string]struct{}, len(snapEmailsAll))
if len(snapEmailsAll) > 0 {
snapEmailList := make([]string, 0, len(snapEmailsAll))
for email := range snapEmailsAll {
snapEmailList = append(snapEmailList, email)
}
for _, batch := range chunkStrings(snapEmailList, sqliteMaxVars) {
var found []string
if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Pluck("email", &found).Error; err != nil {
return false, err
}
for _, e := range found {
existingEmails[e] = struct{}{}
}
}
}
tx := db.Begin()
committed := false
defer func() {
if !committed {
tx.Rollback()
}
}()
structuralChange := false
newInboundIDs := make(map[int]struct{})
snapTags := make(map[string]struct{}, len(snap.Inbounds))
for _, snapIb := range snap.Inbounds {
if snapIb == nil {
continue
}
snapTags[snapIb.Tag] = struct{}{}
// Record the prefix-flipped form too so the orphan sweep below keeps a
// central inbound whether its tag carries the n<id>- prefix or not.
if prefix != "" {
if stripped, found := strings.CutPrefix(snapIb.Tag, prefix); found {
snapTags[stripped] = struct{}{}
} else {
snapTags[prefix+snapIb.Tag] = struct{}{}
}
}
c, ok := tagToCentral[snapIb.Tag]
if !ok {
if dirty {
continue
}
// Try snap.Tag first; on collision fall back to the n<id>-
// prefixed form so local+node can both own the same port.
pickFreeTag := func() (string, error) {
candidates := []string{snapIb.Tag}
if prefix != "" && !strings.HasPrefix(snapIb.Tag, prefix) {
candidates = append(candidates, prefix+snapIb.Tag)
}
for _, t := range candidates {
var owner model.Inbound
err := tx.Where("tag = ?", t).First(&owner).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return t, nil
}
if err != nil {
return "", err
}
}
return "", nil
}
chosenTag, err := pickFreeTag()
if err != nil {
logger.Warningf("setRemoteTraffic: check tag %q failed: %v", snapIb.Tag, err)
continue
}
if chosenTag == "" {
key := fmt.Sprintf("%d:%s", nodeID, snapIb.Tag)
if _, seen := reportedRemoteTagConflict.LoadOrStore(key, struct{}{}); !seen {
logger.Warningf(
"setRemoteTraffic: tag %q from node %d collides with an existing inbound even after the n%d- prefix — skipping (rename one side to remove the duplicate)",
snapIb.Tag, nodeID, nodeID,
)
}
continue
}
reportedRemoteTagConflict.Delete(fmt.Sprintf("%d:%s", nodeID, snapIb.Tag))
newIb := model.Inbound{
UserId: defaultUserId,
NodeID: &nodeID,
OriginNodeGuid: originGuidFor(snapIb),
Tag: chosenTag,
Listen: snapIb.Listen,
Port: snapIb.Port,
Protocol: snapIb.Protocol,
Settings: snapIb.Settings,
StreamSettings: snapIb.StreamSettings,
Sniffing: snapIb.Sniffing,
TrafficReset: snapIb.TrafficReset,
LastTrafficResetTime: snapIb.LastTrafficResetTime,
Enable: snapIb.Enable,
Remark: snapIb.Remark,
SubSortIndex: normalizeSubSortIndex(snapIb.SubSortIndex),
Total: snapIb.Total,
ExpiryTime: snapIb.ExpiryTime,
Up: snapIb.Up,
Down: snapIb.Down,
ShareAddrStrategy: "node",
}
if err := tx.Create(&newIb).Error; err != nil {
logger.Warningf("setRemoteTraffic: create central inbound for tag %q failed: %v", snapIb.Tag, err)
continue
}
tagToCentral[snapIb.Tag] = &newIb
if newIb.Tag != snapIb.Tag {
tagToCentral[newIb.Tag] = &newIb
}
newInboundIDs[newIb.Id] = struct{}{}
structuralChange = true
continue
}
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
updates := map[string]any{}
if !dirty {
updates["enable"] = snapIb.Enable
updates["remark"] = snapIb.Remark
updates["sub_sort_index"] = normalizeSubSortIndex(snapIb.SubSortIndex)
updates["listen"] = snapIb.Listen
updates["port"] = snapIb.Port
updates["protocol"] = snapIb.Protocol
updates["total"] = snapIb.Total
updates["expiry_time"] = snapIb.ExpiryTime
updates["settings"] = snapIb.Settings
updates["stream_settings"] = snapIb.StreamSettings
updates["sniffing"] = snapIb.Sniffing
updates["traffic_reset"] = snapIb.TrafficReset
updates["last_traffic_reset_time"] = snapIb.LastTrafficResetTime
}
if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) {
updates["up"] = snapIb.Up
updates["down"] = snapIb.Down
}
// Physical-home attribution is independent of config-dirty state, so
// keep it current even while the node has pending offline edits. Writes
// once to backfill an existing row, then stays equal (#4983).
if og := originGuidFor(snapIb); c.OriginNodeGuid != og {
updates["origin_node_guid"] = og
}
if !dirty && (c.Settings != snapIb.Settings ||
c.Remark != snapIb.Remark ||
c.Listen != snapIb.Listen ||
c.Port != snapIb.Port ||
c.Total != snapIb.Total ||
c.ExpiryTime != snapIb.ExpiryTime ||
c.Enable != snapIb.Enable) {
structuralChange = true
}
if len(updates) > 0 {
if err := tx.Model(model.Inbound{}).
Where("id = ?", c.Id).
Updates(updates).Error; err != nil {
return false, err
}
}
}
for _, c := range central {
if dirty {
continue
}
if len(snapTags) == 0 {
// A node mid-restart or with a transient DB error can return an empty
// inbound list with success=true. Treat "zero inbounds reported" as
// "nothing to say", not "delete all my inbounds" — otherwise a blip
// wipes the node's central inbounds and every client on them (and
// resets traffic history on re-create). A real per-inbound deletion
// still sweeps, because the node keeps reporting its other inbounds.
continue
}
if _, kept := snapTags[c.Tag]; kept {
continue
}
var goneEmails []string
if err := tx.Model(xray.ClientTraffic{}).
Where("inbound_id = ?", c.Id).
Pluck("email", &goneEmails).Error; err != nil {
return false, err
}
if len(goneEmails) > 0 {
// Baselines are per (node, email), not per inbound: keep them for
// emails the snapshot still reports under a sibling inbound (#5202).
baselineGone := make([]string, 0, len(goneEmails))
for _, e := range goneEmails {
if _, still := snapEmailsAll[e]; !still {
baselineGone = append(baselineGone, e)
}
}
// Chunk to avoid SQLite bind var limit when a node has many clients
// removed (e.g. after API bulk delete or structural change on node inbound).
for _, batch := range chunkStrings(baselineGone, sqliteMaxVars) {
if err := tx.Where("node_id = ? AND email IN ?", nodeID, batch).
Delete(&model.NodeClientTraffic{}).Error; err != nil {
return false, err
}
}
// The per-email row is the shared accumulator across every inbound
// (and node) the email is attached to. Only drop it when this was the
// email's last inbound — wiping it while a sibling still feeds it
// loses the summed history, and the next node sync would re-seed the
// row with that node's counter alone.
sharedEmails, sErr := s.emailsUsedByOtherInbounds(goneEmails, c.Id)
if sErr != nil {
return false, sErr
}
delEmails := make([]string, 0, len(goneEmails))
for _, e := range goneEmails {
if !sharedEmails[strings.ToLower(strings.TrimSpace(e))] {
delEmails = append(delEmails, e)
}
}
for _, batch := range chunkStrings(delEmails, sqliteMaxVars) {
if err := tx.Where("inbound_id = ? AND email IN ?", c.Id, batch).
Delete(&xray.ClientTraffic{}).Error; err != nil {
return false, err
}
}
}
if err := s.clientService.DetachInbound(tx, c.Id); err != nil {
return false, err
}
if err := tx.Where("id = ?", c.Id).
Delete(&model.Inbound{}).Error; err != nil {
return false, err
}
delete(tagToCentral, c.Tag)
structuralChange = true
}
for _, snapIb := range snap.Inbounds {
if snapIb == nil {
continue
}
c, ok := tagToCentral[snapIb.Tag]
if !ok {
continue
}
snapEmails := make(map[string]struct{}, len(snapIb.ClientStats))
for _, cs := range snapIb.ClientStats {
snapEmails[cs.Email] = struct{}{}
// Node-wide total, not this inbound's possibly-stale copy (#5274).
canon := nodeEmailTotals[cs.Email]
base, seen := nodeBaselines[cs.Email]
var deltaUp, deltaDown int64
if seen {
if deltaUp = canon.Up - base.Up; deltaUp < 0 {
deltaUp = 0
}
if deltaDown = canon.Down - base.Down; deltaDown < 0 {
deltaDown = 0
}
}
if _, rowExists := existingEmails[cs.Email]; !rowExists {
if dirty {
continue
}
var seedUp, seedDown int64
if _, isNewInbound := newInboundIDs[c.Id]; isNewInbound && !isClientEmailTombstoned(cs.Email) {
seedUp, seedDown = canon.Up, canon.Down
}
row := &xray.ClientTraffic{
InboundId: c.Id,
Email: cs.Email,
Enable: cs.Enable,
Total: cs.Total,
ExpiryTime: cs.ExpiryTime,
Reset: cs.Reset,
Up: seedUp,
Down: seedDown,
LastOnline: cs.LastOnline,
}
if err := tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
Create(row).Error; err != nil {
return false, err
}
centralCS[csKey{c.Id, cs.Email}] = row
centralCSByEmail[cs.Email] = row
existingEmails[cs.Email] = struct{}{}
structuralChange = true
if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, canon.Up, canon.Down); err != nil {
return false, err
}
nodeBaselines[cs.Email] = nodeTrafficCounter{Up: canon.Up, Down: canon.Down}
continue
}
if existing := centralCSByEmail[cs.Email]; existing != nil &&
(existing.Enable != cs.Enable ||
existing.Total != cs.Total ||
existing.ExpiryTime != mergeActivationExpiry(existing.ExpiryTime, cs.ExpiryTime) ||
existing.Reset != cs.Reset) {
structuralChange = true
}
enableExpr := database.ClientTrafficEnableMergeExpr()
// expiry_time merge mirrors mergeActivationExpiry: a node that has not
// yet seen the client's first connection keeps reporting the negative
// "start after first connect" duration, which must never reset the
// absolute deadline another node already activated. A positive node
// value is still adopted (e.g. auto-renew moves the deadline forward).
// CAST(? AS BIGINT): in the `<= 0` comparison Postgres would otherwise
// infer int4 from the literal and overflow on real expiry values.
if err := tx.Exec(
fmt.Sprintf(
`UPDATE client_traffics
SET up = up + ?, down = down + ?, enable = %s, total = ?,
expiry_time = CASE WHEN expiry_time > 0 AND CAST(? AS BIGINT) <= 0 THEN expiry_time ELSE CAST(? AS BIGINT) END,
reset = ?, last_online = %s
WHERE email = ?`,
enableExpr,
database.GreatestExpr("last_online", "?"),
),
deltaUp, deltaDown, cs.Enable, cs.Total,
cs.ExpiryTime, cs.ExpiryTime, cs.Reset,
cs.LastOnline, cs.Email,
).Error; err != nil {
return false, err
}
if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, canon.Up, canon.Down); err != nil {
return false, err
}
nodeBaselines[cs.Email] = nodeTrafficCounter{Up: canon.Up, Down: canon.Down}
}
for k, existing := range centralCS {
if dirty {
continue
}
if k.inboundID != c.Id {
continue
}
if _, kept := snapEmails[k.email]; kept {
continue
}
// Gone from this inbound's stats but still reported by the node under
// a sibling inbound: both the shared accumulator row and the (node,
// email) baseline must survive, or the sibling's next delta would
// compute against nothing and freeze the counter (#5202).
if _, still := snapEmailsAll[k.email]; still {
continue
}
if err := tx.Where("node_id = ? AND email = ?", nodeID, existing.Email).
Delete(&model.NodeClientTraffic{}).Error; err != nil {
return false, err
}
// Same shared-accumulator rule as the inbound-removal sweep above:
// keep the row while another inbound still references the email.
stillUsed, uErr := s.emailUsedByOtherInbounds(existing.Email, c.Id)
if uErr != nil {
return false, uErr
}
if !stillUsed {
if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email).
Delete(&xray.ClientTraffic{}).Error; err != nil {
return false, err
}
}
structuralChange = true
}
}
type oldSet struct {
inboundID int
emails map[string]struct{}
}
var perInboundOld []oldSet
for _, snapIb := range snap.Inbounds {
if snapIb == nil {
continue
}
c, ok := tagToCentral[snapIb.Tag]
if !ok {
continue
}
if dirty {
continue
}
var oldEmailsRows []string
if err := tx.Table("clients").
Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
Where("client_inbounds.inbound_id = ?", c.Id).
Pluck("email", &oldEmailsRows).Error; err == nil {
oldEmails := make(map[string]struct{}, len(oldEmailsRows))
for _, e := range oldEmailsRows {
if e != "" {
oldEmails[e] = struct{}{}
}
}
perInboundOld = append(perInboundOld, oldSet{inboundID: c.Id, emails: oldEmails})
}
clients, gcErr := s.GetClients(snapIb)
if gcErr != nil {
logger.Warningf("setRemoteTraffic: parse clients for tag %q failed: %v", snapIb.Tag, gcErr)
continue
}
csEnableByEmail := make(map[string]bool, len(snapIb.ClientStats))
for _, cs := range snapIb.ClientStats {
csEnableByEmail[cs.Email] = cs.Enable
}
filtered := clients[:0]
for i := range clients {
if isClientEmailTombstoned(clients[i].Email) {
continue
}
if cse, hit := csEnableByEmail[clients[i].Email]; hit && !cse {
clients[i].Enable = false
}
filtered = append(filtered, clients[i])
}
localEmails := make([]string, 0, len(filtered))
for i := range filtered {
if filtered[i].Email != "" {
localEmails = append(localEmails, filtered[i].Email)
}
}
if len(localEmails) > 0 {
var localMeta []struct {
Email string
Comment string `gorm:"column:comment"`
}
if err := tx.Table("clients").
Select("email, comment").
Where("email IN ?", localEmails).
Find(&localMeta).Error; err == nil {
commentByEmail := make(map[string]string, len(localMeta))
for _, m := range localMeta {
commentByEmail[m.Email] = m.Comment
}
for i := range filtered {
if cmt, ok := commentByEmail[filtered[i].Email]; ok {
filtered[i].Comment = cmt
}
}
}
}
if err := s.clientService.SyncInbound(tx, c.Id, filtered); err != nil {
logger.Warningf("setRemoteTraffic: sync clients for tag %q failed: %v", snapIb.Tag, err)
}
}
for _, old := range perInboundOld {
var stillAttached []string
if err := tx.Table("clients").
Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
Where("client_inbounds.inbound_id = ?", old.inboundID).
Pluck("email", &stillAttached).Error; err != nil {
continue
}
stillSet := make(map[string]struct{}, len(stillAttached))
for _, e := range stillAttached {
stillSet[e] = struct{}{}
}
for email := range old.emails {
if _, kept := stillSet[email]; kept {
continue
}
var attachmentCount int64
if err := tx.Table("client_inbounds").
Joins("JOIN clients ON clients.id = client_inbounds.client_id").
Where("clients.email = ?", email).
Count(&attachmentCount).Error; err != nil {
continue
}
if attachmentCount > 0 {
continue
}
if err := tx.Where("email = ?", email).Delete(&model.ClientRecord{}).Error; err != nil {
logger.Warningf("setRemoteTraffic: delete ClientRecord %q failed: %v", email, err)
}
if err := tx.Where("email = ?", email).Delete(&xray.ClientTraffic{}).Error; err != nil {
logger.Warningf("setRemoteTraffic: delete ClientTraffic %q failed: %v", email, err)
}
if err := tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error; err != nil {
logger.Warningf("setRemoteTraffic: delete NodeClientTraffic %q failed: %v", email, err)
}
structuralChange = true
}
}
if err := tx.Commit().Error; err != nil {
return false, err
}
committed = true
if p != nil {
tree := snap.OnlineTree
switch {
case len(tree) == 0 && len(snap.OnlineEmails) > 0:
// Old-build node (no GUID tree): key its flat online list under its
// own effective identity so attribution still works for that branch.
tree = map[string][]string{selfKey: snap.OnlineEmails}
case guidShared && len(tree) > 0:
// Newer cloned node: its own clients arrive keyed under the shared
// panelGuid. Remap just that entry to the node-unique key so the
// clones don't merge; descendant subtrees keep their distinct GUIDs.
if _, ok := tree[nodeRow.Guid]; ok {
remapped := make(map[string][]string, len(tree))
for g, emails := range tree {
if g == nodeRow.Guid {
g = selfKey
}
remapped[g] = emails
}
tree = remapped
}
}
p.SetNodeOnlineTree(nodeID, tree)
}
return structuralChange, nil
}
func (s *InboundService) restartRemoteNodesOnDisable(nodeIDs []int) {
restartOnDisable, err := (&SettingService{}).GetRestartXrayOnClientDisable()
if err != nil {
logger.Warning("disableInvalidClients: get RestartXrayOnClientDisable failed:", err)
return
}
if !restartOnDisable {
return
}
for _, nodeID := range nodeIDs {
nodeIDCopy := nodeID
rt, rtErr := runtime.GetManager().RuntimeFor(&nodeIDCopy)
if rtErr != nil {
logger.Warning("disableInvalidClients: get runtime for node", nodeID, "failed:", rtErr)
continue
}
if rtErr = rt.RestartXray(context.Background()); rtErr != nil {
logger.Warning("disableInvalidClients: restart xray on node", nodeID, "failed:", rtErr)
}
}
}
func (s *InboundService) GetOnlineClients() []string {
if p == nil {
return []string{}
}
return p.GetOnlineClients()
}
// GetOnlineClientsByGuid returns online emails keyed by the panelGuid of the
// node that physically hosts each set: this panel's own clients under its own
// GUID, plus every node in the tree under its GUID (#4983). Replaces the old
// node-id keying so a client three hops down is attributed to its real node,
// not the intermediate one it was synced through.
func (s *InboundService) GetOnlineClientsByGuid() map[string][]string {
if p == nil {
return map[string][]string{}
}
out := p.GetMergedNodeTrees()
if local := p.GetLocalOnlineClients(); len(local) > 0 {
if guid := s.panelGuid(); guid != "" {
out[guid] = mergeEmails(out[guid], local)
}
}
return out
}
// GetActiveInboundsByGuid returns the inbound tags that carried traffic within
// the grace window for THIS panel, under its own GUID. Remote nodes don't
// report per-inbound activity, so a GUID missing from the map means "don't
// gate" for that node's inbounds.
func (s *InboundService) GetActiveInboundsByGuid() map[string][]string {
if p == nil {
return map[string][]string{}
}
active := p.GetLocalActiveInbounds()
if len(active) == 0 {
return map[string][]string{}
}
guid := s.panelGuid()
if guid == "" {
return map[string][]string{}
}
return map[string][]string{guid: active}
}
func (s *InboundService) SetNodeOnlineTree(nodeID int, tree map[string][]string) {
if p != nil {
p.SetNodeOnlineTree(nodeID, tree)
}
}
func (s *InboundService) ClearNodeOnlineClients(nodeID int) {
if p != nil {
p.ClearNodeOnlineClients(nodeID)
}
}
// panelGuid returns this panel's stable self-identifier, used to key the local
// panel's own clients in the per-node online maps (#4983).
func (s *InboundService) panelGuid() string {
guid, _ := (&SettingService{}).GetPanelGuid()
return guid
}
// synthNodeGuid is the stable per-node fallback identity for a directly-attached
// node whose panel hasn't reported a panelGuid yet (old build). Node ids are
// master-local, so this only composes for direct nodes — exactly the pre-#4983
// flat-topology case where an old-build node appears.
func synthNodeGuid(nodeID int) string {
return fmt.Sprintf("node:%d", nodeID)
}
// mergeEmails returns the deduped union of two email slices.
func mergeEmails(a, b []string) []string {
if len(a) == 0 {
return b
}
seen := make(map[string]struct{}, len(a)+len(b))
out := make([]string, 0, len(a)+len(b))
for _, e := range a {
if _, ok := seen[e]; !ok {
seen[e] = struct{}{}
out = append(out, e)
}
}
for _, e := range b {
if _, ok := seen[e]; !ok {
seen[e] = struct{}{}
out = append(out, e)
}
}
return out
}
func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) {
db := database.GetDB()
var rows []xray.ClientTraffic
err := db.Model(&xray.ClientTraffic{}).Select("email, last_online").Find(&rows).Error
if err != nil && err != gorm.ErrRecordNotFound {
return nil, err
}
result := make(map[string]int64, len(rows))
for _, r := range rows {
result[r.Email] = r.LastOnline
}
return result, nil
}
// RefreshLocalOnlineClients folds the emails and inbound tags active on this
// panel's own xray this poll into the local online/active sets, applying the
// grace window and pruning stale entries. Pass nil to only prune. See
// xray.Process for why the local sets are kept separate from the shared
// last_online column.
func (s *InboundService) RefreshLocalOnlineClients(activeEmails, activeInboundTags []string) {
if p != nil {
p.RefreshLocalOnline(activeEmails, activeInboundTags, time.Now().UnixMilli(), onlineGracePeriodMs)
}
}
func (s *InboundService) FilterAndSortClientEmails(emails []string) ([]string, []string, error) {
db := database.GetDB()
// Step 1: Get ClientTraffic records for emails in the input list.
// Chunked to stay under SQLite's bind-variable limit on huge inputs.
uniqEmails := uniqueNonEmptyStrings(emails)
clients := make([]xray.ClientTraffic, 0, len(uniqEmails))
for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
var page []xray.ClientTraffic
if err := db.Where("email IN ?", batch).Find(&page).Error; err != nil && err != gorm.ErrRecordNotFound {
return nil, nil, err
}
clients = append(clients, page...)
}
// Step 2: Sort clients by (Up + Down) descending
sort.Slice(clients, func(i, j int) bool {
return (clients[i].Up + clients[i].Down) > (clients[j].Up + clients[j].Down)
})
// Step 3: Extract sorted valid emails and track found ones
validEmails := make([]string, 0, len(clients))
found := make(map[string]bool)
for _, client := range clients {
validEmails = append(validEmails, client.Email)
found[client.Email] = true
}
// Step 4: Identify emails that were not found in the database
extraEmails := make([]string, 0)
for _, email := range emails {
if !found[email] {
extraEmails = append(extraEmails, email)
}
}
return validEmails, extraEmails, nil
}