refactor(job): drop access log from IP limiting, wipe it daily instead

The IP-limit job tracks per-client IPs via the core's online-stats API; the access-log parser only ran as a fallback for cores predating that API (which the panel never bundles). Remove the parser, the availability check, and the hourly rotation that truncated a log the job no longer reads.

Move the user-enabled access-log wipe to the daily clear-logs job, guarded so a disabled ('none') or missing log is left alone. Retire the now-unwritten 3xipl-ap persistent-log machinery.

Also resolve IP-limit clients via the exact clients/client_inbounds relation instead of a fragile settings LIKE '%email%' substring, keeping the JSON scan only as a fallback (carried from #5496).
This commit is contained in:
MHSanaei
2026-06-23 11:42:00 +02:00
parent a2961fd046
commit 42cd351e4e
5 changed files with 200 additions and 222 deletions
+48 -143
View File
@@ -1,14 +1,11 @@
package job
import (
"bufio"
"encoding/json"
"errors"
"io"
"log"
"os"
"os/exec"
"regexp"
"runtime"
"sort"
"time"
@@ -30,10 +27,9 @@ type IPWithTimestamp struct {
// CheckClientIpJob monitors client IP addresses and manages IP blocking based
// on configured limits. The per-client IPs come from the core's online-stats
// API when the running core supports it (no access log needed), falling back
// to access-log parsing on older cores.
// API; no access log is involved. On a core too old to expose that API the job
// simply skips the run (the bundled core always supports it).
type CheckClientIpJob struct {
lastClear int64
disAllowedIps []string
xrayService service.XrayService
}
@@ -51,41 +47,24 @@ func NewCheckClientIpJob() *CheckClientIpJob {
}
func (j *CheckClientIpJob) Run() {
if j.lastClear == 0 {
j.lastClear = time.Now().Unix()
observed, apiMode := j.collectFromOnlineAPI()
if !apiMode {
// xray is down or predates the online-stats API. There is no access-log
// fallback anymore, so there is nothing to do this run.
logger.Debug("[LimitIP] online-stats API unavailable this run; skipping")
return
}
fail2BanEnabled := isFail2BanEnabled()
hasLimit := fail2BanEnabled && j.hasLimitIp()
if !isFail2BanEnabled() {
return
}
hasLimit := j.hasLimitIp()
f2bInstalled := false
if hasLimit {
f2bInstalled = j.checkFail2BanInstalled()
}
if observed, apiMode := j.collectFromOnlineAPI(); apiMode {
if fail2BanEnabled {
j.processObserved(observed, j.resolveEnforce(hasLimit, f2bInstalled), true)
}
// The core tracks online IPs itself, so no access log is needed in this
// mode; still rotate a user-configured access log hourly so it doesn't
// grow unboundedly. The enforcement-triggered rotation is skipped —
// nothing here reads the log.
if j.checkAccessLogAvailable(false) && time.Now().Unix()-j.lastClear > 3600 {
j.clearAccessLog()
}
return
}
shouldClearAccessLog := false
isAccessLogAvailable := j.checkAccessLogAvailable(hasLimit)
if fail2BanEnabled && isAccessLogAvailable {
shouldClearAccessLog = j.processLogFile(j.resolveEnforce(hasLimit, f2bInstalled))
}
if shouldClearAccessLog || (isAccessLogAvailable && time.Now().Unix()-j.lastClear > 3600) {
j.clearAccessLog()
}
j.processObserved(observed, j.resolveEnforce(hasLimit, f2bInstalled), true)
}
// resolveEnforce decides whether limits can actually be enforced this run.
@@ -102,7 +81,7 @@ func (j *CheckClientIpJob) resolveEnforce(hasLimit, f2bInstalled bool) bool {
// collectFromOnlineAPI builds per-email IP observations (email -> ip ->
// last-seen unix seconds) from the core's online-stats API. ok=false means the
// API is unavailable — xray not running, an older core, or a transient gRPC
// failure — and the caller must fall back to access-log parsing.
// failure — and the caller skips the run (there is no access-log fallback).
func (j *CheckClientIpJob) collectFromOnlineAPI() (map[string]map[string]int64, bool) {
onlineUsers, ok, err := j.xrayService.GetOnlineUsers()
if err != nil {
@@ -133,27 +112,6 @@ func (j *CheckClientIpJob) collectFromOnlineAPI() (map[string]map[string]int64,
return observed, true
}
func (j *CheckClientIpJob) clearAccessLog() {
logAccessP, err := os.OpenFile(xray.GetAccessPersistentLogPath(), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
j.checkError(err)
defer logAccessP.Close()
accessLogPath, err := xray.GetAccessLogPath()
j.checkError(err)
file, err := os.Open(accessLogPath)
j.checkError(err)
defer file.Close()
_, err = io.Copy(logAccessP, file)
j.checkError(err)
err = os.Truncate(accessLogPath, 0)
j.checkError(err)
j.lastClear = time.Now().Unix()
}
func (j *CheckClientIpJob) hasLimitIp() bool {
db := database.GetDB()
var inbounds []*model.Inbound
@@ -183,74 +141,11 @@ func (j *CheckClientIpJob) hasLimitIp() bool {
return false
}
func (j *CheckClientIpJob) processLogFile(enforce bool) bool {
ipRegex := regexp.MustCompile(`from (?:tcp:|udp:)?\[?([0-9a-fA-F\.:]+)\]?:\d+ accepted`)
emailRegex := regexp.MustCompile(`email: (.+)$`)
timestampRegex := regexp.MustCompile(`^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})`)
accessLogPath, _ := xray.GetAccessLogPath()
file, _ := os.Open(accessLogPath)
defer file.Close()
// Track IPs with their last seen timestamp
inboundClientIps := make(map[string]map[string]int64, 100)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
ipMatches := ipRegex.FindStringSubmatch(line)
if len(ipMatches) < 2 {
continue
}
ip := ipMatches[1]
if ip == "127.0.0.1" || ip == "::1" {
continue
}
emailMatches := emailRegex.FindStringSubmatch(line)
if len(emailMatches) < 2 {
continue
}
email := emailMatches[1]
// Extract timestamp from log line
var timestamp int64
timestampMatches := timestampRegex.FindStringSubmatch(line)
if len(timestampMatches) >= 2 {
t, err := time.ParseInLocation("2006/01/02 15:04:05", timestampMatches[1], time.Local)
if err == nil {
timestamp = t.Unix()
} else {
timestamp = time.Now().Unix()
}
} else {
timestamp = time.Now().Unix()
}
if _, exists := inboundClientIps[email]; !exists {
inboundClientIps[email] = make(map[string]int64)
}
// Update timestamp - keep the latest
if existingTime, ok := inboundClientIps[email][ip]; !ok || timestamp > existingTime {
inboundClientIps[email][ip] = timestamp
}
}
if err := scanner.Err(); err != nil {
j.checkError(err)
}
return j.processObserved(inboundClientIps, enforce, false)
}
// processObserved runs collection + enforcement for one scan's observations
// (email -> ip -> last-seen unix seconds). observedAreLive marks the
// observations as live connections (online-stats API) rather than recent log
// lines: live entries bypass the stale cutoff, since a connection that opened
// hours ago is still live even though its timestamp is old.
// observations as live connections, which bypass the stale cutoff: a connection
// that opened hours ago is still live even though its timestamp is old. The
// online-stats API always reports live connections, so the job passes true.
func (j *CheckClientIpJob) processObserved(observed map[string]map[string]int64, enforce, observedAreLive bool) bool {
shouldCleanLog := false
now := time.Now().Unix()
@@ -391,22 +286,6 @@ func isFail2BanEnabled() bool {
return !ok || value == "true"
}
func (j *CheckClientIpJob) checkAccessLogAvailable(iplimitActive bool) bool {
accessLogPath, err := xray.GetAccessLogPath()
if err != nil {
return false
}
if accessLogPath == "none" || accessLogPath == "" {
if iplimitActive {
logger.Warning("[LimitIP] Access log path is not set, Please configure the access log path in Xray configs.")
}
return false
}
return true
}
func (j *CheckClientIpJob) checkError(e error) {
if e != nil {
logger.Warning("client ip job err:", e)
@@ -682,14 +561,40 @@ func getAPIPortFromConfigData(configData []byte) (int, error) {
return 0, errors.New("api inbound port not found")
}
// getInboundByEmail resolves the inbound that owns a client email. It prefers
// the exact clients/client_inbounds relation; a substring "settings LIKE
// %email%" can match the wrong inbound (an email that is a substring of another,
// or text that merely appears elsewhere in the settings JSON). The LIKE + JSON
// scan stays only as a fallback for clients not yet present in the relation, so
// nothing regresses when the join finds no row.
func (j *CheckClientIpJob) getInboundByEmail(clientEmail string) (*model.Inbound, error) {
db := database.GetDB()
inbound := &model.Inbound{}
err := db.Model(&model.Inbound{}).Where("settings LIKE ?", "%"+clientEmail+"%").First(inbound).Error
if err != nil {
return nil, err
err := db.Model(&model.Inbound{}).
Joins("JOIN client_inbounds ON client_inbounds.inbound_id = inbounds.id").
Joins("JOIN clients ON clients.id = client_inbounds.client_id").
Where("clients.email = ?", clientEmail).
First(inbound).Error
if err == nil {
return inbound, nil
}
return inbound, nil
var candidates []model.Inbound
if listErr := db.Model(&model.Inbound{}).Where("settings LIKE ?", "%"+clientEmail+"%").Find(&candidates).Error; listErr != nil {
return nil, listErr
}
for i := range candidates {
settings := map[string][]model.Client{}
if jsonErr := json.Unmarshal([]byte(candidates[i].Settings), &settings); jsonErr != nil {
continue
}
for _, client := range settings["clients"] {
if client.Email == clientEmail {
return &candidates[i], nil
}
}
}
return nil, err
}
@@ -59,6 +59,11 @@ func setupIntegrationDB(t *testing.T) {
// seed an inbound whose settings json has a single client with the
// given email and ip limit.
func seedInboundWithClient(t *testing.T, tag, email string, limitIp int) {
t.Helper()
seedInboundOnlyWithClient(t, tag, email, limitIp)
}
func seedInboundOnlyWithClient(t *testing.T, tag, email string, limitIp int) *model.Inbound {
t.Helper()
settings := map[string]any{
"clients": []map[string]any{
@@ -83,6 +88,21 @@ func seedInboundWithClient(t *testing.T, tag, email string, limitIp int) {
if err := database.GetDB().Create(inbound).Error; err != nil {
t.Fatalf("seed inbound: %v", err)
}
return inbound
}
func seedLinkedInboundWithClient(t *testing.T, tag, email string, limitIp int) *model.Inbound {
t.Helper()
inbound := seedInboundOnlyWithClient(t, tag, email, limitIp)
client := &model.ClientRecord{Email: email}
if err := database.GetDB().Create(client).Error; err != nil {
t.Fatalf("seed client record: %v", err)
}
link := &model.ClientInbound{ClientId: client.Id, InboundId: inbound.Id}
if err := database.GetDB().Create(link).Error; err != nil {
t.Fatalf("seed client inbound link: %v", err)
}
return inbound
}
// seed an InboundClientIps row with the given blob.
@@ -128,46 +148,32 @@ func ipSet(entries []IPWithTimestamp) map[string]int64 {
return out
}
func TestRun_DisabledFail2BanSkipsProbeAndBanLog(t *testing.T) {
// With the access-log fallback removed, an unavailable online-stats API (xray
// down, as in this unit test) must make Run a clean no-op: no fail2ban probe, no
// ban log, and no inbound_client_ips rows — never a crash or partial work.
func TestRun_NoOpWhenOnlineApiUnavailable(t *testing.T) {
setupIntegrationDB(t)
t.Setenv("XUI_ENABLE_FAIL2BAN", "false")
t.Setenv("XUI_ENABLE_FAIL2BAN", "true")
marker := fakeFail2BanClient(t)
const email = "disabled-fail2ban"
seedInboundWithClient(t, "inbound-disabled-fail2ban", email, 1)
const email = "no-api-user"
seedInboundWithClient(t, "inbound-no-api", email, 1)
binDir := t.TempDir()
accessLog := filepath.Join(t.TempDir(), "access.log")
t.Setenv("XUI_BIN_FOLDER", binDir)
configData, err := json.Marshal(map[string]any{
"log": map[string]any{"access": accessLog},
})
if err != nil {
t.Fatalf("marshal xray config: %v", err)
}
if err := os.WriteFile(filepath.Join(binDir, "config.json"), configData, 0644); err != nil {
t.Fatalf("write xray config: %v", err)
}
if err := os.WriteFile(accessLog, []byte("2026/05/26 12:00:00 from tcp:203.0.113.10:443 accepted tcp:example.com:443 email: disabled-fail2ban\n"), 0644); err != nil {
t.Fatalf("write access log: %v", err)
}
j := NewCheckClientIpJob()
j.Run()
NewCheckClientIpJob().Run()
if _, err := os.Stat(marker); !os.IsNotExist(err) {
t.Fatalf("fail2ban-client should not have been executed, stat error: %v", err)
t.Fatalf("fail2ban-client should not have been probed when the online API is unavailable, stat error: %v", err)
}
if info, err := os.Stat(readIpLimitLogPath()); err == nil && info.Size() > 0 {
body, _ := os.ReadFile(readIpLimitLogPath())
t.Fatalf("3xipl.log should be empty when fail2ban is disabled, got:\n%s", body)
t.Fatalf("3xipl.log should be empty when Run no-ops, got:\n%s", body)
}
var count int64
if err := database.GetDB().Model(&model.InboundClientIps{}).Where("client_email = ?", email).Count(&count).Error; err != nil {
t.Fatalf("count InboundClientIps: %v", err)
}
if count != 0 {
t.Fatalf("disabled fail2ban should not persist IP-limit rows, got %d", count)
t.Fatalf("no IP-limit rows should be persisted when Run no-ops, got %d", count)
}
}
@@ -280,47 +286,24 @@ func TestUpdateInboundClientIps_ExcessLiveIpIsStillBanned(t *testing.T) {
}
}
// writeXrayAccessLog points bin/config.json at a fresh access.log holding a
// single default-format Xray line (`from tcp:<ip>:<port> accepted … email: <e>`)
// for the given client, so Run() has something to scrape.
func writeXrayAccessLog(t *testing.T, email, ip string) {
t.Helper()
binDir := t.TempDir()
accessLog := filepath.Join(t.TempDir(), "access.log")
t.Setenv("XUI_BIN_FOLDER", binDir)
configData, err := json.Marshal(map[string]any{
"log": map[string]any{"access": accessLog},
})
if err != nil {
t.Fatalf("marshal xray config: %v", err)
}
if err := os.WriteFile(filepath.Join(binDir, "config.json"), configData, 0644); err != nil {
t.Fatalf("write xray config: %v", err)
}
line := "2026/06/02 13:35:53 from tcp:" + ip + ":2387 accepted tcp:example.com:443 email: " + email + "\n"
if err := os.WriteFile(accessLog, []byte(line), 0644); err != nil {
t.Fatalf("write access log: %v", err)
}
}
// #4800: the per-client IP log must populate even when no client has an IP
// limit. Before the fix, Run() only scraped the access log when an IP limit
// was active, so a limit-free install always showed an empty IP log despite
// valid access-log lines. No ban may be written since there's no limit.
func TestRun_CollectsIpsWithoutLimit(t *testing.T) {
// #4800: per-client IP tracking must populate even when no client has an IP
// limit. processObserved records observed IPs for the panel regardless of any
// limit; only enforcement is gated, so a limit-free install still shows IPs. No
// ban may be written since there's no limit.
func TestProcessObserved_CollectsIpsWithoutLimit(t *testing.T) {
setupIntegrationDB(t)
t.Setenv("XUI_ENABLE_FAIL2BAN", "true")
fakeFail2BanClient(t)
const email = "no-limit-user"
seedInboundWithClient(t, "inbound-no-limit", email, 0) // limitIp = 0
writeXrayAccessLog(t, email, "203.0.113.10")
NewCheckClientIpJob().Run()
observed := map[string]map[string]int64{
email: {"203.0.113.10": time.Now().Unix()},
}
NewCheckClientIpJob().processObserved(observed, true, true)
ips := readClientIps(t, email)
if len(ips) != 1 || ips[0].IP != "203.0.113.10" {
t.Fatalf("expected the access-log IP to be collected without a limit, got %v", ips)
t.Fatalf("expected the observed IP to be collected without a limit, got %v", ips)
}
if info, err := os.Stat(readIpLimitLogPath()); err == nil && info.Size() > 0 {
@@ -329,22 +312,21 @@ func TestRun_CollectsIpsWithoutLimit(t *testing.T) {
}
}
// #4963: a stale access-log entry for a renamed/deleted client (its email no
// longer maps to any inbound) must not create or resurrect an
// inbound_client_ips row, and must drop any orphan left behind — instead of
// spamming "failed to fetch inbound settings" every run.
func TestRun_StaleAccessLogEmailIsSkippedAndOrphanDropped(t *testing.T) {
// #4963: an observed IP for a renamed/deleted client (its email no longer maps
// to any inbound) must not create or resurrect an inbound_client_ips row, and
// must drop any orphan left behind — instead of erroring every run.
func TestProcessObserved_StaleEmailIsSkippedAndOrphanDropped(t *testing.T) {
setupIntegrationDB(t)
t.Setenv("XUI_ENABLE_FAIL2BAN", "true")
fakeFail2BanClient(t)
const staleEmail = "renamed-away"
// No inbound references staleEmail. Pre-seed an orphan tracking row to
// confirm the job removes it rather than leaving it to error forever.
seedClientIps(t, staleEmail, []IPWithTimestamp{{IP: "203.0.113.5", Timestamp: time.Now().Unix()}})
writeXrayAccessLog(t, staleEmail, "203.0.113.5")
NewCheckClientIpJob().Run()
observed := map[string]map[string]int64{
staleEmail: {"203.0.113.5": time.Now().Unix()},
}
NewCheckClientIpJob().processObserved(observed, true, true)
var count int64
if err := database.GetDB().Model(&model.InboundClientIps{}).Where("client_email = ?", staleEmail).Count(&count).Error; err != nil {
@@ -375,3 +357,33 @@ func contains(haystack, needle string) bool {
}
return false
}
// the exact clients/client_inbounds relation must win over the substring scan,
// so a client is resolved to its own inbound even when another inbound holds a
// superstring email.
func TestGetInboundByEmailUsesClientInboundLink(t *testing.T) {
setupIntegrationDB(t)
want := seedLinkedInboundWithClient(t, "linked-inbound", "exact@example.com", 1)
seedInboundOnlyWithClient(t, "other-inbound", "not-exact@example.com", 1)
got, err := (&CheckClientIpJob{}).getInboundByEmail("exact@example.com")
if err != nil {
t.Fatalf("getInboundByEmail returned error: %v", err)
}
if got.Id != want.Id {
t.Fatalf("getInboundByEmail returned inbound %d, want %d", got.Id, want.Id)
}
}
// the substring fallback must still verify the exact email inside settings, so
// "ann@example.com" does not match an inbound holding "joann@example.com".
func TestGetInboundByEmailRejectsSubstringFallbackMatch(t *testing.T) {
setupIntegrationDB(t)
seedInboundOnlyWithClient(t, "substring-only", "joann@example.com", 1)
if got, err := (&CheckClientIpJob{}).getInboundByEmail("ann@example.com"); err == nil {
t.Fatalf("substring email matched inbound %d; want no exact match", got.Id)
}
}
+18 -2
View File
@@ -34,8 +34,8 @@ func ensureFileExists(path string) error {
// Here Run is an interface method of the Job interface
func (j *ClearLogsJob) Run() {
logFiles := []string{xray.GetIPLimitLogPath(), xray.GetIPLimitBannedLogPath(), xray.GetAccessPersistentLogPath()}
logFilesPrev := []string{xray.GetIPLimitBannedPrevLogPath(), xray.GetAccessPersistentPrevLogPath()}
logFiles := []string{xray.GetIPLimitLogPath(), xray.GetIPLimitBannedLogPath()}
logFilesPrev := []string{xray.GetIPLimitBannedPrevLogPath()}
// Ensure all log files and their paths exist
for _, path := range append(logFiles, logFilesPrev...) {
@@ -75,4 +75,20 @@ func (j *ClearLogsJob) Run() {
logger.Warning("Failed to truncate log file:", logFiles[i], "-", err)
}
}
wipeAccessLog()
}
// wipeAccessLog truncates the user-configured Xray access log so it can't grow
// unbounded. The IP-limit job no longer reads or rotates it, so this daily wipe
// is the only thing that caps it. A disabled ("none") or unset access log is
// left alone, and a missing file is fine — there's nothing to wipe.
func wipeAccessLog() {
accessLogPath, err := xray.GetAccessLogPath()
if err != nil || accessLogPath == "none" || accessLogPath == "" {
return
}
if err := os.Truncate(accessLogPath, 0); err != nil && !os.IsNotExist(err) {
logger.Warning("Failed to truncate access log:", accessLogPath, "-", err)
}
}
+55
View File
@@ -0,0 +1,55 @@
package job
import (
"encoding/json"
"os"
"path/filepath"
"testing"
)
// writeAccessLogConfig points bin/config.json at the given access log path (use
// "none" to disable), so GetAccessLogPath resolves it the way the job does.
func writeAccessLogConfig(t *testing.T, accessPath string) {
t.Helper()
binDir := t.TempDir()
t.Setenv("XUI_BIN_FOLDER", binDir)
configData, err := json.Marshal(map[string]any{
"log": map[string]any{"access": accessPath},
})
if err != nil {
t.Fatalf("marshal xray config: %v", err)
}
if err := os.WriteFile(filepath.Join(binDir, "config.json"), configData, 0644); err != nil {
t.Fatalf("write xray config: %v", err)
}
}
func TestWipeAccessLog_TruncatesEnabledLog(t *testing.T) {
accessLog := filepath.Join(t.TempDir(), "access.log")
if err := os.WriteFile(accessLog, []byte("2026/06/23 12:00:00 from tcp:203.0.113.10:443 accepted\n"), 0644); err != nil {
t.Fatalf("seed access log: %v", err)
}
writeAccessLogConfig(t, accessLog)
wipeAccessLog()
info, err := os.Stat(accessLog)
if err != nil {
t.Fatalf("access log should still exist: %v", err)
}
if info.Size() != 0 {
t.Fatalf("access log should be truncated to 0, got %d bytes", info.Size())
}
}
func TestWipeAccessLog_LeavesDisabledLogAlone(t *testing.T) {
writeAccessLogConfig(t, "none")
// Must not panic or create a file literally named "none".
wipeAccessLog()
if _, err := os.Stat("none"); err == nil {
os.Remove("none")
t.Fatal(`wipeAccessLog must not create a file named "none"`)
}
}
-10
View File
@@ -65,16 +65,6 @@ func GetIPLimitBannedPrevLogPath() string {
return config.GetLogFolder() + "/3xipl-banned.prev.log"
}
// GetAccessPersistentLogPath returns the path to the persistent access log file.
func GetAccessPersistentLogPath() string {
return config.GetLogFolder() + "/3xipl-ap.log"
}
// GetAccessPersistentPrevLogPath returns the path to the previous persistent access log file.
func GetAccessPersistentPrevLogPath() string {
return config.GetLogFolder() + "/3xipl-ap.prev.log"
}
// GetAccessLogPath reads the Xray config and returns the access log file path.
func GetAccessLogPath() (string, error) {
config, err := os.ReadFile(GetConfigPath())