diff --git a/frontend/src/generated/types.ts b/frontend/src/generated/types.ts index e32ed7cf8..1cca057e4 100644 --- a/frontend/src/generated/types.ts +++ b/frontend/src/generated/types.ts @@ -1,4 +1,5 @@ // Code generated by tools/openapigen. DO NOT EDIT. +export type OnlineAPISupport = number; export type ProcessState = string; export type Protocol = string; export type SubLinkProvider = unknown; diff --git a/frontend/src/generated/zod.ts b/frontend/src/generated/zod.ts index 436d9f005..34d7e23c3 100644 --- a/frontend/src/generated/zod.ts +++ b/frontend/src/generated/zod.ts @@ -1,5 +1,8 @@ // Code generated by tools/openapigen. DO NOT EDIT. import { z } from 'zod'; +export const OnlineAPISupportSchema = z.number().int(); +export type OnlineAPISupport = z.infer; + export const ProcessStateSchema = z.string(); export type ProcessState = z.infer; diff --git a/frontend/src/pages/clients/ClientBulkAddModal.tsx b/frontend/src/pages/clients/ClientBulkAddModal.tsx index 8d25d850c..fd68ea924 100644 --- a/frontend/src/pages/clients/ClientBulkAddModal.tsx +++ b/frontend/src/pages/clients/ClientBulkAddModal.tsx @@ -21,7 +21,6 @@ const MULTI_CLIENT_PROTOCOLS = new Set([ interface ClientBulkAddModalProps { open: boolean; inbounds: InboundOption[]; - ipLimitEnable?: boolean; groups?: string[]; onOpenChange: (open: boolean) => void; onSaved?: () => void; @@ -52,7 +51,6 @@ function emptyForm(): FormState { export default function ClientBulkAddModal({ open, inbounds, - ipLimitEnable = false, groups = [], onOpenChange, onSaved, @@ -316,11 +314,9 @@ export default function ClientBulkAddModal({ )} - {ipLimitEnable && ( - - update('limitIp', Number(v) || 0)} /> - - )} + + update('limitIp', Number(v) || 0)} /> + update('totalGB', Number(v) || 0)} /> diff --git a/frontend/src/pages/clients/ClientFormModal.tsx b/frontend/src/pages/clients/ClientFormModal.tsx index cb18441bc..93539bf8b 100644 --- a/frontend/src/pages/clients/ClientFormModal.tsx +++ b/frontend/src/pages/clients/ClientFormModal.tsx @@ -12,6 +12,7 @@ import { Select, Space, Switch, + Tabs, Tag, message, } from 'antd'; @@ -64,7 +65,6 @@ interface ClientFormModalProps { client: ClientRecord | null; inbounds: InboundOption[]; attachedIds?: number[]; - ipLimitEnable?: boolean; tgBotEnable?: boolean; groups?: string[]; save: ( @@ -136,7 +136,6 @@ export default function ClientFormModal({ client, inbounds, attachedIds = [], - ipLimitEnable = false, tgBotEnable = false, groups = [], save, @@ -424,214 +423,232 @@ export default function ClientFormModal({ onCancel={close} >
- - - - - update('email', e.target.value)} - /> - + + )} + + ), + }, + { + key: 'config', + label: t('pages.clients.tabConfig'), + children: ( + <> + + + + + update('uuid', e.target.value)} /> + - - )} + + {showFlow && ( + + + update('security', v)} + options={VMESS_SECURITY_OPTIONS.map((k) => ({ value: k, label: k }))} + /> + + + )} + {showReverseTag && ( + + + update('reverseTag', e.target.value)} /> + + + )} + + + ), + }, + ]} + /> diff --git a/frontend/src/pages/clients/ClientsPage.tsx b/frontend/src/pages/clients/ClientsPage.tsx index c0c200b31..478524a83 100644 --- a/frontend/src/pages/clients/ClientsPage.tsx +++ b/frontend/src/pages/clients/ClientsPage.tsx @@ -196,7 +196,7 @@ export default function ClientsPage() { allGroups, setQuery, inbounds, onlines, loading, fetched, fetchError, subSettings, - ipLimitEnable, tgBotEnable, expireDiff, trafficDiff, pageSize, + tgBotEnable, expireDiff, trafficDiff, pageSize, create, update, remove, bulkDelete, bulkAdjust, bulkAddToGroup, bulkRemoveFromGroup, attach, bulkAttach, detach, bulkDetach, resetTraffic, resetAllTraffics, delDepleted, setEnable, applyTrafficEvent, applyClientStatsEvent, @@ -1219,7 +1219,6 @@ export default function ClientsPage() { client={editingClient} attachedIds={editingAttachedIds} inbounds={inbounds} - ipLimitEnable={ipLimitEnable} tgBotEnable={tgBotEnable} groups={allGroups} save={onSave} @@ -1248,7 +1247,6 @@ export default function ClientsPage() { setBulkAddOpen(false)} diff --git a/frontend/src/pages/index/IndexPage.tsx b/frontend/src/pages/index/IndexPage.tsx index 3d12e0de4..6d2a73c90 100644 --- a/frontend/src/pages/index/IndexPage.tsx +++ b/frontend/src/pages/index/IndexPage.tsx @@ -64,7 +64,7 @@ export default function IndexPage() { const [messageApi, messageContextHolder] = message.useMessage(); useEffect(() => { setMessageInstance(messageApi); }, [messageApi]); - const [ipLimitEnable, setIpLimitEnable] = useState(false); + const [accessLogEnable, setAccessLogEnable] = useState(false); const [panelUpdateInfo, setPanelUpdateInfo] = useState({ currentVersion: '', latestVersion: '', @@ -87,8 +87,8 @@ export default function IndexPage() { const [loadingTip, setLoadingTip] = useState(t('loading')); useEffect(() => { - HttpUtil.post<{ ipLimitEnable?: boolean }>('/panel/api/setting/defaultSettings').then((msg) => { - if (msg?.success && msg.obj) setIpLimitEnable(!!msg.obj.ipLimitEnable); + HttpUtil.post<{ accessLogEnable?: boolean }>('/panel/api/setting/defaultSettings').then((msg) => { + if (msg?.success && msg.obj) setAccessLogEnable(!!msg.obj.accessLogEnable); }); HttpUtil.get('/panel/api/server/getPanelUpdateInfo').then((msg) => { if (msg?.success && msg.obj) setPanelUpdateInfo(msg.obj); @@ -186,7 +186,7 @@ export default function IndexPage() { setXrayLogsOpen(true)} diff --git a/frontend/src/pages/index/XrayStatusCard.tsx b/frontend/src/pages/index/XrayStatusCard.tsx index 95c31ffbc..3ae06e2aa 100644 --- a/frontend/src/pages/index/XrayStatusCard.tsx +++ b/frontend/src/pages/index/XrayStatusCard.tsx @@ -14,7 +14,7 @@ import './XrayStatusCard.css'; interface XrayStatusCardProps { status: Status; isMobile: boolean; - ipLimitEnable: boolean; + accessLogEnable: boolean; onStopXray: () => void; onRestartXray: () => void; onOpenLogs: () => void; @@ -31,7 +31,7 @@ const XRAY_STATE_KEYS: Record = { export default function XrayStatusCard({ status, isMobile, - ipLimitEnable, + accessLogEnable, onStopXray, onRestartXray, onOpenLogs, @@ -86,7 +86,9 @@ export default function XrayStatusCard({ ); const actions = [ - ...(ipLimitEnable + // the xray log viewer reads the access log file, so the button only makes + // sense when one is configured (unlike IP limit, which no longer needs it) + ...(accessLogEnable ? [ diff --git a/frontend/src/schemas/defaults.ts b/frontend/src/schemas/defaults.ts index 4f3f3576f..6d990f813 100644 --- a/frontend/src/schemas/defaults.ts +++ b/frontend/src/schemas/defaults.ts @@ -15,6 +15,7 @@ export const DefaultsPayloadSchema = z.object({ remarkModel: z.string().optional(), datepicker: z.enum(['gregorian', 'jalalian']).optional(), ipLimitEnable: z.boolean().optional(), + accessLogEnable: z.boolean().optional(), webDomain: z.string().optional(), subDomain: z.string().optional(), }).loose(); diff --git a/internal/web/job/check_client_ip_job.go b/internal/web/job/check_client_ip_job.go index 75fb8f1cd..497e4fc52 100644 --- a/internal/web/job/check_client_ip_job.go +++ b/internal/web/job/check_client_ip_job.go @@ -16,6 +16,7 @@ import ( "github.com/mhsanaei/3x-ui/v3/internal/database" "github.com/mhsanaei/3x-ui/v3/internal/database/model" "github.com/mhsanaei/3x-ui/v3/internal/logger" + "github.com/mhsanaei/3x-ui/v3/internal/web/service" "github.com/mhsanaei/3x-ui/v3/internal/xray" "gorm.io/gorm" @@ -27,10 +28,14 @@ type IPWithTimestamp struct { Timestamp int64 `json:"timestamp"` } -// CheckClientIpJob monitors client IP addresses from access logs and manages IP blocking based on configured limits. +// CheckClientIpJob monitors client IP addresses and manages IP blocking based +// on configured limits. The per-client IPs come from the core's online-stats +// API when the running core supports it (no access log needed), falling back +// to access-log parsing on older cores. type CheckClientIpJob struct { lastClear int64 disAllowedIps []string + xrayService service.XrayService } var job *CheckClientIpJob @@ -50,22 +55,32 @@ func (j *CheckClientIpJob) Run() { j.lastClear = time.Now().Unix() } - shouldClearAccessLog := false fail2BanEnabled := isFail2BanEnabled() hasLimit := fail2BanEnabled && j.hasLimitIp() f2bInstalled := false if hasLimit { f2bInstalled = j.checkFail2BanInstalled() } + + if observed, apiMode := j.collectFromOnlineAPI(); apiMode { + if fail2BanEnabled { + j.processObserved(observed, j.resolveEnforce(hasLimit, f2bInstalled), true) + } + // The core tracks online IPs itself, so no access log is needed in this + // mode; still rotate a user-configured access log hourly so it doesn't + // grow unboundedly. The enforcement-triggered rotation is skipped — + // nothing here reads the log. + if j.checkAccessLogAvailable(false) && time.Now().Unix()-j.lastClear > 3600 { + j.clearAccessLog() + } + return + } + + shouldClearAccessLog := false isAccessLogAvailable := j.checkAccessLogAvailable(hasLimit) if fail2BanEnabled && isAccessLogAvailable { - enforce := hasLimit - if hasLimit && runtime.GOOS != "windows" && !f2bInstalled { - logger.Warning("[LimitIP] Fail2Ban is not installed, Please install Fail2Ban from the x-ui bash menu.") - enforce = false - } - shouldClearAccessLog = j.processLogFile(enforce) + shouldClearAccessLog = j.processLogFile(j.resolveEnforce(hasLimit, f2bInstalled)) } if shouldClearAccessLog || (isAccessLogAvailable && time.Now().Unix()-j.lastClear > 3600) { @@ -73,6 +88,50 @@ func (j *CheckClientIpJob) Run() { } } +// resolveEnforce decides whether limits can actually be enforced this run, +// warning when fail2ban is missing on a platform that needs it. +func (j *CheckClientIpJob) resolveEnforce(hasLimit, f2bInstalled bool) bool { + if hasLimit && runtime.GOOS != "windows" && !f2bInstalled { + logger.Warning("[LimitIP] Fail2Ban is not installed, Please install Fail2Ban from the x-ui bash menu.") + return false + } + return hasLimit +} + +// collectFromOnlineAPI builds per-email IP observations (email -> ip -> +// last-seen unix seconds) from the core's online-stats API. ok=false means the +// API is unavailable — xray not running, an older core, or a transient gRPC +// failure — and the caller must fall back to access-log parsing. +func (j *CheckClientIpJob) collectFromOnlineAPI() (map[string]map[string]int64, bool) { + onlineUsers, ok, err := j.xrayService.GetOnlineUsers() + if err != nil { + logger.Debug("[LimitIP] online-stats API unavailable this run:", err) + return nil, false + } + if !ok { + return nil, false + } + now := time.Now().Unix() + observed := make(map[string]map[string]int64, len(onlineUsers)) + for _, user := range onlineUsers { + for _, entry := range user.IPs { + // No localhost guard needed here: the core's OnlineMap.AddIP drops + // 127.0.0.1/[::1] itself, so they never reach this list. + ts := entry.LastSeen + if ts <= 0 { + ts = now + } + if _, exists := observed[user.Email]; !exists { + observed[user.Email] = make(map[string]int64) + } + if existing, seen := observed[user.Email][entry.IP]; !seen || ts > existing { + observed[user.Email][entry.IP] = ts + } + } + } + return observed, true +} + func (j *CheckClientIpJob) clearAccessLog() { logAccessP, err := os.OpenFile(xray.GetAccessPersistentLogPath(), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) j.checkError(err) @@ -183,18 +242,26 @@ func (j *CheckClientIpJob) processLogFile(enforce bool) bool { j.checkError(err) } - shouldCleanLog := false - for email, ipTimestamps := range inboundClientIps { + return j.processObserved(inboundClientIps, enforce, false) +} - // The access log can still reference a client that was just renamed +// processObserved runs collection + enforcement for one scan's observations +// (email -> ip -> last-seen unix seconds). observedAreLive marks the +// observations as live connections (online-stats API) rather than recent log +// lines: live entries bypass the stale cutoff, since a connection that opened +// hours ago is still live even though its timestamp is old. +func (j *CheckClientIpJob) processObserved(observed map[string]map[string]int64, enforce, observedAreLive bool) bool { + shouldCleanLog := false + for email, ipTimestamps := range observed { + + // The observations can still reference a client that was just renamed // or deleted; its email no longer matches any inbound. Skip it (and // drop any orphaned tracking row) instead of recreating a row and - // logging an ERROR every run until the log rotates out the old email - // (#4963). + // logging an ERROR every run (#4963). inbound, err := j.getInboundByEmail(email) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { - logger.Debugf("[LimitIP] skipping stale access-log email %q (renamed or deleted)", email) + logger.Debugf("[LimitIP] skipping stale observed email %q (renamed or deleted)", email) j.delInboundClientIps(email) } else { j.checkError(err) @@ -214,13 +281,17 @@ func (j *CheckClientIpJob) processLogFile(enforce bool) bool { continue } - shouldCleanLog = j.updateInboundClientIps(clientIpsRecord, inbound, email, ipsWithTime, enforce) || shouldCleanLog + shouldCleanLog = j.updateInboundClientIps(clientIpsRecord, inbound, email, ipsWithTime, enforce, observedAreLive) || shouldCleanLog } return shouldCleanLog } -func mergeClientIps(old, new []IPWithTimestamp, staleCutoff int64) map[string]int64 { +// mergeClientIps folds this scan's observations into the persisted set, +// dropping entries older than staleCutoff. newAlwaysLive exempts the new +// entries from that cutoff: an API-observed IP is a live connection by +// definition, even when its lastSeen (set at dispatch time) is hours old. +func mergeClientIps(old, new []IPWithTimestamp, staleCutoff int64, newAlwaysLive bool) map[string]int64 { ipMap := make(map[string]int64, len(old)+len(new)) for _, ipTime := range old { if ipTime.Timestamp < staleCutoff { @@ -229,7 +300,7 @@ func mergeClientIps(old, new []IPWithTimestamp, staleCutoff int64) map[string]in ipMap[ipTime.IP] = ipTime.Timestamp } for _, ipTime := range new { - if ipTime.Timestamp < staleCutoff { + if !newAlwaysLive && ipTime.Timestamp < staleCutoff { continue } if existingTime, ok := ipMap[ipTime.IP]; !ok || ipTime.Timestamp > existingTime { @@ -239,6 +310,16 @@ func mergeClientIps(old, new []IPWithTimestamp, staleCutoff int64) map[string]in return ipMap } +// selectIpsToBan splits the live IPs (sorted oldest-first by partitionLiveIps) +// into the newest `limit` entries to keep and the older remainder to ban. +func selectIpsToBan(live []IPWithTimestamp, limit int) (kept, banned []IPWithTimestamp) { + if limit <= 0 || len(live) <= limit { + return live, nil + } + cutoff := len(live) - limit + return live[cutoff:], live[:cutoff] +} + func partitionLiveIps(ipMap map[string]int64, observedThisScan map[string]bool) (live, historical []IPWithTimestamp) { live = make([]IPWithTimestamp, 0, len(observedThisScan)) historical = make([]IPWithTimestamp, 0, len(ipMap)) @@ -343,7 +424,7 @@ func (j *CheckClientIpJob) delInboundClientIps(clientEmail string) { } } -func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.InboundClientIps, inbound *model.Inbound, clientEmail string, newIpsWithTime []IPWithTimestamp, enforce bool) bool { +func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.InboundClientIps, inbound *model.Inbound, clientEmail string, newIpsWithTime []IPWithTimestamp, enforce, observedAreLive bool) bool { if inbound.Settings == "" { logger.Debug("wrong data:", inbound) return false @@ -380,7 +461,7 @@ func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.Inboun json.Unmarshal([]byte(inboundClientIps.Ips), &oldIpsWithTime) } - ipMap := mergeClientIps(oldIpsWithTime, newIpsWithTime, time.Now().Unix()-ipStaleAfterSeconds) + ipMap := mergeClientIps(oldIpsWithTime, newIpsWithTime, time.Now().Unix()-ipStaleAfterSeconds, observedAreLive) // only ips seen in this scan count toward the limit. see // partitionLiveIps. @@ -394,15 +475,10 @@ func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.Inboun j.disAllowedIps = []string{} // historical db-only ips are excluded from this count on purpose. - var keptLive []IPWithTimestamp - if len(liveIps) > limitIp { + keptLive, bannedLive := selectIpsToBan(liveIps, limitIp) + if len(bannedLive) > 0 { shouldCleanLog = true - // keep the newest live ips, ban older ones. - cutoff := len(liveIps) - limitIp - keptLive = liveIps[cutoff:] - bannedLive := liveIps[:cutoff] - logIpFile, err := os.OpenFile(xray.GetIPLimitLogPath(), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) if err != nil { logger.Errorf("failed to open IP limit log file: %s", err) @@ -422,8 +498,6 @@ func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.Inboun // force xray to drop existing connections from banned ips j.disconnectClientTemporarily(inbound, clientEmail, clients) - } else { - keptLive = liveIps } // keep kept-live + historical in the blob so the panel keeps showing diff --git a/internal/web/job/check_client_ip_job_integration_test.go b/internal/web/job/check_client_ip_job_integration_test.go index fb0fa828e..d46bee342 100644 --- a/internal/web/job/check_client_ip_job_integration_test.go +++ b/internal/web/job/check_client_ip_job_integration_test.go @@ -199,7 +199,7 @@ func TestUpdateInboundClientIps_LiveIpNotBannedByStillFreshHistoricals(t *testin if err != nil { t.Fatalf("getInboundByEmail: %v", err) } - shouldCleanLog := j.updateInboundClientIps(row, inbound, email, live, true) + shouldCleanLog := j.updateInboundClientIps(row, inbound, email, live, true, false) if shouldCleanLog { t.Fatalf("shouldCleanLog must be false, nothing should have been banned with 1 live ip under limit 3") @@ -252,7 +252,7 @@ func TestUpdateInboundClientIps_ExcessLiveIpIsStillBanned(t *testing.T) { if err != nil { t.Fatalf("getInboundByEmail: %v", err) } - shouldCleanLog := j.updateInboundClientIps(row, inbound, email, live, true) + shouldCleanLog := j.updateInboundClientIps(row, inbound, email, live, true, false) if !shouldCleanLog { t.Fatalf("shouldCleanLog must be true when the live set exceeds the limit") diff --git a/internal/web/job/check_client_ip_job_test.go b/internal/web/job/check_client_ip_job_test.go index a55059117..707689e58 100644 --- a/internal/web/job/check_client_ip_job_test.go +++ b/internal/web/job/check_client_ip_job_test.go @@ -22,7 +22,7 @@ func TestMergeClientIps_EvictsStaleOldEntries(t *testing.T) { {IP: "2.2.2.2", Timestamp: 2000}, // same IP, newer log line } - got := mergeClientIps(old, new, 1000) + got := mergeClientIps(old, new, 1000, false) want := map[string]int64{"2.2.2.2": 2000} if !reflect.DeepEqual(got, want) { @@ -36,7 +36,7 @@ func TestMergeClientIps_KeepsFreshOldEntriesUnchanged(t *testing.T) { old := []IPWithTimestamp{ {IP: "1.1.1.1", Timestamp: 1500}, } - got := mergeClientIps(old, nil, 1000) + got := mergeClientIps(old, nil, 1000, false) want := map[string]int64{"1.1.1.1": 1500} if !reflect.DeepEqual(got, want) { @@ -48,7 +48,7 @@ func TestMergeClientIps_PrefersLaterTimestampForSameIp(t *testing.T) { old := []IPWithTimestamp{{IP: "1.1.1.1", Timestamp: 1500}} new := []IPWithTimestamp{{IP: "1.1.1.1", Timestamp: 1700}} - got := mergeClientIps(old, new, 1000) + got := mergeClientIps(old, new, 1000, false) if got["1.1.1.1"] != 1700 { t.Fatalf("expected latest timestamp 1700, got %d", got["1.1.1.1"]) @@ -59,7 +59,7 @@ func TestMergeClientIps_DropsStaleNewEntries(t *testing.T) { // A log line with a clock-skewed old timestamp must not resurrect a // stale IP past the cutoff. new := []IPWithTimestamp{{IP: "1.1.1.1", Timestamp: 500}} - got := mergeClientIps(nil, new, 1000) + got := mergeClientIps(nil, new, 1000, false) if len(got) != 0 { t.Fatalf("stale new IP should have been dropped, got %v", got) @@ -72,7 +72,7 @@ func TestMergeClientIps_NoStaleCutoffStillWorks(t *testing.T) { old := []IPWithTimestamp{{IP: "1.1.1.1", Timestamp: 100}} new := []IPWithTimestamp{{IP: "2.2.2.2", Timestamp: 200}} - got := mergeClientIps(old, new, 0) + got := mergeClientIps(old, new, 0, false) want := map[string]int64{"1.1.1.1": 100, "2.2.2.2": 200} if !reflect.DeepEqual(got, want) { @@ -80,6 +80,66 @@ func TestMergeClientIps_NoStaleCutoffStillWorks(t *testing.T) { } } +func TestMergeClientIps_LiveObservationsBypassStaleCutoff(t *testing.T) { + // online-API mode: lastSeen is set when the connection was dispatched, so + // a connection held open for hours has an "old" timestamp while being live + // by definition. It must survive the stale cutoff. + new := []IPWithTimestamp{{IP: "1.1.1.1", Timestamp: 500}} // opened long ago, still connected + got := mergeClientIps(nil, new, 1000, true) + + want := map[string]int64{"1.1.1.1": 500} + if !reflect.DeepEqual(got, want) { + t.Fatalf("live observation must bypass the stale cutoff\ngot: %v\nwant: %v", got, want) + } +} + +func TestMergeClientIps_LiveModeStillEvictsStaleOldEntries(t *testing.T) { + // the bypass applies only to this scan's observations — persisted entries + // from past scans still age out as before. + old := []IPWithTimestamp{{IP: "2.2.2.2", Timestamp: 100}} + new := []IPWithTimestamp{{IP: "1.1.1.1", Timestamp: 2000}} + got := mergeClientIps(old, new, 1000, true) + + want := map[string]int64{"1.1.1.1": 2000} + if !reflect.DeepEqual(got, want) { + t.Fatalf("stale db entry must still be evicted in live mode\ngot: %v\nwant: %v", got, want) + } +} + +func TestSelectIpsToBan(t *testing.T) { + live := []IPWithTimestamp{ // sorted oldest-first, as partitionLiveIps returns + {IP: "A", Timestamp: 100}, + {IP: "B", Timestamp: 200}, + {IP: "C", Timestamp: 300}, + } + + // over the limit: oldest connections are banned, newest keep the slots + kept, banned := selectIpsToBan(live, 1) + if got := collectIps(kept); !reflect.DeepEqual(got, []string{"C"}) { + t.Fatalf("newest ip must keep the slot, got %v", got) + } + if got := collectIps(banned); !reflect.DeepEqual(got, []string{"A", "B"}) { + t.Fatalf("older ips must be banned oldest-first, got %v", got) + } + + // at the limit: nothing banned + kept, banned = selectIpsToBan(live, 3) + if len(banned) != 0 || len(kept) != 3 { + t.Fatalf("at-limit set must not ban, kept=%v banned=%v", kept, banned) + } + + // under the limit: nothing banned + kept, banned = selectIpsToBan(live[:1], 3) + if len(banned) != 0 || len(kept) != 1 { + t.Fatalf("under-limit set must not ban, kept=%v banned=%v", kept, banned) + } + + // defensive: non-positive limit never reaches enforcement, but must not panic + if _, banned := selectIpsToBan(live, 0); banned != nil { + t.Fatalf("zero limit must not ban, got %v", banned) + } +} + func collectIps(entries []IPWithTimestamp) []string { out := make([]string, 0, len(entries)) for _, e := range entries { diff --git a/internal/web/job/xray_traffic_job.go b/internal/web/job/xray_traffic_job.go index 131d0a244..dc3f6219d 100644 --- a/internal/web/job/xray_traffic_job.go +++ b/internal/web/job/xray_traffic_job.go @@ -66,21 +66,37 @@ func (j *XrayTrafficJob) Run() { j.xrayService.SetToNeedRestart() } - lastOnlineMap, err := j.inboundService.GetClientsLastOnline() - if err != nil { - logger.Warning("get clients last online failed:", err) - } - if lastOnlineMap == nil { - lastOnlineMap = make(map[string]int64) - } // Derive the local online set from this poll's per-email deltas rather // than the shared last_online column, which remote-node syncs also bump // and would otherwise make a client active only on a remote node appear // online on local inbounds. activeEmails := make([]string, 0, len(clientTraffics)) + deltaActive := make(map[string]bool, len(clientTraffics)) for _, ct := range clientTraffics { if ct != nil && ct.Up+ct.Down > 0 { activeEmails = append(activeEmails, ct.Email) + deltaActive[ct.Email] = true + } + } + // When the core supports the online-stats API, union in connection-based + // onlines. Neither signal alone covers everything: an idle-but-connected + // client moves no bytes between polls (the delta heuristic's blind spot), + // while a short-lived connection can close before this poll yet still show + // in the delta. Older cores fall back to deltas alone. + if onlineUsers, apiMode, ouErr := j.xrayService.GetOnlineUsers(); ouErr != nil { + logger.Debug("get online users from xray api failed:", ouErr) + } else if apiMode { + idleOnline := make([]string, 0, len(onlineUsers)) + for _, u := range onlineUsers { + if !deltaActive[u.Email] { + activeEmails = append(activeEmails, u.Email) + idleOnline = append(idleOnline, u.Email) + } + } + // The traffic path only bumps last_online on a non-zero delta; keep the + // column fresh for clients kept online purely by a live connection. + if err := j.inboundService.BumpClientsLastOnline(idleOnline); err != nil { + logger.Warning("bump last online for connected clients failed:", err) } } // Pair the email signal with the inbound tags that moved bytes this poll. @@ -100,6 +116,13 @@ func (j *XrayTrafficJob) Run() { return } + lastOnlineMap, err := j.inboundService.GetClientsLastOnline() + if err != nil { + logger.Warning("get clients last online failed:", err) + } + if lastOnlineMap == nil { + lastOnlineMap = make(map[string]int64) + } onlineClients := j.inboundService.GetOnlineClients() if onlineClients == nil { onlineClients = []string{} diff --git a/internal/web/service/inbound_traffic.go b/internal/web/service/inbound_traffic.go index d3ea2378b..76a4771d9 100644 --- a/internal/web/service/inbound_traffic.go +++ b/internal/web/service/inbound_traffic.go @@ -837,6 +837,28 @@ func (s *InboundService) GetClientTrafficTgBot(tgId int64) ([]*xray.ClientTraffi return traffics, nil } +// BumpClientsLastOnline sets client_traffics.last_online to now for the given +// emails. Used in online-API mode for clients that hold a live connection but +// moved no bytes this poll — the traffic path (addClientTraffic) only bumps +// last_online on a non-zero delta, so idle-but-connected clients would +// otherwise show a stale "last online" while being reported online. +func (s *InboundService) BumpClientsLastOnline(emails []string) error { + uniq := uniqueNonEmptyStrings(emails) + if len(uniq) == 0 { + return nil + } + now := time.Now().UnixMilli() + return submitTrafficWrite(func() error { + db := database.GetDB() + for _, batch := range chunkStrings(uniq, sqliteMaxVars) { + if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Update("last_online", now).Error; err != nil { + return err + } + } + return nil + }) +} + func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) { uniq := uniqueNonEmptyStrings(emails) if len(uniq) == 0 { diff --git a/internal/web/service/setting.go b/internal/web/service/setting.go index 11b9f81d9..ba65f134c 100644 --- a/internal/web/service/setting.go +++ b/internal/web/service/setting.go @@ -798,7 +798,18 @@ func (s *SettingService) SetRestartXrayOnClientDisable(value bool) error { return s.setBool("restartXrayOnClientDisable", value) } +// GetIpLimitEnable reports whether the IP-limit feature is available. Always +// true since the panel enforces limits via the core's online-stats API; on an +// older core the job falls back to access-log parsing and warns there when the +// log is missing, so the UI no longer hides the field behind that condition. func (s *SettingService) GetIpLimitEnable() (bool, error) { + return true, nil +} + +// GetAccessLogEnable reports whether an Xray access log is configured. Used by +// the UI for features that genuinely read the log file (the xray log viewer) — +// distinct from IP limiting, which works without it. +func (s *SettingService) GetAccessLogEnable() (bool, error) { accessLogPath, err := xray.GetAccessLogPath() if err != nil { return false, err @@ -1022,25 +1033,26 @@ func (s *SettingService) BuildSubURIBase(host string) string { func (s *SettingService) GetDefaultSettings(host string) (any, error) { type settingFunc func() (any, error) settings := map[string]settingFunc{ - "expireDiff": func() (any, error) { return s.GetExpireDiff() }, - "trafficDiff": func() (any, error) { return s.GetTrafficDiff() }, - "pageSize": func() (any, error) { return s.GetPageSize() }, - "defaultCert": func() (any, error) { return s.GetCertFile() }, - "defaultKey": func() (any, error) { return s.GetKeyFile() }, - "tgBotEnable": func() (any, error) { return s.GetTgbotEnabled() }, - "subThemeDir": func() (any, error) { return s.GetSubThemeDir() }, - "subEnable": func() (any, error) { return s.GetSubEnable() }, - "subJsonEnable": func() (any, error) { return s.GetSubJsonEnable() }, - "subClashEnable": func() (any, error) { return s.GetSubClashEnable() }, - "subTitle": func() (any, error) { return s.GetSubTitle() }, - "subURI": func() (any, error) { return s.GetSubURI() }, - "subJsonURI": func() (any, error) { return s.GetSubJsonURI() }, - "subClashURI": func() (any, error) { return s.GetSubClashURI() }, - "remarkModel": func() (any, error) { return s.GetRemarkModel() }, - "datepicker": func() (any, error) { return s.GetDatepicker() }, - "ipLimitEnable": func() (any, error) { return s.GetIpLimitEnable() }, - "webDomain": func() (any, error) { return s.GetWebDomain() }, - "subDomain": func() (any, error) { return s.GetSubDomain() }, + "expireDiff": func() (any, error) { return s.GetExpireDiff() }, + "trafficDiff": func() (any, error) { return s.GetTrafficDiff() }, + "pageSize": func() (any, error) { return s.GetPageSize() }, + "defaultCert": func() (any, error) { return s.GetCertFile() }, + "defaultKey": func() (any, error) { return s.GetKeyFile() }, + "tgBotEnable": func() (any, error) { return s.GetTgbotEnabled() }, + "subThemeDir": func() (any, error) { return s.GetSubThemeDir() }, + "subEnable": func() (any, error) { return s.GetSubEnable() }, + "subJsonEnable": func() (any, error) { return s.GetSubJsonEnable() }, + "subClashEnable": func() (any, error) { return s.GetSubClashEnable() }, + "subTitle": func() (any, error) { return s.GetSubTitle() }, + "subURI": func() (any, error) { return s.GetSubURI() }, + "subJsonURI": func() (any, error) { return s.GetSubJsonURI() }, + "subClashURI": func() (any, error) { return s.GetSubClashURI() }, + "remarkModel": func() (any, error) { return s.GetRemarkModel() }, + "datepicker": func() (any, error) { return s.GetDatepicker() }, + "ipLimitEnable": func() (any, error) { return s.GetIpLimitEnable() }, + "accessLogEnable": func() (any, error) { return s.GetAccessLogEnable() }, + "webDomain": func() (any, error) { return s.GetWebDomain() }, + "subDomain": func() (any, error) { return s.GetSubDomain() }, } result := make(map[string]any) diff --git a/internal/web/service/xray.go b/internal/web/service/xray.go index 14f33fb7b..26897a2f7 100644 --- a/internal/web/service/xray.go +++ b/internal/web/service/xray.go @@ -116,6 +116,7 @@ func (s *XrayService) GetXrayConfig() (*xray.Config, error) { } xrayConfig.LogConfig = resolveXrayLogPaths(xrayConfig.LogConfig) xrayConfig.API = ensureAPIServices(xrayConfig.API) + xrayConfig.Policy = ensureStatsPolicy(xrayConfig.Policy) _, _, _ = s.inboundService.AddTraffic(nil, nil) @@ -421,6 +422,51 @@ func ensureAPIServices(api json_util.RawMessage) json_util.RawMessage { return out } +// ensureStatsPolicy guarantees every policy level in the generated config has +// statsUserOnline enabled, so the core tracks per-email online IPs for the +// panel's online view and access-log-free IP limiting. Generated clients carry +// no explicit level, so level "0" is created when absent. The flag is panel +// infrastructure and is forced on even over an explicit false in the template, +// same as the api services above. An entirely missing or unparsable policy +// block is left alone; the stored template itself is never modified — only the +// generated runtime config. +func ensureStatsPolicy(policy json_util.RawMessage) json_util.RawMessage { + if len(policy) == 0 { + return policy + } + var parsed map[string]any + if err := json.Unmarshal(policy, &parsed); err != nil { + return policy + } + levels, _ := parsed["levels"].(map[string]any) + if levels == nil { + levels = make(map[string]any) + } + if _, ok := levels["0"]; !ok { + levels["0"] = map[string]any{} + } + changed := false + for _, raw := range levels { + level, ok := raw.(map[string]any) + if !ok { + continue + } + if enabled, ok := level["statsUserOnline"].(bool); !ok || !enabled { + level["statsUserOnline"] = true + changed = true + } + } + if !changed { + return policy + } + parsed["levels"] = levels + out, err := json.Marshal(parsed) + if err != nil { + return policy + } + return out +} + // resolveXrayLogPaths rewrites relative `log.access` / `log.error` values to // absolute paths under config.GetLogFolder(), so Xray writes those files // alongside the panel's other logs regardless of the working directory the @@ -493,6 +539,43 @@ func (s *XrayService) GetXrayTraffic() ([]*xray.Traffic, []*xray.ClientTraffic, return traffic, clientTraffic, nil } +// GetOnlineUsers returns connection-based online users (email + source IPs) +// from the running core's online-stats API. ok=false means the API is not +// available — xray isn't running or the core predates the online-stats RPCs — +// and callers must use the legacy traffic-delta / access-log paths. The +// capability is probed lazily per process: an Unimplemented answer pins this +// core as unsupported until the next restart, while transient errors leave the +// capability undecided so a flaky poll can't lock in legacy mode. +func (s *XrayService) GetOnlineUsers() ([]xray.OnlineUser, bool, error) { + if !s.IsXrayRunning() { + return nil, false, nil + } + if p.OnlineAPISupport() == xray.OnlineAPIUnsupported { + return nil, false, nil + } + if err := s.xrayAPI.Init(p.GetAPIPort()); err != nil { + logger.Debug("Failed to initialize Xray API:", err) + return nil, false, err + } + defer s.xrayAPI.Close() + + users, err := s.xrayAPI.GetOnlineUsers() + if err != nil { + if xray.IsUnimplementedErr(err) { + p.SetOnlineAPISupport(xray.OnlineAPIUnsupported) + logger.Info("xray core does not support the online-stats API; falling back to traffic-delta onlines and access-log IP limit") + return nil, false, nil + } + logger.Debug("Failed to fetch Xray online users:", err) + return nil, false, err + } + if p.OnlineAPISupport() == xray.OnlineAPIUnknown { + p.SetOnlineAPISupport(xray.OnlineAPISupported) + logger.Info("xray core supports the online-stats API; using connection-based onlines and access-log-free IP limit") + } + return users, true, nil +} + // BalancerStatus is the live view of one balancer for the panel UI. Running // is false when the balancer isn't present in the running core (e.g. xray is // stopped or the balancer hasn't been saved/applied yet). diff --git a/internal/web/service/xray_config_inject_test.go b/internal/web/service/xray_config_inject_test.go index ec3dd2d09..bf763de44 100644 --- a/internal/web/service/xray_config_inject_test.go +++ b/internal/web/service/xray_config_inject_test.go @@ -54,6 +54,71 @@ func TestEnsureAPIServices(t *testing.T) { } } +func TestEnsureStatsPolicy(t *testing.T) { + // default-template shape: level "0" exists with traffic flags — the online + // flag is added and the siblings survive untouched + out := ensureStatsPolicy(json_util.RawMessage(`{"levels":{"0":{"handshake":4,"statsUserUplink":true,"statsUserDownlink":true}},"system":{"statsInboundDownlink":true}}`)) + var parsed struct { + Levels map[string]map[string]any `json:"levels"` + System map[string]any `json:"system"` + } + if err := json.Unmarshal(out, &parsed); err != nil { + t.Fatal(err) + } + level0 := parsed.Levels["0"] + if level0["statsUserOnline"] != true { + t.Fatalf("statsUserOnline must be injected into level 0, got %v", level0) + } + if level0["statsUserUplink"] != true || level0["statsUserDownlink"] != true || level0["handshake"] != float64(4) { + t.Fatalf("sibling keys must be preserved, got %v", level0) + } + if parsed.System["statsInboundDownlink"] != true { + t.Fatalf("system block must be preserved, got %v", parsed.System) + } + + // missing levels block: level "0" is created with the flag + out = ensureStatsPolicy(json_util.RawMessage(`{"system":{}}`)) + if err := json.Unmarshal(out, &parsed); err != nil { + t.Fatal(err) + } + if parsed.Levels["0"]["statsUserOnline"] != true { + t.Fatalf("level 0 must be created with statsUserOnline, got %s", out) + } + + // every level gets the flag, an explicit false included — the flag is + // panel infrastructure, like the api services + out = ensureStatsPolicy(json_util.RawMessage(`{"levels":{"0":{"statsUserOnline":false},"1":{"connIdle":300}}}`)) + if err := json.Unmarshal(out, &parsed); err != nil { + t.Fatal(err) + } + for _, key := range []string{"0", "1"} { + if parsed.Levels[key]["statsUserOnline"] != true { + t.Fatalf("level %s must have statsUserOnline forced on, got %s", key, out) + } + } + if parsed.Levels["1"]["connIdle"] != float64(300) { + t.Fatalf("level 1 siblings must be preserved, got %s", out) + } + + // already-enabled input passes through byte-identical (no marshal churn, + // no spurious restart) + full := json_util.RawMessage(`{"levels":{"0":{"statsUserOnline":true}}}`) + if got := ensureStatsPolicy(full); string(got) != string(full) { + t.Fatalf("already-enabled policy must pass through untouched, got %s", got) + } + + // absent policy block stays absent + if got := ensureStatsPolicy(nil); got != nil { + t.Fatalf("nil policy must stay nil, got %s", got) + } + + // unparsable policy is left untouched + bad := json_util.RawMessage(`{not json`) + if got := ensureStatsPolicy(bad); string(got) != string(bad) { + t.Fatalf("unparsable policy must be left untouched, got %s", got) + } +} + func egressTestConfig() *xray.Config { return &xray.Config{ RouterConfig: json_util.RawMessage(`{"domainStrategy":"AsIs","rules":[{"type":"field","inboundTag":["api"],"outboundTag":"api"}]}`), diff --git a/internal/web/translation/ar-EG.json b/internal/web/translation/ar-EG.json index e0c4e0fc9..7a74a6de9 100644 --- a/internal/web/translation/ar-EG.json +++ b/internal/web/translation/ar-EG.json @@ -627,6 +627,8 @@ } }, "clients": { + "tabBasic": "أساسي", + "tabConfig": "التكوين", "add": "إضافة عميل", "edit": "تعديل العميل", "submitAdd": "إضافة عميل", diff --git a/internal/web/translation/en-US.json b/internal/web/translation/en-US.json index d8441ce4d..d1263fcc5 100644 --- a/internal/web/translation/en-US.json +++ b/internal/web/translation/en-US.json @@ -628,6 +628,8 @@ } }, "clients": { + "tabBasic": "Basic", + "tabConfig": "Config", "add": "Add Client", "edit": "Edit Client", "submitAdd": "Add Client", diff --git a/internal/web/translation/es-ES.json b/internal/web/translation/es-ES.json index 381f84a65..49c475b07 100644 --- a/internal/web/translation/es-ES.json +++ b/internal/web/translation/es-ES.json @@ -627,6 +627,8 @@ } }, "clients": { + "tabBasic": "Básico", + "tabConfig": "Configuración", "add": "Añadir cliente", "edit": "Editar cliente", "submitAdd": "Añadir cliente", diff --git a/internal/web/translation/fa-IR.json b/internal/web/translation/fa-IR.json index b76930788..2ee7c4b17 100644 --- a/internal/web/translation/fa-IR.json +++ b/internal/web/translation/fa-IR.json @@ -627,6 +627,8 @@ } }, "clients": { + "tabBasic": "پایه", + "tabConfig": "پیکربندی", "add": "افزودن کلاینت", "edit": "ویرایش کلاینت", "submitAdd": "افزودن کلاینت", diff --git a/internal/web/translation/id-ID.json b/internal/web/translation/id-ID.json index ebb54b343..5c9ad9557 100644 --- a/internal/web/translation/id-ID.json +++ b/internal/web/translation/id-ID.json @@ -627,6 +627,8 @@ } }, "clients": { + "tabBasic": "Dasar", + "tabConfig": "Konfigurasi", "add": "Tambah klien", "edit": "Ubah klien", "submitAdd": "Tambah klien", diff --git a/internal/web/translation/ja-JP.json b/internal/web/translation/ja-JP.json index 21047d9d6..33b1943c4 100644 --- a/internal/web/translation/ja-JP.json +++ b/internal/web/translation/ja-JP.json @@ -627,6 +627,8 @@ } }, "clients": { + "tabBasic": "基本", + "tabConfig": "設定", "add": "クライアントを追加", "edit": "クライアントを編集", "submitAdd": "クライアントを追加", diff --git a/internal/web/translation/pt-BR.json b/internal/web/translation/pt-BR.json index e9529b8d0..7233f1df8 100644 --- a/internal/web/translation/pt-BR.json +++ b/internal/web/translation/pt-BR.json @@ -627,6 +627,8 @@ } }, "clients": { + "tabBasic": "Básico", + "tabConfig": "Configuração", "add": "Adicionar cliente", "edit": "Editar cliente", "submitAdd": "Adicionar cliente", diff --git a/internal/web/translation/ru-RU.json b/internal/web/translation/ru-RU.json index 7514210bb..5bfc0e5d4 100644 --- a/internal/web/translation/ru-RU.json +++ b/internal/web/translation/ru-RU.json @@ -627,6 +627,8 @@ } }, "clients": { + "tabBasic": "Основные", + "tabConfig": "Конфигурация", "add": "Добавить клиента", "edit": "Изменить клиента", "submitAdd": "Добавить клиента", diff --git a/internal/web/translation/tr-TR.json b/internal/web/translation/tr-TR.json index fd172d9b2..29fc7d6f1 100644 --- a/internal/web/translation/tr-TR.json +++ b/internal/web/translation/tr-TR.json @@ -628,6 +628,8 @@ } }, "clients": { + "tabBasic": "Temel", + "tabConfig": "Yapılandırma", "add": "Kullanıcı Ekle", "edit": "Kullanıcıyı Düzenle", "submitAdd": "Kullanıcı Ekle", diff --git a/internal/web/translation/uk-UA.json b/internal/web/translation/uk-UA.json index 1fe7baf06..7eb7bf6c6 100644 --- a/internal/web/translation/uk-UA.json +++ b/internal/web/translation/uk-UA.json @@ -627,6 +627,8 @@ } }, "clients": { + "tabBasic": "Основні", + "tabConfig": "Конфігурація", "add": "Додати клієнта", "edit": "Редагувати клієнта", "submitAdd": "Додати клієнта", diff --git a/internal/web/translation/vi-VN.json b/internal/web/translation/vi-VN.json index 3098f3a95..dabbf81f0 100644 --- a/internal/web/translation/vi-VN.json +++ b/internal/web/translation/vi-VN.json @@ -627,6 +627,8 @@ } }, "clients": { + "tabBasic": "Cơ bản", + "tabConfig": "Cấu hình", "add": "Thêm khách hàng", "edit": "Chỉnh sửa khách hàng", "submitAdd": "Thêm khách hàng", diff --git a/internal/web/translation/zh-CN.json b/internal/web/translation/zh-CN.json index 3d7710b09..365fbad78 100644 --- a/internal/web/translation/zh-CN.json +++ b/internal/web/translation/zh-CN.json @@ -627,6 +627,8 @@ } }, "clients": { + "tabBasic": "基本", + "tabConfig": "配置", "add": "添加客户端", "edit": "编辑客户端", "submitAdd": "添加客户端", diff --git a/internal/web/translation/zh-TW.json b/internal/web/translation/zh-TW.json index 55ea4d020..24f6b5275 100644 --- a/internal/web/translation/zh-TW.json +++ b/internal/web/translation/zh-TW.json @@ -627,6 +627,8 @@ } }, "clients": { + "tabBasic": "基本", + "tabConfig": "配置", "add": "新增客戶端", "edit": "編輯客戶端", "submitAdd": "新增客戶端", diff --git a/internal/xray/api.go b/internal/xray/api.go index 4c884e985..ea417e4e2 100644 --- a/internal/xray/api.go +++ b/internal/xray/api.go @@ -33,7 +33,9 @@ import ( "github.com/xtls/xray-core/proxy/vless" "github.com/xtls/xray-core/proxy/vmess" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" ) // XrayAPI is a gRPC client for managing Xray core configuration, inbounds, outbounds, and statistics. @@ -289,8 +291,8 @@ type RouteTestRequest struct { type RouteTestResult struct { // Matched is false when no routing rule matched — traffic would use the // default (first) outbound and OutboundTag is empty. - Matched bool `json:"matched"` - OutboundTag string `json:"outboundTag"` + Matched bool `json:"matched"` + OutboundTag string `json:"outboundTag"` // GroupTags lists the balancer chain the decision went through, when any. GroupTags []string `json:"groupTags,omitempty"` } @@ -571,6 +573,62 @@ func (x *XrayAPI) GetTraffic() ([]*Traffic, []*ClientTraffic, error) { return mapToSlice(tagTrafficMap), mapToSlice(emailTrafficMap), nil } +// OnlineIP is one source address of a live connection, with the unix time (seconds) +// the core last dispatched a link from it. +type OnlineIP struct { + IP string `json:"ip"` + LastSeen int64 `json:"lastSeen"` +} + +// OnlineUser is a client email with at least one live connection and the source +// IPs of those connections, as tracked by Xray's statsUserOnline policy. +type OnlineUser struct { + Email string `json:"email"` + IPs []OnlineIP `json:"ips"` +} + +// GetOnlineUsers returns every user with at least one live connection plus their +// source IPs, via StatsService.GetUsersStats (one RPC covers all users). Requires +// statsUserOnline enabled in the policy levels; older cores return Unimplemented. +func (x *XrayAPI) GetOnlineUsers() ([]OnlineUser, error) { + if x.grpcClient == nil { + return nil, common.NewError("xray api is not initialized") + } + if x.StatsServiceClient == nil { + return nil, common.NewError("xray StatsServiceClient is not initialized") + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + resp, err := (*x.StatsServiceClient).GetUsersStats(ctx, &statsService.GetUsersStatsRequest{}) + if err != nil { + return nil, err + } + + users := make([]OnlineUser, 0, len(resp.GetUsers())) + for _, u := range resp.GetUsers() { + if u == nil || u.GetEmail() == "" { + continue + } + ips := make([]OnlineIP, 0, len(u.GetIps())) + for _, entry := range u.GetIps() { + if entry == nil || entry.GetIp() == "" { + continue + } + ips = append(ips, OnlineIP{IP: entry.GetIp(), LastSeen: entry.GetLastSeen()}) + } + users = append(users, OnlineUser{Email: u.GetEmail(), IPs: ips}) + } + return users, nil +} + +// IsUnimplementedErr reports whether err is the running core saying it lacks an +// RPC (an older Xray binary without the online-stats API). +func IsUnimplementedErr(err error) bool { + return status.Code(err) == codes.Unimplemented +} + // processTraffic aggregates a traffic stat into trafficMap using regex matches and value. func processTraffic(matches []string, value int64, trafficMap map[string]*Traffic) { isInbound := matches[1] == "inbound" diff --git a/internal/xray/api_e2e_test.go b/internal/xray/api_e2e_test.go index c25dd065c..0d01eebf9 100644 --- a/internal/xray/api_e2e_test.go +++ b/internal/xray/api_e2e_test.go @@ -53,8 +53,12 @@ func TestXrayAPI_E2E(t *testing.T) { map[string]any{"type": "field", "inboundTag": []string{"api"}, "outboundTag": "api"}, }, }, - "policy": map[string]any{}, - "stats": map[string]any{}, + "policy": map[string]any{ + "levels": map[string]any{ + "0": map[string]any{"statsUserOnline": true}, + }, + }, + "stats": map[string]any{}, } cfgBytes, err := json.MarshalIndent(cfg, "", " ") if err != nil { @@ -130,6 +134,19 @@ func TestXrayAPI_E2E(t *testing.T) { t.Fatalf("missing inbound error not matched by IsMissingHandlerErr: %q", err) } + // --- online-stats API --- + // statsUserOnline is enabled in the policy above; with no client + // connections the call must succeed and return an empty set. This proves + // the GetUsersStats plumbing against a real core (an older binary would + // return Unimplemented here — see IsUnimplementedErr). + online, err := api.GetOnlineUsers() + if err != nil { + t.Fatalf("GetOnlineUsers: %v", err) + } + if len(online) != 0 { + t.Fatalf("expected no online users on an idle core, got %+v", online) + } + // --- routing (rules + balancers replace) --- newRouting := []byte(`{ "domainStrategy": "AsIs", diff --git a/internal/xray/online_test.go b/internal/xray/online_test.go index 23e8e65a6..0c5a3ed4e 100644 --- a/internal/xray/online_test.go +++ b/internal/xray/online_test.go @@ -129,3 +129,21 @@ func TestClearNodeOnlineClientsDropsNode(t *testing.T) { t.Errorf("node 3's subtree should be absent after ClearNodeOnlineClients") } } + +// TestOnlineAPISupportTriState pins the lazy capability probe contract: a new +// process starts Unknown (so the first caller probes), and the flag holds +// whatever the probe recorded until the process is replaced on restart. +func TestOnlineAPISupportTriState(t *testing.T) { + p := newOnlineTestProcess() + if got := p.OnlineAPISupport(); got != OnlineAPIUnknown { + t.Fatalf("new process must start with OnlineAPIUnknown, got %v", got) + } + p.SetOnlineAPISupport(OnlineAPISupported) + if got := p.OnlineAPISupport(); got != OnlineAPISupported { + t.Fatalf("expected OnlineAPISupported, got %v", got) + } + p.SetOnlineAPISupport(OnlineAPIUnsupported) + if got := p.OnlineAPISupport(); got != OnlineAPIUnsupported { + t.Fatalf("expected OnlineAPIUnsupported, got %v", got) + } +} diff --git a/internal/xray/process.go b/internal/xray/process.go index 22cce81aa..d63dfc280 100644 --- a/internal/xray/process.go +++ b/internal/xray/process.go @@ -172,6 +172,12 @@ type process struct { nodeOnlineTrees map[int]map[string][]string onlineMu sync.RWMutex + // onlineAPISupport caches whether the running core implements the + // online-stats RPCs (GetUsersStats). A new process is created on every + // restart/version switch, so the flag resets to Unknown and is re-probed + // lazily by the first caller. + onlineAPISupport atomic.Int32 + config *Config configPath string // if set, use this path instead of GetConfigPath() and remove on Stop logWriter *LogWriter @@ -181,6 +187,29 @@ type process struct { intentionalStop atomic.Bool } +// OnlineAPISupport describes whether the running Xray core implements the +// online-stats API (statsUserOnline + GetUsersStats). +type OnlineAPISupport int32 + +const ( + // OnlineAPIUnknown means support has not been probed yet for this process. + OnlineAPIUnknown OnlineAPISupport = iota + // OnlineAPISupported means the core answered the online-stats RPC. + OnlineAPISupported + // OnlineAPIUnsupported means the core returned Unimplemented (older binary). + OnlineAPIUnsupported +) + +// OnlineAPISupport returns the cached online-stats capability of this process. +func (p *process) OnlineAPISupport() OnlineAPISupport { + return OnlineAPISupport(p.onlineAPISupport.Load()) +} + +// SetOnlineAPISupport records the probed online-stats capability of this process. +func (p *process) SetOnlineAPISupport(v OnlineAPISupport) { + p.onlineAPISupport.Store(int32(v)) +} + var ( xrayGracefulStopTimeout = 5 * time.Second xrayForceStopTimeout = 2 * time.Second