Files
3x-ui/internal/web/runtime/manager.go
T
MHSanaei 6a032bcb2a perf(scale): speed up traffic, auto-renew, and node bulk ops at 50k-100k clients
Local hot paths:
- autoRenewClients: replace the O(clients x expired) inner scan with an
  email->traffic map lookup (quadratic at scale).
- node traffic sync: scope the client_traffics email-membership query to the
  snapshot's emails instead of plucking the whole table every poll.
- add a (expiry_time, reset) index for the per-tick auto-renew filter.
- SQLite: add cache_size/mmap_size/temp_store pragmas (env-tunable); keep the
  single-file DELETE journal and synchronous=FULL defaults.
- scale benchmarks now run on SQLite too via XUI_SCALE_TEST=1 (shared
  setupScaleDB/resetScaleTables helpers), not just Postgres.

Node paths:
- bulk add/delete/adjust on a node-attached inbound folded one HTTP RPC per
  client; above nodeBulkPushThreshold (32) mark the node dirty and let one
  ReconcileNode push converge it instead of O(M) sequential round-trips.
  Small ops keep the live per-client path. Also hoist nodePushPlan out of the
  per-email delete loop.
- ReconcileNode skips inbounds whose wire payload is unchanged (per-tag
  fingerprint on Remote), guarded by node-side tag presence so a restarted
  node is still re-seeded.

Tests: auto-renew multi-inbound correctness, node-path dispatch (large ops
fold to dirty, small ops push live) via a manager runtime override seam, and
reconcile delta-skip.
2026-06-20 10:35:46 +02:00

148 lines
2.9 KiB
Go

package runtime
import (
"errors"
"sync"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
)
type NodeEgressResolver interface {
NodeEgressProxyURL(nodeID int) string
}
type Manager struct {
local Runtime
mu sync.RWMutex
remotes map[int]*Remote
overrides map[int]Runtime // test-only: forces RuntimeFor to return a stub
egressResolver NodeEgressResolver
}
func NewManager(localDeps LocalDeps) *Manager {
return &Manager{
local: NewLocal(localDeps),
remotes: make(map[int]*Remote),
}
}
// SetRuntimeOverride makes RuntimeFor(nodeID) return rt instead of building a
// real Remote. Test seam for exercising node-dispatch paths without a network
// node; pass nil rt to clear.
func (m *Manager) SetRuntimeOverride(nodeID int, rt Runtime) {
m.mu.Lock()
defer m.mu.Unlock()
if rt == nil {
delete(m.overrides, nodeID)
return
}
if m.overrides == nil {
m.overrides = make(map[int]Runtime)
}
m.overrides[nodeID] = rt
}
func (m *Manager) SetNodeEgressResolver(r NodeEgressResolver) {
m.mu.Lock()
defer m.mu.Unlock()
m.egressResolver = r
}
func (m *Manager) NodeEgressProxyURL(nodeID int) string {
m.mu.RLock()
defer m.mu.RUnlock()
if m.egressResolver == nil {
return ""
}
return m.egressResolver.NodeEgressProxyURL(nodeID)
}
func (m *Manager) RuntimeFor(nodeID *int) (Runtime, error) {
if nodeID == nil {
return m.local, nil
}
m.mu.RLock()
if rt, ok := m.overrides[*nodeID]; ok {
m.mu.RUnlock()
return rt, nil
}
if rt, ok := m.remotes[*nodeID]; ok {
m.mu.RUnlock()
return rt, nil
}
m.mu.RUnlock()
m.mu.Lock()
defer m.mu.Unlock()
if rt, ok := m.remotes[*nodeID]; ok {
return rt, nil
}
n, err := loadNode(*nodeID)
if err != nil {
return nil, err
}
if !n.Enable {
return nil, errors.New("node " + n.Name + " is disabled")
}
rt := NewRemote(n, m.egressResolver)
m.remotes[*nodeID] = rt
return rt, nil
}
func (m *Manager) Local() Runtime { return m.local }
func (m *Manager) RemoteFor(node *model.Node) (*Remote, error) {
if node == nil {
return nil, errors.New("node is nil")
}
m.mu.RLock()
if rt, ok := m.remotes[node.Id]; ok {
m.mu.RUnlock()
return rt, nil
}
m.mu.RUnlock()
m.mu.Lock()
defer m.mu.Unlock()
if rt, ok := m.remotes[node.Id]; ok {
return rt, nil
}
rt := NewRemote(node, m.egressResolver)
m.remotes[node.Id] = rt
return rt, nil
}
func (m *Manager) InvalidateNode(nodeID int) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.remotes, nodeID)
}
func loadNode(id int) (*model.Node, error) {
db := database.GetDB()
n := &model.Node{}
if err := db.Model(model.Node{}).Where("id = ?", id).First(n).Error; err != nil {
return nil, err
}
return n, nil
}
var (
managerMu sync.RWMutex
manager *Manager
)
func SetManager(m *Manager) {
managerMu.Lock()
defer managerMu.Unlock()
manager = m
}
func GetManager() *Manager {
managerMu.RLock()
defer managerMu.RUnlock()
return manager
}