fix(traffic): persist delayed-start expiry only for converted clients

addClientTraffic's second pass wrote expiry_time for every polled row via
UPDATE ... WHERE expiry_time < 0 — a no-op statement per active client on
every 5s poll, since almost all rows carry a positive expiry. At 10k
active clients that was 10k pointless indexed UPDATEs per poll.

adjustTraffics now returns the emails it actually converted this tick and
the persistence pass writes exactly those, in sorted order to keep
concurrent writers lock-compatible on Postgres. Behavior is unchanged:
unconverted rows never matched the WHERE clause anyway.
This commit is contained in:
MHSanaei
2026-07-02 16:24:18 +02:00
parent 4fc301682f
commit fb1d055b06
2 changed files with 93 additions and 17 deletions
@@ -157,3 +157,77 @@ func TestAdjustTraffics_DelayedStartConvertsDespiteStaleInboundId(t *testing.T)
t.Errorf("inbound settings expiry not converted: %#v", cs)
}
}
// TestAddClientTraffic_ExpiryWriteOnlyForConvertedClients locks in that the
// delayed-start persistence pass touches only clients adjustTraffics actually
// converted this poll: the delayed client's negative expiry becomes an absolute
// deadline while an already-absolute expiry passes through byte-identical.
// Before the fix every polled row got its own no-op expiry UPDATE.
func TestAddClientTraffic_ExpiryWriteOnlyForConvertedClients(t *testing.T) {
dbDir := t.TempDir()
t.Setenv("XUI_DB_FOLDER", dbDir)
if err := database.InitDB(filepath.Join(dbDir, "x-ui.db")); err != nil {
t.Fatalf("InitDB: %v", err)
}
t.Cleanup(func() { _ = database.CloseDB() })
db := database.GetDB()
const delayedEmail = "delayed-mixed-user"
const normalEmail = "normal-mixed-user"
const delayedUID = "ce8d33df-3a64-4f10-8f9b-91c3a8e0d002"
const normalUID = "ce8d33df-3a64-4f10-8f9b-91c3a8e0d003"
const sevenDays = int64(7 * 86400000)
normalExpiry := time.Now().AddDate(0, 1, 0).UnixMilli()
clients := []model.Client{
{Email: delayedEmail, ID: delayedUID, Enable: true, ExpiryTime: -sevenDays},
{Email: normalEmail, ID: normalUID, Enable: true, ExpiryTime: normalExpiry},
}
inbound := &model.Inbound{
Tag: "vless-mixed", Enable: true, Port: 45002, Protocol: model.VLESS,
Settings: clientsSettings(t, clients),
}
if err := db.Create(inbound).Error; err != nil {
t.Fatalf("create inbound: %v", err)
}
svc := InboundService{}
if err := svc.clientService.SyncInbound(db, inbound.Id, clients); err != nil {
t.Fatalf("SyncInbound: %v", err)
}
if err := db.Create(&xray.ClientTraffic{InboundId: inbound.Id, Email: delayedEmail, Enable: true, ExpiryTime: -sevenDays}).Error; err != nil {
t.Fatalf("create delayed traffic row: %v", err)
}
if err := db.Create(&xray.ClientTraffic{InboundId: inbound.Id, Email: normalEmail, Enable: true, ExpiryTime: normalExpiry}).Error; err != nil {
t.Fatalf("create normal traffic row: %v", err)
}
before := time.Now().UnixMilli()
err := svc.addClientTraffic(db, []*xray.ClientTraffic{
{Email: delayedEmail, Up: 10, Down: 20},
{Email: normalEmail, Up: 30, Down: 40},
})
if err != nil {
t.Fatalf("addClientTraffic: %v", err)
}
var delayed xray.ClientTraffic
if err := db.Model(xray.ClientTraffic{}).Where("email = ?", delayedEmail).First(&delayed).Error; err != nil {
t.Fatalf("reload delayed row: %v", err)
}
if delayed.ExpiryTime < before+sevenDays-5000 || delayed.ExpiryTime > before+sevenDays+5000 {
t.Errorf("delayed expiry = %d, want ~now+7d (%d)", delayed.ExpiryTime, before+sevenDays)
}
var normal xray.ClientTraffic
if err := db.Model(xray.ClientTraffic{}).Where("email = ?", normalEmail).First(&normal).Error; err != nil {
t.Fatalf("reload normal row: %v", err)
}
if normal.ExpiryTime != normalExpiry {
t.Errorf("normal expiry changed: %d, want %d", normal.ExpiryTime, normalExpiry)
}
if normal.Up != 30 || normal.Down != 40 {
t.Errorf("normal traffic not applied: up=%d down=%d, want 30/40", normal.Up, normal.Down)
}
}
+19 -17
View File
@@ -5,6 +5,8 @@ import (
"encoding/json"
"errors"
"fmt"
"maps"
"slices"
"strings"
"time"
@@ -125,7 +127,7 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr
return nil
}
dbClientTraffics, err = s.adjustTraffics(tx, dbClientTraffics)
dbClientTraffics, convertedExpiryByEmail, err := s.adjustTraffics(tx, dbClientTraffics)
if err != nil {
return err
}
@@ -161,22 +163,22 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr
// adjustTraffics converts delayed-start rows (negative ExpiryTime → absolute
// deadline) in-memory. Persist that conversion now since the traffic UPDATE
// above only touches up/down/last_online.
for _, ct := range dbClientTraffics {
if ct.ExpiryTime > 0 {
if err = tx.Exec(
`UPDATE client_traffics SET expiry_time = ? WHERE email = ? AND expiry_time < 0`,
ct.ExpiryTime, ct.Email,
).Error; err != nil {
logger.Warning("AddClientTraffic update expiry_time ", err)
}
// above only touches up/down/last_online. Only converted emails are written:
// updating every polled row issued one no-op UPDATE per active client per
// poll. Sorted order keeps concurrent writers lock-compatible on Postgres.
for _, email := range slices.Sorted(maps.Keys(convertedExpiryByEmail)) {
if err = tx.Exec(
`UPDATE client_traffics SET expiry_time = ? WHERE email = ? AND expiry_time < 0`,
convertedExpiryByEmail[email], email,
).Error; err != nil {
logger.Warning("AddClientTraffic update expiry_time ", err)
}
}
return nil
}
func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, error) {
func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, map[string]int64, error) {
now := time.Now().UnixMilli()
// "Start After First Use" stores a negative expiry (the duration). On the
@@ -190,7 +192,7 @@ func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.Cl
}
}
if len(newExpiryByEmail) == 0 {
return dbClientTraffics, nil
return dbClientTraffics, nil, nil
}
delayedEmails := make([]string, 0, len(newExpiryByEmail))
@@ -208,16 +210,16 @@ func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.Cl
Distinct().
Pluck("client_inbounds.inbound_id", &inboundIds).Error
if err != nil {
return nil, err
return nil, nil, err
}
if len(inboundIds) == 0 {
return dbClientTraffics, nil
return dbClientTraffics, nil, nil
}
var inbounds []*model.Inbound
err = tx.Model(model.Inbound{}).Where("id IN (?)", inboundIds).Find(&inbounds).Error
if err != nil {
return nil, err
return nil, nil, err
}
for inbound_index := range inbounds {
settings := map[string]any{}
@@ -243,7 +245,7 @@ func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.Cl
settings["clients"] = newClients
modifiedSettings, err := json.MarshalIndent(settings, "", " ")
if err != nil {
return nil, err
return nil, nil, err
}
inbounds[inbound_index].Settings = string(modifiedSettings)
@@ -276,7 +278,7 @@ func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.Cl
}
}
return dbClientTraffics, nil
return dbClientTraffics, newExpiryByEmail, nil
}
func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {