diff --git a/frontend/public/openapi.json b/frontend/public/openapi.json index 517d37ec9..6c9939b49 100644 --- a/frontend/public/openapi.json +++ b/frontend/public/openapi.json @@ -1826,6 +1826,10 @@ "Node": { "description": "Node represents a remote 3x-ui panel registered with the central panel.\nThe central panel polls each node's existing /panel/api/server/status\nendpoint over HTTP using the per-node ApiToken to populate the runtime\nstatus fields below.", "properties": { + "activeCount": { + "example": 23, + "type": "integer" + }, "address": { "example": "node1.example.com", "type": "string" @@ -1863,6 +1867,10 @@ "example": 1, "type": "integer" }, + "disabledCount": { + "example": 3, + "type": "integer" + }, "enable": { "example": true, "type": "boolean" @@ -1993,6 +2001,7 @@ } }, "required": [ + "activeCount", "address", "allowPrivateAddress", "apiToken", @@ -2003,6 +2012,7 @@ "cpuPct", "createdAt", "depletedCount", + "disabledCount", "enable", "guid", "id", @@ -6683,6 +6693,7 @@ "success": true, "obj": [ { + "activeCount": 23, "address": "node1.example.com", "allowPrivateAddress": false, "apiToken": "abcdef0123456789", @@ -6693,6 +6704,7 @@ "cpuPct": 23.5, "createdAt": 1700000000, "depletedCount": 1, + "disabledCount": 3, "enable": true, "guid": "", "id": 1, diff --git a/frontend/src/generated/examples.ts b/frontend/src/generated/examples.ts index e476514f1..8314d55cd 100644 --- a/frontend/src/generated/examples.ts +++ b/frontend/src/generated/examples.ts @@ -396,6 +396,7 @@ export const EXAMPLES: Record = { "success": false }, "Node": { + "activeCount": 23, "address": "node1.example.com", "allowPrivateAddress": false, "apiToken": "abcdef0123456789", @@ -406,6 +407,7 @@ export const EXAMPLES: Record = { "cpuPct": 23.5, "createdAt": 1700000000, "depletedCount": 1, + "disabledCount": 3, "enable": true, "guid": "", "id": 1, diff --git a/frontend/src/generated/schemas.ts b/frontend/src/generated/schemas.ts index 7fed98bd3..d59caa7a2 100644 --- a/frontend/src/generated/schemas.ts +++ b/frontend/src/generated/schemas.ts @@ -1800,6 +1800,10 @@ export const SCHEMAS: Record = { "Node": { "description": "Node represents a remote 3x-ui panel registered with the central panel.\nThe central panel polls each node's existing /panel/api/server/status\nendpoint over HTTP using the per-node ApiToken to populate the runtime\nstatus fields below.", "properties": { + "activeCount": { + "example": 23, + "type": "integer" + }, "address": { "example": "node1.example.com", "type": "string" @@ -1837,6 +1841,10 @@ export const SCHEMAS: Record = { "example": 1, "type": "integer" }, + "disabledCount": { + "example": 3, + "type": "integer" + }, "enable": { "example": true, "type": "boolean" @@ -1967,6 +1975,7 @@ export const SCHEMAS: Record = { } }, "required": [ + "activeCount", "address", "allowPrivateAddress", "apiToken", @@ -1977,6 +1986,7 @@ export const SCHEMAS: Record = { "cpuPct", "createdAt", "depletedCount", + "disabledCount", "enable", "guid", "id", diff --git a/frontend/src/generated/types.ts b/frontend/src/generated/types.ts index b8c8f45d6..7fd234581 100644 --- a/frontend/src/generated/types.ts +++ b/frontend/src/generated/types.ts @@ -394,6 +394,7 @@ export interface Msg { } export interface Node { + activeCount: number; address: string; allowPrivateAddress: boolean; apiToken: string; @@ -404,6 +405,7 @@ export interface Node { cpuPct: number; createdAt: number; depletedCount: number; + disabledCount: number; enable: boolean; guid: string; id: number; diff --git a/frontend/src/generated/zod.ts b/frontend/src/generated/zod.ts index 458e2f99b..c6a725bcf 100644 --- a/frontend/src/generated/zod.ts +++ b/frontend/src/generated/zod.ts @@ -423,6 +423,7 @@ export const MsgSchema = z.object({ export type Msg = z.infer; export const NodeSchema = z.object({ + activeCount: z.number().int(), address: z.string(), allowPrivateAddress: z.boolean(), apiToken: z.string(), @@ -433,6 +434,7 @@ export const NodeSchema = z.object({ cpuPct: z.number(), createdAt: z.number().int(), depletedCount: z.number().int(), + disabledCount: z.number().int(), enable: z.boolean(), guid: z.string(), id: z.number().int(), diff --git a/frontend/src/pages/inbounds/list/InboundList.tsx b/frontend/src/pages/inbounds/list/InboundList.tsx index c17d074e8..b233456a8 100644 --- a/frontend/src/pages/inbounds/list/InboundList.tsx +++ b/frontend/src/pages/inbounds/list/InboundList.tsx @@ -133,6 +133,11 @@ export default function InboundList({ onSwitchEnable, }); + const tableScrollX = useMemo( + () => columns.reduce((sum, c) => sum + (typeof c.width === 'number' ? c.width : 0), 0), + [columns], + ); + const paginationFor = (rows: DBInboundRecord[]) => { const size = pageSize > 0 ? pageSize : rows.length || 1; return { pageSize: size, showSizeChanger: false, hideOnSinglePage: true }; @@ -252,7 +257,7 @@ export default function InboundList({ onChange: (keys: Key[]) => setSelectedRowKeys(keys as number[]), }} pagination={paginationFor(visibleInbounds)} - scroll={{ x: 1000 }} + scroll={{ x: tableScrollX }} style={{ marginTop: 10 }} size="small" locale={{ diff --git a/frontend/src/pages/inbounds/list/useInboundColumns.tsx b/frontend/src/pages/inbounds/list/useInboundColumns.tsx index 4839ed213..4bb02275a 100644 --- a/frontend/src/pages/inbounds/list/useInboundColumns.tsx +++ b/frontend/src/pages/inbounds/list/useInboundColumns.tsx @@ -57,13 +57,13 @@ export function useInboundColumns({ dataIndex: 'id', key: 'id', align: 'right', - width: 30, + width: 60, }, { title: t('pages.inbounds.operate'), key: 'action', align: 'center', - width: 60, + width: 70, render: (_, record) => ( ( { if (record.nodeId == null) { return {t('pages.inbounds.localPanel')}; @@ -128,7 +128,7 @@ export function useInboundColumns({ dataIndex: 'subSortIndex', key: 'subSortIndex', align: 'right', - width: 70, + width: 90, }); } @@ -138,13 +138,13 @@ export function useInboundColumns({ dataIndex: 'port', key: 'port', align: 'center', - width: 40, + width: 80, }, { title: t('pages.inbounds.protocol'), key: 'protocol', align: 'left', - width: 130, + width: 190, render: (_, record) => { const tags: ReactElement[] = [{record.protocol}]; if (record.isWireguard || record.isHysteria) { @@ -172,7 +172,7 @@ export function useInboundColumns({ title: t('clients'), key: 'clients', align: 'left', - width: 110, + width: 200, render: (_, record) => { const cc = clientCount[record.id]; if (!cc) return null; @@ -237,7 +237,7 @@ export function useInboundColumns({ title: t('pages.inbounds.traffic'), key: 'traffic', align: 'center', - width: 90, + width: 140, render: (_, record) => ( { const speed = inboundSpeed[record.id]; if (!isActiveSpeed(speed)) { @@ -282,7 +282,7 @@ export function useInboundColumns({ title: t('pages.inbounds.expireDate'), key: 'expiryTime', align: 'center', - width: 40, + width: 100, render: (_, record) => { if (record.expiryTime > 0) { return ( diff --git a/frontend/src/pages/inbounds/useInbounds.ts b/frontend/src/pages/inbounds/useInbounds.ts index 723e5182f..f37a68bb8 100644 --- a/frontend/src/pages/inbounds/useInbounds.ts +++ b/frontend/src/pages/inbounds/useInbounds.ts @@ -32,6 +32,14 @@ type DBInboundInstance = InstanceType; // deltas accumulated over this window, so dividing by it yields bytes/sec. const TRAFFIC_POLL_INTERVAL_S = 5; +// Speed is delta-derived, so it can't be recomputed until the first poll after +// mount; navigating away and back would otherwise blank the column for up to one +// poll. Cache the last speed map across mounts (module scope) and reseed from it +// while recent, so returning to the page shows the last throughput immediately +// and the next poll refreshes it. +const SPEED_CACHE_TTL_MS = 15000; +let inboundSpeedCache: { at: number; data: Record } = { at: 0, data: {} }; + interface TrafficDelta { Tag: string; Up: number; @@ -191,7 +199,12 @@ export function useInbounds() { const [clientCount, setClientCount] = useState>({}); const [statsVersion, setStatsVersion] = useState(0); - const [inboundSpeed, setInboundSpeed] = useState>({}); + const [inboundSpeed, setInboundSpeed] = useState>(() => + Date.now() - inboundSpeedCache.at < SPEED_CACHE_TTL_MS ? inboundSpeedCache.data : {}, + ); + useEffect(() => { + inboundSpeedCache = { at: Date.now(), data: inboundSpeed }; + }, [inboundSpeed]); const [onlineClients, setOnlineClients] = useState([]); const onlineClientsRef = useRef([]); @@ -399,6 +412,7 @@ export function useInbounds() { if (!payload || typeof payload !== 'object') return; const p = payload as { traffics?: TrafficDelta[]; + nodeTraffics?: TrafficDelta[]; onlineClients?: string[]; onlineByGuid?: Record; activeInbounds?: Record; @@ -417,26 +431,40 @@ export function useInbounds() { if (p.lastOnlineMap && typeof p.lastOnlineMap === 'object') { setLastOnlineMap((prev) => ({ ...prev, ...p.lastOnlineMap! })); } - // Full-replace each poll so idle inbounds (and an empty array after an - // Xray stat reset) clear their speed instead of showing a stale value. - if (Array.isArray(p.traffics)) { + // Speed arrives from two independent 5s polls: the local Xray poll sends + // `traffics` (local inbounds) and the node sync sends `nodeTraffics` (node + // inbounds). Each replaces speed only within its own scope so the two don't + // clobber each other; an idle in-scope inbound — absent from its payload — + // clears instead of showing a stale value. + const applyTraffics = ( + traffics: TrafficDelta[], + inScope: (ib: DBInboundInstance) => boolean, + ) => { const byTag = new Map(); - for (const tr of p.traffics) { + for (const tr of traffics) { if (!tr || typeof tr.Tag !== 'string') continue; if (tr.IsInbound === false) continue; byTag.set(tr.Tag, tr); } - const nextSpeed: Record = {}; - for (const ib of dbInboundsRef.current) { - const delta = byTag.get(ib.tag); - if (!delta) continue; - nextSpeed[ib.id] = { - up: (delta.Up || 0) / TRAFFIC_POLL_INTERVAL_S, - down: (delta.Down || 0) / TRAFFIC_POLL_INTERVAL_S, - }; - } - setInboundSpeed(nextSpeed); - } + setInboundSpeed((prev) => { + const next = { ...prev }; + for (const ib of dbInboundsRef.current) { + if (!inScope(ib)) continue; + const delta = byTag.get(ib.tag); + if (delta) { + next[ib.id] = { + up: (delta.Up || 0) / TRAFFIC_POLL_INTERVAL_S, + down: (delta.Down || 0) / TRAFFIC_POLL_INTERVAL_S, + }; + } else { + delete next[ib.id]; + } + } + return next; + }); + }; + if (Array.isArray(p.traffics)) applyTraffics(p.traffics, (ib) => ib.nodeId == null); + if (Array.isArray(p.nodeTraffics)) applyTraffics(p.nodeTraffics, (ib) => ib.nodeId != null); rebuildClientCount(); }, [rebuildClientCount], diff --git a/frontend/src/pages/nodes/NodeList.tsx b/frontend/src/pages/nodes/NodeList.tsx index 1348f82e6..c07b29f43 100644 --- a/frontend/src/pages/nodes/NodeList.tsx +++ b/frontend/src/pages/nodes/NodeList.tsx @@ -28,6 +28,7 @@ import { PlusOutlined, RightOutlined, SafetyCertificateOutlined, + TeamOutlined, ThunderboltOutlined, } from '@ant-design/icons'; @@ -384,15 +385,29 @@ export default function NodeList({ { title: t('clients'), align: 'center', - width: 160, + width: 180, render: (_value, record) => ( - - {record.clientCount || 0} - {record.onlineCount ? ( - {record.onlineCount} {t('online')} + + {record.clientCount || 0} + {record.activeCount ? ( + + {record.activeCount} + + ) : null} + {record.disabledCount ? ( + + {record.disabledCount} + ) : null} {record.depletedCount ? ( - {record.depletedCount} {t('depleted')} + + {record.depletedCount} + + ) : null} + {record.onlineCount ? ( + + {record.onlineCount} + ) : null} ), @@ -587,13 +602,19 @@ export default function NodeList({
{t('clients')} - {statsNode.clientCount || 0} - {statsNode.onlineCount ? ( - {statsNode.onlineCount} {t('online')} + {statsNode.clientCount || 0} + {statsNode.activeCount ? ( + {statsNode.activeCount} {t('subscription.active')} + ) : null} + {statsNode.disabledCount ? ( + {statsNode.disabledCount} {t('disabled')} ) : null} {statsNode.depletedCount ? ( {statsNode.depletedCount} {t('depleted')} ) : null} + {statsNode.onlineCount ? ( + {statsNode.onlineCount} {t('online')} + ) : null}
{t('pages.nodes.lastHeartbeat')} diff --git a/frontend/src/schemas/node.ts b/frontend/src/schemas/node.ts index 52b640c52..6fb24a469 100644 --- a/frontend/src/schemas/node.ts +++ b/frontend/src/schemas/node.ts @@ -20,6 +20,8 @@ export const NodeRecordSchema = z.object({ inboundCount: z.number().optional(), clientCount: z.number().optional(), onlineCount: z.number().optional(), + activeCount: z.number().optional(), + disabledCount: z.number().optional(), depletedCount: z.number().optional(), lastHeartbeat: z.number().optional(), lastError: z.string().optional(), diff --git a/internal/database/model/model.go b/internal/database/model/model.go index 33083f531..0b5604298 100644 --- a/internal/database/model/model.go +++ b/internal/database/model/model.go @@ -545,6 +545,8 @@ type Node struct { InboundCount int `json:"inboundCount" gorm:"-" example:"5"` ClientCount int `json:"clientCount" gorm:"-" example:"27"` OnlineCount int `json:"onlineCount" gorm:"-" example:"3"` + ActiveCount int `json:"activeCount" gorm:"-" example:"23"` + DisabledCount int `json:"disabledCount" gorm:"-" example:"3"` DepletedCount int `json:"depletedCount" gorm:"-" example:"1"` // ParentGuid + Transitive are set only when a node is surfaced as part of a diff --git a/internal/web/job/node_traffic_sync_job.go b/internal/web/job/node_traffic_sync_job.go index e919a2e54..2618cd7b9 100644 --- a/internal/web/job/node_traffic_sync_job.go +++ b/internal/web/job/node_traffic_sync_job.go @@ -12,6 +12,7 @@ import ( "github.com/mhsanaei/3x-ui/v3/internal/web/runtime" "github.com/mhsanaei/3x-ui/v3/internal/web/service" "github.com/mhsanaei/3x-ui/v3/internal/web/websocket" + "github.com/mhsanaei/3x-ui/v3/internal/xray" ) const ( @@ -21,8 +22,18 @@ const ( nodeClientIpSyncInterval = 10 * time.Second nodeClientIpSyncTimeout = 6 * time.Second nodeGlobalPushInterval = 30 * time.Second + // nodeInboundSpeedWindowMs is the poll window node-inbound speed deltas are + // normalized to; it MUST match the dashboard's TRAFFIC_POLL_INTERVAL_S (5s), + // the fixed divisor the frontend applies to turn a delta into a rate. + nodeInboundSpeedWindowMs int64 = 5000 ) +// inboundSample is a node inbound's last-seen cumulative up/down and the time +// (unix millis) its counter last changed, used to derive a normalized speed. +type inboundSample struct { + up, down, at int64 +} + type NodeTrafficSyncJob struct { nodeService service.NodeService inboundService service.InboundService @@ -34,6 +45,14 @@ type NodeTrafficSyncJob struct { lastIpSync int64 globalPushMu sync.Mutex lastGlobalPush int64 + // noGuidIpEndpoint tracks nodes (by id) whose client-IP attribution endpoint + // returned 404, so an old-build node is noted once instead of every cycle. + noGuidIpEndpoint sync.Map + // prevInboundTotals holds the previous poll's cumulative up/down (and the time + // the counter last changed) per node inbound tag, so the next poll can derive + // a per-inbound speed delta — node inbounds have no local Xray poll. Touched + // only from Run (serialized). + prevInboundTotals map[string]inboundSample } type atomicBool struct { @@ -137,6 +156,10 @@ func (j *NodeTrafficSyncJob) Run() { // xray's clients and inbounds still age out between traffic polls. j.inboundService.RefreshLocalOnlineClients(nil, nil) + // Derive per-node-inbound speed every tick (keeps the baseline fresh even + // with no dashboard open); only broadcast it when someone is watching. + inboundSpeed := j.nodeInboundSpeed() + if !websocket.HasClients() { return } @@ -145,12 +168,18 @@ func (j *NodeTrafficSyncJob) Run() { if online == nil { online = []string{} } - websocket.BroadcastTraffic(map[string]any{ + trafficPayload := map[string]any{ "onlineClients": online, "onlineByGuid": j.inboundService.GetOnlineClientsByGuid(), "activeInbounds": j.inboundService.GetActiveInboundsByGuid(), "lastOnlineMap": lastOnline, - }) + } + // Always send the key so the dashboard clears node inbounds that went idle + // this tick. A nil result (query error) marshals to null and is skipped + // client-side, leaving the last shown value untouched; an empty (non-nil) + // slice marshals to [] and clears stale speeds. + trafficPayload["nodeTraffics"] = inboundSpeed + websocket.BroadcastTraffic(trafficPayload) clientStats := map[string]any{} if stats, err := j.inboundService.GetAllClientTraffics(); err != nil { @@ -173,6 +202,65 @@ func (j *NodeTrafficSyncJob) Run() { } } +// nodeInboundSpeed derives a per-node-inbound speed delta by diffing the current +// cumulative up/down against the previous poll's, keyed by the central tag the +// dashboard matches. The node's counter keeps climbing while the master can't +// reach it, so the first delta after a gap (node outage, skipped poll, slow +// node) spans more than one poll window; it is normalized to the fixed +// nodeInboundSpeedWindowMs using the real elapsed time so the dashboard's fixed +// divisor yields the true average rate over the gap instead of an impossible +// one-tick spike. The change timestamp only advances when the value actually +// moves, so an idle stretch is averaged correctly when traffic resumes. A reset +// rebaselines to the lower value; a first-seen tag yields no delta until the +// next poll. +func (j *NodeTrafficSyncJob) nodeInboundSpeed() []*xray.Traffic { + totals, err := j.inboundService.GetNodeInboundTrafficTotals() + if err != nil { + return nil + } + now := time.Now().UnixMilli() + deltas := make([]*xray.Traffic, 0, len(totals)) + next := make(map[string]inboundSample, len(totals)) + for tag, cur := range totals { + prev, ok := j.prevInboundTotals[tag] + if !ok { + next[tag] = inboundSample{up: cur[0], down: cur[1], at: now} + continue + } + dUp := cur[0] - prev.up + dDown := cur[1] - prev.down + if dUp <= 0 && dDown <= 0 { + // No movement, or a counter reset: hold the change timestamp so a + // later jump is averaged over the real elapsed window, not shown as a + // spike. Adopt the lower value on a reset. + if cur[0] < prev.up || cur[1] < prev.down { + next[tag] = inboundSample{up: cur[0], down: cur[1], at: now} + } else { + next[tag] = prev + } + continue + } + if dUp < 0 { + dUp = 0 + } + if dDown < 0 { + dDown = 0 + } + elapsed := now - prev.at + if elapsed < nodeInboundSpeedWindowMs { + elapsed = nodeInboundSpeedWindowMs + } + up := dUp * nodeInboundSpeedWindowMs / elapsed + down := dDown * nodeInboundSpeedWindowMs / elapsed + if up > 0 || down > 0 { + deltas = append(deltas, &xray.Traffic{Tag: tag, IsInbound: true, Up: up, Down: down}) + } + next[tag] = inboundSample{up: cur[0], down: cur[1], at: now} + } + j.prevInboundTotals = next + return deltas +} + // maybePushGlobals broadcasts this panel's aggregated per-client usage to its // online nodes so each node can display the client's cross-panel total and // enforce its quota locally (see InboundService.AcceptGlobalTraffic). Scoped @@ -303,12 +391,22 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy // Per-node IP attribution: pull the node's guid-keyed subtree (its own // observations plus any descendants) so the master can tell which node each - // IP is on. Old nodes without the endpoint just return an error — skip them. + // IP is on. Old nodes without the endpoint return HTTP 404 every cycle — note + // it once per node (re-armed on recovery) instead of flooding the log. if guidTrees, err := rt.FetchClientIpsByGuid(ipCtx); err != nil { - logger.Debugf("node traffic sync: fetch client ip attribution from %s failed: %v", n.Name, err) - } else if len(guidTrees) > 0 { - if err := j.inboundService.MergeClientIpsByGuid(guidTrees); err != nil { - logger.Warningf("node traffic sync: merge client ip attribution from %s failed: %v", n.Name, err) + if strings.Contains(err.Error(), "HTTP 404") { + if _, seen := j.noGuidIpEndpoint.LoadOrStore(n.Id, true); !seen { + logger.Debugf("node traffic sync: node %s has no client-IP attribution endpoint (old build)", n.Name) + } + } else { + logger.Debugf("node traffic sync: fetch client ip attribution from %s failed: %v", n.Name, err) + } + } else { + j.noGuidIpEndpoint.Delete(n.Id) + if len(guidTrees) > 0 { + if err := j.inboundService.MergeClientIpsByGuid(n, guidTrees); err != nil { + logger.Warningf("node traffic sync: merge client ip attribution from %s failed: %v", n.Name, err) + } } } } diff --git a/internal/web/service/email/subscriber.go b/internal/web/service/email/subscriber.go index bb4a319e9..bf8b483fd 100644 --- a/internal/web/service/email/subscriber.go +++ b/internal/web/service/email/subscriber.go @@ -31,6 +31,9 @@ func NewSubscriber(settingService service.SettingService, emailService *EmailSer // HandleEvent is the eventbus subscriber callback. func (s *Subscriber) HandleEvent(e eventbus.Event) { + if on, err := s.settingService.GetSmtpEnable(); err != nil || !on { + return + } if !s.isEventEnabled(e.Type) { return } diff --git a/internal/web/service/inbound_node.go b/internal/web/service/inbound_node.go index 34a437d2b..c7782f962 100644 --- a/internal/web/service/inbound_node.go +++ b/internal/web/service/inbound_node.go @@ -201,6 +201,29 @@ func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnaps return structuralChange, err } +// GetNodeInboundTrafficTotals returns the current cumulative up/down for every +// node-hosted inbound, keyed by tag. The node sync diffs successive snapshots of +// this to derive per-inbound speed for the dashboard — node inbounds have no +// local Xray poll to produce live deltas the way local inbounds do. +func (s *InboundService) GetNodeInboundTrafficTotals() (map[string][2]int64, error) { + var rows []struct { + Tag string + Up int64 + Down int64 + } + if err := database.GetDB().Table("inbounds"). + Select("tag, up, down"). + Where("node_id IS NOT NULL"). + Scan(&rows).Error; err != nil { + return nil, err + } + out := make(map[string][2]int64, len(rows)) + for _, r := range rows { + out[r.Tag] = [2]int64{r.Up, r.Down} + } + return out, nil +} + func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) { if snap == nil || nodeID <= 0 { return false, nil @@ -209,18 +232,23 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi now := time.Now().UnixMilli() // originGuidFor attributes a synced inbound to the panel that physically - // hosts it: inbounds the node forwards from its own sub-nodes already carry - // a non-empty OriginNodeGuid (kept as-is across hops); the node's own local - // inbounds report empty, so they are attributed to the node's own GUID. An - // empty result (old-build node with no GUID yet) leaves attribution to the - // node_id fallback downstream (#4983). + // hosts it. A node's OWN inbounds report either an empty origin or — on + // builds that set it locally — the node's own panelGuid; both resolve to + // selfKey, which is the node's panelGuid unless that GUID is ambiguous + // (shared with another node or the master, i.e. a cloned server), in which + // case it falls back to the node-unique id so #4983 attribution doesn't + // collapse two physical nodes into one bucket. Only a DIFFERENT, non-empty + // origin (an inbound the node forwards from its own sub-node) is kept as-is, + // so a chained Node1->Node2->Node3 still attributes Node3's inbounds to Node3. var nodeRow model.Node db.Select("guid").Where("id = ?", nodeID).First(&nodeRow) + selfKey := effectiveNodeKey(&model.Node{Id: nodeID, Guid: nodeRow.Guid}) + guidShared := nodeRow.Guid != "" && selfKey != nodeRow.Guid originGuidFor := func(snapIb *model.Inbound) string { - if snapIb.OriginNodeGuid != "" { + if snapIb.OriginNodeGuid != "" && snapIb.OriginNodeGuid != nodeRow.Guid { return snapIb.OriginNodeGuid } - return nodeRow.Guid + return selfKey } var central []model.Inbound @@ -494,6 +522,15 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi if dirty { continue } + if len(snapTags) == 0 { + // A node mid-restart or with a transient DB error can return an empty + // inbound list with success=true. Treat "zero inbounds reported" as + // "nothing to say", not "delete all my inbounds" — otherwise a blip + // wipes the node's central inbounds and every client on them (and + // resets traffic history on re-create). A real per-inbound deletion + // still sweeps, because the node keeps reporting its other inbounds. + continue + } if _, kept := snapTags[c.Tag]; kept { continue } @@ -810,14 +847,25 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi if p != nil { tree := snap.OnlineTree - if len(tree) == 0 && len(snap.OnlineEmails) > 0 { + switch { + case len(tree) == 0 && len(snap.OnlineEmails) > 0: // Old-build node (no GUID tree): key its flat online list under its // own effective identity so attribution still works for that branch. - effectiveGuid := nodeRow.Guid - if effectiveGuid == "" { - effectiveGuid = synthNodeGuid(nodeID) + tree = map[string][]string{selfKey: snap.OnlineEmails} + case guidShared && len(tree) > 0: + // Newer cloned node: its own clients arrive keyed under the shared + // panelGuid. Remap just that entry to the node-unique key so the + // clones don't merge; descendant subtrees keep their distinct GUIDs. + if _, ok := tree[nodeRow.Guid]; ok { + remapped := make(map[string][]string, len(tree)) + for g, emails := range tree { + if g == nodeRow.Guid { + g = selfKey + } + remapped[g] = emails + } + tree = remapped } - tree = map[string][]string{effectiveGuid: snap.OnlineEmails} } p.SetNodeOnlineTree(nodeID, tree) } diff --git a/internal/web/service/inbound_node_ips.go b/internal/web/service/inbound_node_ips.go index a173f3f19..13950db15 100644 --- a/internal/web/service/inbound_node_ips.go +++ b/internal/web/service/inbound_node_ips.go @@ -113,8 +113,26 @@ func (s *InboundService) RecordLocalClientIps(panelGuid string, observed map[str // MergeClientIpsByGuid folds a node's guid-keyed attribution report (its own // panelGuid subtree plus any descendants) into the local table, preserving which -// physical node each IP is on across a chain. -func (s *InboundService) MergeClientIpsByGuid(trees map[string]map[string][]model.ClientIpEntry) error { +// physical node each IP is on across a chain. When node is non-nil and its own +// panelGuid is ambiguous (shared with another node or the master — a cloned +// server), the node's own subtree is remapped to its node-unique key so two +// clones don't collapse into one attribution row; descendant subtrees keep their +// distinct GUIDs. A nil node merges the report verbatim. +func (s *InboundService) MergeClientIpsByGuid(node *model.Node, trees map[string]map[string][]model.ClientIpEntry) error { + if node != nil && node.Guid != "" { + if eff := effectiveNodeKey(node); eff != node.Guid { + if sub, ok := trees[node.Guid]; ok { + delete(trees, node.Guid) + if existing, ok := trees[eff]; ok { + for email, ips := range sub { + existing[email] = append(existing[email], ips...) + } + } else { + trees[eff] = sub + } + } + } + } for guid, perEmail := range trees { if err := upsertNodeClientIps(guid, perEmail); err != nil { return err @@ -242,18 +260,24 @@ func (s *InboundService) GetClientIpsWithNodes(email string) ([]ClientIpInfo, er return out, nil } -// nodeGuidNameMap maps each known node's stable guid to its display name. +// nodeGuidNameMap maps each known node's attribution key to its display name, +// keyed by effectiveNodeGuid so a cloned node's IPs (stored under its node-unique +// key) still resolve to the right name instead of colliding under a shared GUID. func (s *InboundService) nodeGuidNameMap() map[string]string { db := database.GetDB() var nodes []model.Node if err := db.Model(&model.Node{}).Find(&nodes).Error; err != nil { return map[string]string{} } + ptrs := make([]*model.Node, len(nodes)) + for i := range nodes { + ptrs[i] = &nodes[i] + } + selfGuid, _ := (&SettingService{}).GetPanelGuid() + ambiguous := ambiguousNodeGuids(ptrs, selfGuid) m := make(map[string]string, len(nodes)) - for _, n := range nodes { - if n.Guid != "" { - m[n.Guid] = n.Name - } + for i := range nodes { + m[effectiveNodeGuid(&nodes[i], ambiguous)] = nodes[i].Name } return m } diff --git a/internal/web/service/inbound_node_ips_test.go b/internal/web/service/inbound_node_ips_test.go index ba4f3769c..c534b14a1 100644 --- a/internal/web/service/inbound_node_ips_test.go +++ b/internal/web/service/inbound_node_ips_test.go @@ -103,7 +103,7 @@ func TestGetClientIpNodeAttribution_NewestGuidWins(t *testing.T) { }); err != nil { t.Fatalf("record gA: %v", err) } - if err := svc.MergeClientIpsByGuid(map[string]map[string][]model.ClientIpEntry{ + if err := svc.MergeClientIpsByGuid(nil, map[string]map[string][]model.ClientIpEntry{ "gB": {"u@x": {{IP: "9.9.9.9", Timestamp: now}}}, }); err != nil { t.Fatalf("merge gB: %v", err) @@ -145,7 +145,7 @@ func TestGetClientIpsWithNodes_LabelsNodes(t *testing.T) { }); err != nil { t.Fatalf("record local: %v", err) } - if err := svc.MergeClientIpsByGuid(map[string]map[string][]model.ClientIpEntry{ + if err := svc.MergeClientIpsByGuid(nil, map[string]map[string][]model.ClientIpEntry{ "node-guid": {"u@x": {{IP: "2.2.2.2", Timestamp: now}}}, }); err != nil { t.Fatalf("merge node: %v", err) diff --git a/internal/web/service/node.go b/internal/web/service/node.go index 3106cac13..6dc53d8b3 100644 --- a/internal/web/service/node.go +++ b/internal/web/service/node.go @@ -14,6 +14,7 @@ import ( "slices" "strconv" "strings" + "sync" "time" "github.com/mhsanaei/3x-ui/v3/internal/database" @@ -122,10 +123,8 @@ func (s *NodeService) GetAll() ([]*model.Node, error) { return nodes, nil } inboundsByNode := make(map[int][]int, len(nodes)) - nodeByInbound := make(map[int]int, len(inboundRows)) for _, row := range inboundRows { inboundsByNode[row.NodeID] = append(inboundsByNode[row.NodeID], row.Id) - nodeByInbound[row.Id] = row.NodeID } type clientCountRow struct { @@ -150,60 +149,105 @@ func (s *NodeService) GetAll() ([]*model.Node, error) { } } - now := time.Now().UnixMilli() - type trafficRow struct { - InboundID int `gorm:"column:inbound_id"` - Email string - Enable bool - Total int64 - Up int64 - Down int64 - ExpiryTime int64 `gorm:"column:expiry_time"` - } - var trafficRows []trafficRow - inboundIDs := make([]int, 0, len(nodeByInbound)) - for id := range nodeByInbound { - inboundIDs = append(inboundIDs, id) - } - // Chunk the IN clause to avoid "too many SQL variables" on SQLite - // when there are many node-owned inbounds (common with many nodes). - // sqliteMaxVars is defined in this package (inbound.go). - for _, batch := range chunkInts(inboundIDs, sqliteMaxVars) { - var page []trafficRow - if err := db.Table("client_traffics"). - Select("inbound_id, email, enable, total, up, down, expiry_time"). - Where("inbound_id IN ?", batch). - Scan(&page).Error; err == nil { - trafficRows = append(trafficRows, page...) - } - } depletedByNode := make(map[int]int) - if len(trafficRows) > 0 { - for _, row := range trafficRows { - nodeID, ok := nodeByInbound[row.InboundID] - if !ok { - continue - } - expired := row.ExpiryTime > 0 && row.ExpiryTime <= now - exhausted := row.Total > 0 && row.Up+row.Down >= row.Total - if expired || exhausted || !row.Enable { - depletedByNode[nodeID]++ - } + disabledByNode := make(map[int]int) + activeByNode := make(map[int]int) + statuses, _ := s.nodeClientStatuses() + seen := make(map[int]map[int]struct{}, len(nodes)) + for _, st := range statuses { + clientsSeen := seen[st.NodeID] + if clientsSeen == nil { + clientsSeen = make(map[int]struct{}) + seen[st.NodeID] = clientsSeen + } + if _, dup := clientsSeen[st.ClientID]; dup { + // A client attached to several inbounds of one node counts once, + // matching the distinct ClientCount above. + continue + } + clientsSeen[st.ClientID] = struct{}{} + switch { + case st.Depleted: + depletedByNode[st.NodeID]++ + case st.Disabled: + disabledByNode[st.NodeID]++ + default: + activeByNode[st.NodeID]++ } } onlineByGuid := s.onlineEmailsByGuid() + selfGuid, _ := (&SettingService{}).GetPanelGuid() + ambiguous := ambiguousNodeGuids(nodes, selfGuid) for _, n := range nodes { n.InboundCount = len(inboundsByNode[n.Id]) n.DepletedCount = depletedByNode[n.Id] + n.DisabledCount = disabledByNode[n.Id] + n.ActiveCount = activeByNode[n.Id] // Online is attributed to the node that physically hosts the client // (by GUID): a client on a sub-node counts under the sub-node, not // the intermediate node it syncs through (#4983). - n.OnlineCount = len(onlineByGuid[effectiveNodeGuid(n)]) + n.OnlineCount = len(onlineByGuid[effectiveNodeGuid(n, ambiguous)]) } return nodes, nil } +// nodeClientStatus is one node-hosted client's classification, carrying enough +// identity for callers to bucket it by node id or by attribution GUID. +type nodeClientStatus struct { + InboundID int + NodeID int + ClientID int + Depleted bool + Disabled bool +} + +// nodeClientStatuses classifies every client attached to a node-hosted inbound as +// depleted / disabled / active, matching client_traffics by EMAIL rather than by +// inbound_id. client_traffics.inbound_id goes stale after an inbound is +// delete+recreated, so filtering by it silently drops most rows; the +// client_inbounds -> clients join is the reliable client set and the email join +// pulls each client's live counters. Precedence matches the inbound page: +// depleted (expired/exhausted) wins over disabled. +func (s *NodeService) nodeClientStatuses() ([]nodeClientStatus, error) { + type row struct { + InboundID int `gorm:"column:inbound_id"` + NodeID int `gorm:"column:node_id"` + ClientID int `gorm:"column:client_id"` + Enable bool `gorm:"column:enable"` + Total int64 `gorm:"column:total"` + Up int64 `gorm:"column:up"` + Down int64 `gorm:"column:down"` + ExpiryTime int64 `gorm:"column:expiry_time"` + } + var rows []row + if err := database.GetDB().Table("inbounds"). + Select("inbounds.id AS inbound_id, inbounds.node_id AS node_id, clients.id AS client_id, " + + "clients.enable AS enable, ct.total AS total, ct.up AS up, ct.down AS down, ct.expiry_time AS expiry_time"). + Joins("JOIN client_inbounds ON client_inbounds.inbound_id = inbounds.id"). + Joins("JOIN clients ON clients.id = client_inbounds.client_id"). + Joins("LEFT JOIN client_traffics ct ON ct.email = clients.email"). + Where("inbounds.node_id IS NOT NULL"). + Scan(&rows).Error; err != nil { + return nil, err + } + now := time.Now().UnixMilli() + out := make([]nodeClientStatus, 0, len(rows)) + for _, r := range rows { + st := nodeClientStatus{InboundID: r.InboundID, NodeID: r.NodeID, ClientID: r.ClientID} + expired := r.ExpiryTime > 0 && r.ExpiryTime <= now + exhausted := r.Total > 0 && r.Up+r.Down >= r.Total + switch { + case expired || exhausted: + st.Depleted = true + case !r.Enable: + st.Disabled = true + } + out = append(out, st) + } + return out, nil +} + func (s *NodeService) onlineEmailsByGuid() map[string]map[string]struct{} { svc := InboundService{} byGuid := svc.GetOnlineClientsByGuid() @@ -218,14 +262,69 @@ func (s *NodeService) onlineEmailsByGuid() map[string]map[string]struct{} { return out } -// effectiveNodeGuid is a node's stable online-attribution key: its reported -// panelGuid, or a master-local synthetic id when the node is an old build that -// hasn't reported one yet (#4983). -func effectiveNodeGuid(n *model.Node) string { - if n.Guid != "" { - return n.Guid +// effectiveNodeGuid is a node's stable online/inbound attribution key: its +// reported panelGuid, or a master-local synthetic node-id fallback when the node +// has no GUID yet (old build) or its GUID is ambiguous. ambiguous comes from +// ambiguousNodeGuids. +func effectiveNodeGuid(n *model.Node, ambiguous map[string]struct{}) string { + if n.Guid == "" { + return synthNodeGuid(n.Id) } - return synthNodeGuid(n.Id) + if n.Id > 0 { + if _, bad := ambiguous[n.Guid]; bad { + return synthNodeGuid(n.Id) + } + } + return n.Guid +} + +// ambiguousNodeGuids returns the panelGuids a node must not be attributed under +// directly, because doing so would merge two distinct identities: a GUID +// reported by more than one of this master's direct nodes (cloned node servers +// ship the same panelGuid in their copied settings), or a GUID equal to the +// master's own panelGuid (a node cloned from the master). A node holding such a +// GUID falls back to its node-unique synthNodeGuid. Transitive sub-nodes (Id 0) +// carry distinct descendant GUIDs by construction and are excluded. +func ambiguousNodeGuids(nodes []*model.Node, selfGuid string) map[string]struct{} { + counts := make(map[string]int, len(nodes)) + for _, n := range nodes { + if n.Id > 0 && n.Guid != "" { + counts[n.Guid]++ + } + } + ambiguous := make(map[string]struct{}) + for guid, c := range counts { + if c > 1 { + ambiguous[guid] = struct{}{} + } + } + if selfGuid != "" { + if _, ok := counts[selfGuid]; ok { + ambiguous[selfGuid] = struct{}{} + } + } + return ambiguous +} + +// effectiveNodeKey returns one node's attribution key without a preloaded node +// list — its panelGuid when that GUID uniquely identifies it among the master's +// nodes and differs from the master's own, otherwise its node-unique +// synthNodeGuid. Same rule as effectiveNodeGuid + ambiguousNodeGuids, for the +// write paths that handle a single node (online tree, IP attribution). +func effectiveNodeKey(node *model.Node) string { + if node == nil { + return "" + } + if node.Guid == "" { + return synthNodeGuid(node.Id) + } + var sameGuid int64 + database.GetDB().Model(&model.Node{}).Where("guid = ?", node.Guid).Count(&sameGuid) + masterGuid, _ := (&SettingService{}).GetPanelGuid() + if sameGuid > 1 || node.Guid == masterGuid { + return synthNodeGuid(node.Id) + } + return node.Guid } func (s *NodeService) GetById(id int) (*model.Node, error) { @@ -456,7 +555,9 @@ func (s *NodeService) Delete(id int) error { return common.NewError(fmt.Sprintf("cannot delete node: %d inbound(s) still attached to it; detach or delete them first", attached)) } // Capture the node's guid before deleting the row so we can drop its per-node - // IP attribution (NodeClientIp is keyed by guid, not node id). + // IP attribution. NodeClientIp is keyed by the node's attribution key, which + // is its guid normally but its node-unique key for a cloned/ambiguous-guid + // node (see effectiveNodeKey) — so we purge both below. var guid string var n model.Node if err := db.Select("guid").Where("id = ?", id).First(&n).Error; err == nil { @@ -470,10 +571,12 @@ func (s *NodeService) Delete(id int) error { if err := tx.Where("node_id = ?", id).Delete(&model.NodeClientTraffic{}).Error; err != nil { return err } + guids := []string{synthNodeGuid(id)} if guid != "" { - if err := tx.Where("node_guid = ?", guid).Delete(&model.NodeClientIp{}).Error; err != nil { - return err - } + guids = append(guids, guid) + } + if err := tx.Where("node_guid IN ?", guids).Delete(&model.NodeClientIp{}).Error; err != nil { + return err } return tx.Where("id = ?", id).Delete(&model.Node{}).Error }); err != nil { @@ -593,6 +696,7 @@ func (s *NodeService) UpdateHeartbeat(id int, p HeartbeatPatch) error { // failed probe) reports none, so the stable identity survives blips. if p.Guid != "" { updates["guid"] = p.Guid + s.warnOnDuplicateGuid(id, p.Guid) } if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil { return err @@ -607,6 +711,30 @@ func (s *NodeService) UpdateHeartbeat(id int, p HeartbeatPatch) error { return nil } +// warnedDupGuid remembers the (nodeID -> guid) pairs already warned about so a +// cloned-server collision is logged once, not every heartbeat. +var warnedDupGuid sync.Map + +// warnOnDuplicateGuid logs once when a node reports a panelGuid already held by +// another node or by the master itself (the cloned-server footgun). Attribution +// still works — it falls back to node-unique keys — but the operator should +// regenerate the duplicate panelGuid to restore real identity and per-node IP +// attribution. Re-arms if the collision later clears. +func (s *NodeService) warnOnDuplicateGuid(id int, guid string) { + var clash int64 + database.GetDB().Model(&model.Node{}).Where("guid = ? AND id <> ?", guid, id).Count(&clash) + masterGuid, _ := (&SettingService{}).GetPanelGuid() + if clash == 0 && guid != masterGuid { + warnedDupGuid.Delete(id) + return + } + if prev, ok := warnedDupGuid.Load(id); ok && prev == guid { + return + } + warnedDupGuid.Store(id, guid) + logger.Warningf("node %d reports panelGuid %s already used by another node or the master (cloned server?) — regenerate it on that node so online and IP attribution stay per-node", id, guid) +} + func (s *NodeService) MarkNodeDirty(id int) error { if id <= 0 { return nil diff --git a/internal/web/service/node_client_breakdown_test.go b/internal/web/service/node_client_breakdown_test.go new file mode 100644 index 000000000..09c17bcde --- /dev/null +++ b/internal/web/service/node_client_breakdown_test.go @@ -0,0 +1,87 @@ +package service + +import ( + "testing" + + "github.com/mhsanaei/3x-ui/v3/internal/database" + "github.com/mhsanaei/3x-ui/v3/internal/database/model" + "github.com/mhsanaei/3x-ui/v3/internal/xray" +) + +// The node-page client breakdown must classify by EMAIL, not by +// client_traffics.inbound_id — that column goes stale after an inbound is +// delete+recreated, so filtering by it drops almost every row and undercounts +// active/disabled/ended. Here the traffic rows carry a bogus inbound_id (999) +// yet must still be matched to the node's clients by email. +func TestGetAll_ClientBreakdownMatchesByEmailNotStaleInboundId(t *testing.T) { + setupConflictDB(t) + db := database.GetDB() + svc := NodeService{} + + if err := db.Create(&model.Node{Id: 1, Name: "n", Address: "10.0.0.1", Port: 2053, ApiToken: "t", Guid: "g"}).Error; err != nil { + t.Fatalf("create node: %v", err) + } + nid := 1 + ib := &model.Inbound{Tag: "n1-in", Enable: true, Port: 443, Protocol: model.VLESS, Settings: `{"clients":[]}`, NodeID: &nid} + if err := db.Create(ib).Error; err != nil { + t.Fatalf("create inbound: %v", err) + } + + mk := func(id int, email string, enable bool) { + if err := db.Create(&model.ClientRecord{Id: id, Email: email, Enable: enable}).Error; err != nil { + t.Fatalf("create client %s: %v", email, err) + } + if !enable { + // Enable has gorm:"default:true", so a zero-value (false) create is + // dropped and the DB applies true — force the disabled state explicitly. + if err := db.Model(&model.ClientRecord{}).Where("id = ?", id).Update("enable", false).Error; err != nil { + t.Fatalf("disable client %s: %v", email, err) + } + } + if err := db.Create(&model.ClientInbound{ClientId: id, InboundId: ib.Id}).Error; err != nil { + t.Fatalf("attach client %s: %v", email, err) + } + } + mk(1, "active@x", true) + mk(2, "disabled@x", false) + mk(3, "depleted@x", true) + + // Traffic rows carry a STALE inbound_id (999), unrelated to the node inbound. + const staleInboundID = 999 + rows := []*xray.ClientTraffic{ + {InboundId: staleInboundID, Email: "active@x", Enable: true}, + {InboundId: staleInboundID, Email: "disabled@x", Enable: false}, + {InboundId: staleInboundID, Email: "depleted@x", Enable: true, Total: 100, Up: 60, Down: 60}, // exhausted + } + for _, r := range rows { + if err := db.Create(r).Error; err != nil { + t.Fatalf("create traffic %s: %v", r.Email, err) + } + } + + nodes, err := svc.GetAll() + if err != nil { + t.Fatalf("GetAll: %v", err) + } + var n *model.Node + for _, x := range nodes { + if x.Id == 1 { + n = x + } + } + if n == nil { + t.Fatal("node 1 not found") + } + if n.ClientCount != 3 { + t.Errorf("ClientCount = %d, want 3", n.ClientCount) + } + if n.ActiveCount != 1 { + t.Errorf("ActiveCount = %d, want 1 (matched by email despite stale inbound_id)", n.ActiveCount) + } + if n.DisabledCount != 1 { + t.Errorf("DisabledCount = %d, want 1", n.DisabledCount) + } + if n.DepletedCount != 1 { + t.Errorf("DepletedCount = %d, want 1", n.DepletedCount) + } +} diff --git a/internal/web/service/node_origin_guid_test.go b/internal/web/service/node_origin_guid_test.go index fa155aa75..672ff34a7 100644 --- a/internal/web/service/node_origin_guid_test.go +++ b/internal/web/service/node_origin_guid_test.go @@ -70,6 +70,104 @@ func TestSetRemoteTraffic_AttributesOriginNodeGuid(t *testing.T) { } } +// A cloned node reports its OWN inbound with its own (duplicated) panelGuid as +// the origin. That must be remapped to the node-unique key, not stored verbatim +// — otherwise origin_node_guid keeps the shared GUID while online is keyed by +// the node-unique key, and the inbound page reads an empty bucket (shows +// offline). A genuinely forwarded sub-node GUID is still kept across the hop. +func TestSetRemoteTraffic_RemapsClonedNodeOwnGuidOrigin(t *testing.T) { + setupConflictDB(t) + db := database.GetDB() + + // Two nodes share one panelGuid (cloned servers). + for _, n := range []*model.Node{ + {Id: 1, Name: "a", Address: "10.0.0.1", Port: 2053, ApiToken: "t", Guid: "dup"}, + {Id: 2, Name: "b", Address: "10.0.0.2", Port: 2053, ApiToken: "t", Guid: "dup"}, + } { + if err := db.Create(n).Error; err != nil { + t.Fatalf("create node %s: %v", n.Name, err) + } + } + + snap := &runtime.TrafficSnapshot{ + Inbounds: []*model.Inbound{ + { // node 1's OWN inbound, reporting its own (shared) panelGuid as origin + Tag: "own-443-tcp", + Enable: true, + Port: 443, + Protocol: model.VLESS, + Settings: `{"clients":[]}`, + OriginNodeGuid: "dup", + }, + { // forwarded from a sub-node with a distinct guid — kept across the hop + Tag: "fwd-8443-tcp", + Enable: true, + Port: 8443, + Protocol: model.VLESS, + Settings: `{"clients":[]}`, + OriginNodeGuid: "child-guid", + }, + }, + } + + svc := InboundService{} + if _, err := svc.setRemoteTrafficLocked(1, snap, false); err != nil { + t.Fatalf("setRemoteTrafficLocked: %v", err) + } + + origin := func(tag string) string { + var ib model.Inbound + if err := db.Where("tag = ?", tag).First(&ib).Error; err != nil { + t.Fatalf("load inbound %q: %v", tag, err) + } + return ib.OriginNodeGuid + } + + if og := origin("own-443-tcp"); og != "node:1" { + t.Fatalf("cloned node's own inbound origin = %q, want node:1 (remapped from shared GUID)", og) + } + if og := origin("fwd-8443-tcp"); og != "child-guid" { + t.Fatalf("forwarded inbound origin = %q, want child-guid (kept across the hop)", og) + } +} + +// A node mid-restart can return an empty inbound list with success=true. The +// sync must NOT treat that as "delete all my inbounds" — otherwise a blip wipes +// the node's central inbounds and every client on them (what happened to the +// Germany node: 0 clients but still online). +func TestSetRemoteTraffic_EmptySnapshotKeepsCentralInbounds(t *testing.T) { + setupConflictDB(t) + db := database.GetDB() + + const nodeID = 1 + if err := db.Create(&model.Node{ + Id: nodeID, Name: "n", Address: "10.0.0.1", Port: 2053, ApiToken: "t", Guid: "g", + }).Error; err != nil { + t.Fatalf("create node: %v", err) + } + nidPtr := nodeID + if err := db.Create(&model.Inbound{ + UserId: 1, NodeID: &nidPtr, Tag: "remote-in", Enable: true, + Port: 443, Protocol: model.VLESS, Settings: `{"clients":[]}`, + }).Error; err != nil { + t.Fatalf("create central inbound: %v", err) + } + + // Empty snapshot — the node reported no inbounds this cycle. + svc := InboundService{} + if _, err := svc.setRemoteTrafficLocked(nodeID, &runtime.TrafficSnapshot{}, false); err != nil { + t.Fatalf("setRemoteTrafficLocked: %v", err) + } + + var count int64 + if err := db.Model(&model.Inbound{}).Where("tag = ?", "remote-in").Count(&count).Error; err != nil { + t.Fatalf("count inbounds: %v", err) + } + if count != 1 { + t.Fatalf("empty snapshot must not delete the central inbound; got count = %d", count) + } +} + func TestSetRemoteTraffic_PreservesLocalShareAddressStrategy(t *testing.T) { setupConflictDB(t) db := database.GetDB() diff --git a/internal/web/service/node_shared_guid_test.go b/internal/web/service/node_shared_guid_test.go new file mode 100644 index 000000000..bb77f1ceb --- /dev/null +++ b/internal/web/service/node_shared_guid_test.go @@ -0,0 +1,162 @@ +package service + +import ( + "fmt" + "testing" + "time" + + "github.com/mhsanaei/3x-ui/v3/internal/database" + "github.com/mhsanaei/3x-ui/v3/internal/database/model" +) + +// Cloned node servers ship an identical panelGuid in their copied settings, and +// a node cloned from the master shares the master's own GUID. effectiveNodeGuid +// must keep each physical node in its own attribution bucket by falling back to +// the node-unique id for both collision kinds, while leaving a uniquely-named +// node on its real GUID and never folding transitive (Id 0) nodes. +func TestEffectiveNodeGuid_DisambiguatesAmbiguousGuids(t *testing.T) { + nodes := []*model.Node{ + {Id: 1, Guid: "dup"}, + {Id: 2, Guid: "dup"}, + {Id: 3, Guid: "uniq"}, + {Id: 4, Guid: ""}, + {Id: 5, Guid: "master"}, + {Id: 0, Guid: "transitive"}, + } + ambiguous := ambiguousNodeGuids(nodes, "master") + + if _, ok := ambiguous["dup"]; !ok { + t.Fatalf("dup must be flagged ambiguous, got %v", ambiguous) + } + if _, ok := ambiguous["master"]; !ok { + t.Fatalf("a node sharing the master GUID must be flagged, got %v", ambiguous) + } + if _, ok := ambiguous["uniq"]; ok { + t.Fatalf("uniq must not be flagged, got %v", ambiguous) + } + if _, ok := ambiguous["transitive"]; ok { + t.Fatalf("transitive (Id 0) must not count, got %v", ambiguous) + } + + cases := map[*model.Node]string{ + nodes[0]: "node:1", + nodes[1]: "node:2", + nodes[2]: "uniq", + nodes[3]: "node:4", + nodes[4]: "node:5", + nodes[5]: "transitive", + } + for n, want := range cases { + if got := effectiveNodeGuid(n, ambiguous); got != want { + t.Errorf("effectiveNodeGuid(Id=%d, Guid=%q) = %q, want %q", n.Id, n.Guid, got, want) + } + } +} + +// effectiveNodeKey (the no-preloaded-list variant used by the write paths) must +// agree with the slice helper: fall back to the node-unique id when a GUID is +// shared with another node or with the master, else keep the real GUID. +func TestEffectiveNodeKey_FallsBackOnCollision(t *testing.T) { + setupConflictDB(t) + db := database.GetDB() + selfGuid, _ := (&SettingService{}).GetPanelGuid() + if selfGuid == "" { + t.Fatal("expected a panel guid") + } + + mk := func(id int, name, guid string) *model.Node { + n := &model.Node{Id: id, Name: name, Address: fmt.Sprintf("10.0.0.%d", id), Port: 2053, ApiToken: "t", Guid: guid, Status: "online"} + if err := db.Create(n).Error; err != nil { + t.Fatalf("create %s: %v", name, err) + } + return n + } + dupA := mk(1, "a", "shared") + mk(2, "b", "shared") + uniq := mk(3, "c", "solo") + masterClone := mk(4, "d", selfGuid) + + if got := effectiveNodeKey(dupA); got != "node:1" { + t.Errorf("node-node collision: got %q, want node:1", got) + } + if got := effectiveNodeKey(uniq); got != "solo" { + t.Errorf("unique node: got %q, want solo", got) + } + if got := effectiveNodeKey(masterClone); got != "node:4" { + t.Errorf("master collision: got %q, want node:4", got) + } +} + +// recountByGuid must split per-node counts even when two direct nodes share a +// GUID and their inbounds still carry that shared GUID as origin (pre-backfill). +func TestRecountByGuid_SplitsClonedNodesWithSharedGuid(t *testing.T) { + setupConflictDB(t) + db := database.GetDB() + svc := NodeService{} + selfGuid, _ := (&SettingService{}).GetPanelGuid() + + n1 := &model.Node{Id: 1, Name: "A", Address: "10.0.0.1", Port: 2053, ApiToken: "t", Guid: "dup", Status: "online"} + n2 := &model.Node{Id: 2, Name: "B", Address: "10.0.0.2", Port: 2053, ApiToken: "t", Guid: "dup", Status: "online"} + n3 := &model.Node{Id: 3, Name: "C", Address: "10.0.0.3", Port: 2053, ApiToken: "t", Guid: "uniq", Status: "online"} + for _, n := range []*model.Node{n1, n2, n3} { + if err := db.Create(n).Error; err != nil { + t.Fatalf("create node %s: %v", n.Name, err) + } + } + + id1, id2, id3 := 1, 2, 3 + inbounds := []*model.Inbound{ + {Tag: "a", Port: 1001, Protocol: model.VLESS, Settings: `{"clients":[]}`, Enable: true, NodeID: &id1, OriginNodeGuid: "dup"}, + {Tag: "b", Port: 1002, Protocol: model.VLESS, Settings: `{"clients":[]}`, Enable: true, NodeID: &id2, OriginNodeGuid: "dup"}, + {Tag: "c", Port: 1003, Protocol: model.VLESS, Settings: `{"clients":[]}`, Enable: true, NodeID: &id3, OriginNodeGuid: "uniq"}, + } + for _, ib := range inbounds { + if err := db.Create(ib).Error; err != nil { + t.Fatalf("create inbound %s: %v", ib.Tag, err) + } + } + + nodes := []*model.Node{n1, n2, n3} + svc.recountByGuid(nodes, selfGuid) + + if n1.InboundCount != 1 || n2.InboundCount != 1 { + t.Errorf("cloned nodes must not share inbound counts: n1=%d n2=%d, want 1,1", n1.InboundCount, n2.InboundCount) + } + if n3.InboundCount != 1 { + t.Errorf("unique node InboundCount = %d, want 1", n3.InboundCount) + } +} + +// A cloned node's IP-attribution subtree must be stored under its node-unique +// key, so a second clone sharing the GUID can't overwrite it in node_client_ips. +func TestMergeClientIpsByGuid_RemapsClonedNodeSubtree(t *testing.T) { + setupClientIpTestDB(t) + db := database.GetDB() + svc := &InboundService{} + now := time.Now().Unix() + + n1 := &model.Node{Id: 1, Name: "A", Address: "10.0.0.1", Port: 2053, ApiToken: "t", Guid: "dup", Status: "online"} + n2 := &model.Node{Id: 2, Name: "B", Address: "10.0.0.2", Port: 2053, ApiToken: "t", Guid: "dup", Status: "online"} + for _, n := range []*model.Node{n1, n2} { + if err := db.Create(n).Error; err != nil { + t.Fatalf("create node: %v", err) + } + } + + if err := svc.MergeClientIpsByGuid(n1, map[string]map[string][]model.ClientIpEntry{ + "dup": {"u@x": {{IP: "1.1.1.1", Timestamp: now}}}, + }); err != nil { + t.Fatalf("merge n1: %v", err) + } + + var rows []model.NodeClientIp + if err := db.Find(&rows).Error; err != nil { + t.Fatalf("load rows: %v", err) + } + if len(rows) != 1 { + t.Fatalf("want 1 attribution row, got %d", len(rows)) + } + if rows[0].NodeGuid != "node:1" { + t.Errorf("cloned node IPs must be stored under node-unique key, got %q", rows[0].NodeGuid) + } +} diff --git a/internal/web/service/node_tree.go b/internal/web/service/node_tree.go index f99452dd5..36cc73518 100644 --- a/internal/web/service/node_tree.go +++ b/internal/web/service/node_tree.go @@ -3,7 +3,6 @@ package service import ( "context" "sync" - "time" "github.com/mhsanaei/3x-ui/v3/internal/database" "github.com/mhsanaei/3x-ui/v3/internal/database/model" @@ -174,9 +173,9 @@ func (s *NodeService) recountByGuid(nodes []*model.Node, selfGuid string) { if err := db.Table("inbounds").Select("id, node_id, origin_node_guid").Scan(&ibRows).Error; err != nil { return } + ambiguous := ambiguousNodeGuids(nodes, selfGuid) effByInbound := make(map[int]string, len(ibRows)) inboundCountByGuid := make(map[string]int) - ids := make([]int, 0, len(ibRows)) for _, r := range ibRows { guid := r.OriginNodeGuid if guid == "" { @@ -185,46 +184,58 @@ func (s *NodeService) recountByGuid(nodes []*model.Node, selfGuid string) { } else { guid = selfGuid } + } else if r.NodeID != nil { + // Origin still holds an ambiguous GUID (cloned server / master-shared, + // not yet re-attributed): bucket under the hosting node's unique id so + // the clones don't merge. + if _, bad := ambiguous[guid]; bad { + guid = synthNodeGuid(*r.NodeID) + } } effByInbound[r.Id] = guid inboundCountByGuid[guid]++ - ids = append(ids, r.Id) } - now := time.Now().UnixMilli() + // Classify by EMAIL (not the stale client_traffics.inbound_id) and bucket + // each client under its inbound's effective attribution GUID, deduping a + // client attached to several inbounds under the same GUID. depletedByGuid := make(map[string]int) - if len(ids) > 0 { - type tRow struct { - InboundID int `gorm:"column:inbound_id"` - Enable bool - Total int64 - Up int64 - Down int64 - ExpiryTime int64 `gorm:"column:expiry_time"` - } - var tRows []tRow - if err := db.Table("client_traffics"). - Select("inbound_id, enable, total, up, down, expiry_time"). - Where("inbound_id IN ?", ids).Scan(&tRows).Error; err == nil { - for _, row := range tRows { - guid, ok := effByInbound[row.InboundID] - if !ok { - continue - } - expired := row.ExpiryTime > 0 && row.ExpiryTime <= now - exhausted := row.Total > 0 && row.Up+row.Down >= row.Total - if expired || exhausted || !row.Enable { - depletedByGuid[guid]++ - } + disabledByGuid := make(map[string]int) + activeByGuid := make(map[string]int) + if statuses, err := s.nodeClientStatuses(); err == nil { + seen := make(map[string]map[int]struct{}) + for _, st := range statuses { + guid, ok := effByInbound[st.InboundID] + if !ok { + continue + } + clientsSeen := seen[guid] + if clientsSeen == nil { + clientsSeen = make(map[int]struct{}) + seen[guid] = clientsSeen + } + if _, dup := clientsSeen[st.ClientID]; dup { + continue + } + clientsSeen[st.ClientID] = struct{}{} + switch { + case st.Depleted: + depletedByGuid[guid]++ + case st.Disabled: + disabledByGuid[guid]++ + default: + activeByGuid[guid]++ } } } onlineByGuid := s.onlineEmailsByGuid() for _, n := range nodes { - guid := effectiveNodeGuid(n) + guid := effectiveNodeGuid(n, ambiguous) n.InboundCount = inboundCountByGuid[guid] n.OnlineCount = len(onlineByGuid[guid]) n.DepletedCount = depletedByGuid[guid] + n.DisabledCount = disabledByGuid[guid] + n.ActiveCount = activeByGuid[guid] } }