From 7d23a2c15b64b99e9337f27e0347d7a02260e155 Mon Sep 17 00:00:00 2001 From: MHSanaei Date: Mon, 22 Jun 2026 02:48:58 +0200 Subject: [PATCH] perf: prevent cron job overlap, auto-set GOMEMLIMIT, fix tgbot userStates race cron: SkipIfStillRunning stops a slow 5s/10s job from overlapping itself and racing the shared xrayAPI (grpc conn leak) and the StatsLastValues map (fatal concurrent map write). memlimit: auto-detect a Go soft memory limit from XUI_MEMORY_LIMIT, the cgroup limit, or system RAM (about 90 percent); opt-in pprof via XUI_PPROF. tgbot: userStates now goes through a mutex-guarded store with TTL pruning (was raced by worker-pool and delayed-delete goroutines). check_client_ip: prefilter inbounds by settings LIKE limitIp instead of loading and JSON-parsing all of them every scan. minor: prune StatsLastValues, RateLimiter.lastSent, reportedRemoteTagConflict. docker-compose: document the memory knobs. --- docker-compose.yml | 8 +++ internal/eventbus/filter.go | 29 ++++++-- internal/util/sys/memlimit.go | 78 ++++++++++++++++++++++ internal/web/job/check_client_ip_job.go | 2 +- internal/web/service/inbound_node.go | 1 + internal/web/service/tgbot/tgbot.go | 62 ++++++++++++++++- internal/web/service/tgbot/tgbot_router.go | 33 ++++----- internal/web/service/tgbot/tgbot_send.go | 2 +- internal/web/web.go | 16 ++++- internal/xray/api.go | 13 ++++ main.go | 17 +++++ 11 files changed, 234 insertions(+), 27 deletions(-) create mode 100644 internal/util/sys/memlimit.go diff --git a/docker-compose.yml b/docker-compose.yml index 168f8be09..1cfd65212 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,6 +5,9 @@ services: dockerfile: ./Dockerfile container_name: 3xui_app # hostname: yourhostname <- optional + # Optional hard memory cap. When set, the panel auto-derives its Go soft + # limit (GOMEMLIMIT, ~90%) from this so it GCs before the OOM killer fires. + # mem_limit: 512m # The bundled Fail2ban (XUI_ENABLE_FAIL2BAN below) enforces the IP limit # with iptables, which needs NET_ADMIN. Without these caps a ban is logged # and shown in fail2ban status but never actually applied. NET_RAW covers @@ -18,6 +21,11 @@ services: environment: XRAY_VMESS_AEAD_FORCED: "false" XUI_ENABLE_FAIL2BAN: "true" + # Go memory soft limit. If neither is set, the panel auto-detects the + # cgroup/host limit and targets ~90%. Pin it explicitly with one of: + # XUI_MEMORY_LIMIT: "400" # in MiB + # GOMEMLIMIT: "400MiB" # Go syntax, takes precedence + # XUI_PPROF: "true" # expose pprof on 127.0.0.1:6060 for profiling # XUI_INIT_WEB_BASE_PATH: "/" # XUI_PORT: "8080" # To use PostgreSQL instead of the default SQLite, run: diff --git a/internal/eventbus/filter.go b/internal/eventbus/filter.go index 71ad253dc..b418d6427 100644 --- a/internal/eventbus/filter.go +++ b/internal/eventbus/filter.go @@ -7,9 +7,10 @@ import ( // RateLimiter prevents notification spam from flapping events. type RateLimiter struct { - mu sync.Mutex - lastSent map[string]time.Time - cooldown time.Duration + mu sync.Mutex + lastSent map[string]time.Time + cooldown time.Duration + lastPrune time.Time } // NewRateLimiter creates a rate limiter with the given cooldown period. @@ -23,11 +24,29 @@ func NewRateLimiter(cooldown time.Duration) *RateLimiter { // Allow returns true if the event should be sent (cooldown has elapsed). func (r *RateLimiter) Allow(eventType EventType, source string) bool { key := string(eventType) + ":" + source + now := time.Now() r.mu.Lock() defer r.mu.Unlock() - if time.Since(r.lastSent[key]) < r.cooldown { + r.pruneLocked(now) + if now.Sub(r.lastSent[key]) < r.cooldown { return false } - r.lastSent[key] = time.Now() + r.lastSent[key] = now return true } + +// pruneLocked drops keys whose cooldown has elapsed. Such an entry no longer +// affects Allow's result, so removing it is safe and keeps the map from +// retaining one entry per (eventType, source) ever seen. Throttled to once per +// cooldown so a busy bus doesn't sweep the whole map on every event. +func (r *RateLimiter) pruneLocked(now time.Time) { + if now.Sub(r.lastPrune) < r.cooldown { + return + } + r.lastPrune = now + for k, v := range r.lastSent { + if now.Sub(v) >= r.cooldown { + delete(r.lastSent, k) + } + } +} diff --git a/internal/util/sys/memlimit.go b/internal/util/sys/memlimit.go new file mode 100644 index 000000000..892172eb1 --- /dev/null +++ b/internal/util/sys/memlimit.go @@ -0,0 +1,78 @@ +package sys + +import ( + "os" + "runtime/debug" + "strconv" + "strings" + + "github.com/shirou/gopsutil/v4/mem" +) + +// memLimitHeadroomPercent is the share of detected memory used for the soft +// limit, leaving room for non-heap (stacks, mmap, the xray child) before the OS +// OOM-kills the process. +const memLimitHeadroomPercent = 90 + +// ApplyMemoryLimit sets a Go soft memory limit (the runtime's GOMEMLIMIT) when +// one is not already configured, so a long-running panel in a memory-capped +// container or VPS triggers GC as it approaches the cap instead of growing RSS +// until the OS OOM-kills it. Precedence: an explicit GOMEMLIMIT env is left to +// the runtime; otherwise XUI_MEMORY_LIMIT (in MiB) wins; otherwise the limit is +// derived from the cgroup memory limit, falling back to total system RAM. +// Returns the limit applied in bytes (0 when none) and a short source label. +func ApplyMemoryLimit() (int64, string) { + if strings.TrimSpace(os.Getenv("GOMEMLIMIT")) != "" { + return 0, "GOMEMLIMIT env (handled by the Go runtime)" + } + + if v := strings.TrimSpace(os.Getenv("XUI_MEMORY_LIMIT")); v != "" { + if mb, err := strconv.ParseInt(v, 10, 64); err == nil && mb > 0 { + limit := mb << 20 + debug.SetMemoryLimit(limit) + return limit, "XUI_MEMORY_LIMIT=" + v + "MiB" + } + } + + total, source := detectAvailableMemory() + if total <= 0 { + return 0, "undetectable; left at Go default" + } + limit := total / 100 * memLimitHeadroomPercent + debug.SetMemoryLimit(limit) + return limit, source +} + +func detectAvailableMemory() (int64, string) { + if v, ok := cgroupMemoryLimit(); ok { + return v, "cgroup limit" + } + if vm, err := mem.VirtualMemory(); err == nil && vm.Total > 0 { + return int64(vm.Total), "system RAM" + } + return 0, "" +} + +// cgroupMemoryLimit reads the container memory limit from cgroup v2 then v1. +// A "max" value or the v1 unlimited sentinel (~8 EiB) means no limit at this +// level, so it reports not-found and the caller falls back to system RAM. The +// files are absent off Linux, which also yields not-found. +func cgroupMemoryLimit() (int64, bool) { + const unlimited = int64(1) << 62 + + if b, err := os.ReadFile("/sys/fs/cgroup/memory.max"); err == nil { + if s := strings.TrimSpace(string(b)); s != "" && s != "max" { + if v, err := strconv.ParseInt(s, 10, 64); err == nil && v > 0 && v < unlimited { + return v, true + } + } + } + + if b, err := os.ReadFile("/sys/fs/cgroup/memory/memory.limit_in_bytes"); err == nil { + if v, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64); err == nil && v > 0 && v < unlimited { + return v, true + } + } + + return 0, false +} diff --git a/internal/web/job/check_client_ip_job.go b/internal/web/job/check_client_ip_job.go index 2ffa51850..854dc783e 100644 --- a/internal/web/job/check_client_ip_job.go +++ b/internal/web/job/check_client_ip_job.go @@ -157,7 +157,7 @@ func (j *CheckClientIpJob) hasLimitIp() bool { db := database.GetDB() var inbounds []*model.Inbound - err := db.Model(model.Inbound{}).Find(&inbounds).Error + err := db.Model(model.Inbound{}).Where("settings LIKE ?", "%limitIp%").Find(&inbounds).Error if err != nil { return false } diff --git a/internal/web/service/inbound_node.go b/internal/web/service/inbound_node.go index 0836203ad..34a437d2b 100644 --- a/internal/web/service/inbound_node.go +++ b/internal/web/service/inbound_node.go @@ -407,6 +407,7 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi } continue } + reportedRemoteTagConflict.Delete(fmt.Sprintf("%d:%s", nodeID, snapIb.Tag)) newIb := model.Inbound{ UserId: defaultUserId, NodeID: &nodeID, diff --git a/internal/web/service/tgbot/tgbot.go b/internal/web/service/tgbot/tgbot.go index c6b45c17d..d76f78c03 100644 --- a/internal/web/service/tgbot/tgbot.go +++ b/internal/web/service/tgbot/tgbot.go @@ -83,7 +83,65 @@ var ( client_Reset int ) -var userStates = make(map[int64]string) +// userStateStore guards the per-chat conversation states. The Telegram command +// and callback handlers run on a worker-pool goroutine while the message handler +// runs on the dispatch goroutine, so a bare map would be a concurrent-map-write +// crash. It also expires abandoned conversations so a user who starts a flow and +// goes silent doesn't leave an entry forever. +type userStateStore struct { + mu sync.Mutex + states map[int64]userStateEntry + lastPrune time.Time +} + +type userStateEntry struct { + state string + at time.Time +} + +var userStateMgr = &userStateStore{states: make(map[int64]userStateEntry)} + +func (s *userStateStore) set(chatID int64, state string) { + s.mu.Lock() + s.states[chatID] = userStateEntry{state: state, at: time.Now()} + s.mu.Unlock() +} + +func (s *userStateStore) get(chatID int64) (string, bool) { + s.mu.Lock() + defer s.mu.Unlock() + e, ok := s.states[chatID] + return e.state, ok +} + +func (s *userStateStore) clear(chatID int64) { + s.mu.Lock() + delete(s.states, chatID) + s.mu.Unlock() +} + +func (s *userStateStore) reset() { + s.mu.Lock() + s.states = make(map[int64]userStateEntry) + s.mu.Unlock() +} + +// maybePrune drops conversations older than maxAge, at most once per maxAge so a +// busy bot doesn't sweep the whole map on every message. +func (s *userStateStore) maybePrune(maxAge time.Duration) { + s.mu.Lock() + defer s.mu.Unlock() + now := time.Now() + if now.Sub(s.lastPrune) < maxAge { + return + } + s.lastPrune = now + for id, e := range s.states { + if now.Sub(e.at) > maxAge { + delete(s.states, id) + } + } +} // LoginStatus represents the result of a login attempt. type LoginStatus byte @@ -411,6 +469,8 @@ func StopBot() { isRunning = false tgBotMutex.Unlock() + userStateMgr.reset() + if handler != nil { handler.Stop() } diff --git a/internal/web/service/tgbot/tgbot_router.go b/internal/web/service/tgbot/tgbot_router.go index 4d6581525..75f89be6f 100644 --- a/internal/web/service/tgbot/tgbot_router.go +++ b/internal/web/service/tgbot/tgbot_router.go @@ -47,7 +47,7 @@ func (t *Tgbot) OnReceive() { tgBotMutex.Unlock() h.HandleMessage(func(ctx *th.Context, message telego.Message) error { - delete(userStates, message.Chat.ID) + userStateMgr.clear(message.Chat.ID) t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.keyboardClosed"), tu.ReplyKeyboardRemove()) return nil }, th.TextEqual(t.I18nBot("tgbot.buttons.closeKeyboard"))) @@ -62,7 +62,7 @@ func (t *Tgbot) OnReceive() { messageWorkerPool <- struct{}{} // Acquire worker defer func() { <-messageWorkerPool }() // Release worker - delete(userStates, message.Chat.ID) + userStateMgr.clear(message.Chat.ID) t.answerCommand(&message, message.Chat.ID, checkAdmin(message.From.ID)) }() return nil @@ -74,25 +74,26 @@ func (t *Tgbot) OnReceive() { messageWorkerPool <- struct{}{} // Acquire worker defer func() { <-messageWorkerPool }() // Release worker - delete(userStates, query.Message.GetChat().ID) + userStateMgr.clear(query.Message.GetChat().ID) t.answerCallback(&query, checkAdmin(query.From.ID)) }() return nil }, th.AnyCallbackQueryWithMessage()) h.HandleMessage(func(ctx *th.Context, message telego.Message) error { - if userState, exists := userStates[message.Chat.ID]; exists { + userStateMgr.maybePrune(time.Hour) + if userState, exists := userStateMgr.get(message.Chat.ID); exists { switch userState { case "awaiting_email": if client_Email == strings.TrimSpace(message.Text) { t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) + userStateMgr.clear(message.Chat.ID) return nil } client_Email = strings.TrimSpace(message.Text) if t.isSingleWord(client_Email) { - userStates[message.Chat.ID] = "awaiting_email" + userStateMgr.set(message.Chat.ID, "awaiting_email") cancel_btn_markup := tu.InlineKeyboard( tu.InlineKeyboardRow( @@ -103,26 +104,26 @@ func (t *Tgbot) OnReceive() { t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup) } else { t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_email"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) + userStateMgr.clear(message.Chat.ID) t.addClient(message.Chat.ID, t.BuildClientDraftMessage()) } case "awaiting_comment": if client_Comment == strings.TrimSpace(message.Text) { t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) + userStateMgr.clear(message.Chat.ID) return nil } client_Comment = strings.TrimSpace(message.Text) t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_comment"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) + userStateMgr.clear(message.Chat.ID) t.addClient(message.Chat.ID, t.BuildClientDraftMessage()) case "awaiting_tg_id": input := strings.TrimSpace(message.Text) if input == "" || input == "-" || strings.EqualFold(input, "none") { client_TgID = "" t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) + userStateMgr.clear(message.Chat.ID) t.addClient(message.Chat.ID, t.BuildClientDraftMessage()) return nil } @@ -137,7 +138,7 @@ func (t *Tgbot) OnReceive() { } client_TgID = input t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.userSaved"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, message.Chat.ID) + userStateMgr.clear(message.Chat.ID) t.addClient(message.Chat.ID, t.BuildClientDraftMessage()) } @@ -1236,7 +1237,7 @@ func (t *Tgbot) answerCallback(callbackQuery *telego.CallbackQuery, isAdmin bool t.SendMsgToTgbot(chatId, t.I18nBot("tgbot.answers.chooseInbound"), inbounds) case "add_client_ch_default_email": t.deleteMessageTgBot(chatId, callbackQuery.Message.GetMessageID()) - userStates[chatId] = "awaiting_email" + userStateMgr.set(chatId, "awaiting_email") cancel_btn_markup := tu.InlineKeyboard( tu.InlineKeyboardRow( tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"), @@ -1246,7 +1247,7 @@ func (t *Tgbot) answerCallback(callbackQuery *telego.CallbackQuery, isAdmin bool t.SendMsgToTgbot(chatId, prompt_message, cancel_btn_markup) case "add_client_ch_default_comment": t.deleteMessageTgBot(chatId, callbackQuery.Message.GetMessageID()) - userStates[chatId] = "awaiting_comment" + userStateMgr.set(chatId, "awaiting_comment") cancel_btn_markup := tu.InlineKeyboard( tu.InlineKeyboardRow( tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"), @@ -1256,7 +1257,7 @@ func (t *Tgbot) answerCallback(callbackQuery *telego.CallbackQuery, isAdmin bool t.SendMsgToTgbot(chatId, prompt_message, cancel_btn_markup) case "add_client_ch_default_tg_id": t.deleteMessageTgBot(chatId, callbackQuery.Message.GetMessageID()) - userStates[chatId] = "awaiting_tg_id" + userStateMgr.set(chatId, "awaiting_tg_id") cancel_btn_markup := tu.InlineKeyboard( tu.InlineKeyboardRow( tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"), @@ -1357,10 +1358,10 @@ func (t *Tgbot) answerCallback(callbackQuery *telego.CallbackQuery, isAdmin bool case "add_client_default_info": t.deleteMessageTgBot(chatId, callbackQuery.Message.GetMessageID()) t.SendMsgToTgbotDeleteAfter(chatId, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove()) - delete(userStates, chatId) + userStateMgr.clear(chatId) t.addClient(chatId, t.BuildClientDraftMessage()) case "add_client_cancel": - delete(userStates, chatId) + userStateMgr.clear(chatId) receiver_inbound_ID = 0 receiver_inbound_IDs = nil t.deleteMessageTgBot(chatId, callbackQuery.Message.GetMessageID()) diff --git a/internal/web/service/tgbot/tgbot_send.go b/internal/web/service/tgbot/tgbot_send.go index c38a4a22c..4578619d0 100644 --- a/internal/web/service/tgbot/tgbot_send.go +++ b/internal/web/service/tgbot/tgbot_send.go @@ -232,7 +232,7 @@ func (t *Tgbot) SendMsgToTgbotDeleteAfter(chatId int64, msg string, delayInSecon go func() { time.Sleep(time.Duration(delayInSeconds) * time.Second) // Wait for the specified delay t.deleteMessageTgBot(chatId, sentMsg.MessageID) // Delete the message - delete(userStates, chatId) + userStateMgr.clear(chatId) }() } diff --git a/internal/web/web.go b/internal/web/web.go index ec776847e..e4f754431 100644 --- a/internal/web/web.go +++ b/internal/web/web.go @@ -476,9 +476,19 @@ func (s *Server) start(restartXray bool, startTgBot bool) (err error) { } service.StartTrafficWriter() - // cron.Recover wraps every job so a panic is logged and the scheduler keeps - // running, instead of the panic taking down the whole panel process. - s.cron = cron.New(cron.WithLocation(loc), cron.WithSeconds(), cron.WithChain(cron.Recover(cron.PrintfLogger(cronPanicLogger{})))) + // SkipIfStillRunning stops a slow job (e.g. the 5s traffic poll on a large + // install) from overlapping itself: two concurrent runs of the same job race + // the shared xrayAPI — leaking a grpc connection — and the StatsLastValues + // map, whose concurrent write is a fatal runtime throw cron.Recover can't + // catch. cron.Recover then logs any panic and keeps the scheduler alive. + s.cron = cron.New( + cron.WithLocation(loc), + cron.WithSeconds(), + cron.WithChain( + cron.SkipIfStillRunning(cron.DiscardLogger), + cron.Recover(cron.PrintfLogger(cronPanicLogger{})), + ), + ) s.cron.Start() // Wire the inbound-runtime manager once so InboundService can route diff --git a/internal/xray/api.go b/internal/xray/api.go index 8bb8d9a2e..96a3dd1fc 100644 --- a/internal/xray/api.go +++ b/internal/xray/api.go @@ -594,6 +594,19 @@ func (x *XrayAPI) GetTraffic() ([]*Traffic, []*ClientTraffic, error) { processClientTraffic(matches, value, emailTrafficMap) } } + + // Drop delta baselines for stats that no longer exist (deleted inbounds or + // clients), which otherwise linger until the next Xray restart. Only rebuild + // when the map has drifted past 2x the live set, so the steady-state hot path + // stays allocation-free. + if n := len(resp.GetStat()); n > 0 && len(x.StatsLastValues) > 2*n { + pruned := make(map[string]int64, n) + for _, stat := range resp.GetStat() { + pruned[stat.Name] = x.StatsLastValues[stat.Name] + } + x.StatsLastValues = pruned + } + return mapToSlice(tagTrafficMap), mapToSlice(emailTrafficMap), nil } diff --git a/main.go b/main.go index 09ea0d9f1..a2dca3136 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,8 @@ import ( "flag" "fmt" "log" + "net/http" + _ "net/http/pprof" "os" "os/signal" "syscall" @@ -49,6 +51,21 @@ func runWebServer() { godotenv.Load() + if limit, source := sys.ApplyMemoryLimit(); limit > 0 { + logger.Infof("Go memory soft limit set to %d MiB (%s)", limit>>20, source) + } else { + logger.Info("Go memory soft limit not enforced: ", source) + } + + if os.Getenv("XUI_PPROF") == "true" { + go func() { + logger.Info("pprof profiling server listening on 127.0.0.1:6060") + if err := http.ListenAndServe("127.0.0.1:6060", nil); err != nil { + logger.Warning("pprof server stopped: ", err) + } + }() + } + err := database.InitDB(config.GetDBPath()) if err != nil { log.Fatalf("Error initializing database: %v", err)