mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-06-28 00:24:19 +00:00
8258a26fbf
client_traffics is the per-email accumulator shared across every inbound and node the client is attached to. setRemoteTrafficLocked deleted it unguarded in two sweeps — when a node inbound vanished from the snapshot (node reinstall, tag change, another master's reconcile on a shared node) and when an email left one inbound's stats — even though the email was still attached elsewhere. The next sync then re-seeded the row with that node's counter alone, so the panel showed the last changed panel's number instead of the summed total. Guard both sweeps with emailUsedByOtherInbounds, matching what the manual-edit path (updateClientTraffics) already does. Truly removed clients are still cleaned up by the zero-attachment sweep.
878 lines
26 KiB
Go
878 lines
26 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mhsanaei/3x-ui/v3/internal/database"
|
|
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
|
|
"github.com/mhsanaei/3x-ui/v3/internal/logger"
|
|
"github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
|
|
"github.com/mhsanaei/3x-ui/v3/internal/xray"
|
|
|
|
"gorm.io/gorm"
|
|
"gorm.io/gorm/clause"
|
|
)
|
|
|
|
var reportedRemoteTagConflict sync.Map
|
|
|
|
func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) {
|
|
mgr := runtime.GetManager()
|
|
if mgr == nil {
|
|
return nil, fmt.Errorf("runtime manager not initialised")
|
|
}
|
|
return mgr.RuntimeFor(ib.NodeID)
|
|
}
|
|
|
|
func (s *InboundService) nodePushPlan(ib *model.Inbound) (runtime.Runtime, bool, bool, error) {
|
|
if ib.NodeID == nil {
|
|
rt, err := s.runtimeFor(ib)
|
|
if err != nil {
|
|
return nil, false, false, nil
|
|
}
|
|
return rt, true, false, nil
|
|
}
|
|
nodeSvc := NodeService{}
|
|
enabled, status, _, _, err := nodeSvc.NodeSyncState(*ib.NodeID)
|
|
if err != nil {
|
|
return nil, false, false, err
|
|
}
|
|
if !enabled || status == "offline" {
|
|
return nil, false, true, nil
|
|
}
|
|
rt, err := s.runtimeFor(ib)
|
|
if err != nil {
|
|
return nil, false, true, nil
|
|
}
|
|
return rt, true, false, nil
|
|
}
|
|
|
|
func (s *InboundService) NodeIsPending(nodeID *int) bool {
|
|
if nodeID == nil {
|
|
return false
|
|
}
|
|
return (&NodeService{}).IsNodePending(*nodeID)
|
|
}
|
|
|
|
func (s *InboundService) AnyNodePending(inboundIds []int) bool {
|
|
if len(inboundIds) == 0 {
|
|
return false
|
|
}
|
|
nodeSvc := NodeService{}
|
|
for _, id := range inboundIds {
|
|
ib, err := s.GetInbound(id)
|
|
if err != nil || ib.NodeID == nil {
|
|
continue
|
|
}
|
|
if nodeSvc.IsNodePending(*ib.NodeID) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, nodeID int) error {
|
|
if rt == nil || nodeID <= 0 {
|
|
return nil
|
|
}
|
|
db := database.GetDB()
|
|
var inbounds []*model.Inbound
|
|
if err := db.Model(model.Inbound{}).Where("node_id = ?", nodeID).Find(&inbounds).Error; err != nil {
|
|
return err
|
|
}
|
|
remoteTags, err := rt.ListRemoteTags(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
prefix := nodeTagPrefix(&nodeID)
|
|
desiredTags := make(map[string]struct{}, len(inbounds)*2)
|
|
for _, ib := range inbounds {
|
|
desiredTags[ib.Tag] = struct{}{}
|
|
if prefix != "" {
|
|
if stripped, found := strings.CutPrefix(ib.Tag, prefix); found {
|
|
desiredTags[stripped] = struct{}{}
|
|
} else {
|
|
desiredTags[prefix+ib.Tag] = struct{}{}
|
|
}
|
|
}
|
|
if err := rt.UpdateInbound(ctx, ib, ib); err != nil {
|
|
return fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err)
|
|
}
|
|
}
|
|
for _, tag := range remoteTags {
|
|
if _, want := desiredTags[tag]; want {
|
|
continue
|
|
}
|
|
if err := rt.DelInbound(ctx, &model.Inbound{Tag: tag}); err != nil {
|
|
return fmt.Errorf("reconcile delete %q: %w", tag, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
const resetGracePeriodMs int64 = 30000
|
|
|
|
// onlineGracePeriodMs must comfortably exceed the 5s traffic-poll interval —
|
|
// Xray's stats counters often report a zero delta for an active session across
|
|
// a single poll, so a 5s grace would still drop the client on the next tick.
|
|
// ~4 polls of slack keeps idle-but-connected clients visible without lingering
|
|
// long after a real disconnect.
|
|
const onlineGracePeriodMs int64 = 20000
|
|
|
|
type nodeTrafficCounter struct {
|
|
Up int64
|
|
Down int64
|
|
}
|
|
|
|
func (s *InboundService) upsertNodeBaseline(tx *gorm.DB, nodeID int, email string, up, down int64) error {
|
|
return tx.Clauses(clause.OnConflict{
|
|
Columns: []clause.Column{{Name: "node_id"}, {Name: "email"}},
|
|
DoUpdates: clause.AssignmentColumns([]string{"up", "down"}),
|
|
}).Create(&model.NodeClientTraffic{NodeId: nodeID, Email: email, Up: up, Down: down}).Error
|
|
}
|
|
|
|
func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) {
|
|
var structuralChange bool
|
|
err := submitTrafficWrite(func() error {
|
|
var inner error
|
|
structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap, dirty)
|
|
return inner
|
|
})
|
|
return structuralChange, err
|
|
}
|
|
|
|
func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) {
|
|
if snap == nil || nodeID <= 0 {
|
|
return false, nil
|
|
}
|
|
db := database.GetDB()
|
|
now := time.Now().UnixMilli()
|
|
|
|
// originGuidFor attributes a synced inbound to the panel that physically
|
|
// hosts it: inbounds the node forwards from its own sub-nodes already carry
|
|
// a non-empty OriginNodeGuid (kept as-is across hops); the node's own local
|
|
// inbounds report empty, so they are attributed to the node's own GUID. An
|
|
// empty result (old-build node with no GUID yet) leaves attribution to the
|
|
// node_id fallback downstream (#4983).
|
|
var nodeRow model.Node
|
|
db.Select("guid").Where("id = ?", nodeID).First(&nodeRow)
|
|
originGuidFor := func(snapIb *model.Inbound) string {
|
|
if snapIb.OriginNodeGuid != "" {
|
|
return snapIb.OriginNodeGuid
|
|
}
|
|
return nodeRow.Guid
|
|
}
|
|
|
|
var central []model.Inbound
|
|
if err := db.Model(model.Inbound{}).
|
|
Where("node_id = ?", nodeID).
|
|
Find(¢ral).Error; err != nil {
|
|
return false, err
|
|
}
|
|
// Index under the stored tag and its prefix-flipped form so a snap matches
|
|
// whether the n<id>- prefix lives on the node side, the central side, or
|
|
// neither — a mismatch must never spawn a duplicate central inbound.
|
|
tagToCentral := make(map[string]*model.Inbound, len(central)*2)
|
|
prefix := nodeTagPrefix(&nodeID)
|
|
for i := range central {
|
|
tagToCentral[central[i].Tag] = ¢ral[i]
|
|
if prefix != "" {
|
|
if stripped, found := strings.CutPrefix(central[i].Tag, prefix); found {
|
|
tagToCentral[stripped] = ¢ral[i]
|
|
} else {
|
|
tagToCentral[prefix+central[i].Tag] = ¢ral[i]
|
|
}
|
|
}
|
|
}
|
|
|
|
var centralClientStats []xray.ClientTraffic
|
|
if len(central) > 0 {
|
|
ids := make([]int, 0, len(central))
|
|
for i := range central {
|
|
ids = append(ids, central[i].Id)
|
|
}
|
|
if err := db.Model(xray.ClientTraffic{}).
|
|
Where("inbound_id IN ?", ids).
|
|
Find(¢ralClientStats).Error; err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
type csKey struct {
|
|
inboundID int
|
|
email string
|
|
}
|
|
centralCS := make(map[csKey]*xray.ClientTraffic, len(centralClientStats))
|
|
centralCSByEmail := make(map[string]*xray.ClientTraffic, len(centralClientStats))
|
|
for i := range centralClientStats {
|
|
centralCS[csKey{centralClientStats[i].InboundId, centralClientStats[i].Email}] = ¢ralClientStats[i]
|
|
centralCSByEmail[centralClientStats[i].Email] = ¢ralClientStats[i]
|
|
}
|
|
|
|
nodeBaselines := make(map[string]nodeTrafficCounter)
|
|
var baselineRows []model.NodeClientTraffic
|
|
if err := db.Model(&model.NodeClientTraffic{}).
|
|
Where("node_id = ?", nodeID).
|
|
Find(&baselineRows).Error; err != nil {
|
|
return false, err
|
|
}
|
|
for i := range baselineRows {
|
|
nodeBaselines[baselineRows[i].Email] = nodeTrafficCounter{Up: baselineRows[i].Up, Down: baselineRows[i].Down}
|
|
}
|
|
|
|
var existingEmailsList []string
|
|
if err := db.Model(xray.ClientTraffic{}).Pluck("email", &existingEmailsList).Error; err != nil {
|
|
return false, err
|
|
}
|
|
existingEmails := make(map[string]struct{}, len(existingEmailsList))
|
|
for _, e := range existingEmailsList {
|
|
existingEmails[e] = struct{}{}
|
|
}
|
|
|
|
var defaultUserId int
|
|
if len(central) > 0 {
|
|
defaultUserId = central[0].UserId
|
|
} else {
|
|
var u model.User
|
|
if err := db.Model(model.User{}).Order("id asc").First(&u).Error; err == nil {
|
|
defaultUserId = u.Id
|
|
} else {
|
|
defaultUserId = 1
|
|
}
|
|
}
|
|
|
|
tx := db.Begin()
|
|
committed := false
|
|
defer func() {
|
|
if !committed {
|
|
tx.Rollback()
|
|
}
|
|
}()
|
|
|
|
structuralChange := false
|
|
|
|
snapTags := make(map[string]struct{}, len(snap.Inbounds))
|
|
for _, snapIb := range snap.Inbounds {
|
|
if snapIb == nil {
|
|
continue
|
|
}
|
|
snapTags[snapIb.Tag] = struct{}{}
|
|
// Record the prefix-flipped form too so the orphan sweep below keeps a
|
|
// central inbound whether its tag carries the n<id>- prefix or not.
|
|
if prefix != "" {
|
|
if stripped, found := strings.CutPrefix(snapIb.Tag, prefix); found {
|
|
snapTags[stripped] = struct{}{}
|
|
} else {
|
|
snapTags[prefix+snapIb.Tag] = struct{}{}
|
|
}
|
|
}
|
|
|
|
c, ok := tagToCentral[snapIb.Tag]
|
|
if !ok {
|
|
if dirty {
|
|
continue
|
|
}
|
|
// Try snap.Tag first; on collision fall back to the n<id>-
|
|
// prefixed form so local+node can both own the same port.
|
|
pickFreeTag := func() (string, error) {
|
|
candidates := []string{snapIb.Tag}
|
|
if prefix != "" && !strings.HasPrefix(snapIb.Tag, prefix) {
|
|
candidates = append(candidates, prefix+snapIb.Tag)
|
|
}
|
|
for _, t := range candidates {
|
|
var owner model.Inbound
|
|
err := tx.Where("tag = ?", t).First(&owner).Error
|
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return t, nil
|
|
}
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
return "", nil
|
|
}
|
|
chosenTag, err := pickFreeTag()
|
|
if err != nil {
|
|
logger.Warningf("setRemoteTraffic: check tag %q failed: %v", snapIb.Tag, err)
|
|
continue
|
|
}
|
|
if chosenTag == "" {
|
|
key := fmt.Sprintf("%d:%s", nodeID, snapIb.Tag)
|
|
if _, seen := reportedRemoteTagConflict.LoadOrStore(key, struct{}{}); !seen {
|
|
logger.Warningf(
|
|
"setRemoteTraffic: tag %q from node %d collides with an existing inbound even after the n%d- prefix — skipping (rename one side to remove the duplicate)",
|
|
snapIb.Tag, nodeID, nodeID,
|
|
)
|
|
}
|
|
continue
|
|
}
|
|
newIb := model.Inbound{
|
|
UserId: defaultUserId,
|
|
NodeID: &nodeID,
|
|
OriginNodeGuid: originGuidFor(snapIb),
|
|
Tag: chosenTag,
|
|
Listen: snapIb.Listen,
|
|
Port: snapIb.Port,
|
|
Protocol: snapIb.Protocol,
|
|
Settings: snapIb.Settings,
|
|
StreamSettings: snapIb.StreamSettings,
|
|
Sniffing: snapIb.Sniffing,
|
|
TrafficReset: snapIb.TrafficReset,
|
|
LastTrafficResetTime: snapIb.LastTrafficResetTime,
|
|
Enable: snapIb.Enable,
|
|
Remark: snapIb.Remark,
|
|
Total: snapIb.Total,
|
|
ExpiryTime: snapIb.ExpiryTime,
|
|
Up: snapIb.Up,
|
|
Down: snapIb.Down,
|
|
}
|
|
if err := tx.Create(&newIb).Error; err != nil {
|
|
logger.Warningf("setRemoteTraffic: create central inbound for tag %q failed: %v", snapIb.Tag, err)
|
|
continue
|
|
}
|
|
tagToCentral[snapIb.Tag] = &newIb
|
|
if newIb.Tag != snapIb.Tag {
|
|
tagToCentral[newIb.Tag] = &newIb
|
|
}
|
|
structuralChange = true
|
|
continue
|
|
}
|
|
|
|
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
|
|
|
|
updates := map[string]any{}
|
|
if !dirty {
|
|
updates["enable"] = snapIb.Enable
|
|
updates["remark"] = snapIb.Remark
|
|
updates["listen"] = snapIb.Listen
|
|
updates["port"] = snapIb.Port
|
|
updates["protocol"] = snapIb.Protocol
|
|
updates["total"] = snapIb.Total
|
|
updates["expiry_time"] = snapIb.ExpiryTime
|
|
updates["settings"] = snapIb.Settings
|
|
updates["stream_settings"] = snapIb.StreamSettings
|
|
updates["sniffing"] = snapIb.Sniffing
|
|
updates["traffic_reset"] = snapIb.TrafficReset
|
|
updates["last_traffic_reset_time"] = snapIb.LastTrafficResetTime
|
|
}
|
|
if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) {
|
|
updates["up"] = snapIb.Up
|
|
updates["down"] = snapIb.Down
|
|
}
|
|
// Physical-home attribution is independent of config-dirty state, so
|
|
// keep it current even while the node has pending offline edits. Writes
|
|
// once to backfill an existing row, then stays equal (#4983).
|
|
if og := originGuidFor(snapIb); c.OriginNodeGuid != og {
|
|
updates["origin_node_guid"] = og
|
|
}
|
|
|
|
if !dirty && (c.Settings != snapIb.Settings ||
|
|
c.Remark != snapIb.Remark ||
|
|
c.Listen != snapIb.Listen ||
|
|
c.Port != snapIb.Port ||
|
|
c.Total != snapIb.Total ||
|
|
c.ExpiryTime != snapIb.ExpiryTime ||
|
|
c.Enable != snapIb.Enable) {
|
|
structuralChange = true
|
|
}
|
|
|
|
if len(updates) > 0 {
|
|
if err := tx.Model(model.Inbound{}).
|
|
Where("id = ?", c.Id).
|
|
Updates(updates).Error; err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, c := range central {
|
|
if dirty {
|
|
continue
|
|
}
|
|
if _, kept := snapTags[c.Tag]; kept {
|
|
continue
|
|
}
|
|
var goneEmails []string
|
|
if err := tx.Model(xray.ClientTraffic{}).
|
|
Where("inbound_id = ?", c.Id).
|
|
Pluck("email", &goneEmails).Error; err != nil {
|
|
return false, err
|
|
}
|
|
if len(goneEmails) > 0 {
|
|
// Chunk to avoid SQLite bind var limit when a node has many clients
|
|
// removed (e.g. after API bulk delete or structural change on node inbound).
|
|
for _, batch := range chunkStrings(goneEmails, sqliteMaxVars) {
|
|
if err := tx.Where("node_id = ? AND email IN ?", nodeID, batch).
|
|
Delete(&model.NodeClientTraffic{}).Error; err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
// The per-email row is the shared accumulator across every inbound
|
|
// (and node) the email is attached to. Only drop it when this was the
|
|
// email's last inbound — wiping it while a sibling still feeds it
|
|
// loses the summed history, and the next node sync would re-seed the
|
|
// row with that node's counter alone.
|
|
sharedEmails, sErr := s.emailsUsedByOtherInbounds(goneEmails, c.Id)
|
|
if sErr != nil {
|
|
return false, sErr
|
|
}
|
|
delEmails := make([]string, 0, len(goneEmails))
|
|
for _, e := range goneEmails {
|
|
if !sharedEmails[strings.ToLower(strings.TrimSpace(e))] {
|
|
delEmails = append(delEmails, e)
|
|
}
|
|
}
|
|
for _, batch := range chunkStrings(delEmails, sqliteMaxVars) {
|
|
if err := tx.Where("inbound_id = ? AND email IN ?", c.Id, batch).
|
|
Delete(&xray.ClientTraffic{}).Error; err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
}
|
|
if err := s.clientService.DetachInbound(tx, c.Id); err != nil {
|
|
return false, err
|
|
}
|
|
if err := tx.Where("id = ?", c.Id).
|
|
Delete(&model.Inbound{}).Error; err != nil {
|
|
return false, err
|
|
}
|
|
delete(tagToCentral, c.Tag)
|
|
structuralChange = true
|
|
}
|
|
|
|
for _, snapIb := range snap.Inbounds {
|
|
if snapIb == nil {
|
|
continue
|
|
}
|
|
c, ok := tagToCentral[snapIb.Tag]
|
|
if !ok {
|
|
continue
|
|
}
|
|
snapEmails := make(map[string]struct{}, len(snapIb.ClientStats))
|
|
for _, cs := range snapIb.ClientStats {
|
|
snapEmails[cs.Email] = struct{}{}
|
|
|
|
base, seen := nodeBaselines[cs.Email]
|
|
var deltaUp, deltaDown int64
|
|
if seen {
|
|
if deltaUp = cs.Up - base.Up; deltaUp < 0 {
|
|
deltaUp = cs.Up
|
|
}
|
|
if deltaDown = cs.Down - base.Down; deltaDown < 0 {
|
|
deltaDown = cs.Down
|
|
}
|
|
}
|
|
|
|
if _, rowExists := existingEmails[cs.Email]; !rowExists {
|
|
if dirty {
|
|
continue
|
|
}
|
|
row := &xray.ClientTraffic{
|
|
InboundId: c.Id,
|
|
Email: cs.Email,
|
|
Enable: cs.Enable,
|
|
Total: cs.Total,
|
|
ExpiryTime: cs.ExpiryTime,
|
|
Reset: cs.Reset,
|
|
Up: cs.Up,
|
|
Down: cs.Down,
|
|
LastOnline: cs.LastOnline,
|
|
}
|
|
if err := tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
|
|
Create(row).Error; err != nil {
|
|
return false, err
|
|
}
|
|
centralCS[csKey{c.Id, cs.Email}] = row
|
|
centralCSByEmail[cs.Email] = row
|
|
existingEmails[cs.Email] = struct{}{}
|
|
structuralChange = true
|
|
if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, cs.Up, cs.Down); err != nil {
|
|
return false, err
|
|
}
|
|
nodeBaselines[cs.Email] = nodeTrafficCounter{Up: cs.Up, Down: cs.Down}
|
|
continue
|
|
}
|
|
|
|
if existing := centralCSByEmail[cs.Email]; existing != nil &&
|
|
(existing.Enable != cs.Enable ||
|
|
existing.Total != cs.Total ||
|
|
existing.ExpiryTime != cs.ExpiryTime ||
|
|
existing.Reset != cs.Reset) {
|
|
structuralChange = true
|
|
}
|
|
|
|
enableExpr := database.ClientTrafficEnableMergeExpr()
|
|
if err := tx.Exec(
|
|
fmt.Sprintf(
|
|
`UPDATE client_traffics
|
|
SET up = up + ?, down = down + ?, enable = %s, total = ?, expiry_time = ?, reset = ?,
|
|
last_online = %s
|
|
WHERE email = ?`,
|
|
enableExpr,
|
|
database.GreatestExpr("last_online", "?"),
|
|
),
|
|
deltaUp, deltaDown, cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset,
|
|
cs.LastOnline, cs.Email,
|
|
).Error; err != nil {
|
|
return false, err
|
|
}
|
|
if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, cs.Up, cs.Down); err != nil {
|
|
return false, err
|
|
}
|
|
nodeBaselines[cs.Email] = nodeTrafficCounter{Up: cs.Up, Down: cs.Down}
|
|
}
|
|
|
|
for k, existing := range centralCS {
|
|
if dirty {
|
|
continue
|
|
}
|
|
if k.inboundID != c.Id {
|
|
continue
|
|
}
|
|
if _, kept := snapEmails[k.email]; kept {
|
|
continue
|
|
}
|
|
if err := tx.Where("node_id = ? AND email = ?", nodeID, existing.Email).
|
|
Delete(&model.NodeClientTraffic{}).Error; err != nil {
|
|
return false, err
|
|
}
|
|
// Same shared-accumulator rule as the inbound-removal sweep above:
|
|
// keep the row while another inbound still references the email.
|
|
stillUsed, uErr := s.emailUsedByOtherInbounds(existing.Email, c.Id)
|
|
if uErr != nil {
|
|
return false, uErr
|
|
}
|
|
if !stillUsed {
|
|
if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email).
|
|
Delete(&xray.ClientTraffic{}).Error; err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
structuralChange = true
|
|
}
|
|
}
|
|
|
|
type oldSet struct {
|
|
inboundID int
|
|
emails map[string]struct{}
|
|
}
|
|
var perInboundOld []oldSet
|
|
for _, snapIb := range snap.Inbounds {
|
|
if snapIb == nil {
|
|
continue
|
|
}
|
|
c, ok := tagToCentral[snapIb.Tag]
|
|
if !ok {
|
|
continue
|
|
}
|
|
if dirty {
|
|
continue
|
|
}
|
|
var oldEmailsRows []string
|
|
if err := tx.Table("clients").
|
|
Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
|
|
Where("client_inbounds.inbound_id = ?", c.Id).
|
|
Pluck("email", &oldEmailsRows).Error; err == nil {
|
|
oldEmails := make(map[string]struct{}, len(oldEmailsRows))
|
|
for _, e := range oldEmailsRows {
|
|
if e != "" {
|
|
oldEmails[e] = struct{}{}
|
|
}
|
|
}
|
|
perInboundOld = append(perInboundOld, oldSet{inboundID: c.Id, emails: oldEmails})
|
|
}
|
|
|
|
clients, gcErr := s.GetClients(snapIb)
|
|
if gcErr != nil {
|
|
logger.Warningf("setRemoteTraffic: parse clients for tag %q failed: %v", snapIb.Tag, gcErr)
|
|
continue
|
|
}
|
|
csEnableByEmail := make(map[string]bool, len(snapIb.ClientStats))
|
|
for _, cs := range snapIb.ClientStats {
|
|
csEnableByEmail[cs.Email] = cs.Enable
|
|
}
|
|
filtered := clients[:0]
|
|
for i := range clients {
|
|
if isClientEmailTombstoned(clients[i].Email) {
|
|
continue
|
|
}
|
|
if cse, hit := csEnableByEmail[clients[i].Email]; hit && !cse {
|
|
clients[i].Enable = false
|
|
}
|
|
filtered = append(filtered, clients[i])
|
|
}
|
|
localEmails := make([]string, 0, len(filtered))
|
|
for i := range filtered {
|
|
if filtered[i].Email != "" {
|
|
localEmails = append(localEmails, filtered[i].Email)
|
|
}
|
|
}
|
|
if len(localEmails) > 0 {
|
|
var localMeta []struct {
|
|
Email string
|
|
Comment string `gorm:"column:comment"`
|
|
}
|
|
if err := tx.Table("clients").
|
|
Select("email, comment").
|
|
Where("email IN ?", localEmails).
|
|
Find(&localMeta).Error; err == nil {
|
|
commentByEmail := make(map[string]string, len(localMeta))
|
|
for _, m := range localMeta {
|
|
commentByEmail[m.Email] = m.Comment
|
|
}
|
|
for i := range filtered {
|
|
if cmt, ok := commentByEmail[filtered[i].Email]; ok {
|
|
filtered[i].Comment = cmt
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if err := s.clientService.SyncInbound(tx, c.Id, filtered); err != nil {
|
|
logger.Warningf("setRemoteTraffic: sync clients for tag %q failed: %v", snapIb.Tag, err)
|
|
}
|
|
}
|
|
|
|
for _, old := range perInboundOld {
|
|
var stillAttached []string
|
|
if err := tx.Table("clients").
|
|
Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
|
|
Where("client_inbounds.inbound_id = ?", old.inboundID).
|
|
Pluck("email", &stillAttached).Error; err != nil {
|
|
continue
|
|
}
|
|
stillSet := make(map[string]struct{}, len(stillAttached))
|
|
for _, e := range stillAttached {
|
|
stillSet[e] = struct{}{}
|
|
}
|
|
for email := range old.emails {
|
|
if _, kept := stillSet[email]; kept {
|
|
continue
|
|
}
|
|
var attachmentCount int64
|
|
if err := tx.Table("client_inbounds").
|
|
Joins("JOIN clients ON clients.id = client_inbounds.client_id").
|
|
Where("clients.email = ?", email).
|
|
Count(&attachmentCount).Error; err != nil {
|
|
continue
|
|
}
|
|
if attachmentCount > 0 {
|
|
continue
|
|
}
|
|
if err := tx.Where("email = ?", email).Delete(&model.ClientRecord{}).Error; err != nil {
|
|
logger.Warningf("setRemoteTraffic: delete ClientRecord %q failed: %v", email, err)
|
|
}
|
|
if err := tx.Where("email = ?", email).Delete(&xray.ClientTraffic{}).Error; err != nil {
|
|
logger.Warningf("setRemoteTraffic: delete ClientTraffic %q failed: %v", email, err)
|
|
}
|
|
if err := tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error; err != nil {
|
|
logger.Warningf("setRemoteTraffic: delete NodeClientTraffic %q failed: %v", email, err)
|
|
}
|
|
structuralChange = true
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit().Error; err != nil {
|
|
return false, err
|
|
}
|
|
committed = true
|
|
|
|
if p != nil {
|
|
tree := snap.OnlineTree
|
|
if len(tree) == 0 && len(snap.OnlineEmails) > 0 {
|
|
// Old-build node (no GUID tree): key its flat online list under its
|
|
// own effective identity so attribution still works for that branch.
|
|
effectiveGuid := nodeRow.Guid
|
|
if effectiveGuid == "" {
|
|
effectiveGuid = synthNodeGuid(nodeID)
|
|
}
|
|
tree = map[string][]string{effectiveGuid: snap.OnlineEmails}
|
|
}
|
|
p.SetNodeOnlineTree(nodeID, tree)
|
|
}
|
|
|
|
return structuralChange, nil
|
|
}
|
|
|
|
func (s *InboundService) restartRemoteNodesOnDisable(nodeIDs []int) {
|
|
restartOnDisable, err := (&SettingService{}).GetRestartXrayOnClientDisable()
|
|
if err != nil {
|
|
logger.Warning("disableInvalidClients: get RestartXrayOnClientDisable failed:", err)
|
|
return
|
|
}
|
|
if !restartOnDisable {
|
|
return
|
|
}
|
|
for _, nodeID := range nodeIDs {
|
|
nodeIDCopy := nodeID
|
|
rt, rtErr := runtime.GetManager().RuntimeFor(&nodeIDCopy)
|
|
if rtErr != nil {
|
|
logger.Warning("disableInvalidClients: get runtime for node", nodeID, "failed:", rtErr)
|
|
continue
|
|
}
|
|
if rtErr = rt.RestartXray(context.Background()); rtErr != nil {
|
|
logger.Warning("disableInvalidClients: restart xray on node", nodeID, "failed:", rtErr)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *InboundService) GetOnlineClients() []string {
|
|
if p == nil {
|
|
return []string{}
|
|
}
|
|
return p.GetOnlineClients()
|
|
}
|
|
|
|
// GetOnlineClientsByGuid returns online emails keyed by the panelGuid of the
|
|
// node that physically hosts each set: this panel's own clients under its own
|
|
// GUID, plus every node in the tree under its GUID (#4983). Replaces the old
|
|
// node-id keying so a client three hops down is attributed to its real node,
|
|
// not the intermediate one it was synced through.
|
|
func (s *InboundService) GetOnlineClientsByGuid() map[string][]string {
|
|
if p == nil {
|
|
return map[string][]string{}
|
|
}
|
|
out := p.GetMergedNodeTrees()
|
|
if local := p.GetLocalOnlineClients(); len(local) > 0 {
|
|
if guid := s.panelGuid(); guid != "" {
|
|
out[guid] = mergeEmails(out[guid], local)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// GetActiveInboundsByGuid returns the inbound tags that carried traffic within
|
|
// the grace window for THIS panel, under its own GUID. Remote nodes don't
|
|
// report per-inbound activity, so a GUID missing from the map means "don't
|
|
// gate" for that node's inbounds.
|
|
func (s *InboundService) GetActiveInboundsByGuid() map[string][]string {
|
|
if p == nil {
|
|
return map[string][]string{}
|
|
}
|
|
active := p.GetLocalActiveInbounds()
|
|
if len(active) == 0 {
|
|
return map[string][]string{}
|
|
}
|
|
guid := s.panelGuid()
|
|
if guid == "" {
|
|
return map[string][]string{}
|
|
}
|
|
return map[string][]string{guid: active}
|
|
}
|
|
|
|
func (s *InboundService) SetNodeOnlineTree(nodeID int, tree map[string][]string) {
|
|
if p != nil {
|
|
p.SetNodeOnlineTree(nodeID, tree)
|
|
}
|
|
}
|
|
|
|
func (s *InboundService) ClearNodeOnlineClients(nodeID int) {
|
|
if p != nil {
|
|
p.ClearNodeOnlineClients(nodeID)
|
|
}
|
|
}
|
|
|
|
// panelGuid returns this panel's stable self-identifier, used to key the local
|
|
// panel's own clients in the per-node online maps (#4983).
|
|
func (s *InboundService) panelGuid() string {
|
|
guid, _ := (&SettingService{}).GetPanelGuid()
|
|
return guid
|
|
}
|
|
|
|
// synthNodeGuid is the stable per-node fallback identity for a directly-attached
|
|
// node whose panel hasn't reported a panelGuid yet (old build). Node ids are
|
|
// master-local, so this only composes for direct nodes — exactly the pre-#4983
|
|
// flat-topology case where an old-build node appears.
|
|
func synthNodeGuid(nodeID int) string {
|
|
return fmt.Sprintf("node:%d", nodeID)
|
|
}
|
|
|
|
// mergeEmails returns the deduped union of two email slices.
|
|
func mergeEmails(a, b []string) []string {
|
|
if len(a) == 0 {
|
|
return b
|
|
}
|
|
seen := make(map[string]struct{}, len(a)+len(b))
|
|
out := make([]string, 0, len(a)+len(b))
|
|
for _, e := range a {
|
|
if _, ok := seen[e]; !ok {
|
|
seen[e] = struct{}{}
|
|
out = append(out, e)
|
|
}
|
|
}
|
|
for _, e := range b {
|
|
if _, ok := seen[e]; !ok {
|
|
seen[e] = struct{}{}
|
|
out = append(out, e)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) {
|
|
db := database.GetDB()
|
|
var rows []xray.ClientTraffic
|
|
err := db.Model(&xray.ClientTraffic{}).Select("email, last_online").Find(&rows).Error
|
|
if err != nil && err != gorm.ErrRecordNotFound {
|
|
return nil, err
|
|
}
|
|
result := make(map[string]int64, len(rows))
|
|
for _, r := range rows {
|
|
result[r.Email] = r.LastOnline
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// RefreshLocalOnlineClients folds the emails and inbound tags active on this
|
|
// panel's own xray this poll into the local online/active sets, applying the
|
|
// grace window and pruning stale entries. Pass nil to only prune. See
|
|
// xray.Process for why the local sets are kept separate from the shared
|
|
// last_online column.
|
|
func (s *InboundService) RefreshLocalOnlineClients(activeEmails, activeInboundTags []string) {
|
|
if p != nil {
|
|
p.RefreshLocalOnline(activeEmails, activeInboundTags, time.Now().UnixMilli(), onlineGracePeriodMs)
|
|
}
|
|
}
|
|
|
|
func (s *InboundService) FilterAndSortClientEmails(emails []string) ([]string, []string, error) {
|
|
db := database.GetDB()
|
|
|
|
// Step 1: Get ClientTraffic records for emails in the input list.
|
|
// Chunked to stay under SQLite's bind-variable limit on huge inputs.
|
|
uniqEmails := uniqueNonEmptyStrings(emails)
|
|
clients := make([]xray.ClientTraffic, 0, len(uniqEmails))
|
|
for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
|
|
var page []xray.ClientTraffic
|
|
if err := db.Where("email IN ?", batch).Find(&page).Error; err != nil && err != gorm.ErrRecordNotFound {
|
|
return nil, nil, err
|
|
}
|
|
clients = append(clients, page...)
|
|
}
|
|
|
|
// Step 2: Sort clients by (Up + Down) descending
|
|
sort.Slice(clients, func(i, j int) bool {
|
|
return (clients[i].Up + clients[i].Down) > (clients[j].Up + clients[j].Down)
|
|
})
|
|
|
|
// Step 3: Extract sorted valid emails and track found ones
|
|
validEmails := make([]string, 0, len(clients))
|
|
found := make(map[string]bool)
|
|
for _, client := range clients {
|
|
validEmails = append(validEmails, client.Email)
|
|
found[client.Email] = true
|
|
}
|
|
|
|
// Step 4: Identify emails that were not found in the database
|
|
extraEmails := make([]string, 0)
|
|
for _, email := range emails {
|
|
if !found[email] {
|
|
extraEmails = append(extraEmails, email)
|
|
}
|
|
}
|
|
|
|
return validEmails, extraEmails, nil
|
|
}
|