diff --git a/frontend/public/openapi.json b/frontend/public/openapi.json index fae0f5ff2..7d9505cd9 100644 --- a/frontend/public/openapi.json +++ b/frontend/public/openapi.json @@ -4060,6 +4060,79 @@ } } }, + "/panel/api/server/clientIps": { + "get": { + "tags": [ + "Server" + ], + "summary": "Fetch the fully aggregated inbound_client_ips database table. Used by nodes to sync recently active IPs across the cluster.", + "operationId": "get_panel_api_server_clientIps", + "responses": { + "200": { + "description": "Successful response", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "success": { + "type": "boolean" + }, + "msg": { + "type": "string" + }, + "obj": { + "type": "array", + "items": { + "$ref": "#/components/schemas/InboundClientIps" + } + } + } + }, + "example": { + "success": true, + "obj": [ + { + "clientEmail": "", + "id": 0, + "ips": null + } + ] + } + } + } + } + } + }, + "post": { + "tags": [ + "Server" + ], + "summary": "Submit a list of recently active IP timestamps. The panel merges them with the existing database to maintain a unified global IP-limit view.", + "operationId": "post_panel_api_server_clientIps", + "responses": { + "200": { + "description": "Successful response", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "success": { + "type": "boolean" + }, + "msg": { + "type": "string" + }, + "obj": {} + } + } + } + } + } + } + } + }, "/panel/api/clients/list": { "get": { "tags": [ diff --git a/frontend/src/pages/api-docs/endpoints.ts b/frontend/src/pages/api-docs/endpoints.ts index e2a71cc5c..140d96804 100644 --- a/frontend/src/pages/api-docs/endpoints.ts +++ b/frontend/src/pages/api-docs/endpoints.ts @@ -442,6 +442,21 @@ export const sections: readonly Section[] = [ body: 'sni=example.com', response: '{\n "success": true,\n "obj": {\n "echKeySet": "...",\n "echServerKeys": [...],\n "echConfigList": "..."\n }\n}', }, + { + method: 'GET', + path: '/panel/api/server/clientIps', + summary: 'Fetch the fully aggregated inbound_client_ips database table. Used by nodes to sync recently active IPs across the cluster.', + responseSchema: 'InboundClientIps', + responseSchemaArray: true, + }, + { + method: 'POST', + path: '/panel/api/server/clientIps', + summary: 'Submit a list of recently active IP timestamps. The panel merges them with the existing database to maintain a unified global IP-limit view.', + params: [ + { name: 'ips', in: 'body (json)', type: 'object[]', desc: 'Array of InboundClientIps to merge.' }, + ], + }, ], }, diff --git a/web/controller/server.go b/web/controller/server.go index 0f27bc614..9125ace12 100644 --- a/web/controller/server.go +++ b/web/controller/server.go @@ -9,6 +9,7 @@ import ( "time" "github.com/mhsanaei/3x-ui/v3/database" + "github.com/mhsanaei/3x-ui/v3/database/model" "github.com/mhsanaei/3x-ui/v3/logger" "github.com/mhsanaei/3x-ui/v3/web/entity" "github.com/mhsanaei/3x-ui/v3/web/global" @@ -61,6 +62,7 @@ func (a *ServerController) initRouter(g *gin.RouterGroup) { g.GET("/getNewmldsa65", a.getNewmldsa65) g.GET("/getNewmlkem768", a.getNewmlkem768) g.GET("/getNewVlessEnc", a.getNewVlessEnc) + g.GET("/clientIps", a.getClientIps) g.POST("/stopXrayService", a.stopXrayService) g.POST("/restartXrayService", a.restartXrayService) @@ -72,6 +74,7 @@ func (a *ServerController) initRouter(g *gin.RouterGroup) { g.POST("/xraylogs/:count", a.getXrayLogs) g.POST("/importDB", a.importDB) g.POST("/getNewEchCert", a.getNewEchCert) + g.POST("/clientIps", a.setClientIps) } // startTask registers the @2s ticker that refreshes server status, samples @@ -420,3 +423,18 @@ func (a *ServerController) getNewmlkem768(c *gin.Context) { } jsonObj(c, out, nil) } + +func (a *ServerController) getClientIps(c *gin.Context) { + ips, err := (&service.InboundService{}).GetAllInboundClientIps() + jsonObj(c, ips, err) +} + +func (a *ServerController) setClientIps(c *gin.Context) { + var ips []model.InboundClientIps + if err := c.ShouldBindJSON(&ips); err != nil { + jsonMsg(c, "invalid data", err) + return + } + err := (&service.InboundService{}).MergeInboundClientIps(ips) + jsonMsg(c, "Client IPs merged", err) +} diff --git a/web/job/check_client_ip_job.go b/web/job/check_client_ip_job.go index 41f2484e2..d32d84605 100644 --- a/web/job/check_client_ip_job.go +++ b/web/job/check_client_ip_job.go @@ -242,9 +242,13 @@ func mergeClientIps(old, new []IPWithTimestamp, staleCutoff int64) map[string]in func partitionLiveIps(ipMap map[string]int64, observedThisScan map[string]bool) (live, historical []IPWithTimestamp) { live = make([]IPWithTimestamp, 0, len(observedThisScan)) historical = make([]IPWithTimestamp, 0, len(ipMap)) + now := time.Now().Unix() for ip, ts := range ipMap { entry := IPWithTimestamp{IP: ip, Timestamp: ts} - if observedThisScan[ip] { + // Consider an IP "live" if it was seen locally in this scan, OR if its + // timestamp from the synced database is very recent (e.g. within 2 minutes). + // This ensures cluster-wide limits work even if the IP was seen on another node. + if observedThisScan[ip] || now-ts < 120 { live = append(live, entry) } else { historical = append(historical, entry) diff --git a/web/job/check_client_ip_job_test.go b/web/job/check_client_ip_job_test.go index b6967d032..a55059117 100644 --- a/web/job/check_client_ip_job_test.go +++ b/web/job/check_client_ip_job_test.go @@ -6,6 +6,7 @@ import ( "reflect" "runtime" "testing" + "time" ) func TestMergeClientIps_EvictsStaleOldEntries(t *testing.T) { @@ -149,6 +150,26 @@ func TestPartitionLiveIps_EmptyScanLeavesDbIntact(t *testing.T) { } } +func TestPartitionLiveIps_RecentSyncedIpIsLive(t *testing.T) { + // Synced IPs from other nodes within 2 minutes should be counted as live + // even if they weren't observed in the local scan. + now := time.Now().Unix() + ipMap := map[string]int64{ + "A": now - 30, // synced 30s ago -> live + "B": now - 150, // synced 2m30s ago -> historical + } + observed := map[string]bool{} + + live, historical := partitionLiveIps(ipMap, observed) + + if got := collectIps(live); !reflect.DeepEqual(got, []string{"A"}) { + t.Fatalf("recent IP should be live\ngot: %v\nwant: [A]", got) + } + if got := collectIps(historical); !reflect.DeepEqual(got, []string{"B"}) { + t.Fatalf("older IP should be historical\ngot: %v\nwant: [B]", got) + } +} + func TestCheckFail2BanInstalled_DisabledEnvSkipsClientProbe(t *testing.T) { t.Setenv("XUI_ENABLE_FAIL2BAN", "false") marker := fakeFail2BanClient(t) diff --git a/web/job/node_traffic_sync_job.go b/web/job/node_traffic_sync_job.go index 126b5a0ec..29b639f88 100644 --- a/web/job/node_traffic_sync_job.go +++ b/web/job/node_traffic_sync_job.go @@ -16,6 +16,7 @@ const ( nodeTrafficSyncConcurrency = 8 nodeTrafficSyncRequestTimeout = 4 * time.Second nodeReconcileTimeout = 30 * time.Second + nodeClientIpSyncInterval = 10 * time.Second ) type NodeTrafficSyncJob struct { @@ -25,6 +26,8 @@ type NodeTrafficSyncJob struct { xrayService service.XrayService running sync.Mutex structural atomicBool + ipSyncMu sync.Mutex + lastIpSync int64 } type atomicBool struct { @@ -70,6 +73,16 @@ func (j *NodeTrafficSyncJob) Run() { return } + // Decide once per tick whether this run also syncs client IPs, and stamp the + // clock before the loop so two back-to-back 5s ticks can't both qualify. + doIpSync := false + j.ipSyncMu.Lock() + if now := time.Now().Unix(); now-j.lastIpSync >= int64(nodeClientIpSyncInterval/time.Second) { + doIpSync = true + j.lastIpSync = now + } + j.ipSyncMu.Unlock() + sem := make(chan struct{}, nodeTrafficSyncConcurrency) var wg sync.WaitGroup for _, n := range nodes { @@ -81,7 +94,7 @@ func (j *NodeTrafficSyncJob) Run() { go func(n *model.Node) { defer wg.Done() defer func() { <-sem }() - j.syncOne(mgr, n) + j.syncOne(mgr, n, doIpSync) }(n) } wg.Wait() @@ -151,7 +164,7 @@ func (j *NodeTrafficSyncJob) Run() { } } -func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) { +func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSync bool) { rt, err := mgr.RemoteFor(n) if err != nil { logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err) @@ -190,4 +203,28 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) { if changed { j.structural.set() } + + if !doIpSync { + return + } + + nodeIps, err := rt.FetchAllClientIps(ctx) + if err == nil && len(nodeIps) > 0 { + if err := j.inboundService.MergeInboundClientIps(nodeIps); err != nil { + logger.Warning("node traffic sync: merge client ips from", n.Name, "failed:", err) + } + } else if err != nil { + logger.Warning("node traffic sync: fetch client ips from", n.Name, "failed:", err) + } + + masterIps, err := j.inboundService.GetAllInboundClientIps() + if err != nil { + logger.Warning("node traffic sync: load client ips for push to", n.Name, "failed:", err) + return + } + if len(masterIps) > 0 { + if err := rt.PushAllClientIps(ctx, masterIps); err != nil { + logger.Warning("node traffic sync: push client ips to", n.Name, "failed:", err) + } + } } diff --git a/web/runtime/remote.go b/web/runtime/remote.go index 596e07b62..fc18c2a2c 100644 --- a/web/runtime/remote.go +++ b/web/runtime/remote.go @@ -533,3 +533,22 @@ func isNonEmptySlice(v any) bool { s, ok := v.([]any) return ok && len(s) > 0 } + +func (r *Remote) FetchAllClientIps(ctx context.Context) ([]model.InboundClientIps, error) { + env, err := r.do(ctx, http.MethodGet, "panel/api/server/clientIps", nil) + if err != nil { + return nil, err + } + var ips []model.InboundClientIps + if len(env.Obj) > 0 { + if err := json.Unmarshal(env.Obj, &ips); err != nil { + return nil, fmt.Errorf("decode client ips: %w", err) + } + } + return ips, nil +} + +func (r *Remote) PushAllClientIps(ctx context.Context, ips []model.InboundClientIps) error { + _, err := r.do(ctx, http.MethodPost, "panel/api/server/clientIps", ips) + return err +} diff --git a/web/service/inbound.go b/web/service/inbound.go index f05f6ea4b..b49aacfc1 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -399,6 +399,137 @@ func (s *InboundService) GetInboundOptions(userId int) ([]InboundOption, error) return out, nil } +func (s *InboundService) GetAllInboundClientIps() ([]model.InboundClientIps, error) { + db := database.GetDB() + var ips []model.InboundClientIps + err := db.Model(&model.InboundClientIps{}).Find(&ips).Error + return ips, err +} + +// clientIpStaleAfterSeconds mirrors job.ipStaleAfterSeconds: client IPs older than +// 30 minutes are evicted. Applying the same cutoff inside the cross-node merge keeps +// the synced blob bounded and stops the master's push-back from resurrecting IPs that +// a node has already pruned (otherwise the merge defeats the eviction cluster-wide). +const clientIpStaleAfterSeconds = int64(30 * 60) + +// clientIpEntry is the on-disk shape of each element of InboundClientIps.Ips. Tags +// match job.IPWithTimestamp so the blob round-trips with the access.log scanner. +type clientIpEntry struct { + IP string `json:"ip"` + Timestamp int64 `json:"timestamp"` +} + +// mergeClientIpEntries unions old and incoming IP observations, dropping anything +// older than cutoff, keeping the most recent timestamp per IP, and returning the +// result sorted newest-first. +func mergeClientIpEntries(old, incoming []clientIpEntry, cutoff int64) []clientIpEntry { + ipMap := make(map[string]int64, len(old)+len(incoming)) + for _, e := range old { + if e.Timestamp < cutoff { + continue + } + ipMap[e.IP] = e.Timestamp + } + for _, e := range incoming { + if e.Timestamp < cutoff { + continue + } + if cur, ok := ipMap[e.IP]; !ok || e.Timestamp > cur { + ipMap[e.IP] = e.Timestamp + } + } + out := make([]clientIpEntry, 0, len(ipMap)) + for ip, ts := range ipMap { + out = append(out, clientIpEntry{IP: ip, Timestamp: ts}) + } + sort.Slice(out, func(i, j int) bool { return out[i].Timestamp > out[j].Timestamp }) + return out +} + +// MergeInboundClientIps folds client IPs synced from another node into the local +// inbound_client_ips table without double-counting an IP seen on multiple nodes and +// without resurrecting stale entries. Existing rows are updated in place; brand-new +// clients (typically node-only clients with no local row) are created with a fresh +// local id. +func (s *InboundService) MergeInboundClientIps(incomingIps []model.InboundClientIps) error { + db := database.GetDB() + var currentIps []model.InboundClientIps + if err := db.Model(&model.InboundClientIps{}).Find(¤tIps).Error; err != nil { + return err + } + + currentMap := make(map[string]*model.InboundClientIps, len(currentIps)) + for i := range currentIps { + currentMap[currentIps[i].ClientEmail] = ¤tIps[i] + } + + now := time.Now().Unix() + cutoff := now - clientIpStaleAfterSeconds + + tx := db.Begin() + defer func() { + if r := recover(); r != nil { + tx.Rollback() + } + }() + + for _, incoming := range incomingIps { + if incoming.ClientEmail == "" || incoming.Ips == "" { + continue + } + + var incomingEntries []clientIpEntry + _ = json.Unmarshal([]byte(incoming.Ips), &incomingEntries) + + current, exists := currentMap[incoming.ClientEmail] + if !exists { + // New client we've never seen locally. Drop stale entries up front and + // skip the row entirely if nothing is fresh, so we don't persist a row + // that is dead on arrival. + fresh := mergeClientIpEntries(nil, incomingEntries, cutoff) + if len(fresh) == 0 { + continue + } + b, _ := json.Marshal(fresh) + incoming.Ips = string(b) + // Never carry the remote node's primary key into the local table: id + // spaces are independent across nodes and the remote id would collide + // with an unrelated local row. OnConflict guards the race where + // check_client_ip_job creates the same brand-new email between the + // snapshot above and this insert. + incoming.Id = 0 + if err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "client_email"}}, + DoNothing: true, + }).Create(&incoming).Error; err != nil { + tx.Rollback() + return err + } + continue + } + + var oldEntries []clientIpEntry + if current.Ips != "" { + _ = json.Unmarshal([]byte(current.Ips), &oldEntries) + } + + merged := mergeClientIpEntries(oldEntries, incomingEntries, cutoff) + b, _ := json.Marshal(merged) + mergedStr := string(b) + + // A concurrent check_client_ip_job db.Save on the same row can interleave + // with this update (benign last-writer-wins; any dropped IP reappears on the + // next scan/sync), so only write when the blob actually changed. + if current.Ips != mergedStr { + if err := tx.Model(&model.InboundClientIps{}).Where("id = ?", current.Id).Update("ips", mergedStr).Error; err != nil { + tx.Rollback() + return err + } + } + } + return tx.Commit().Error +} + // inboundShadowsocksMethod extracts settings.method for Shadowsocks inbounds so // the client UI can generate a valid PSK (base64 of the method's key length) // for Shadowsocks 2022 ciphers. Returns "" for non-Shadowsocks inbounds. diff --git a/web/service/inbound_client_ips_merge_test.go b/web/service/inbound_client_ips_merge_test.go new file mode 100644 index 000000000..9d5bec757 --- /dev/null +++ b/web/service/inbound_client_ips_merge_test.go @@ -0,0 +1,211 @@ +package service + +import ( + "encoding/json" + "path/filepath" + "testing" + "time" + + "github.com/mhsanaei/3x-ui/v3/database" + "github.com/mhsanaei/3x-ui/v3/database/model" +) + +// setupClientIpTestDB spins up a throwaway SQLite database (migrations + seeders) +// for a single test, mirroring the harness used by the other service tests. +func setupClientIpTestDB(t *testing.T) { + t.Helper() + dbDir := t.TempDir() + t.Setenv("XUI_DB_FOLDER", dbDir) + if err := database.InitDB(filepath.Join(dbDir, "x-ui.db")); err != nil { + t.Fatalf("InitDB: %v", err) + } + t.Cleanup(func() { _ = database.CloseDB() }) +} + +func marshalIps(t *testing.T, entries ...clientIpEntry) string { + t.Helper() + b, err := json.Marshal(entries) + if err != nil { + t.Fatalf("marshal ips: %v", err) + } + return string(b) +} + +// readClientIps returns the stored IP entries for an email as a map[ip]timestamp, +// plus whether the row exists at all. +func readClientIps(t *testing.T, email string) (map[string]int64, bool) { + t.Helper() + var row model.InboundClientIps + err := database.GetDB().Where("client_email = ?", email).First(&row).Error + if database.IsNotFound(err) { + return nil, false + } + if err != nil { + t.Fatalf("read client ips for %s: %v", email, err) + } + var entries []clientIpEntry + if row.Ips != "" { + if err := json.Unmarshal([]byte(row.Ips), &entries); err != nil { + t.Fatalf("unmarshal stored ips for %s: %v", email, err) + } + } + out := make(map[string]int64, len(entries)) + for _, e := range entries { + out[e.IP] = e.Timestamp + } + return out, true +} + +func TestMergeInboundClientIps_CreatesNodeOnlyRowIgnoringRemoteId(t *testing.T) { + setupClientIpTestDB(t) + db := database.GetDB() + now := time.Now().Unix() + + // Local client occupies id 1. + local := &model.InboundClientIps{ClientEmail: "local@x", Ips: marshalIps(t, clientIpEntry{IP: "1.1.1.1", Timestamp: now})} + if err := db.Create(local).Error; err != nil { + t.Fatalf("seed local row: %v", err) + } + + // Incoming node-only client carries the remote node's id 1, which must not + // collide with the local row. + incoming := []model.InboundClientIps{{ + Id: 1, + ClientEmail: "node@x", + Ips: marshalIps(t, clientIpEntry{IP: "2.2.2.2", Timestamp: now}), + }} + if err := (&InboundService{}).MergeInboundClientIps(incoming); err != nil { + t.Fatalf("merge: %v", err) + } + + // Local row is untouched. + if ips, ok := readClientIps(t, "local@x"); !ok || ips["1.1.1.1"] != now { + t.Fatalf("local@x changed unexpectedly: %v (exists=%v)", ips, ok) + } + + // Node row exists with its own ip and a freshly assigned id (not the remote 1). + var nodeRow model.InboundClientIps + if err := db.Where("client_email = ?", "node@x").First(&nodeRow).Error; err != nil { + t.Fatalf("node@x not created: %v", err) + } + if nodeRow.Id == local.Id { + t.Fatalf("node@x reused local id %d instead of a fresh one", nodeRow.Id) + } + if ips, _ := readClientIps(t, "node@x"); ips["2.2.2.2"] != now { + t.Fatalf("node@x missing expected ip: %v", ips) + } +} + +func TestMergeInboundClientIps_DedupKeepsMaxTimestamp(t *testing.T) { + setupClientIpTestDB(t) + db := database.GetDB() + now := time.Now().Unix() + + if err := db.Create(&model.InboundClientIps{ + ClientEmail: "a@x", + Ips: marshalIps(t, clientIpEntry{IP: "1.1.1.1", Timestamp: now - 100}), + }).Error; err != nil { + t.Fatalf("seed: %v", err) + } + + incoming := []model.InboundClientIps{{ + ClientEmail: "a@x", + Ips: marshalIps(t, + clientIpEntry{IP: "1.1.1.1", Timestamp: now - 50}, // newer than stored -> wins + clientIpEntry{IP: "2.2.2.2", Timestamp: now - 10}, + ), + }} + if err := (&InboundService{}).MergeInboundClientIps(incoming); err != nil { + t.Fatalf("merge: %v", err) + } + + ips, _ := readClientIps(t, "a@x") + if len(ips) != 2 { + t.Fatalf("want 2 ips, got %v", ips) + } + if ips["1.1.1.1"] != now-50 { + t.Fatalf("1.1.1.1 should keep max timestamp %d, got %d", now-50, ips["1.1.1.1"]) + } + if ips["2.2.2.2"] != now-10 { + t.Fatalf("2.2.2.2 missing/incorrect: %d", ips["2.2.2.2"]) + } +} + +func TestMergeInboundClientIps_DropsStaleIps(t *testing.T) { + setupClientIpTestDB(t) + db := database.GetDB() + now := time.Now().Unix() + + if err := db.Create(&model.InboundClientIps{ + ClientEmail: "a@x", + Ips: marshalIps(t, + clientIpEntry{IP: "old", Timestamp: now - 3600}, // > 30m -> stale + clientIpEntry{IP: "fresh", Timestamp: now - 60}, + ), + }).Error; err != nil { + t.Fatalf("seed: %v", err) + } + + incoming := []model.InboundClientIps{{ + ClientEmail: "a@x", + Ips: marshalIps(t, + clientIpEntry{IP: "incStale", Timestamp: now - 4000}, // > 30m -> stale + clientIpEntry{IP: "incFresh", Timestamp: now - 10}, + ), + }} + if err := (&InboundService{}).MergeInboundClientIps(incoming); err != nil { + t.Fatalf("merge: %v", err) + } + + ips, _ := readClientIps(t, "a@x") + if len(ips) != 2 { + t.Fatalf("want only fresh ips, got %v", ips) + } + if _, ok := ips["old"]; ok { + t.Fatalf("stale local ip not dropped: %v", ips) + } + if _, ok := ips["incStale"]; ok { + t.Fatalf("stale incoming ip not dropped: %v", ips) + } + if ips["fresh"] != now-60 || ips["incFresh"] != now-10 { + t.Fatalf("fresh ips wrong: %v", ips) + } +} + +func TestMergeInboundClientIps_SkipsAllStaleCreate(t *testing.T) { + setupClientIpTestDB(t) + now := time.Now().Unix() + + incoming := []model.InboundClientIps{{ + ClientEmail: "b@x", + Ips: marshalIps(t, clientIpEntry{IP: "1.1.1.1", Timestamp: now - 9999}), + }} + if err := (&InboundService{}).MergeInboundClientIps(incoming); err != nil { + t.Fatalf("merge: %v", err) + } + + if _, ok := readClientIps(t, "b@x"); ok { + t.Fatalf("all-stale node-only client should not create a row") + } +} + +func TestMergeInboundClientIps_SkipsBlankRows(t *testing.T) { + setupClientIpTestDB(t) + now := time.Now().Unix() + + incoming := []model.InboundClientIps{ + {ClientEmail: "", Ips: marshalIps(t, clientIpEntry{IP: "1.1.1.1", Timestamp: now})}, + {ClientEmail: "c@x", Ips: ""}, + } + if err := (&InboundService{}).MergeInboundClientIps(incoming); err != nil { + t.Fatalf("merge: %v", err) + } + + var count int64 + if err := database.GetDB().Model(&model.InboundClientIps{}).Count(&count).Error; err != nil { + t.Fatalf("count: %v", err) + } + if count != 0 { + t.Fatalf("blank rows should be skipped, but %d row(s) created", count) + } +}