add sync lock for sub or add user's power

This commit is contained in:
RockYang
2024-09-03 12:09:36 +08:00
parent aa46fdb113
commit 13bf73e6cf
16 changed files with 341 additions and 423 deletions

View File

@@ -35,9 +35,10 @@ type Service struct {
taskQueue *store.RedisQueue
notifyQueue *store.RedisQueue
Clients *types.LMap[uint, *types.WsClient] // UserId => Client
userService *service.UserService
}
func NewService(db *gorm.DB, manager *oss.UploaderManager, redisCli *redis.Client) *Service {
func NewService(db *gorm.DB, manager *oss.UploaderManager, redisCli *redis.Client, userService *service.UserService) *Service {
return &Service{
httpClient: req.C().SetTimeout(time.Minute * 3),
db: db,
@@ -45,6 +46,7 @@ func NewService(db *gorm.DB, manager *oss.UploaderManager, redisCli *redis.Clien
notifyQueue: store.NewRedisQueue("DallE_Notify_Queue", redisCli),
Clients: types.NewLMap[uint, *types.WsClient](),
uploadManager: manager,
userService: userService,
}
}
@@ -122,32 +124,23 @@ func (s *Service) Image(task types.DallTask, sync bool) (string, error) {
return "", errors.New("insufficient of power")
}
// 更新用户算力
tx := s.db.Model(&model.User{}).Where("id", user.Id).UpdateColumn("power", gorm.Expr("power - ?", task.Power))
// 记录算力变化日志
if tx.Error == nil && tx.RowsAffected > 0 {
var u model.User
s.db.Where("id", user.Id).First(&u)
s.db.Create(&model.PowerLog{
UserId: user.Id,
Username: user.Username,
Type: types.PowerConsume,
Amount: task.Power,
Balance: u.Power,
Mark: types.PowerSub,
Model: "dall-e-3",
Remark: fmt.Sprintf("绘画提示词:%s", utils.CutWords(task.Prompt, 10)),
CreatedAt: time.Now(),
})
// 扣减算力
err := s.userService.DecreasePower(int(user.Id), task.Power, model.PowerLog{
Type: types.PowerConsume,
Model: "dall-e-3",
Remark: fmt.Sprintf("绘画提示词:%s", utils.CutWords(task.Prompt, 10)),
})
if err != nil {
return "", fmt.Errorf("error with decrease power: %v", err)
}
// get image generation API KEY
var apiKey model.ApiKey
tx = s.db.Where("type", "dalle").
err = s.db.Where("type", "dalle").
Where("enabled", true).
Order("last_used_at ASC").First(&apiKey)
if tx.Error != nil {
return "", fmt.Errorf("no available DALL-E api key: %v", tx.Error)
Order("last_used_at ASC").First(&apiKey).Error
if err != nil {
return "", fmt.Errorf("no available DALL-E api key: %v", err)
}
var res imgRes
@@ -181,13 +174,13 @@ func (s *Service) Image(task types.DallTask, sync bool) (string, error) {
// update the api key last use time
s.db.Model(&apiKey).UpdateColumn("last_used_at", time.Now().Unix())
// update task progress
tx = s.db.Model(&model.DallJob{Id: task.JobId}).UpdateColumns(map[string]interface{}{
err = s.db.Model(&model.DallJob{Id: task.JobId}).UpdateColumns(map[string]interface{}{
"progress": 100,
"org_url": res.Data[0].Url,
"prompt": prompt,
})
if tx.Error != nil {
return "", fmt.Errorf("err with update database: %v", tx.Error)
}).Error
if err != nil {
return "", fmt.Errorf("err with update database: %v", err)
}
s.notifyQueue.RPush(service.NotifyMessage{UserId: int(task.UserId), JobId: int(task.JobId), Message: service.TaskStatusFailed})