mirror of
https://github.com/yangjian102621/geekai.git
synced 2025-11-08 10:13:44 +08:00
refactor: 重构项目,为所有的 AI 工具都引入算力,采用算力统一结算各个工具的调用次数和权限
This commit is contained in:
@@ -96,12 +96,6 @@ func (s *Service) Run() {
|
||||
s.db.Updates(&job)
|
||||
// 任务失败,通知前端
|
||||
s.notifyQueue.RPush(task.UserId)
|
||||
// restore img_call quota
|
||||
if task.Type.String() != types.TaskUpscale.String() {
|
||||
s.db.Model(&model.User{}).Where("id = ?", task.UserId).UpdateColumn("img_calls", gorm.Expr("img_calls + ?", 1))
|
||||
}
|
||||
|
||||
// TODO: 任务提交失败,加入队列重试
|
||||
continue
|
||||
}
|
||||
logger.Infof("任务提交成功:%+v", res)
|
||||
|
||||
@@ -191,28 +191,43 @@ func (p *ServicePool) SyncTaskProgress() {
|
||||
go func() {
|
||||
var items []model.MidJourneyJob
|
||||
for {
|
||||
res := p.db.Where("progress >= ? AND progress < ?", 0, 100).Find(&items)
|
||||
res := p.db.Where("progress < ?", 100).Find(&items)
|
||||
if res.Error != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, v := range items {
|
||||
// 30 分钟还没完成的任务直接删除
|
||||
if time.Now().Sub(v.CreatedAt) > time.Minute*30 {
|
||||
p.db.Delete(&v)
|
||||
// 非放大任务,退回绘图次数
|
||||
if v.Type != types.TaskUpscale.String() {
|
||||
p.db.Model(&model.User{}).Where("id = ?", v.UserId).UpdateColumn("img_calls", gorm.Expr("img_calls + ?", 1))
|
||||
for _, job := range items {
|
||||
// 失败或者 30 分钟还没完成的任务删除并退回算力
|
||||
if time.Now().Sub(job.CreatedAt) > time.Minute*30 || job.Progress == -1 {
|
||||
p.db.Delete(&job)
|
||||
// 略过 Upscale 任务
|
||||
if job.Type != types.TaskUpscale.String() {
|
||||
continue
|
||||
}
|
||||
tx := p.db.Model(&model.User{}).Where("id = ?", job.UserId).UpdateColumn("power", gorm.Expr("power + ?", job.Power))
|
||||
if tx.Error == nil && tx.RowsAffected > 0 {
|
||||
var user model.User
|
||||
p.db.Where("id = ?", job.UserId).First(&user)
|
||||
p.db.Create(&model.PowerLog{
|
||||
UserId: user.Id,
|
||||
Username: user.Username,
|
||||
Type: types.PowerConsume,
|
||||
Amount: job.Power,
|
||||
Balance: user.Power + job.Power,
|
||||
Mark: types.PowerAdd,
|
||||
Model: "mid-journey",
|
||||
Remark: fmt.Sprintf("绘画任务失败,退回算力。任务ID:%s", job.TaskId),
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(job.ChannelId, "mj-service-plus") {
|
||||
continue
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(v.ChannelId, "mj-service-plus") {
|
||||
continue
|
||||
}
|
||||
|
||||
if servicePlus := p.getServicePlus(v.ChannelId); servicePlus != nil {
|
||||
_ = servicePlus.Notify(v)
|
||||
if servicePlus := p.getServicePlus(job.ChannelId); servicePlus != nil {
|
||||
_ = servicePlus.Notify(job)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -84,7 +84,7 @@ func (s *Service) Run() {
|
||||
if err != nil {
|
||||
logger.Error("绘画任务执行失败:", err.Error())
|
||||
// update the task progress
|
||||
s.db.Model(&model.MidJourneyJob{Id: uint(task.Id)}).UpdateColumns(map[string]interface{}{
|
||||
s.db.Model(&model.MidJourneyJob{Id: task.Id}).UpdateColumns(map[string]interface{}{
|
||||
"progress": -1,
|
||||
"err_msg": err.Error(),
|
||||
})
|
||||
|
||||
@@ -4,7 +4,9 @@ import (
|
||||
"chatplus/core/types"
|
||||
"chatplus/service/oss"
|
||||
"chatplus/store"
|
||||
"chatplus/store/model"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"gorm.io/gorm"
|
||||
@@ -14,6 +16,7 @@ type ServicePool struct {
|
||||
services []*Service
|
||||
taskQueue *store.RedisQueue
|
||||
notifyQueue *store.RedisQueue
|
||||
db *gorm.DB
|
||||
Clients *types.LMap[uint, *types.WsClient] // UserId => Client
|
||||
}
|
||||
|
||||
@@ -42,6 +45,7 @@ func NewServicePool(db *gorm.DB, redisCli *redis.Client, manager *oss.UploaderMa
|
||||
taskQueue: taskQueue,
|
||||
notifyQueue: notifyQueue,
|
||||
services: services,
|
||||
db: db,
|
||||
Clients: types.NewLMap[uint, *types.WsClient](),
|
||||
}
|
||||
}
|
||||
@@ -72,6 +76,46 @@ func (p *ServicePool) CheckTaskNotify() {
|
||||
}()
|
||||
}
|
||||
|
||||
// CheckTaskStatus 检查任务状态,自动删除过期或者失败的任务
|
||||
func (p *ServicePool) CheckTaskStatus() {
|
||||
go func() {
|
||||
for {
|
||||
var jobs []model.SdJob
|
||||
res := p.db.Where("progress < ?", 100).Find(&jobs)
|
||||
if res.Error != nil {
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
// 5 分钟还没完成的任务直接删除
|
||||
if time.Now().Sub(job.CreatedAt) > time.Minute*5 || job.Progress == -1 {
|
||||
p.db.Delete(&job)
|
||||
var user model.User
|
||||
p.db.Where("id = ?", job.UserId).First(&user)
|
||||
// 退回绘图次数
|
||||
res = p.db.Model(&model.User{}).Where("id = ?", job.UserId).UpdateColumn("power", gorm.Expr("power + ?", job.Power))
|
||||
if res.Error == nil && res.RowsAffected > 0 {
|
||||
p.db.Create(&model.PowerLog{
|
||||
UserId: user.Id,
|
||||
Username: user.Username,
|
||||
Type: types.PowerConsume,
|
||||
Amount: job.Power,
|
||||
Balance: user.Power + job.Power,
|
||||
Mark: types.PowerAdd,
|
||||
Model: "stable-diffusion",
|
||||
Remark: fmt.Sprintf("任务失败,退回算力。任务ID:%s", job.TaskId),
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// HasAvailableService check if it has available mj service in pool
|
||||
func (p *ServicePool) HasAvailableService() bool {
|
||||
return len(p.services) > 0
|
||||
|
||||
@@ -74,8 +74,6 @@ func (s *Service) Run() {
|
||||
"progress": -1,
|
||||
"err_msg": err.Error(),
|
||||
})
|
||||
// restore img_call quota
|
||||
s.db.Model(&model.User{}).Where("id = ?", task.UserId).UpdateColumn("img_calls", gorm.Expr("img_calls + ?", 1))
|
||||
// release task num
|
||||
atomic.AddInt32(&s.handledTaskNum, -1)
|
||||
// 通知前端,任务失败
|
||||
@@ -307,8 +305,6 @@ func (s *Service) callback(data CBReq) {
|
||||
"progress": -1,
|
||||
"err_msg": data.Message,
|
||||
})
|
||||
// restore img_calls
|
||||
s.db.Model(&model.User{}).Where("id = ? AND img_calls > 0", data.UserId).UpdateColumn("img_calls", gorm.Expr("img_calls + ?", 1))
|
||||
}
|
||||
|
||||
// 发送更新状态信号
|
||||
|
||||
@@ -39,7 +39,7 @@ func NewXXLJobExecutor(config *types.AppConfig, db *gorm.DB) *XXLJobExecutor {
|
||||
|
||||
func (e *XXLJobExecutor) Run() error {
|
||||
e.executor.RegTask("ClearOrders", e.ClearOrders)
|
||||
e.executor.RegTask("ResetVipCalls", e.ResetVipCalls)
|
||||
e.executor.RegTask("ResetVipPower", e.ResetVipPower)
|
||||
return e.executor.Run()
|
||||
}
|
||||
|
||||
@@ -68,8 +68,8 @@ func (e *XXLJobExecutor) ClearOrders(cxt context.Context, param *xxl.RunReq) (ms
|
||||
return fmt.Sprintf("Clear order successfully, affect rows: %d", res.RowsAffected)
|
||||
}
|
||||
|
||||
// ResetVipCalls 清理过期的 VIP 会员
|
||||
func (e *XXLJobExecutor) ResetVipCalls(cxt context.Context, param *xxl.RunReq) (msg string) {
|
||||
// ResetVipPower 清理过期的 VIP 会员
|
||||
func (e *XXLJobExecutor) ResetVipPower(cxt context.Context, param *xxl.RunReq) (msg string) {
|
||||
logger.Info("开始进行月底账号盘点...")
|
||||
var users []model.User
|
||||
res := e.db.Where("vip = ?", 1).Find(&users)
|
||||
@@ -89,55 +89,27 @@ func (e *XXLJobExecutor) ResetVipCalls(cxt context.Context, param *xxl.RunReq) (
|
||||
return "error with decode system config: " + err.Error()
|
||||
}
|
||||
|
||||
// 获取本月月初时间
|
||||
currentTime := time.Now()
|
||||
year, month, _ := currentTime.Date()
|
||||
firstOfMonth := time.Date(year, month, 1, 0, 0, 0, 0, currentTime.Location()).Unix()
|
||||
for _, u := range users {
|
||||
// 账号到期,直接清零
|
||||
if u.ExpiredTime <= currentTime.Unix() {
|
||||
logger.Info("账号过期:", u.Username)
|
||||
u.Calls = 0
|
||||
u.Vip = false
|
||||
} else {
|
||||
if u.Calls <= 0 {
|
||||
u.Calls = 0
|
||||
}
|
||||
if u.ImgCalls <= 0 {
|
||||
u.ImgCalls = 0
|
||||
}
|
||||
// 如果该用户当月有充值点卡,则将点卡中未用完的点数结余到下个月
|
||||
var orders []model.Order
|
||||
e.db.Debug().Where("user_id = ? AND pay_time > ?", u.Id, firstOfMonth).Find(&orders)
|
||||
var calls = 0
|
||||
var imgCalls = 0
|
||||
for _, o := range orders {
|
||||
var remark types.OrderRemark
|
||||
err = utils.JsonDecode(o.Remark, &remark)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if remark.Days > 0 { // 会员续费
|
||||
continue
|
||||
}
|
||||
calls += remark.Calls
|
||||
imgCalls += remark.ImgCalls
|
||||
}
|
||||
if u.Calls > calls { // 本月套餐没有用完
|
||||
u.Calls = calls + config.VipMonthCalls
|
||||
} else {
|
||||
u.Calls = u.Calls + config.VipMonthCalls
|
||||
}
|
||||
if u.ImgCalls > imgCalls { // 本月套餐没有用完
|
||||
u.ImgCalls = imgCalls + config.VipMonthImgCalls
|
||||
} else {
|
||||
u.ImgCalls = u.ImgCalls + config.VipMonthImgCalls
|
||||
}
|
||||
logger.Infof("%s 点卡结余:%d", u.Username, calls)
|
||||
if u.Power <= 0 {
|
||||
u.Power = 0
|
||||
}
|
||||
u.Tokens = 0
|
||||
u.Power += config.VipMonthPower
|
||||
// update user
|
||||
e.db.Updates(&u)
|
||||
tx := e.db.Updates(&u)
|
||||
// 记录算力充值日志
|
||||
if tx.Error == nil {
|
||||
e.db.Create(&model.PowerLog{
|
||||
UserId: u.Id,
|
||||
Username: u.Username,
|
||||
Type: types.PowerRecharge,
|
||||
Amount: config.VipMonthPower,
|
||||
Mark: types.PowerAdd,
|
||||
Balance: u.Power + config.VipMonthPower,
|
||||
Model: "",
|
||||
Remark: fmt.Sprintf("月底盘点,会员每月赠送算力:%d", config.VipMonthPower),
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
logger.Info("月底盘点完成!")
|
||||
return "success"
|
||||
|
||||
Reference in New Issue
Block a user