From c5d31de4e96850787815f21757fb746dd328416e Mon Sep 17 00:00:00 2001 From: MHSanaei Date: Wed, 17 Jun 2026 15:55:47 +0200 Subject: [PATCH] fix(service): serialize client/inbound writes to prevent Postgres deadlock Client/inbound mutations opened their own transactions that locked client_traffics before inbounds, while the @every 5s traffic poll (AddTraffic, already serialized through the traffic writer) locks them in the opposite order. Concurrently these formed an ABBA lock cycle that Postgres aborted as "deadlock detected" (SQLSTATE 40P01), failing client updates. Route those DB writes through the same single-goroutine traffic writer via a new runSerializedTx helper, so they can never run concurrently with the poll. For the client-edit paths the runtime (node) push is moved after the commit, keeping network I/O out of the serialized section. UpdateInbound keeps its push inside the transaction because EnsureInboundTagAllowed must reach the node before the central row is committed. Covers UpdateInboundClient/addInboundClient/DelInboundClientByEmail/ delInboundClients, the bulk adjust/delete transactions, and UpdateInbound. --- internal/web/service/client_bulk.go | 67 +-- internal/web/service/client_inbound_apply.go | 415 +++++++++++-------- internal/web/service/inbound.go | 363 ++++++++-------- internal/web/service/traffic_writer.go | 20 + 4 files changed, 477 insertions(+), 388 deletions(-) diff --git a/internal/web/service/client_bulk.go b/internal/web/service/client_bulk.go index 322f3bfee..5a21119ee 100644 --- a/internal/web/service/client_bulk.go +++ b/internal/web/service/client_bulk.go @@ -545,8 +545,9 @@ func (s *ClientService) bulkAdjustInboundClients( } } - db := database.GetDB() - txErr := db.Transaction(func(tx *gorm.DB) error { + // Serialize against the traffic poll to avoid the cross-transaction + // lock-order deadlock on inbounds/client_records (runSerializedTx). + txErr := runSerializedTx(func(tx *gorm.DB) error { if err := tx.Save(oldInbound).Error; err != nil { return err } @@ -685,25 +686,32 @@ func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string, } if len(successIds) > 0 { - for _, batch := range chunkInts(successIds, sqlInChunk) { - if err := db.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; err != nil { - return result, needRestart, err - } - } - if !keepTraffic && len(successEmails) > 0 { - for _, batch := range chunkStrings(successEmails, sqlInChunk) { - if err := db.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; err != nil { - return result, needRestart, err - } - if err := db.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; err != nil { - return result, needRestart, err + // Serialize the row cleanup against the traffic poll to avoid the + // cross-transaction lock-order deadlock on client_traffics/inbounds. + if err := runSerializedTx(func(tx *gorm.DB) error { + for _, batch := range chunkInts(successIds, sqlInChunk) { + if e := tx.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; e != nil { + return e } } - } - for _, batch := range chunkInts(successIds, sqlInChunk) { - if err := db.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; err != nil { - return result, needRestart, err + if !keepTraffic && len(successEmails) > 0 { + for _, batch := range chunkStrings(successEmails, sqlInChunk) { + if e := tx.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; e != nil { + return e + } + if e := tx.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; e != nil { + return e + } + } } + for _, batch := range chunkInts(successIds, sqlInChunk) { + if e := tx.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; e != nil { + return e + } + } + return nil + }); err != nil { + return result, needRestart, err } } @@ -850,14 +858,19 @@ func (s *ClientService) bulkDelInboundClients( } } if len(purge) > 0 { - if delErr := inboundSvc.delClientIPsByEmails(db, purge); delErr != nil { - logger.Error("Error in delete client IPs") - for _, email := range purge { - res.perEmailSkipped[email] = delErr.Error() - delete(foundEmails, email) + // Serialize the IP/stat purge against the traffic poll to avoid the + // cross-transaction lock-order deadlock on client_traffics. + if delErr := runSerializedTx(func(tx *gorm.DB) error { + if e := inboundSvc.delClientIPsByEmails(tx, purge); e != nil { + logger.Error("Error in delete client IPs") + return e } - } else if delErr := inboundSvc.delClientStatsByEmails(db, purge); delErr != nil { - logger.Error("Delete stats Data Error") + if e := inboundSvc.delClientStatsByEmails(tx, purge); e != nil { + logger.Error("Delete stats Data Error") + return e + } + return nil + }); delErr != nil { for _, email := range purge { res.perEmailSkipped[email] = delErr.Error() delete(foundEmails, email) @@ -909,7 +922,9 @@ func (s *ClientService) bulkDelInboundClients( } } - txErr := db.Transaction(func(tx *gorm.DB) error { + // Serialize against the traffic poll to avoid the cross-transaction + // lock-order deadlock on inbounds/client_records (runSerializedTx). + txErr := runSerializedTx(func(tx *gorm.DB) error { if err := tx.Save(oldInbound).Error; err != nil { return err } diff --git a/internal/web/service/client_inbound_apply.go b/internal/web/service/client_inbound_apply.go index 881f0e1ca..59b5eb19d 100644 --- a/internal/web/service/client_inbound_apply.go +++ b/internal/web/service/client_inbound_apply.go @@ -12,7 +12,10 @@ import ( "github.com/mhsanaei/3x-ui/v3/internal/logger" "github.com/mhsanaei/3x-ui/v3/internal/util/common" "github.com/mhsanaei/3x-ui/v3/internal/util/random" + "github.com/mhsanaei/3x-ui/v3/internal/web/runtime" "github.com/mhsanaei/3x-ui/v3/internal/xray" + + "gorm.io/gorm" ) // delInboundClients removes several clients from a single inbound in one pass: @@ -105,40 +108,79 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId needRestart := false markDirty := false + + // Read each client's live state before the DB write (DelClientStat would + // erase the enable flag we need to decide on a runtime removal). + type delTarget struct { + email string + emailShared bool + notDepleted bool + needApiDel bool + } + targets := make([]delTarget, 0, len(removed)) for _, r := range removed { email := r.email emailShared := sharedSet[strings.ToLower(strings.TrimSpace(email))] - if !emailShared && !keepTraffic { - if err := inboundSvc.DelClientIPs(db, email); err != nil { - logger.Error("Error in delete client IPs") - return needRestart, err - } - } + notDepleted := false if len(email) > 0 { var enables []bool if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Limit(1).Pluck("enable", &enables).Error; err != nil { logger.Error("Get stats error") return needRestart, err } - notDepleted := len(enables) > 0 && enables[0] - if !emailShared && !keepTraffic { - if err := inboundSvc.DelClientStat(db, email); err != nil { + notDepleted = len(enables) > 0 && enables[0] + } + targets = append(targets, delTarget{email: email, emailShared: emailShared, notDepleted: notDepleted, needApiDel: r.needApiDel}) + } + + // Persist the batch deletion atomically, serialized against the traffic poll + // to avoid the cross-transaction lock-order deadlock (runSerializedTx). + if txErr := runSerializedTx(func(tx *gorm.DB) error { + for _, t := range targets { + if t.emailShared || keepTraffic { + continue + } + if e := inboundSvc.DelClientIPs(tx, t.email); e != nil { + logger.Error("Error in delete client IPs") + return e + } + if len(t.email) > 0 { + if e := inboundSvc.DelClientStat(tx, t.email); e != nil { logger.Error("Delete stats Data Error") - return needRestart, err + return e } } - if r.needApiDel && notDepleted && oldInbound.NodeID == nil { + } + if e := tx.Save(oldInbound).Error; e != nil { + return e + } + finalClients, gcErr := inboundSvc.GetClients(oldInbound) + if gcErr != nil { + return gcErr + } + return s.SyncInbound(tx, inboundId, finalClients) + }); txErr != nil { + return needRestart, txErr + } + + // Apply runtime deletes after commit — outside the serialized writer so a + // slow node call can't stall traffic accounting. + for _, t := range targets { + if len(t.email) == 0 { + continue + } + if oldInbound.NodeID == nil { + if t.needApiDel && t.notDepleted { rt, rterr := inboundSvc.runtimeFor(oldInbound) if rterr != nil { needRestart = true - } else if err1 := rt.RemoveUser(context.Background(), oldInbound, email); err1 != nil { - if !strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) { + } else if err1 := rt.RemoveUser(context.Background(), oldInbound, t.email); err1 != nil { + if !strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", t.email)) { needRestart = true } } } - } - if oldInbound.NodeID != nil && len(email) > 0 { + } else { rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) if perr != nil { return needRestart, perr @@ -147,7 +189,7 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId markDirty = true } if push { - if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { + if err1 := rt.DeleteUser(context.Background(), oldInbound, t.email); err1 != nil { logger.Warning("Error in deleting client on", rt.Name(), ":", err1) markDirty = true } @@ -155,16 +197,6 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId } } - if err := db.Save(oldInbound).Error; err != nil { - return needRestart, err - } - finalClients, gcErr := inboundSvc.GetClients(oldInbound) - if gcErr != nil { - return needRestart, gcErr - } - if err := s.SyncInbound(db, inboundId, finalClients); err != nil { - return needRestart, err - } if markDirty && oldInbound.NodeID != nil { if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { logger.Warning("mark node dirty failed:", dErr) @@ -300,32 +332,42 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model oldInbound.Settings = string(newSettings) - db := database.GetDB() - tx := db.Begin() - - markDirty := false - defer func() { - if err != nil { - tx.Rollback() - return - } - tx.Commit() - if markDirty && oldInbound.NodeID != nil { - if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { - logger.Warning("mark node dirty failed:", dErr) - } - } - }() - needRestart := false + markDirty := false + rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) if perr != nil { - err = perr - return false, err + return false, perr } if dirty { markDirty = true } + + // Persist client stats + inbound atomically, serialized against the traffic + // poll to avoid the cross-transaction lock-order deadlock (runSerializedTx). + if txErr := runSerializedTx(func(tx *gorm.DB) error { + for i := range clients { + if len(clients[i].Email) == 0 { + continue + } + if e := inboundSvc.AddClientStat(tx, data.Id, &clients[i]); e != nil { + return e + } + } + if e := tx.Save(oldInbound).Error; e != nil { + return e + } + finalClients, gcErr := inboundSvc.GetClients(oldInbound) + if gcErr != nil { + return gcErr + } + return s.SyncInbound(tx, oldInbound.Id, finalClients) + }); txErr != nil { + return false, txErr + } + + // Apply to the running runtime after commit — outside the serialized writer + // so a slow node call can't stall traffic accounting. if oldInbound.NodeID == nil { if !push { needRestart = true @@ -335,7 +377,6 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model needRestart = true continue } - inboundSvc.AddClientStat(tx, data.Id, &client) if !client.Enable { continue } @@ -362,9 +403,6 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model } } else { for _, client := range clients { - if len(client.Email) > 0 { - inboundSvc.AddClientStat(tx, data.Id, &client) - } if push { if err1 := rt.AddClient(context.Background(), oldInbound, client); err1 != nil { logger.Warning("Error in adding client on", rt.Name(), ":", err1) @@ -375,16 +413,10 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model } } - if err = tx.Save(oldInbound).Error; err != nil { - return false, err - } - finalClients, gcErr := inboundSvc.GetClients(oldInbound) - if gcErr != nil { - err = gcErr - return false, err - } - if err = s.SyncInbound(tx, oldInbound.Id, finalClients); err != nil { - return false, err + if markDirty && oldInbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } } return needRestart, nil } @@ -519,87 +551,98 @@ func (s *ClientService) UpdateInboundClient(inboundSvc *InboundService, data *mo } oldInbound.Settings = string(newSettings) - db := database.GetDB() - tx := db.Begin() - markDirty := false - defer func() { - if err != nil { - tx.Rollback() - return - } - tx.Commit() - if markDirty && oldInbound.NodeID != nil { - if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { - logger.Warning("mark node dirty failed:", dErr) - } - } - }() - - if len(clients[0].Email) > 0 { - if len(oldEmail) > 0 { - emailUnchanged := strings.EqualFold(oldEmail, clients[0].Email) - targetExists := int64(0) - if !emailUnchanged { - if err = tx.Model(xray.ClientTraffic{}).Where("email = ?", clients[0].Email).Count(&targetExists).Error; err != nil { - return false, err - } - } - if emailUnchanged || targetExists == 0 { - err = inboundSvc.UpdateClientStat(tx, oldEmail, &clients[0]) - if err != nil { - return false, err - } - err = inboundSvc.UpdateClientIPs(tx, oldEmail, clients[0].Email) - if err != nil { - return false, err - } - } else { - stillUsed, sErr := inboundSvc.emailUsedByOtherInbounds(oldEmail, data.Id) - if sErr != nil { - return false, sErr - } - if !stillUsed { - if err = inboundSvc.DelClientStat(tx, oldEmail); err != nil { - return false, err - } - if err = inboundSvc.DelClientIPs(tx, oldEmail); err != nil { - return false, err - } - } - if err = inboundSvc.UpdateClientStat(tx, clients[0].Email, &clients[0]); err != nil { - return false, err - } - } - } else { - inboundSvc.AddClientStat(tx, data.Id, &clients[0]) - } - } else { - stillUsed, err := inboundSvc.emailUsedByOtherInbounds(oldEmail, data.Id) - if err != nil { - return false, err - } - if !stillUsed { - err = inboundSvc.DelClientStat(tx, oldEmail) - if err != nil { - return false, err - } - err = inboundSvc.DelClientIPs(tx, oldEmail) - if err != nil { - return false, err - } - } - } needRestart := false + markDirty := false + + // Resolve the push plan before the DB write so a node-state lookup failure + // still aborts the whole update without committing anything (it used to roll + // the transaction back). nodePushPlan only reads, so order doesn't matter. + var rt runtime.Runtime + var push bool if len(oldEmail) > 0 { - rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) + var dirty bool + var perr error + rt, push, dirty, perr = inboundSvc.nodePushPlan(oldInbound) if perr != nil { - err = perr - return false, err + return false, perr } if dirty { markDirty = true } + } + + // Persist client stats + inbound atomically, serialized against the traffic + // poll to avoid the cross-transaction lock-order deadlock (runSerializedTx). + if txErr := runSerializedTx(func(tx *gorm.DB) error { + if len(clients[0].Email) > 0 { + if len(oldEmail) > 0 { + emailUnchanged := strings.EqualFold(oldEmail, clients[0].Email) + targetExists := int64(0) + if !emailUnchanged { + if e := tx.Model(xray.ClientTraffic{}).Where("email = ?", clients[0].Email).Count(&targetExists).Error; e != nil { + return e + } + } + if emailUnchanged || targetExists == 0 { + if e := inboundSvc.UpdateClientStat(tx, oldEmail, &clients[0]); e != nil { + return e + } + if e := inboundSvc.UpdateClientIPs(tx, oldEmail, clients[0].Email); e != nil { + return e + } + } else { + stillUsed, sErr := inboundSvc.emailUsedByOtherInbounds(oldEmail, data.Id) + if sErr != nil { + return sErr + } + if !stillUsed { + if e := inboundSvc.DelClientStat(tx, oldEmail); e != nil { + return e + } + if e := inboundSvc.DelClientIPs(tx, oldEmail); e != nil { + return e + } + } + if e := inboundSvc.UpdateClientStat(tx, clients[0].Email, &clients[0]); e != nil { + return e + } + } + } else { + if e := inboundSvc.AddClientStat(tx, data.Id, &clients[0]); e != nil { + return e + } + } + } else { + stillUsed, sErr := inboundSvc.emailUsedByOtherInbounds(oldEmail, data.Id) + if sErr != nil { + return sErr + } + if !stillUsed { + if e := inboundSvc.DelClientStat(tx, oldEmail); e != nil { + return e + } + if e := inboundSvc.DelClientIPs(tx, oldEmail); e != nil { + return e + } + } + } + + if e := tx.Save(oldInbound).Error; e != nil { + return e + } + finalClients, gcErr := inboundSvc.GetClients(oldInbound) + if gcErr != nil { + return gcErr + } + return s.SyncInbound(tx, oldInbound.Id, finalClients) + }); txErr != nil { + return false, txErr + } + + // Apply to the running runtime after the DB is committed — outside the + // serialized writer so a slow node call can't stall traffic accounting. + if len(oldEmail) > 0 { if oldInbound.NodeID == nil { if !push { needRestart = true @@ -647,16 +690,11 @@ func (s *ClientService) UpdateInboundClient(inboundSvc *InboundService, data *mo logger.Debug("Client old email not found") needRestart = true } - if err = tx.Save(oldInbound).Error; err != nil { - return false, err - } - finalClients, gcErr := inboundSvc.GetClients(oldInbound) - if gcErr != nil { - err = gcErr - return false, err - } - if err = s.SyncInbound(tx, oldInbound.Id, finalClients); err != nil { - return false, err + + if markDirty && oldInbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } } return needRestart, nil } @@ -718,41 +756,67 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo return false, err } - if !emailShared && !keepTraffic { - if err := inboundSvc.DelClientIPs(db, email); err != nil { - logger.Error("Error in delete client IPs") - return false, err - } - } - needRestart := false markDirty := false - if len(email) > 0 && !emailShared { - if !keepTraffic { - traffic, err := inboundSvc.GetClientTrafficByEmail(email) - if err != nil { - return false, err - } - if traffic != nil { - if err := inboundSvc.DelClientStat(db, email); err != nil { - logger.Error("Delete stats Data Error") - return false, err - } + // Decide what to delete and the push plan before the serialized DB write — + // these are reads, and nodePushPlan failing should abort before committing. + delStat := false + if len(email) > 0 && !emailShared && !keepTraffic { + traffic, tErr := inboundSvc.GetClientTrafficByEmail(email) + if tErr != nil { + return false, tErr + } + delStat = traffic != nil + } + + var rt runtime.Runtime + var push bool + if len(email) > 0 && !emailShared && (oldInbound.NodeID != nil || needApiDel) { + r, p, dirty, perr := inboundSvc.nodePushPlan(oldInbound) + if perr != nil { + return false, perr + } + rt, push = r, p + if dirty { + markDirty = true + } + } + + // Persist the deletion atomically, serialized against the traffic poll to + // avoid the cross-transaction lock-order deadlock (runSerializedTx). + if txErr := runSerializedTx(func(tx *gorm.DB) error { + if !emailShared && !keepTraffic { + if e := inboundSvc.DelClientIPs(tx, email); e != nil { + logger.Error("Error in delete client IPs") + return e } } + if delStat { + if e := inboundSvc.DelClientStat(tx, email); e != nil { + logger.Error("Delete stats Data Error") + return e + } + } + if e := tx.Save(oldInbound).Error; e != nil { + return e + } + finalClients, gcErr := inboundSvc.GetClients(oldInbound) + if gcErr != nil { + return gcErr + } + return s.SyncInbound(tx, inboundId, finalClients) + }); txErr != nil { + return false, txErr + } + // Apply the runtime delete after commit — outside the serialized writer so a + // slow node call can't stall traffic accounting. + if len(email) > 0 && !emailShared { if oldInbound.NodeID == nil { // Local inbound: a disabled client isn't in the running Xray, so only // a live one (needApiDel) needs an API removal. if needApiDel { - rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) - if perr != nil { - return false, perr - } - if dirty { - markDirty = true - } if !push { needRestart = true } else if err1 := rt.RemoveUser(context.Background(), oldInbound, email); err1 == nil { @@ -769,13 +833,6 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo // Node inbound: propagate the delete regardless of the enable flag — // the node's own DB still carries a disabled client and would // resurrect it on the next snapshot otherwise. - rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) - if perr != nil { - return false, perr - } - if dirty { - markDirty = true - } if push { if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { logger.Warning("Error in deleting client on", rt.Name(), ":", err1) @@ -785,16 +842,6 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo } } - if err := db.Save(oldInbound).Error; err != nil { - return false, err - } - finalClients, gcErr := inboundSvc.GetClients(oldInbound) - if gcErr != nil { - return false, gcErr - } - if err := s.SyncInbound(db, inboundId, finalClients); err != nil { - return false, err - } if markDirty && oldInbound.NodeID != nil { if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { logger.Warning("mark node dirty failed:", dErr) diff --git a/internal/web/service/inbound.go b/internal/web/service/inbound.go index 39cfd003f..b73dcda9d 100644 --- a/internal/web/service/inbound.go +++ b/internal/web/service/inbound.go @@ -966,195 +966,202 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, oldBits := inboundTransports(oldInbound.Protocol, oldInbound.StreamSettings, oldInbound.Settings) oldTagWasAuto := isAutoGeneratedTag(tag, oldInbound.Port, oldInbound.NodeID, oldBits) - db := database.GetDB() - tx := db.Begin() - - markDirty := false - defer func() { - if err != nil { - tx.Rollback() - return - } - tx.Commit() - if markDirty && oldInbound.NodeID != nil { - if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { - logger.Warning("mark node dirty failed:", dErr) - } - } - }() - - err = s.updateClientTraffics(tx, oldInbound, inbound) - if err != nil { - return inbound, false, err - } - - // Ensure created_at and updated_at exist in inbound.Settings clients - { - var oldSettings map[string]any - _ = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings) - emailToCreated := map[string]int64{} - emailToUpdated := map[string]int64{} - if oldSettings != nil { - if oc, ok := oldSettings["clients"].([]any); ok { - for _, it := range oc { - if m, ok2 := it.(map[string]any); ok2 { - if email, ok3 := m["email"].(string); ok3 { - switch v := m["created_at"].(type) { - case float64: - emailToCreated[email] = int64(v) - case int64: - emailToCreated[email] = v - } - switch v := m["updated_at"].(type) { - case float64: - emailToUpdated[email] = int64(v) - case int64: - emailToUpdated[email] = v - } - } - } - } - } - } - var newSettings map[string]any - if err2 := json.Unmarshal([]byte(inbound.Settings), &newSettings); err2 == nil && newSettings != nil { - now := time.Now().Unix() * 1000 - if nSlice, ok := newSettings["clients"].([]any); ok { - for i := range nSlice { - if m, ok2 := nSlice[i].(map[string]any); ok2 { - email, _ := m["email"].(string) - if _, ok3 := m["created_at"]; !ok3 { - if v, ok4 := emailToCreated[email]; ok4 && v > 0 { - m["created_at"] = v - } else { - m["created_at"] = now - } - } - // Preserve client's updated_at if present; do not bump on parent inbound update - if _, hasUpdated := m["updated_at"]; !hasUpdated { - if v, ok4 := emailToUpdated[email]; ok4 && v > 0 { - m["updated_at"] = v - } - } - nSlice[i] = m - } - } - newSettings["clients"] = nSlice - if bs, err3 := json.MarshalIndent(newSettings, "", " "); err3 == nil { - inbound.Settings = string(bs) - } - } - } - } - - // A Shadowsocks-2022 method change resizes the key, but existing client PSKs - // keep their old length and would be rejected by xray. Regenerate mismatched - // client keys so the inbound stays connectable. - if normalized, changed := normalizeShadowsocksClientKeys(inbound.Settings); changed { - inbound.Settings = normalized - logger.Warning("Shadowsocks inbound", inbound.Id, "method change resized keys; regenerated mismatched client PSK(s)") - } - - oldInbound.Total = inbound.Total - oldInbound.Remark = inbound.Remark - oldInbound.SubSortIndex = inbound.SubSortIndex - oldInbound.Enable = inbound.Enable - oldInbound.ExpiryTime = inbound.ExpiryTime - oldInbound.TrafficReset = inbound.TrafficReset - oldInbound.Listen = inbound.Listen - oldInbound.Port = inbound.Port - oldInbound.Protocol = inbound.Protocol - oldInbound.Settings = inbound.Settings - oldInbound.StreamSettings = inbound.StreamSettings - oldInbound.Sniffing = inbound.Sniffing - if strings.TrimSpace(inbound.ShareAddrStrategy) == "" { - normalizeInboundShareAddress(oldInbound) - inbound.ShareAddrStrategy = oldInbound.ShareAddrStrategy - inbound.ShareAddr = oldInbound.ShareAddr - } else { - if err := normalizeInboundShareAddressStrict(inbound); err != nil { - return inbound, false, err - } - oldInbound.ShareAddrStrategy = inbound.ShareAddrStrategy - oldInbound.ShareAddr = inbound.ShareAddr - } - if oldTagWasAuto && inbound.Tag == tag { - inbound.Tag = "" - } - oldInbound.Tag, err = s.resolveInboundTag(inbound, inbound.Id) - if err != nil { - return inbound, false, err - } - inbound.Tag = oldInbound.Tag - needRestart := false - rt, push, dirty, perr := s.nodePushPlan(oldInbound) - if perr != nil { - err = perr - return inbound, false, err - } - if dirty { - markDirty = true - } - if oldInbound.NodeID == nil { - if !push { - needRestart = true - } else { - oldSnapshot := *oldInbound - oldSnapshot.Tag = tag - if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil { - logger.Debug("Old inbound deleted on", rt.Name(), ":", tag) + markDirty := false + + // Persist the client-stat sync, settings munging, runtime push and inbound + // save as one transaction routed through the serial traffic writer, so it + // never runs concurrently with the @every 5s traffic poll. Both touch + // client_traffics and inbounds in opposite order, which Postgres aborts as a + // deadlock (40P01); serializing removes the contention (runSerializedTx). + // + // The runtime push stays inside the transaction here (unlike the client-edit + // paths that apply it after commit): EnsureInboundTagAllowed must reach the + // node before the central row is committed, or a "selected"-mode node would + // sweep the renamed inbound on its next pull. Inbound edits are rare, so + // holding the writer across the node call is an acceptable trade. + txErr := runSerializedTx(func(tx *gorm.DB) error { + if err := s.updateClientTraffics(tx, oldInbound, inbound); err != nil { + return err + } + + // Ensure created_at and updated_at exist in inbound.Settings clients + { + var oldSettings map[string]any + _ = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings) + emailToCreated := map[string]int64{} + emailToUpdated := map[string]int64{} + if oldSettings != nil { + if oc, ok := oldSettings["clients"].([]any); ok { + for _, it := range oc { + if m, ok2 := it.(map[string]any); ok2 { + if email, ok3 := m["email"].(string); ok3 { + switch v := m["created_at"].(type) { + case float64: + emailToCreated[email] = int64(v) + case int64: + emailToCreated[email] = v + } + switch v := m["updated_at"].(type) { + case float64: + emailToUpdated[email] = int64(v) + case int64: + emailToUpdated[email] = v + } + } + } + } + } } - if inbound.Enable { - runtimeInbound, err2 := s.buildRuntimeInboundForAPI(tx, oldInbound) - if err2 != nil { - logger.Debug("Unable to prepare runtime inbound config:", err2) - needRestart = true - } else if err2 := rt.AddInbound(context.Background(), runtimeInbound); err2 == nil { - logger.Debug("Updated inbound added on", rt.Name(), ":", oldInbound.Tag) - } else { - logger.Debug("Unable to update inbound on", rt.Name(), ":", err2) - needRestart = true + var newSettings map[string]any + if err2 := json.Unmarshal([]byte(inbound.Settings), &newSettings); err2 == nil && newSettings != nil { + now := time.Now().Unix() * 1000 + if nSlice, ok := newSettings["clients"].([]any); ok { + for i := range nSlice { + if m, ok2 := nSlice[i].(map[string]any); ok2 { + email, _ := m["email"].(string) + if _, ok3 := m["created_at"]; !ok3 { + if v, ok4 := emailToCreated[email]; ok4 && v > 0 { + m["created_at"] = v + } else { + m["created_at"] = now + } + } + // Preserve client's updated_at if present; do not bump on parent inbound update + if _, hasUpdated := m["updated_at"]; !hasUpdated { + if v, ok4 := emailToUpdated[email]; ok4 && v > 0 { + m["updated_at"] = v + } + } + nSlice[i] = m + } + } + newSettings["clients"] = nSlice + if bs, err3 := json.MarshalIndent(newSettings, "", " "); err3 == nil { + inbound.Settings = string(bs) + } } } } - } else if push { - oldSnapshot := *oldInbound - oldSnapshot.Tag = tag - if !inbound.Enable { - if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil { - logger.Warning("Unable to disable inbound on", rt.Name(), ":", err2) - markDirty = true + + // A Shadowsocks-2022 method change resizes the key, but existing client PSKs + // keep their old length and would be rejected by xray. Regenerate mismatched + // client keys so the inbound stays connectable. + if normalized, changed := normalizeShadowsocksClientKeys(inbound.Settings); changed { + inbound.Settings = normalized + logger.Warning("Shadowsocks inbound", inbound.Id, "method change resized keys; regenerated mismatched client PSK(s)") + } + + oldInbound.Total = inbound.Total + oldInbound.Remark = inbound.Remark + oldInbound.SubSortIndex = inbound.SubSortIndex + oldInbound.Enable = inbound.Enable + oldInbound.ExpiryTime = inbound.ExpiryTime + oldInbound.TrafficReset = inbound.TrafficReset + oldInbound.Listen = inbound.Listen + oldInbound.Port = inbound.Port + oldInbound.Protocol = inbound.Protocol + oldInbound.Settings = inbound.Settings + oldInbound.StreamSettings = inbound.StreamSettings + oldInbound.Sniffing = inbound.Sniffing + if strings.TrimSpace(inbound.ShareAddrStrategy) == "" { + normalizeInboundShareAddress(oldInbound) + inbound.ShareAddrStrategy = oldInbound.ShareAddrStrategy + inbound.ShareAddr = oldInbound.ShareAddr + } else { + if err := normalizeInboundShareAddressStrict(inbound); err != nil { + return err } - } else if err2 := rt.UpdateInbound(context.Background(), &oldSnapshot, oldInbound); err2 != nil { - logger.Warning("Unable to update inbound on", rt.Name(), ":", err2) + oldInbound.ShareAddrStrategy = inbound.ShareAddrStrategy + oldInbound.ShareAddr = inbound.ShareAddr + } + if oldTagWasAuto && inbound.Tag == tag { + inbound.Tag = "" + } + resolvedTag, err := s.resolveInboundTag(inbound, inbound.Id) + if err != nil { + return err + } + oldInbound.Tag = resolvedTag + inbound.Tag = oldInbound.Tag + + rt, push, dirty, perr := s.nodePushPlan(oldInbound) + if perr != nil { + return perr + } + if dirty { markDirty = true } - } - - // A rename must allow the new tag before the deferred commit, or a node in - // "selected" sync mode would sweep the renamed central row on the next pull. - if oldInbound.NodeID != nil { - if aErr := (&NodeService{}).EnsureInboundTagAllowed(*oldInbound.NodeID, oldInbound.Tag); aErr != nil { - logger.Warning("allow inbound tag on node failed:", aErr) + if oldInbound.NodeID == nil { + if !push { + needRestart = true + } else { + oldSnapshot := *oldInbound + oldSnapshot.Tag = tag + if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil { + logger.Debug("Old inbound deleted on", rt.Name(), ":", tag) + } + if inbound.Enable { + runtimeInbound, err2 := s.buildRuntimeInboundForAPI(tx, oldInbound) + if err2 != nil { + logger.Debug("Unable to prepare runtime inbound config:", err2) + needRestart = true + } else if err2 := rt.AddInbound(context.Background(), runtimeInbound); err2 == nil { + logger.Debug("Updated inbound added on", rt.Name(), ":", oldInbound.Tag) + } else { + logger.Debug("Unable to update inbound on", rt.Name(), ":", err2) + needRestart = true + } + } + } + } else if push { + oldSnapshot := *oldInbound + oldSnapshot.Tag = tag + if !inbound.Enable { + if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil { + logger.Warning("Unable to disable inbound on", rt.Name(), ":", err2) + markDirty = true + } + } else if err2 := rt.UpdateInbound(context.Background(), &oldSnapshot, oldInbound); err2 != nil { + logger.Warning("Unable to update inbound on", rt.Name(), ":", err2) + markDirty = true + } } - } - if err = tx.Save(oldInbound).Error; err != nil { - return inbound, false, err + // A rename must allow the new tag before the inbound row is committed, or a + // node in "selected" sync mode would sweep the renamed central row on the + // next pull. + if oldInbound.NodeID != nil { + if aErr := (&NodeService{}).EnsureInboundTagAllowed(*oldInbound.NodeID, oldInbound.Tag); aErr != nil { + logger.Warning("allow inbound tag on node failed:", aErr) + } + } + + if err := tx.Save(oldInbound).Error; err != nil { + return err + } + newClients, gcErr := s.GetClients(oldInbound) + if gcErr != nil { + return gcErr + } + if err := s.clientService.SyncInbound(tx, oldInbound.Id, newClients); err != nil { + return err + } + // (Re)generate the Xray config whenever routing was or is now enabled, so + // the egress SOCKS bridge is added, moved, or dropped to match the new + // settings. + if mtprotoRoutesThroughXray(inbound) || oldRoutedMtproto { + needRestart = true + } + return nil + }) + if txErr != nil { + return inbound, false, txErr } - newClients, gcErr := s.GetClients(oldInbound) - if gcErr != nil { - err = gcErr - return inbound, false, err - } - if err = s.clientService.SyncInbound(tx, oldInbound.Id, newClients); err != nil { - return inbound, false, err - } - // (Re)generate the Xray config whenever routing was or is now enabled, so the - // egress SOCKS bridge is added, moved, or dropped to match the new settings. - if mtprotoRoutesThroughXray(inbound) || oldRoutedMtproto { - needRestart = true + if markDirty && oldInbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } } return inbound, needRestart, nil } diff --git a/internal/web/service/traffic_writer.go b/internal/web/service/traffic_writer.go index 4c1235e0d..be0cea8bd 100644 --- a/internal/web/service/traffic_writer.go +++ b/internal/web/service/traffic_writer.go @@ -7,7 +7,10 @@ import ( "sync" "time" + "github.com/mhsanaei/3x-ui/v3/internal/database" "github.com/mhsanaei/3x-ui/v3/internal/logger" + + "gorm.io/gorm" ) const ( @@ -109,6 +112,23 @@ func runTrafficWriter(ctx context.Context, queue chan *trafficWriteRequest, done } } +// runSerializedTx runs fn inside one DB transaction on the shared serial +// traffic-writer goroutine, so it can never execute concurrently with the +// @every 5s traffic poll (AddTraffic). Both touch the hot client_traffics and +// inbounds rows, and they acquire them in opposite order (the poll locks +// inbounds then client_traffics; an admin client/inbound mutation does the +// reverse), which Postgres aborts as a deadlock (SQLSTATE 40P01). Routing every +// such mutation through this single writer removes that contention entirely. +// +// Keep network I/O (node pushes) OUT of fn: holding the single writer across a +// remote node call would stall all traffic accounting for up to the remote +// timeout. Apply runtime changes after this returns. +func runSerializedTx(fn func(tx *gorm.DB) error) error { + return submitTrafficWrite(func() error { + return database.GetDB().Transaction(fn) + }) +} + func safeApply(fn func() error) (err error) { defer func() { if r := recover(); r != nil {