fix(service): serialize client/inbound writes to prevent Postgres deadlock

Client/inbound mutations opened their own transactions that locked
client_traffics before inbounds, while the @every 5s traffic poll
(AddTraffic, already serialized through the traffic writer) locks them in
the opposite order. Concurrently these formed an ABBA lock cycle that
Postgres aborted as "deadlock detected" (SQLSTATE 40P01), failing client
updates.

Route those DB writes through the same single-goroutine traffic writer via
a new runSerializedTx helper, so they can never run concurrently with the
poll. For the client-edit paths the runtime (node) push is moved after the
commit, keeping network I/O out of the serialized section. UpdateInbound
keeps its push inside the transaction because EnsureInboundTagAllowed must
reach the node before the central row is committed.

Covers UpdateInboundClient/addInboundClient/DelInboundClientByEmail/
delInboundClients, the bulk adjust/delete transactions, and UpdateInbound.
This commit is contained in:
MHSanaei
2026-06-17 15:55:47 +02:00
parent 340d0df9fc
commit c5d31de4e9
4 changed files with 477 additions and 388 deletions
+41 -26
View File
@@ -545,8 +545,9 @@ func (s *ClientService) bulkAdjustInboundClients(
}
}
db := database.GetDB()
txErr := db.Transaction(func(tx *gorm.DB) error {
// Serialize against the traffic poll to avoid the cross-transaction
// lock-order deadlock on inbounds/client_records (runSerializedTx).
txErr := runSerializedTx(func(tx *gorm.DB) error {
if err := tx.Save(oldInbound).Error; err != nil {
return err
}
@@ -685,25 +686,32 @@ func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string,
}
if len(successIds) > 0 {
for _, batch := range chunkInts(successIds, sqlInChunk) {
if err := db.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; err != nil {
return result, needRestart, err
}
}
if !keepTraffic && len(successEmails) > 0 {
for _, batch := range chunkStrings(successEmails, sqlInChunk) {
if err := db.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; err != nil {
return result, needRestart, err
}
if err := db.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; err != nil {
return result, needRestart, err
// Serialize the row cleanup against the traffic poll to avoid the
// cross-transaction lock-order deadlock on client_traffics/inbounds.
if err := runSerializedTx(func(tx *gorm.DB) error {
for _, batch := range chunkInts(successIds, sqlInChunk) {
if e := tx.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; e != nil {
return e
}
}
}
for _, batch := range chunkInts(successIds, sqlInChunk) {
if err := db.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; err != nil {
return result, needRestart, err
if !keepTraffic && len(successEmails) > 0 {
for _, batch := range chunkStrings(successEmails, sqlInChunk) {
if e := tx.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; e != nil {
return e
}
if e := tx.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; e != nil {
return e
}
}
}
for _, batch := range chunkInts(successIds, sqlInChunk) {
if e := tx.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; e != nil {
return e
}
}
return nil
}); err != nil {
return result, needRestart, err
}
}
@@ -850,14 +858,19 @@ func (s *ClientService) bulkDelInboundClients(
}
}
if len(purge) > 0 {
if delErr := inboundSvc.delClientIPsByEmails(db, purge); delErr != nil {
logger.Error("Error in delete client IPs")
for _, email := range purge {
res.perEmailSkipped[email] = delErr.Error()
delete(foundEmails, email)
// Serialize the IP/stat purge against the traffic poll to avoid the
// cross-transaction lock-order deadlock on client_traffics.
if delErr := runSerializedTx(func(tx *gorm.DB) error {
if e := inboundSvc.delClientIPsByEmails(tx, purge); e != nil {
logger.Error("Error in delete client IPs")
return e
}
} else if delErr := inboundSvc.delClientStatsByEmails(db, purge); delErr != nil {
logger.Error("Delete stats Data Error")
if e := inboundSvc.delClientStatsByEmails(tx, purge); e != nil {
logger.Error("Delete stats Data Error")
return e
}
return nil
}); delErr != nil {
for _, email := range purge {
res.perEmailSkipped[email] = delErr.Error()
delete(foundEmails, email)
@@ -909,7 +922,9 @@ func (s *ClientService) bulkDelInboundClients(
}
}
txErr := db.Transaction(func(tx *gorm.DB) error {
// Serialize against the traffic poll to avoid the cross-transaction
// lock-order deadlock on inbounds/client_records (runSerializedTx).
txErr := runSerializedTx(func(tx *gorm.DB) error {
if err := tx.Save(oldInbound).Error; err != nil {
return err
}
+231 -184
View File
@@ -12,7 +12,10 @@ import (
"github.com/mhsanaei/3x-ui/v3/internal/logger"
"github.com/mhsanaei/3x-ui/v3/internal/util/common"
"github.com/mhsanaei/3x-ui/v3/internal/util/random"
"github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
"github.com/mhsanaei/3x-ui/v3/internal/xray"
"gorm.io/gorm"
)
// delInboundClients removes several clients from a single inbound in one pass:
@@ -105,40 +108,79 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId
needRestart := false
markDirty := false
// Read each client's live state before the DB write (DelClientStat would
// erase the enable flag we need to decide on a runtime removal).
type delTarget struct {
email string
emailShared bool
notDepleted bool
needApiDel bool
}
targets := make([]delTarget, 0, len(removed))
for _, r := range removed {
email := r.email
emailShared := sharedSet[strings.ToLower(strings.TrimSpace(email))]
if !emailShared && !keepTraffic {
if err := inboundSvc.DelClientIPs(db, email); err != nil {
logger.Error("Error in delete client IPs")
return needRestart, err
}
}
notDepleted := false
if len(email) > 0 {
var enables []bool
if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Limit(1).Pluck("enable", &enables).Error; err != nil {
logger.Error("Get stats error")
return needRestart, err
}
notDepleted := len(enables) > 0 && enables[0]
if !emailShared && !keepTraffic {
if err := inboundSvc.DelClientStat(db, email); err != nil {
notDepleted = len(enables) > 0 && enables[0]
}
targets = append(targets, delTarget{email: email, emailShared: emailShared, notDepleted: notDepleted, needApiDel: r.needApiDel})
}
// Persist the batch deletion atomically, serialized against the traffic poll
// to avoid the cross-transaction lock-order deadlock (runSerializedTx).
if txErr := runSerializedTx(func(tx *gorm.DB) error {
for _, t := range targets {
if t.emailShared || keepTraffic {
continue
}
if e := inboundSvc.DelClientIPs(tx, t.email); e != nil {
logger.Error("Error in delete client IPs")
return e
}
if len(t.email) > 0 {
if e := inboundSvc.DelClientStat(tx, t.email); e != nil {
logger.Error("Delete stats Data Error")
return needRestart, err
return e
}
}
if r.needApiDel && notDepleted && oldInbound.NodeID == nil {
}
if e := tx.Save(oldInbound).Error; e != nil {
return e
}
finalClients, gcErr := inboundSvc.GetClients(oldInbound)
if gcErr != nil {
return gcErr
}
return s.SyncInbound(tx, inboundId, finalClients)
}); txErr != nil {
return needRestart, txErr
}
// Apply runtime deletes after commit — outside the serialized writer so a
// slow node call can't stall traffic accounting.
for _, t := range targets {
if len(t.email) == 0 {
continue
}
if oldInbound.NodeID == nil {
if t.needApiDel && t.notDepleted {
rt, rterr := inboundSvc.runtimeFor(oldInbound)
if rterr != nil {
needRestart = true
} else if err1 := rt.RemoveUser(context.Background(), oldInbound, email); err1 != nil {
if !strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) {
} else if err1 := rt.RemoveUser(context.Background(), oldInbound, t.email); err1 != nil {
if !strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", t.email)) {
needRestart = true
}
}
}
}
if oldInbound.NodeID != nil && len(email) > 0 {
} else {
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
return needRestart, perr
@@ -147,7 +189,7 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId
markDirty = true
}
if push {
if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
if err1 := rt.DeleteUser(context.Background(), oldInbound, t.email); err1 != nil {
logger.Warning("Error in deleting client on", rt.Name(), ":", err1)
markDirty = true
}
@@ -155,16 +197,6 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId
}
}
if err := db.Save(oldInbound).Error; err != nil {
return needRestart, err
}
finalClients, gcErr := inboundSvc.GetClients(oldInbound)
if gcErr != nil {
return needRestart, gcErr
}
if err := s.SyncInbound(db, inboundId, finalClients); err != nil {
return needRestart, err
}
if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
@@ -300,32 +332,42 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model
oldInbound.Settings = string(newSettings)
db := database.GetDB()
tx := db.Begin()
markDirty := false
defer func() {
if err != nil {
tx.Rollback()
return
}
tx.Commit()
if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
}()
needRestart := false
markDirty := false
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
err = perr
return false, err
return false, perr
}
if dirty {
markDirty = true
}
// Persist client stats + inbound atomically, serialized against the traffic
// poll to avoid the cross-transaction lock-order deadlock (runSerializedTx).
if txErr := runSerializedTx(func(tx *gorm.DB) error {
for i := range clients {
if len(clients[i].Email) == 0 {
continue
}
if e := inboundSvc.AddClientStat(tx, data.Id, &clients[i]); e != nil {
return e
}
}
if e := tx.Save(oldInbound).Error; e != nil {
return e
}
finalClients, gcErr := inboundSvc.GetClients(oldInbound)
if gcErr != nil {
return gcErr
}
return s.SyncInbound(tx, oldInbound.Id, finalClients)
}); txErr != nil {
return false, txErr
}
// Apply to the running runtime after commit — outside the serialized writer
// so a slow node call can't stall traffic accounting.
if oldInbound.NodeID == nil {
if !push {
needRestart = true
@@ -335,7 +377,6 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model
needRestart = true
continue
}
inboundSvc.AddClientStat(tx, data.Id, &client)
if !client.Enable {
continue
}
@@ -362,9 +403,6 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model
}
} else {
for _, client := range clients {
if len(client.Email) > 0 {
inboundSvc.AddClientStat(tx, data.Id, &client)
}
if push {
if err1 := rt.AddClient(context.Background(), oldInbound, client); err1 != nil {
logger.Warning("Error in adding client on", rt.Name(), ":", err1)
@@ -375,16 +413,10 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model
}
}
if err = tx.Save(oldInbound).Error; err != nil {
return false, err
}
finalClients, gcErr := inboundSvc.GetClients(oldInbound)
if gcErr != nil {
err = gcErr
return false, err
}
if err = s.SyncInbound(tx, oldInbound.Id, finalClients); err != nil {
return false, err
if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
return needRestart, nil
}
@@ -519,87 +551,98 @@ func (s *ClientService) UpdateInboundClient(inboundSvc *InboundService, data *mo
}
oldInbound.Settings = string(newSettings)
db := database.GetDB()
tx := db.Begin()
markDirty := false
defer func() {
if err != nil {
tx.Rollback()
return
}
tx.Commit()
if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
}()
if len(clients[0].Email) > 0 {
if len(oldEmail) > 0 {
emailUnchanged := strings.EqualFold(oldEmail, clients[0].Email)
targetExists := int64(0)
if !emailUnchanged {
if err = tx.Model(xray.ClientTraffic{}).Where("email = ?", clients[0].Email).Count(&targetExists).Error; err != nil {
return false, err
}
}
if emailUnchanged || targetExists == 0 {
err = inboundSvc.UpdateClientStat(tx, oldEmail, &clients[0])
if err != nil {
return false, err
}
err = inboundSvc.UpdateClientIPs(tx, oldEmail, clients[0].Email)
if err != nil {
return false, err
}
} else {
stillUsed, sErr := inboundSvc.emailUsedByOtherInbounds(oldEmail, data.Id)
if sErr != nil {
return false, sErr
}
if !stillUsed {
if err = inboundSvc.DelClientStat(tx, oldEmail); err != nil {
return false, err
}
if err = inboundSvc.DelClientIPs(tx, oldEmail); err != nil {
return false, err
}
}
if err = inboundSvc.UpdateClientStat(tx, clients[0].Email, &clients[0]); err != nil {
return false, err
}
}
} else {
inboundSvc.AddClientStat(tx, data.Id, &clients[0])
}
} else {
stillUsed, err := inboundSvc.emailUsedByOtherInbounds(oldEmail, data.Id)
if err != nil {
return false, err
}
if !stillUsed {
err = inboundSvc.DelClientStat(tx, oldEmail)
if err != nil {
return false, err
}
err = inboundSvc.DelClientIPs(tx, oldEmail)
if err != nil {
return false, err
}
}
}
needRestart := false
markDirty := false
// Resolve the push plan before the DB write so a node-state lookup failure
// still aborts the whole update without committing anything (it used to roll
// the transaction back). nodePushPlan only reads, so order doesn't matter.
var rt runtime.Runtime
var push bool
if len(oldEmail) > 0 {
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
var dirty bool
var perr error
rt, push, dirty, perr = inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
err = perr
return false, err
return false, perr
}
if dirty {
markDirty = true
}
}
// Persist client stats + inbound atomically, serialized against the traffic
// poll to avoid the cross-transaction lock-order deadlock (runSerializedTx).
if txErr := runSerializedTx(func(tx *gorm.DB) error {
if len(clients[0].Email) > 0 {
if len(oldEmail) > 0 {
emailUnchanged := strings.EqualFold(oldEmail, clients[0].Email)
targetExists := int64(0)
if !emailUnchanged {
if e := tx.Model(xray.ClientTraffic{}).Where("email = ?", clients[0].Email).Count(&targetExists).Error; e != nil {
return e
}
}
if emailUnchanged || targetExists == 0 {
if e := inboundSvc.UpdateClientStat(tx, oldEmail, &clients[0]); e != nil {
return e
}
if e := inboundSvc.UpdateClientIPs(tx, oldEmail, clients[0].Email); e != nil {
return e
}
} else {
stillUsed, sErr := inboundSvc.emailUsedByOtherInbounds(oldEmail, data.Id)
if sErr != nil {
return sErr
}
if !stillUsed {
if e := inboundSvc.DelClientStat(tx, oldEmail); e != nil {
return e
}
if e := inboundSvc.DelClientIPs(tx, oldEmail); e != nil {
return e
}
}
if e := inboundSvc.UpdateClientStat(tx, clients[0].Email, &clients[0]); e != nil {
return e
}
}
} else {
if e := inboundSvc.AddClientStat(tx, data.Id, &clients[0]); e != nil {
return e
}
}
} else {
stillUsed, sErr := inboundSvc.emailUsedByOtherInbounds(oldEmail, data.Id)
if sErr != nil {
return sErr
}
if !stillUsed {
if e := inboundSvc.DelClientStat(tx, oldEmail); e != nil {
return e
}
if e := inboundSvc.DelClientIPs(tx, oldEmail); e != nil {
return e
}
}
}
if e := tx.Save(oldInbound).Error; e != nil {
return e
}
finalClients, gcErr := inboundSvc.GetClients(oldInbound)
if gcErr != nil {
return gcErr
}
return s.SyncInbound(tx, oldInbound.Id, finalClients)
}); txErr != nil {
return false, txErr
}
// Apply to the running runtime after the DB is committed — outside the
// serialized writer so a slow node call can't stall traffic accounting.
if len(oldEmail) > 0 {
if oldInbound.NodeID == nil {
if !push {
needRestart = true
@@ -647,16 +690,11 @@ func (s *ClientService) UpdateInboundClient(inboundSvc *InboundService, data *mo
logger.Debug("Client old email not found")
needRestart = true
}
if err = tx.Save(oldInbound).Error; err != nil {
return false, err
}
finalClients, gcErr := inboundSvc.GetClients(oldInbound)
if gcErr != nil {
err = gcErr
return false, err
}
if err = s.SyncInbound(tx, oldInbound.Id, finalClients); err != nil {
return false, err
if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
return needRestart, nil
}
@@ -718,41 +756,67 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo
return false, err
}
if !emailShared && !keepTraffic {
if err := inboundSvc.DelClientIPs(db, email); err != nil {
logger.Error("Error in delete client IPs")
return false, err
}
}
needRestart := false
markDirty := false
if len(email) > 0 && !emailShared {
if !keepTraffic {
traffic, err := inboundSvc.GetClientTrafficByEmail(email)
if err != nil {
return false, err
}
if traffic != nil {
if err := inboundSvc.DelClientStat(db, email); err != nil {
logger.Error("Delete stats Data Error")
return false, err
}
// Decide what to delete and the push plan before the serialized DB write —
// these are reads, and nodePushPlan failing should abort before committing.
delStat := false
if len(email) > 0 && !emailShared && !keepTraffic {
traffic, tErr := inboundSvc.GetClientTrafficByEmail(email)
if tErr != nil {
return false, tErr
}
delStat = traffic != nil
}
var rt runtime.Runtime
var push bool
if len(email) > 0 && !emailShared && (oldInbound.NodeID != nil || needApiDel) {
r, p, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
return false, perr
}
rt, push = r, p
if dirty {
markDirty = true
}
}
// Persist the deletion atomically, serialized against the traffic poll to
// avoid the cross-transaction lock-order deadlock (runSerializedTx).
if txErr := runSerializedTx(func(tx *gorm.DB) error {
if !emailShared && !keepTraffic {
if e := inboundSvc.DelClientIPs(tx, email); e != nil {
logger.Error("Error in delete client IPs")
return e
}
}
if delStat {
if e := inboundSvc.DelClientStat(tx, email); e != nil {
logger.Error("Delete stats Data Error")
return e
}
}
if e := tx.Save(oldInbound).Error; e != nil {
return e
}
finalClients, gcErr := inboundSvc.GetClients(oldInbound)
if gcErr != nil {
return gcErr
}
return s.SyncInbound(tx, inboundId, finalClients)
}); txErr != nil {
return false, txErr
}
// Apply the runtime delete after commit — outside the serialized writer so a
// slow node call can't stall traffic accounting.
if len(email) > 0 && !emailShared {
if oldInbound.NodeID == nil {
// Local inbound: a disabled client isn't in the running Xray, so only
// a live one (needApiDel) needs an API removal.
if needApiDel {
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
return false, perr
}
if dirty {
markDirty = true
}
if !push {
needRestart = true
} else if err1 := rt.RemoveUser(context.Background(), oldInbound, email); err1 == nil {
@@ -769,13 +833,6 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo
// Node inbound: propagate the delete regardless of the enable flag —
// the node's own DB still carries a disabled client and would
// resurrect it on the next snapshot otherwise.
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
return false, perr
}
if dirty {
markDirty = true
}
if push {
if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
logger.Warning("Error in deleting client on", rt.Name(), ":", err1)
@@ -785,16 +842,6 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo
}
}
if err := db.Save(oldInbound).Error; err != nil {
return false, err
}
finalClients, gcErr := inboundSvc.GetClients(oldInbound)
if gcErr != nil {
return false, gcErr
}
if err := s.SyncInbound(db, inboundId, finalClients); err != nil {
return false, err
}
if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
+185 -178
View File
@@ -966,195 +966,202 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound,
oldBits := inboundTransports(oldInbound.Protocol, oldInbound.StreamSettings, oldInbound.Settings)
oldTagWasAuto := isAutoGeneratedTag(tag, oldInbound.Port, oldInbound.NodeID, oldBits)
db := database.GetDB()
tx := db.Begin()
markDirty := false
defer func() {
if err != nil {
tx.Rollback()
return
}
tx.Commit()
if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
}()
err = s.updateClientTraffics(tx, oldInbound, inbound)
if err != nil {
return inbound, false, err
}
// Ensure created_at and updated_at exist in inbound.Settings clients
{
var oldSettings map[string]any
_ = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings)
emailToCreated := map[string]int64{}
emailToUpdated := map[string]int64{}
if oldSettings != nil {
if oc, ok := oldSettings["clients"].([]any); ok {
for _, it := range oc {
if m, ok2 := it.(map[string]any); ok2 {
if email, ok3 := m["email"].(string); ok3 {
switch v := m["created_at"].(type) {
case float64:
emailToCreated[email] = int64(v)
case int64:
emailToCreated[email] = v
}
switch v := m["updated_at"].(type) {
case float64:
emailToUpdated[email] = int64(v)
case int64:
emailToUpdated[email] = v
}
}
}
}
}
}
var newSettings map[string]any
if err2 := json.Unmarshal([]byte(inbound.Settings), &newSettings); err2 == nil && newSettings != nil {
now := time.Now().Unix() * 1000
if nSlice, ok := newSettings["clients"].([]any); ok {
for i := range nSlice {
if m, ok2 := nSlice[i].(map[string]any); ok2 {
email, _ := m["email"].(string)
if _, ok3 := m["created_at"]; !ok3 {
if v, ok4 := emailToCreated[email]; ok4 && v > 0 {
m["created_at"] = v
} else {
m["created_at"] = now
}
}
// Preserve client's updated_at if present; do not bump on parent inbound update
if _, hasUpdated := m["updated_at"]; !hasUpdated {
if v, ok4 := emailToUpdated[email]; ok4 && v > 0 {
m["updated_at"] = v
}
}
nSlice[i] = m
}
}
newSettings["clients"] = nSlice
if bs, err3 := json.MarshalIndent(newSettings, "", " "); err3 == nil {
inbound.Settings = string(bs)
}
}
}
}
// A Shadowsocks-2022 method change resizes the key, but existing client PSKs
// keep their old length and would be rejected by xray. Regenerate mismatched
// client keys so the inbound stays connectable.
if normalized, changed := normalizeShadowsocksClientKeys(inbound.Settings); changed {
inbound.Settings = normalized
logger.Warning("Shadowsocks inbound", inbound.Id, "method change resized keys; regenerated mismatched client PSK(s)")
}
oldInbound.Total = inbound.Total
oldInbound.Remark = inbound.Remark
oldInbound.SubSortIndex = inbound.SubSortIndex
oldInbound.Enable = inbound.Enable
oldInbound.ExpiryTime = inbound.ExpiryTime
oldInbound.TrafficReset = inbound.TrafficReset
oldInbound.Listen = inbound.Listen
oldInbound.Port = inbound.Port
oldInbound.Protocol = inbound.Protocol
oldInbound.Settings = inbound.Settings
oldInbound.StreamSettings = inbound.StreamSettings
oldInbound.Sniffing = inbound.Sniffing
if strings.TrimSpace(inbound.ShareAddrStrategy) == "" {
normalizeInboundShareAddress(oldInbound)
inbound.ShareAddrStrategy = oldInbound.ShareAddrStrategy
inbound.ShareAddr = oldInbound.ShareAddr
} else {
if err := normalizeInboundShareAddressStrict(inbound); err != nil {
return inbound, false, err
}
oldInbound.ShareAddrStrategy = inbound.ShareAddrStrategy
oldInbound.ShareAddr = inbound.ShareAddr
}
if oldTagWasAuto && inbound.Tag == tag {
inbound.Tag = ""
}
oldInbound.Tag, err = s.resolveInboundTag(inbound, inbound.Id)
if err != nil {
return inbound, false, err
}
inbound.Tag = oldInbound.Tag
needRestart := false
rt, push, dirty, perr := s.nodePushPlan(oldInbound)
if perr != nil {
err = perr
return inbound, false, err
}
if dirty {
markDirty = true
}
if oldInbound.NodeID == nil {
if !push {
needRestart = true
} else {
oldSnapshot := *oldInbound
oldSnapshot.Tag = tag
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil {
logger.Debug("Old inbound deleted on", rt.Name(), ":", tag)
markDirty := false
// Persist the client-stat sync, settings munging, runtime push and inbound
// save as one transaction routed through the serial traffic writer, so it
// never runs concurrently with the @every 5s traffic poll. Both touch
// client_traffics and inbounds in opposite order, which Postgres aborts as a
// deadlock (40P01); serializing removes the contention (runSerializedTx).
//
// The runtime push stays inside the transaction here (unlike the client-edit
// paths that apply it after commit): EnsureInboundTagAllowed must reach the
// node before the central row is committed, or a "selected"-mode node would
// sweep the renamed inbound on its next pull. Inbound edits are rare, so
// holding the writer across the node call is an acceptable trade.
txErr := runSerializedTx(func(tx *gorm.DB) error {
if err := s.updateClientTraffics(tx, oldInbound, inbound); err != nil {
return err
}
// Ensure created_at and updated_at exist in inbound.Settings clients
{
var oldSettings map[string]any
_ = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings)
emailToCreated := map[string]int64{}
emailToUpdated := map[string]int64{}
if oldSettings != nil {
if oc, ok := oldSettings["clients"].([]any); ok {
for _, it := range oc {
if m, ok2 := it.(map[string]any); ok2 {
if email, ok3 := m["email"].(string); ok3 {
switch v := m["created_at"].(type) {
case float64:
emailToCreated[email] = int64(v)
case int64:
emailToCreated[email] = v
}
switch v := m["updated_at"].(type) {
case float64:
emailToUpdated[email] = int64(v)
case int64:
emailToUpdated[email] = v
}
}
}
}
}
}
if inbound.Enable {
runtimeInbound, err2 := s.buildRuntimeInboundForAPI(tx, oldInbound)
if err2 != nil {
logger.Debug("Unable to prepare runtime inbound config:", err2)
needRestart = true
} else if err2 := rt.AddInbound(context.Background(), runtimeInbound); err2 == nil {
logger.Debug("Updated inbound added on", rt.Name(), ":", oldInbound.Tag)
} else {
logger.Debug("Unable to update inbound on", rt.Name(), ":", err2)
needRestart = true
var newSettings map[string]any
if err2 := json.Unmarshal([]byte(inbound.Settings), &newSettings); err2 == nil && newSettings != nil {
now := time.Now().Unix() * 1000
if nSlice, ok := newSettings["clients"].([]any); ok {
for i := range nSlice {
if m, ok2 := nSlice[i].(map[string]any); ok2 {
email, _ := m["email"].(string)
if _, ok3 := m["created_at"]; !ok3 {
if v, ok4 := emailToCreated[email]; ok4 && v > 0 {
m["created_at"] = v
} else {
m["created_at"] = now
}
}
// Preserve client's updated_at if present; do not bump on parent inbound update
if _, hasUpdated := m["updated_at"]; !hasUpdated {
if v, ok4 := emailToUpdated[email]; ok4 && v > 0 {
m["updated_at"] = v
}
}
nSlice[i] = m
}
}
newSettings["clients"] = nSlice
if bs, err3 := json.MarshalIndent(newSettings, "", " "); err3 == nil {
inbound.Settings = string(bs)
}
}
}
}
} else if push {
oldSnapshot := *oldInbound
oldSnapshot.Tag = tag
if !inbound.Enable {
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil {
logger.Warning("Unable to disable inbound on", rt.Name(), ":", err2)
markDirty = true
// A Shadowsocks-2022 method change resizes the key, but existing client PSKs
// keep their old length and would be rejected by xray. Regenerate mismatched
// client keys so the inbound stays connectable.
if normalized, changed := normalizeShadowsocksClientKeys(inbound.Settings); changed {
inbound.Settings = normalized
logger.Warning("Shadowsocks inbound", inbound.Id, "method change resized keys; regenerated mismatched client PSK(s)")
}
oldInbound.Total = inbound.Total
oldInbound.Remark = inbound.Remark
oldInbound.SubSortIndex = inbound.SubSortIndex
oldInbound.Enable = inbound.Enable
oldInbound.ExpiryTime = inbound.ExpiryTime
oldInbound.TrafficReset = inbound.TrafficReset
oldInbound.Listen = inbound.Listen
oldInbound.Port = inbound.Port
oldInbound.Protocol = inbound.Protocol
oldInbound.Settings = inbound.Settings
oldInbound.StreamSettings = inbound.StreamSettings
oldInbound.Sniffing = inbound.Sniffing
if strings.TrimSpace(inbound.ShareAddrStrategy) == "" {
normalizeInboundShareAddress(oldInbound)
inbound.ShareAddrStrategy = oldInbound.ShareAddrStrategy
inbound.ShareAddr = oldInbound.ShareAddr
} else {
if err := normalizeInboundShareAddressStrict(inbound); err != nil {
return err
}
} else if err2 := rt.UpdateInbound(context.Background(), &oldSnapshot, oldInbound); err2 != nil {
logger.Warning("Unable to update inbound on", rt.Name(), ":", err2)
oldInbound.ShareAddrStrategy = inbound.ShareAddrStrategy
oldInbound.ShareAddr = inbound.ShareAddr
}
if oldTagWasAuto && inbound.Tag == tag {
inbound.Tag = ""
}
resolvedTag, err := s.resolveInboundTag(inbound, inbound.Id)
if err != nil {
return err
}
oldInbound.Tag = resolvedTag
inbound.Tag = oldInbound.Tag
rt, push, dirty, perr := s.nodePushPlan(oldInbound)
if perr != nil {
return perr
}
if dirty {
markDirty = true
}
}
// A rename must allow the new tag before the deferred commit, or a node in
// "selected" sync mode would sweep the renamed central row on the next pull.
if oldInbound.NodeID != nil {
if aErr := (&NodeService{}).EnsureInboundTagAllowed(*oldInbound.NodeID, oldInbound.Tag); aErr != nil {
logger.Warning("allow inbound tag on node failed:", aErr)
if oldInbound.NodeID == nil {
if !push {
needRestart = true
} else {
oldSnapshot := *oldInbound
oldSnapshot.Tag = tag
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil {
logger.Debug("Old inbound deleted on", rt.Name(), ":", tag)
}
if inbound.Enable {
runtimeInbound, err2 := s.buildRuntimeInboundForAPI(tx, oldInbound)
if err2 != nil {
logger.Debug("Unable to prepare runtime inbound config:", err2)
needRestart = true
} else if err2 := rt.AddInbound(context.Background(), runtimeInbound); err2 == nil {
logger.Debug("Updated inbound added on", rt.Name(), ":", oldInbound.Tag)
} else {
logger.Debug("Unable to update inbound on", rt.Name(), ":", err2)
needRestart = true
}
}
}
} else if push {
oldSnapshot := *oldInbound
oldSnapshot.Tag = tag
if !inbound.Enable {
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil {
logger.Warning("Unable to disable inbound on", rt.Name(), ":", err2)
markDirty = true
}
} else if err2 := rt.UpdateInbound(context.Background(), &oldSnapshot, oldInbound); err2 != nil {
logger.Warning("Unable to update inbound on", rt.Name(), ":", err2)
markDirty = true
}
}
}
if err = tx.Save(oldInbound).Error; err != nil {
return inbound, false, err
// A rename must allow the new tag before the inbound row is committed, or a
// node in "selected" sync mode would sweep the renamed central row on the
// next pull.
if oldInbound.NodeID != nil {
if aErr := (&NodeService{}).EnsureInboundTagAllowed(*oldInbound.NodeID, oldInbound.Tag); aErr != nil {
logger.Warning("allow inbound tag on node failed:", aErr)
}
}
if err := tx.Save(oldInbound).Error; err != nil {
return err
}
newClients, gcErr := s.GetClients(oldInbound)
if gcErr != nil {
return gcErr
}
if err := s.clientService.SyncInbound(tx, oldInbound.Id, newClients); err != nil {
return err
}
// (Re)generate the Xray config whenever routing was or is now enabled, so
// the egress SOCKS bridge is added, moved, or dropped to match the new
// settings.
if mtprotoRoutesThroughXray(inbound) || oldRoutedMtproto {
needRestart = true
}
return nil
})
if txErr != nil {
return inbound, false, txErr
}
newClients, gcErr := s.GetClients(oldInbound)
if gcErr != nil {
err = gcErr
return inbound, false, err
}
if err = s.clientService.SyncInbound(tx, oldInbound.Id, newClients); err != nil {
return inbound, false, err
}
// (Re)generate the Xray config whenever routing was or is now enabled, so the
// egress SOCKS bridge is added, moved, or dropped to match the new settings.
if mtprotoRoutesThroughXray(inbound) || oldRoutedMtproto {
needRestart = true
if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
return inbound, needRestart, nil
}
+20
View File
@@ -7,7 +7,10 @@ import (
"sync"
"time"
"github.com/mhsanaei/3x-ui/v3/internal/database"
"github.com/mhsanaei/3x-ui/v3/internal/logger"
"gorm.io/gorm"
)
const (
@@ -109,6 +112,23 @@ func runTrafficWriter(ctx context.Context, queue chan *trafficWriteRequest, done
}
}
// runSerializedTx runs fn inside one DB transaction on the shared serial
// traffic-writer goroutine, so it can never execute concurrently with the
// @every 5s traffic poll (AddTraffic). Both touch the hot client_traffics and
// inbounds rows, and they acquire them in opposite order (the poll locks
// inbounds then client_traffics; an admin client/inbound mutation does the
// reverse), which Postgres aborts as a deadlock (SQLSTATE 40P01). Routing every
// such mutation through this single writer removes that contention entirely.
//
// Keep network I/O (node pushes) OUT of fn: holding the single writer across a
// remote node call would stall all traffic accounting for up to the remote
// timeout. Apply runtime changes after this returns.
func runSerializedTx(fn func(tx *gorm.DB) error) error {
return submitTrafficWrite(func() error {
return database.GetDB().Transaction(fn)
})
}
func safeApply(fn func() error) (err error) {
defer func() {
if r := recover(); r != nil {