mirror of
https://github.com/yangjian102621/geekai.git
synced 2025-11-08 02:03:42 +08:00
add sync lock for sub or add user's power
This commit is contained in:
@@ -35,9 +35,10 @@ type Service struct {
|
||||
taskQueue *store.RedisQueue
|
||||
notifyQueue *store.RedisQueue
|
||||
Clients *types.LMap[uint, *types.WsClient] // UserId => Client
|
||||
userService *service.UserService
|
||||
}
|
||||
|
||||
func NewService(db *gorm.DB, manager *oss.UploaderManager, redisCli *redis.Client) *Service {
|
||||
func NewService(db *gorm.DB, manager *oss.UploaderManager, redisCli *redis.Client, userService *service.UserService) *Service {
|
||||
return &Service{
|
||||
httpClient: req.C().SetTimeout(time.Minute * 3),
|
||||
db: db,
|
||||
@@ -45,6 +46,7 @@ func NewService(db *gorm.DB, manager *oss.UploaderManager, redisCli *redis.Clien
|
||||
notifyQueue: store.NewRedisQueue("DallE_Notify_Queue", redisCli),
|
||||
Clients: types.NewLMap[uint, *types.WsClient](),
|
||||
uploadManager: manager,
|
||||
userService: userService,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,32 +124,23 @@ func (s *Service) Image(task types.DallTask, sync bool) (string, error) {
|
||||
return "", errors.New("insufficient of power")
|
||||
}
|
||||
|
||||
// 更新用户算力
|
||||
tx := s.db.Model(&model.User{}).Where("id", user.Id).UpdateColumn("power", gorm.Expr("power - ?", task.Power))
|
||||
// 记录算力变化日志
|
||||
if tx.Error == nil && tx.RowsAffected > 0 {
|
||||
var u model.User
|
||||
s.db.Where("id", user.Id).First(&u)
|
||||
s.db.Create(&model.PowerLog{
|
||||
UserId: user.Id,
|
||||
Username: user.Username,
|
||||
Type: types.PowerConsume,
|
||||
Amount: task.Power,
|
||||
Balance: u.Power,
|
||||
Mark: types.PowerSub,
|
||||
Model: "dall-e-3",
|
||||
Remark: fmt.Sprintf("绘画提示词:%s", utils.CutWords(task.Prompt, 10)),
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
// 扣减算力
|
||||
err := s.userService.DecreasePower(int(user.Id), task.Power, model.PowerLog{
|
||||
Type: types.PowerConsume,
|
||||
Model: "dall-e-3",
|
||||
Remark: fmt.Sprintf("绘画提示词:%s", utils.CutWords(task.Prompt, 10)),
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error with decrease power: %v", err)
|
||||
}
|
||||
|
||||
// get image generation API KEY
|
||||
var apiKey model.ApiKey
|
||||
tx = s.db.Where("type", "dalle").
|
||||
err = s.db.Where("type", "dalle").
|
||||
Where("enabled", true).
|
||||
Order("last_used_at ASC").First(&apiKey)
|
||||
if tx.Error != nil {
|
||||
return "", fmt.Errorf("no available DALL-E api key: %v", tx.Error)
|
||||
Order("last_used_at ASC").First(&apiKey).Error
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("no available DALL-E api key: %v", err)
|
||||
}
|
||||
|
||||
var res imgRes
|
||||
@@ -181,13 +174,13 @@ func (s *Service) Image(task types.DallTask, sync bool) (string, error) {
|
||||
// update the api key last use time
|
||||
s.db.Model(&apiKey).UpdateColumn("last_used_at", time.Now().Unix())
|
||||
// update task progress
|
||||
tx = s.db.Model(&model.DallJob{Id: task.JobId}).UpdateColumns(map[string]interface{}{
|
||||
err = s.db.Model(&model.DallJob{Id: task.JobId}).UpdateColumns(map[string]interface{}{
|
||||
"progress": 100,
|
||||
"org_url": res.Data[0].Url,
|
||||
"prompt": prompt,
|
||||
})
|
||||
if tx.Error != nil {
|
||||
return "", fmt.Errorf("err with update database: %v", tx.Error)
|
||||
}).Error
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("err with update database: %v", err)
|
||||
}
|
||||
|
||||
s.notifyQueue.RPush(service.NotifyMessage{UserId: int(task.UserId), JobId: int(task.JobId), Message: service.TaskStatusFailed})
|
||||
|
||||
83
api/service/user_service.go
Normal file
83
api/service/user_service.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"geekai/core/types"
|
||||
"geekai/store/model"
|
||||
"gorm.io/gorm"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type UserService struct {
|
||||
db *gorm.DB
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func NewUserService(db *gorm.DB) *UserService {
|
||||
return &UserService{db: db, lock: sync.Mutex{}}
|
||||
}
|
||||
|
||||
// IncreasePower 增加用户算力
|
||||
func (s *UserService) IncreasePower(userId int, power int, log model.PowerLog) error {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
tx := s.db.Begin()
|
||||
err := tx.Model(&model.User{}).Where("id", userId).UpdateColumn("power", gorm.Expr("power + ?", power)).Error
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
var user model.User
|
||||
tx.Where("id", userId).First(&user)
|
||||
err = tx.Create(&model.PowerLog{
|
||||
UserId: user.Id,
|
||||
Username: user.Username,
|
||||
Type: log.Type,
|
||||
Amount: power,
|
||||
Balance: user.Power,
|
||||
Mark: types.PowerAdd,
|
||||
Model: log.Model,
|
||||
Remark: log.Remark,
|
||||
CreatedAt: time.Now(),
|
||||
}).Error
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// DecreasePower 减少用户算力
|
||||
func (s *UserService) DecreasePower(userId int, power int, log model.PowerLog) error {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
tx := s.db.Begin()
|
||||
err := tx.Model(&model.User{}).Where("id", userId).UpdateColumn("power", gorm.Expr("power - ?", power)).Error
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return fmt.Errorf("扣减算力失败:%v", err)
|
||||
}
|
||||
var user model.User
|
||||
tx.Where("id", userId).First(&user)
|
||||
err = tx.Create(&model.PowerLog{
|
||||
UserId: user.Id,
|
||||
Username: user.Username,
|
||||
Type: log.Type,
|
||||
Amount: power,
|
||||
Balance: user.Power,
|
||||
Mark: types.PowerSub,
|
||||
Model: log.Model,
|
||||
Remark: log.Remark,
|
||||
CreatedAt: time.Now(),
|
||||
}).Error
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return fmt.Errorf("记录算力日志失败:%v", err)
|
||||
}
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
@@ -81,54 +81,6 @@ func (e *XXLJobExecutor) ClearOrders(cxt context.Context, param *xxl.RunReq) (ms
|
||||
// 自动将 VIP 会员的算力补充到每月赠送的最大值
|
||||
func (e *XXLJobExecutor) ResetVipPower(cxt context.Context, param *xxl.RunReq) (msg string) {
|
||||
logger.Info("开始进行月底账号盘点...")
|
||||
var users []model.User
|
||||
res := e.db.Where("vip", 1).Where("status", 1).Find(&users)
|
||||
if res.Error != nil {
|
||||
return "No vip users found"
|
||||
}
|
||||
|
||||
var sysConfig model.Config
|
||||
res = e.db.Where("marker", "system").First(&sysConfig)
|
||||
if res.Error != nil {
|
||||
return "error with get system config: " + res.Error.Error()
|
||||
}
|
||||
|
||||
var config types.SystemConfig
|
||||
err := utils.JsonDecode(sysConfig.Config, &config)
|
||||
if err != nil {
|
||||
return "error with decode system config: " + err.Error()
|
||||
}
|
||||
|
||||
for _, u := range users {
|
||||
// 处理过期的 VIP
|
||||
if u.ExpiredTime > 0 && u.ExpiredTime <= time.Now().Unix() {
|
||||
u.Vip = false
|
||||
e.db.Model(&model.User{}).Where("id", u.Id).UpdateColumn("vip", false)
|
||||
continue
|
||||
}
|
||||
if u.Power < config.VipMonthPower {
|
||||
power := config.VipMonthPower - u.Power
|
||||
// update user
|
||||
tx := e.db.Model(&model.User{}).Where("id", u.Id).UpdateColumn("power", gorm.Expr("power + ?", power))
|
||||
// 记录算力变动日志
|
||||
if tx.Error == nil {
|
||||
var user model.User
|
||||
e.db.Where("id", u.Id).First(&user)
|
||||
e.db.Create(&model.PowerLog{
|
||||
UserId: u.Id,
|
||||
Username: u.Username,
|
||||
Type: types.PowerRecharge,
|
||||
Amount: power,
|
||||
Mark: types.PowerAdd,
|
||||
Balance: user.Power,
|
||||
Model: "系统盘点",
|
||||
Remark: fmt.Sprintf("VIP会员每月算力派发,:%d", config.VipMonthPower),
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.Info("月底盘点完成!")
|
||||
return "success"
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user