diff --git a/internal/web/service/client_bulk.go b/internal/web/service/client_bulk.go index 322f3bfee..5a21119ee 100644 --- a/internal/web/service/client_bulk.go +++ b/internal/web/service/client_bulk.go @@ -545,8 +545,9 @@ func (s *ClientService) bulkAdjustInboundClients( } } - db := database.GetDB() - txErr := db.Transaction(func(tx *gorm.DB) error { + // Serialize against the traffic poll to avoid the cross-transaction + // lock-order deadlock on inbounds/client_records (runSerializedTx). + txErr := runSerializedTx(func(tx *gorm.DB) error { if err := tx.Save(oldInbound).Error; err != nil { return err } @@ -685,25 +686,32 @@ func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string, } if len(successIds) > 0 { - for _, batch := range chunkInts(successIds, sqlInChunk) { - if err := db.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; err != nil { - return result, needRestart, err - } - } - if !keepTraffic && len(successEmails) > 0 { - for _, batch := range chunkStrings(successEmails, sqlInChunk) { - if err := db.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; err != nil { - return result, needRestart, err - } - if err := db.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; err != nil { - return result, needRestart, err + // Serialize the row cleanup against the traffic poll to avoid the + // cross-transaction lock-order deadlock on client_traffics/inbounds. + if err := runSerializedTx(func(tx *gorm.DB) error { + for _, batch := range chunkInts(successIds, sqlInChunk) { + if e := tx.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; e != nil { + return e } } - } - for _, batch := range chunkInts(successIds, sqlInChunk) { - if err := db.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; err != nil { - return result, needRestart, err + if !keepTraffic && len(successEmails) > 0 { + for _, batch := range chunkStrings(successEmails, sqlInChunk) { + if e := tx.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; e != nil { + return e + } + if e := tx.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; e != nil { + return e + } + } } + for _, batch := range chunkInts(successIds, sqlInChunk) { + if e := tx.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; e != nil { + return e + } + } + return nil + }); err != nil { + return result, needRestart, err } } @@ -850,14 +858,19 @@ func (s *ClientService) bulkDelInboundClients( } } if len(purge) > 0 { - if delErr := inboundSvc.delClientIPsByEmails(db, purge); delErr != nil { - logger.Error("Error in delete client IPs") - for _, email := range purge { - res.perEmailSkipped[email] = delErr.Error() - delete(foundEmails, email) + // Serialize the IP/stat purge against the traffic poll to avoid the + // cross-transaction lock-order deadlock on client_traffics. + if delErr := runSerializedTx(func(tx *gorm.DB) error { + if e := inboundSvc.delClientIPsByEmails(tx, purge); e != nil { + logger.Error("Error in delete client IPs") + return e } - } else if delErr := inboundSvc.delClientStatsByEmails(db, purge); delErr != nil { - logger.Error("Delete stats Data Error") + if e := inboundSvc.delClientStatsByEmails(tx, purge); e != nil { + logger.Error("Delete stats Data Error") + return e + } + return nil + }); delErr != nil { for _, email := range purge { res.perEmailSkipped[email] = delErr.Error() delete(foundEmails, email) @@ -909,7 +922,9 @@ func (s *ClientService) bulkDelInboundClients( } } - txErr := db.Transaction(func(tx *gorm.DB) error { + // Serialize against the traffic poll to avoid the cross-transaction + // lock-order deadlock on inbounds/client_records (runSerializedTx). + txErr := runSerializedTx(func(tx *gorm.DB) error { if err := tx.Save(oldInbound).Error; err != nil { return err } diff --git a/internal/web/service/client_inbound_apply.go b/internal/web/service/client_inbound_apply.go index 881f0e1ca..59b5eb19d 100644 --- a/internal/web/service/client_inbound_apply.go +++ b/internal/web/service/client_inbound_apply.go @@ -12,7 +12,10 @@ import ( "github.com/mhsanaei/3x-ui/v3/internal/logger" "github.com/mhsanaei/3x-ui/v3/internal/util/common" "github.com/mhsanaei/3x-ui/v3/internal/util/random" + "github.com/mhsanaei/3x-ui/v3/internal/web/runtime" "github.com/mhsanaei/3x-ui/v3/internal/xray" + + "gorm.io/gorm" ) // delInboundClients removes several clients from a single inbound in one pass: @@ -105,40 +108,79 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId needRestart := false markDirty := false + + // Read each client's live state before the DB write (DelClientStat would + // erase the enable flag we need to decide on a runtime removal). + type delTarget struct { + email string + emailShared bool + notDepleted bool + needApiDel bool + } + targets := make([]delTarget, 0, len(removed)) for _, r := range removed { email := r.email emailShared := sharedSet[strings.ToLower(strings.TrimSpace(email))] - if !emailShared && !keepTraffic { - if err := inboundSvc.DelClientIPs(db, email); err != nil { - logger.Error("Error in delete client IPs") - return needRestart, err - } - } + notDepleted := false if len(email) > 0 { var enables []bool if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Limit(1).Pluck("enable", &enables).Error; err != nil { logger.Error("Get stats error") return needRestart, err } - notDepleted := len(enables) > 0 && enables[0] - if !emailShared && !keepTraffic { - if err := inboundSvc.DelClientStat(db, email); err != nil { + notDepleted = len(enables) > 0 && enables[0] + } + targets = append(targets, delTarget{email: email, emailShared: emailShared, notDepleted: notDepleted, needApiDel: r.needApiDel}) + } + + // Persist the batch deletion atomically, serialized against the traffic poll + // to avoid the cross-transaction lock-order deadlock (runSerializedTx). + if txErr := runSerializedTx(func(tx *gorm.DB) error { + for _, t := range targets { + if t.emailShared || keepTraffic { + continue + } + if e := inboundSvc.DelClientIPs(tx, t.email); e != nil { + logger.Error("Error in delete client IPs") + return e + } + if len(t.email) > 0 { + if e := inboundSvc.DelClientStat(tx, t.email); e != nil { logger.Error("Delete stats Data Error") - return needRestart, err + return e } } - if r.needApiDel && notDepleted && oldInbound.NodeID == nil { + } + if e := tx.Save(oldInbound).Error; e != nil { + return e + } + finalClients, gcErr := inboundSvc.GetClients(oldInbound) + if gcErr != nil { + return gcErr + } + return s.SyncInbound(tx, inboundId, finalClients) + }); txErr != nil { + return needRestart, txErr + } + + // Apply runtime deletes after commit — outside the serialized writer so a + // slow node call can't stall traffic accounting. + for _, t := range targets { + if len(t.email) == 0 { + continue + } + if oldInbound.NodeID == nil { + if t.needApiDel && t.notDepleted { rt, rterr := inboundSvc.runtimeFor(oldInbound) if rterr != nil { needRestart = true - } else if err1 := rt.RemoveUser(context.Background(), oldInbound, email); err1 != nil { - if !strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) { + } else if err1 := rt.RemoveUser(context.Background(), oldInbound, t.email); err1 != nil { + if !strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", t.email)) { needRestart = true } } } - } - if oldInbound.NodeID != nil && len(email) > 0 { + } else { rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) if perr != nil { return needRestart, perr @@ -147,7 +189,7 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId markDirty = true } if push { - if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { + if err1 := rt.DeleteUser(context.Background(), oldInbound, t.email); err1 != nil { logger.Warning("Error in deleting client on", rt.Name(), ":", err1) markDirty = true } @@ -155,16 +197,6 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId } } - if err := db.Save(oldInbound).Error; err != nil { - return needRestart, err - } - finalClients, gcErr := inboundSvc.GetClients(oldInbound) - if gcErr != nil { - return needRestart, gcErr - } - if err := s.SyncInbound(db, inboundId, finalClients); err != nil { - return needRestart, err - } if markDirty && oldInbound.NodeID != nil { if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { logger.Warning("mark node dirty failed:", dErr) @@ -300,32 +332,42 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model oldInbound.Settings = string(newSettings) - db := database.GetDB() - tx := db.Begin() - - markDirty := false - defer func() { - if err != nil { - tx.Rollback() - return - } - tx.Commit() - if markDirty && oldInbound.NodeID != nil { - if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { - logger.Warning("mark node dirty failed:", dErr) - } - } - }() - needRestart := false + markDirty := false + rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) if perr != nil { - err = perr - return false, err + return false, perr } if dirty { markDirty = true } + + // Persist client stats + inbound atomically, serialized against the traffic + // poll to avoid the cross-transaction lock-order deadlock (runSerializedTx). + if txErr := runSerializedTx(func(tx *gorm.DB) error { + for i := range clients { + if len(clients[i].Email) == 0 { + continue + } + if e := inboundSvc.AddClientStat(tx, data.Id, &clients[i]); e != nil { + return e + } + } + if e := tx.Save(oldInbound).Error; e != nil { + return e + } + finalClients, gcErr := inboundSvc.GetClients(oldInbound) + if gcErr != nil { + return gcErr + } + return s.SyncInbound(tx, oldInbound.Id, finalClients) + }); txErr != nil { + return false, txErr + } + + // Apply to the running runtime after commit — outside the serialized writer + // so a slow node call can't stall traffic accounting. if oldInbound.NodeID == nil { if !push { needRestart = true @@ -335,7 +377,6 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model needRestart = true continue } - inboundSvc.AddClientStat(tx, data.Id, &client) if !client.Enable { continue } @@ -362,9 +403,6 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model } } else { for _, client := range clients { - if len(client.Email) > 0 { - inboundSvc.AddClientStat(tx, data.Id, &client) - } if push { if err1 := rt.AddClient(context.Background(), oldInbound, client); err1 != nil { logger.Warning("Error in adding client on", rt.Name(), ":", err1) @@ -375,16 +413,10 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model } } - if err = tx.Save(oldInbound).Error; err != nil { - return false, err - } - finalClients, gcErr := inboundSvc.GetClients(oldInbound) - if gcErr != nil { - err = gcErr - return false, err - } - if err = s.SyncInbound(tx, oldInbound.Id, finalClients); err != nil { - return false, err + if markDirty && oldInbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } } return needRestart, nil } @@ -519,87 +551,98 @@ func (s *ClientService) UpdateInboundClient(inboundSvc *InboundService, data *mo } oldInbound.Settings = string(newSettings) - db := database.GetDB() - tx := db.Begin() - markDirty := false - defer func() { - if err != nil { - tx.Rollback() - return - } - tx.Commit() - if markDirty && oldInbound.NodeID != nil { - if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { - logger.Warning("mark node dirty failed:", dErr) - } - } - }() - - if len(clients[0].Email) > 0 { - if len(oldEmail) > 0 { - emailUnchanged := strings.EqualFold(oldEmail, clients[0].Email) - targetExists := int64(0) - if !emailUnchanged { - if err = tx.Model(xray.ClientTraffic{}).Where("email = ?", clients[0].Email).Count(&targetExists).Error; err != nil { - return false, err - } - } - if emailUnchanged || targetExists == 0 { - err = inboundSvc.UpdateClientStat(tx, oldEmail, &clients[0]) - if err != nil { - return false, err - } - err = inboundSvc.UpdateClientIPs(tx, oldEmail, clients[0].Email) - if err != nil { - return false, err - } - } else { - stillUsed, sErr := inboundSvc.emailUsedByOtherInbounds(oldEmail, data.Id) - if sErr != nil { - return false, sErr - } - if !stillUsed { - if err = inboundSvc.DelClientStat(tx, oldEmail); err != nil { - return false, err - } - if err = inboundSvc.DelClientIPs(tx, oldEmail); err != nil { - return false, err - } - } - if err = inboundSvc.UpdateClientStat(tx, clients[0].Email, &clients[0]); err != nil { - return false, err - } - } - } else { - inboundSvc.AddClientStat(tx, data.Id, &clients[0]) - } - } else { - stillUsed, err := inboundSvc.emailUsedByOtherInbounds(oldEmail, data.Id) - if err != nil { - return false, err - } - if !stillUsed { - err = inboundSvc.DelClientStat(tx, oldEmail) - if err != nil { - return false, err - } - err = inboundSvc.DelClientIPs(tx, oldEmail) - if err != nil { - return false, err - } - } - } needRestart := false + markDirty := false + + // Resolve the push plan before the DB write so a node-state lookup failure + // still aborts the whole update without committing anything (it used to roll + // the transaction back). nodePushPlan only reads, so order doesn't matter. + var rt runtime.Runtime + var push bool if len(oldEmail) > 0 { - rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) + var dirty bool + var perr error + rt, push, dirty, perr = inboundSvc.nodePushPlan(oldInbound) if perr != nil { - err = perr - return false, err + return false, perr } if dirty { markDirty = true } + } + + // Persist client stats + inbound atomically, serialized against the traffic + // poll to avoid the cross-transaction lock-order deadlock (runSerializedTx). + if txErr := runSerializedTx(func(tx *gorm.DB) error { + if len(clients[0].Email) > 0 { + if len(oldEmail) > 0 { + emailUnchanged := strings.EqualFold(oldEmail, clients[0].Email) + targetExists := int64(0) + if !emailUnchanged { + if e := tx.Model(xray.ClientTraffic{}).Where("email = ?", clients[0].Email).Count(&targetExists).Error; e != nil { + return e + } + } + if emailUnchanged || targetExists == 0 { + if e := inboundSvc.UpdateClientStat(tx, oldEmail, &clients[0]); e != nil { + return e + } + if e := inboundSvc.UpdateClientIPs(tx, oldEmail, clients[0].Email); e != nil { + return e + } + } else { + stillUsed, sErr := inboundSvc.emailUsedByOtherInbounds(oldEmail, data.Id) + if sErr != nil { + return sErr + } + if !stillUsed { + if e := inboundSvc.DelClientStat(tx, oldEmail); e != nil { + return e + } + if e := inboundSvc.DelClientIPs(tx, oldEmail); e != nil { + return e + } + } + if e := inboundSvc.UpdateClientStat(tx, clients[0].Email, &clients[0]); e != nil { + return e + } + } + } else { + if e := inboundSvc.AddClientStat(tx, data.Id, &clients[0]); e != nil { + return e + } + } + } else { + stillUsed, sErr := inboundSvc.emailUsedByOtherInbounds(oldEmail, data.Id) + if sErr != nil { + return sErr + } + if !stillUsed { + if e := inboundSvc.DelClientStat(tx, oldEmail); e != nil { + return e + } + if e := inboundSvc.DelClientIPs(tx, oldEmail); e != nil { + return e + } + } + } + + if e := tx.Save(oldInbound).Error; e != nil { + return e + } + finalClients, gcErr := inboundSvc.GetClients(oldInbound) + if gcErr != nil { + return gcErr + } + return s.SyncInbound(tx, oldInbound.Id, finalClients) + }); txErr != nil { + return false, txErr + } + + // Apply to the running runtime after the DB is committed — outside the + // serialized writer so a slow node call can't stall traffic accounting. + if len(oldEmail) > 0 { if oldInbound.NodeID == nil { if !push { needRestart = true @@ -647,16 +690,11 @@ func (s *ClientService) UpdateInboundClient(inboundSvc *InboundService, data *mo logger.Debug("Client old email not found") needRestart = true } - if err = tx.Save(oldInbound).Error; err != nil { - return false, err - } - finalClients, gcErr := inboundSvc.GetClients(oldInbound) - if gcErr != nil { - err = gcErr - return false, err - } - if err = s.SyncInbound(tx, oldInbound.Id, finalClients); err != nil { - return false, err + + if markDirty && oldInbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } } return needRestart, nil } @@ -718,41 +756,67 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo return false, err } - if !emailShared && !keepTraffic { - if err := inboundSvc.DelClientIPs(db, email); err != nil { - logger.Error("Error in delete client IPs") - return false, err - } - } - needRestart := false markDirty := false - if len(email) > 0 && !emailShared { - if !keepTraffic { - traffic, err := inboundSvc.GetClientTrafficByEmail(email) - if err != nil { - return false, err - } - if traffic != nil { - if err := inboundSvc.DelClientStat(db, email); err != nil { - logger.Error("Delete stats Data Error") - return false, err - } + // Decide what to delete and the push plan before the serialized DB write — + // these are reads, and nodePushPlan failing should abort before committing. + delStat := false + if len(email) > 0 && !emailShared && !keepTraffic { + traffic, tErr := inboundSvc.GetClientTrafficByEmail(email) + if tErr != nil { + return false, tErr + } + delStat = traffic != nil + } + + var rt runtime.Runtime + var push bool + if len(email) > 0 && !emailShared && (oldInbound.NodeID != nil || needApiDel) { + r, p, dirty, perr := inboundSvc.nodePushPlan(oldInbound) + if perr != nil { + return false, perr + } + rt, push = r, p + if dirty { + markDirty = true + } + } + + // Persist the deletion atomically, serialized against the traffic poll to + // avoid the cross-transaction lock-order deadlock (runSerializedTx). + if txErr := runSerializedTx(func(tx *gorm.DB) error { + if !emailShared && !keepTraffic { + if e := inboundSvc.DelClientIPs(tx, email); e != nil { + logger.Error("Error in delete client IPs") + return e } } + if delStat { + if e := inboundSvc.DelClientStat(tx, email); e != nil { + logger.Error("Delete stats Data Error") + return e + } + } + if e := tx.Save(oldInbound).Error; e != nil { + return e + } + finalClients, gcErr := inboundSvc.GetClients(oldInbound) + if gcErr != nil { + return gcErr + } + return s.SyncInbound(tx, inboundId, finalClients) + }); txErr != nil { + return false, txErr + } + // Apply the runtime delete after commit — outside the serialized writer so a + // slow node call can't stall traffic accounting. + if len(email) > 0 && !emailShared { if oldInbound.NodeID == nil { // Local inbound: a disabled client isn't in the running Xray, so only // a live one (needApiDel) needs an API removal. if needApiDel { - rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) - if perr != nil { - return false, perr - } - if dirty { - markDirty = true - } if !push { needRestart = true } else if err1 := rt.RemoveUser(context.Background(), oldInbound, email); err1 == nil { @@ -769,13 +833,6 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo // Node inbound: propagate the delete regardless of the enable flag — // the node's own DB still carries a disabled client and would // resurrect it on the next snapshot otherwise. - rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound) - if perr != nil { - return false, perr - } - if dirty { - markDirty = true - } if push { if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { logger.Warning("Error in deleting client on", rt.Name(), ":", err1) @@ -785,16 +842,6 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo } } - if err := db.Save(oldInbound).Error; err != nil { - return false, err - } - finalClients, gcErr := inboundSvc.GetClients(oldInbound) - if gcErr != nil { - return false, gcErr - } - if err := s.SyncInbound(db, inboundId, finalClients); err != nil { - return false, err - } if markDirty && oldInbound.NodeID != nil { if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { logger.Warning("mark node dirty failed:", dErr) diff --git a/internal/web/service/inbound.go b/internal/web/service/inbound.go index 39cfd003f..b73dcda9d 100644 --- a/internal/web/service/inbound.go +++ b/internal/web/service/inbound.go @@ -966,195 +966,202 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, oldBits := inboundTransports(oldInbound.Protocol, oldInbound.StreamSettings, oldInbound.Settings) oldTagWasAuto := isAutoGeneratedTag(tag, oldInbound.Port, oldInbound.NodeID, oldBits) - db := database.GetDB() - tx := db.Begin() - - markDirty := false - defer func() { - if err != nil { - tx.Rollback() - return - } - tx.Commit() - if markDirty && oldInbound.NodeID != nil { - if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { - logger.Warning("mark node dirty failed:", dErr) - } - } - }() - - err = s.updateClientTraffics(tx, oldInbound, inbound) - if err != nil { - return inbound, false, err - } - - // Ensure created_at and updated_at exist in inbound.Settings clients - { - var oldSettings map[string]any - _ = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings) - emailToCreated := map[string]int64{} - emailToUpdated := map[string]int64{} - if oldSettings != nil { - if oc, ok := oldSettings["clients"].([]any); ok { - for _, it := range oc { - if m, ok2 := it.(map[string]any); ok2 { - if email, ok3 := m["email"].(string); ok3 { - switch v := m["created_at"].(type) { - case float64: - emailToCreated[email] = int64(v) - case int64: - emailToCreated[email] = v - } - switch v := m["updated_at"].(type) { - case float64: - emailToUpdated[email] = int64(v) - case int64: - emailToUpdated[email] = v - } - } - } - } - } - } - var newSettings map[string]any - if err2 := json.Unmarshal([]byte(inbound.Settings), &newSettings); err2 == nil && newSettings != nil { - now := time.Now().Unix() * 1000 - if nSlice, ok := newSettings["clients"].([]any); ok { - for i := range nSlice { - if m, ok2 := nSlice[i].(map[string]any); ok2 { - email, _ := m["email"].(string) - if _, ok3 := m["created_at"]; !ok3 { - if v, ok4 := emailToCreated[email]; ok4 && v > 0 { - m["created_at"] = v - } else { - m["created_at"] = now - } - } - // Preserve client's updated_at if present; do not bump on parent inbound update - if _, hasUpdated := m["updated_at"]; !hasUpdated { - if v, ok4 := emailToUpdated[email]; ok4 && v > 0 { - m["updated_at"] = v - } - } - nSlice[i] = m - } - } - newSettings["clients"] = nSlice - if bs, err3 := json.MarshalIndent(newSettings, "", " "); err3 == nil { - inbound.Settings = string(bs) - } - } - } - } - - // A Shadowsocks-2022 method change resizes the key, but existing client PSKs - // keep their old length and would be rejected by xray. Regenerate mismatched - // client keys so the inbound stays connectable. - if normalized, changed := normalizeShadowsocksClientKeys(inbound.Settings); changed { - inbound.Settings = normalized - logger.Warning("Shadowsocks inbound", inbound.Id, "method change resized keys; regenerated mismatched client PSK(s)") - } - - oldInbound.Total = inbound.Total - oldInbound.Remark = inbound.Remark - oldInbound.SubSortIndex = inbound.SubSortIndex - oldInbound.Enable = inbound.Enable - oldInbound.ExpiryTime = inbound.ExpiryTime - oldInbound.TrafficReset = inbound.TrafficReset - oldInbound.Listen = inbound.Listen - oldInbound.Port = inbound.Port - oldInbound.Protocol = inbound.Protocol - oldInbound.Settings = inbound.Settings - oldInbound.StreamSettings = inbound.StreamSettings - oldInbound.Sniffing = inbound.Sniffing - if strings.TrimSpace(inbound.ShareAddrStrategy) == "" { - normalizeInboundShareAddress(oldInbound) - inbound.ShareAddrStrategy = oldInbound.ShareAddrStrategy - inbound.ShareAddr = oldInbound.ShareAddr - } else { - if err := normalizeInboundShareAddressStrict(inbound); err != nil { - return inbound, false, err - } - oldInbound.ShareAddrStrategy = inbound.ShareAddrStrategy - oldInbound.ShareAddr = inbound.ShareAddr - } - if oldTagWasAuto && inbound.Tag == tag { - inbound.Tag = "" - } - oldInbound.Tag, err = s.resolveInboundTag(inbound, inbound.Id) - if err != nil { - return inbound, false, err - } - inbound.Tag = oldInbound.Tag - needRestart := false - rt, push, dirty, perr := s.nodePushPlan(oldInbound) - if perr != nil { - err = perr - return inbound, false, err - } - if dirty { - markDirty = true - } - if oldInbound.NodeID == nil { - if !push { - needRestart = true - } else { - oldSnapshot := *oldInbound - oldSnapshot.Tag = tag - if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil { - logger.Debug("Old inbound deleted on", rt.Name(), ":", tag) + markDirty := false + + // Persist the client-stat sync, settings munging, runtime push and inbound + // save as one transaction routed through the serial traffic writer, so it + // never runs concurrently with the @every 5s traffic poll. Both touch + // client_traffics and inbounds in opposite order, which Postgres aborts as a + // deadlock (40P01); serializing removes the contention (runSerializedTx). + // + // The runtime push stays inside the transaction here (unlike the client-edit + // paths that apply it after commit): EnsureInboundTagAllowed must reach the + // node before the central row is committed, or a "selected"-mode node would + // sweep the renamed inbound on its next pull. Inbound edits are rare, so + // holding the writer across the node call is an acceptable trade. + txErr := runSerializedTx(func(tx *gorm.DB) error { + if err := s.updateClientTraffics(tx, oldInbound, inbound); err != nil { + return err + } + + // Ensure created_at and updated_at exist in inbound.Settings clients + { + var oldSettings map[string]any + _ = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings) + emailToCreated := map[string]int64{} + emailToUpdated := map[string]int64{} + if oldSettings != nil { + if oc, ok := oldSettings["clients"].([]any); ok { + for _, it := range oc { + if m, ok2 := it.(map[string]any); ok2 { + if email, ok3 := m["email"].(string); ok3 { + switch v := m["created_at"].(type) { + case float64: + emailToCreated[email] = int64(v) + case int64: + emailToCreated[email] = v + } + switch v := m["updated_at"].(type) { + case float64: + emailToUpdated[email] = int64(v) + case int64: + emailToUpdated[email] = v + } + } + } + } + } } - if inbound.Enable { - runtimeInbound, err2 := s.buildRuntimeInboundForAPI(tx, oldInbound) - if err2 != nil { - logger.Debug("Unable to prepare runtime inbound config:", err2) - needRestart = true - } else if err2 := rt.AddInbound(context.Background(), runtimeInbound); err2 == nil { - logger.Debug("Updated inbound added on", rt.Name(), ":", oldInbound.Tag) - } else { - logger.Debug("Unable to update inbound on", rt.Name(), ":", err2) - needRestart = true + var newSettings map[string]any + if err2 := json.Unmarshal([]byte(inbound.Settings), &newSettings); err2 == nil && newSettings != nil { + now := time.Now().Unix() * 1000 + if nSlice, ok := newSettings["clients"].([]any); ok { + for i := range nSlice { + if m, ok2 := nSlice[i].(map[string]any); ok2 { + email, _ := m["email"].(string) + if _, ok3 := m["created_at"]; !ok3 { + if v, ok4 := emailToCreated[email]; ok4 && v > 0 { + m["created_at"] = v + } else { + m["created_at"] = now + } + } + // Preserve client's updated_at if present; do not bump on parent inbound update + if _, hasUpdated := m["updated_at"]; !hasUpdated { + if v, ok4 := emailToUpdated[email]; ok4 && v > 0 { + m["updated_at"] = v + } + } + nSlice[i] = m + } + } + newSettings["clients"] = nSlice + if bs, err3 := json.MarshalIndent(newSettings, "", " "); err3 == nil { + inbound.Settings = string(bs) + } } } } - } else if push { - oldSnapshot := *oldInbound - oldSnapshot.Tag = tag - if !inbound.Enable { - if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil { - logger.Warning("Unable to disable inbound on", rt.Name(), ":", err2) - markDirty = true + + // A Shadowsocks-2022 method change resizes the key, but existing client PSKs + // keep their old length and would be rejected by xray. Regenerate mismatched + // client keys so the inbound stays connectable. + if normalized, changed := normalizeShadowsocksClientKeys(inbound.Settings); changed { + inbound.Settings = normalized + logger.Warning("Shadowsocks inbound", inbound.Id, "method change resized keys; regenerated mismatched client PSK(s)") + } + + oldInbound.Total = inbound.Total + oldInbound.Remark = inbound.Remark + oldInbound.SubSortIndex = inbound.SubSortIndex + oldInbound.Enable = inbound.Enable + oldInbound.ExpiryTime = inbound.ExpiryTime + oldInbound.TrafficReset = inbound.TrafficReset + oldInbound.Listen = inbound.Listen + oldInbound.Port = inbound.Port + oldInbound.Protocol = inbound.Protocol + oldInbound.Settings = inbound.Settings + oldInbound.StreamSettings = inbound.StreamSettings + oldInbound.Sniffing = inbound.Sniffing + if strings.TrimSpace(inbound.ShareAddrStrategy) == "" { + normalizeInboundShareAddress(oldInbound) + inbound.ShareAddrStrategy = oldInbound.ShareAddrStrategy + inbound.ShareAddr = oldInbound.ShareAddr + } else { + if err := normalizeInboundShareAddressStrict(inbound); err != nil { + return err } - } else if err2 := rt.UpdateInbound(context.Background(), &oldSnapshot, oldInbound); err2 != nil { - logger.Warning("Unable to update inbound on", rt.Name(), ":", err2) + oldInbound.ShareAddrStrategy = inbound.ShareAddrStrategy + oldInbound.ShareAddr = inbound.ShareAddr + } + if oldTagWasAuto && inbound.Tag == tag { + inbound.Tag = "" + } + resolvedTag, err := s.resolveInboundTag(inbound, inbound.Id) + if err != nil { + return err + } + oldInbound.Tag = resolvedTag + inbound.Tag = oldInbound.Tag + + rt, push, dirty, perr := s.nodePushPlan(oldInbound) + if perr != nil { + return perr + } + if dirty { markDirty = true } - } - - // A rename must allow the new tag before the deferred commit, or a node in - // "selected" sync mode would sweep the renamed central row on the next pull. - if oldInbound.NodeID != nil { - if aErr := (&NodeService{}).EnsureInboundTagAllowed(*oldInbound.NodeID, oldInbound.Tag); aErr != nil { - logger.Warning("allow inbound tag on node failed:", aErr) + if oldInbound.NodeID == nil { + if !push { + needRestart = true + } else { + oldSnapshot := *oldInbound + oldSnapshot.Tag = tag + if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil { + logger.Debug("Old inbound deleted on", rt.Name(), ":", tag) + } + if inbound.Enable { + runtimeInbound, err2 := s.buildRuntimeInboundForAPI(tx, oldInbound) + if err2 != nil { + logger.Debug("Unable to prepare runtime inbound config:", err2) + needRestart = true + } else if err2 := rt.AddInbound(context.Background(), runtimeInbound); err2 == nil { + logger.Debug("Updated inbound added on", rt.Name(), ":", oldInbound.Tag) + } else { + logger.Debug("Unable to update inbound on", rt.Name(), ":", err2) + needRestart = true + } + } + } + } else if push { + oldSnapshot := *oldInbound + oldSnapshot.Tag = tag + if !inbound.Enable { + if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil { + logger.Warning("Unable to disable inbound on", rt.Name(), ":", err2) + markDirty = true + } + } else if err2 := rt.UpdateInbound(context.Background(), &oldSnapshot, oldInbound); err2 != nil { + logger.Warning("Unable to update inbound on", rt.Name(), ":", err2) + markDirty = true + } } - } - if err = tx.Save(oldInbound).Error; err != nil { - return inbound, false, err + // A rename must allow the new tag before the inbound row is committed, or a + // node in "selected" sync mode would sweep the renamed central row on the + // next pull. + if oldInbound.NodeID != nil { + if aErr := (&NodeService{}).EnsureInboundTagAllowed(*oldInbound.NodeID, oldInbound.Tag); aErr != nil { + logger.Warning("allow inbound tag on node failed:", aErr) + } + } + + if err := tx.Save(oldInbound).Error; err != nil { + return err + } + newClients, gcErr := s.GetClients(oldInbound) + if gcErr != nil { + return gcErr + } + if err := s.clientService.SyncInbound(tx, oldInbound.Id, newClients); err != nil { + return err + } + // (Re)generate the Xray config whenever routing was or is now enabled, so + // the egress SOCKS bridge is added, moved, or dropped to match the new + // settings. + if mtprotoRoutesThroughXray(inbound) || oldRoutedMtproto { + needRestart = true + } + return nil + }) + if txErr != nil { + return inbound, false, txErr } - newClients, gcErr := s.GetClients(oldInbound) - if gcErr != nil { - err = gcErr - return inbound, false, err - } - if err = s.clientService.SyncInbound(tx, oldInbound.Id, newClients); err != nil { - return inbound, false, err - } - // (Re)generate the Xray config whenever routing was or is now enabled, so the - // egress SOCKS bridge is added, moved, or dropped to match the new settings. - if mtprotoRoutesThroughXray(inbound) || oldRoutedMtproto { - needRestart = true + if markDirty && oldInbound.NodeID != nil { + if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil { + logger.Warning("mark node dirty failed:", dErr) + } } return inbound, needRestart, nil } diff --git a/internal/web/service/traffic_writer.go b/internal/web/service/traffic_writer.go index 4c1235e0d..be0cea8bd 100644 --- a/internal/web/service/traffic_writer.go +++ b/internal/web/service/traffic_writer.go @@ -7,7 +7,10 @@ import ( "sync" "time" + "github.com/mhsanaei/3x-ui/v3/internal/database" "github.com/mhsanaei/3x-ui/v3/internal/logger" + + "gorm.io/gorm" ) const ( @@ -109,6 +112,23 @@ func runTrafficWriter(ctx context.Context, queue chan *trafficWriteRequest, done } } +// runSerializedTx runs fn inside one DB transaction on the shared serial +// traffic-writer goroutine, so it can never execute concurrently with the +// @every 5s traffic poll (AddTraffic). Both touch the hot client_traffics and +// inbounds rows, and they acquire them in opposite order (the poll locks +// inbounds then client_traffics; an admin client/inbound mutation does the +// reverse), which Postgres aborts as a deadlock (SQLSTATE 40P01). Routing every +// such mutation through this single writer removes that contention entirely. +// +// Keep network I/O (node pushes) OUT of fn: holding the single writer across a +// remote node call would stall all traffic accounting for up to the remote +// timeout. Apply runtime changes after this returns. +func runSerializedTx(fn func(tx *gorm.DB) error) error { + return submitTrafficWrite(func() error { + return database.GetDB().Transaction(fn) + }) +} + func safeApply(fn func() error) (err error) { defer func() { if r := recover(); r != nil {