release v4.0.3

This commit is contained in:
RockYang 2024-04-15 08:26:07 +08:00
parent a7237fe62f
commit fbfa2a71a9
5 changed files with 23 additions and 62 deletions

View File

@ -1,6 +1,10 @@
# 更新日志 # 更新日志
## v4.0.3 2024-04-15
* Bug修复修复MJ-PLUS 服务会自动删除10分钟前的任务问题
* Bug修复修复MJ 的 U/V 操作会强制使用 Fast 模式 Bug
## 4.0.2
## v4.0.2
* 功能新增:支持前端菜单可以配置 * 功能新增:支持前端菜单可以配置
* 功能优化:在登录和注册界面标题显示软件版本号 * 功能优化:在登录和注册界面标题显示软件版本号

View File

@ -125,7 +125,7 @@ func (h *MidJourneyHandler) Image(c *gin.Context) {
params += fmt.Sprintf(" --c %d", data.Chaos) params += fmt.Sprintf(" --c %d", data.Chaos)
} }
if len(data.ImgArr) > 0 && data.Iw > 0 { if len(data.ImgArr) > 0 && data.Iw > 0 {
params += fmt.Sprintf(" --iw %f", data.Iw) params += fmt.Sprintf(" --iw %.2f", data.Iw)
} }
if data.Raw { if data.Raw {
params += " --style raw" params += " --style raw"

View File

@ -163,7 +163,7 @@ func (c *PlusClient) Upscale(task types.MjTask) (ImageRes, error) {
"customId": fmt.Sprintf("MJ::JOB::upsample::%d::%s", task.Index, task.MessageHash), "customId": fmt.Sprintf("MJ::JOB::upsample::%d::%s", task.Index, task.MessageHash),
"taskId": task.MessageId, "taskId": task.MessageId,
} }
apiURL := fmt.Sprintf("%s/mj/submit/action", c.apiURL) apiURL := fmt.Sprintf("%s/mj-%s/mj/submit/action", c.Config.Mode, c.apiURL)
var res ImageRes var res ImageRes
var errRes ErrRes var errRes ErrRes
r, err := c.client.R(). r, err := c.client.R().
@ -189,7 +189,7 @@ func (c *PlusClient) Variation(task types.MjTask) (ImageRes, error) {
"customId": fmt.Sprintf("MJ::JOB::variation::%d::%s", task.Index, task.MessageHash), "customId": fmt.Sprintf("MJ::JOB::variation::%d::%s", task.Index, task.MessageHash),
"taskId": task.MessageId, "taskId": task.MessageId,
} }
apiURL := fmt.Sprintf("%s/mj/submit/action", c.apiURL) apiURL := fmt.Sprintf("%s/mj-%s/mj/submit/action", c.Config.Mode, c.apiURL)
var res ImageRes var res ImageRes
var errRes ErrRes var errRes ErrRes
r, err := req.C().R(). r, err := req.C().R().

View File

@ -36,7 +36,7 @@ func NewServicePool(db *gorm.DB, redisCli *redis.Client, manager *oss.UploaderMa
} }
cli := NewPlusClient(config) cli := NewPlusClient(config)
name := fmt.Sprintf("mj-plus-service-%d", k) name := fmt.Sprintf("mj-plus-service-%d", k)
service := NewService(name, taskQueue, notifyQueue, 4, 600, db, cli) service := NewService(name, taskQueue, notifyQueue, db, cli)
go func() { go func() {
service.Run() service.Run()
}() }()
@ -49,7 +49,7 @@ func NewServicePool(db *gorm.DB, redisCli *redis.Client, manager *oss.UploaderMa
} }
cli := NewProxyClient(config) cli := NewProxyClient(config)
name := fmt.Sprintf("mj-proxy-service-%d", k) name := fmt.Sprintf("mj-proxy-service-%d", k)
service := NewService(name, taskQueue, notifyQueue, 4, 600, db, cli) service := NewService(name, taskQueue, notifyQueue, db, cli)
go func() { go func() {
service.Run() service.Run()
}() }()

View File

@ -8,7 +8,6 @@ import (
"chatplus/utils" "chatplus/utils"
"fmt" "fmt"
"strings" "strings"
"sync/atomic"
"time" "time"
"gorm.io/gorm" "gorm.io/gorm"
@ -16,41 +15,26 @@ import (
// Service MJ 绘画服务 // Service MJ 绘画服务
type Service struct { type Service struct {
Name string // service Name Name string // service Name
Client Client // MJ Client Client Client // MJ Client
taskQueue *store.RedisQueue taskQueue *store.RedisQueue
notifyQueue *store.RedisQueue notifyQueue *store.RedisQueue
db *gorm.DB db *gorm.DB
maxHandleTaskNum int32 // max task number current service can handle
HandledTaskNum int32 // already handled task number
taskStartTimes map[int]time.Time // task start time, to check if the task is timeout
taskTimeout int64
} }
func NewService(name string, taskQueue *store.RedisQueue, notifyQueue *store.RedisQueue, maxTaskNum int32, timeout int64, db *gorm.DB, cli Client) *Service { func NewService(name string, taskQueue *store.RedisQueue, notifyQueue *store.RedisQueue, db *gorm.DB, cli Client) *Service {
return &Service{ return &Service{
Name: name, Name: name,
db: db, db: db,
taskQueue: taskQueue, taskQueue: taskQueue,
notifyQueue: notifyQueue, notifyQueue: notifyQueue,
Client: cli, Client: cli,
taskTimeout: timeout,
maxHandleTaskNum: maxTaskNum,
taskStartTimes: make(map[int]time.Time, 0),
} }
} }
func (s *Service) Run() { func (s *Service) Run() {
logger.Infof("Starting MidJourney job consumer for %s", s.Name) logger.Infof("Starting MidJourney job consumer for %s", s.Name)
for { for {
s.checkTasks()
if !s.canHandleTask() {
// current service is full, can not handle more task
// waiting for running task finish
time.Sleep(time.Second * 3)
continue
}
var task types.MjTask var task types.MjTask
err := s.taskQueue.LPop(&task) err := s.taskQueue.LPop(&task)
if err != nil { if err != nil {
@ -125,37 +109,14 @@ func (s *Service) Run() {
continue continue
} }
logger.Infof("任务提交成功:%+v", res) logger.Infof("任务提交成功:%+v", res)
// lock the task until the execute timeout
s.taskStartTimes[int(task.Id)] = time.Now()
atomic.AddInt32(&s.HandledTaskNum, 1)
// 更新任务 ID/频道 // 更新任务 ID/频道
job.TaskId = res.Result job.TaskId = res.Result
job.MessageId = res.Result
job.ChannelId = s.Name job.ChannelId = s.Name
s.db.Updates(&job) s.db.Updates(&job)
} }
} }
// check if current service instance can handle more task
func (s *Service) canHandleTask() bool {
handledNum := atomic.LoadInt32(&s.HandledTaskNum)
return handledNum < s.maxHandleTaskNum
}
// remove the timeout tasks
func (s *Service) checkTasks() {
for k, t := range s.taskStartTimes {
if time.Now().Unix()-t.Unix() > s.taskTimeout {
delete(s.taskStartTimes, k)
atomic.AddInt32(&s.HandledTaskNum, -1)
s.db.Model(&model.MidJourneyJob{Id: uint(k)}).UpdateColumns(map[string]interface{}{
"progress": -1,
"err_msg": "任务超时",
})
}
}
}
type CBReq struct { type CBReq struct {
Id string `json:"id"` Id string `json:"id"`
Action string `json:"action"` Action string `json:"action"`
@ -186,6 +147,7 @@ func (s *Service) Notify(job model.MidJourneyJob) error {
"progress": -1, "progress": -1,
"err_msg": task.FailReason, "err_msg": task.FailReason,
}) })
s.notifyQueue.RPush(job.UserId)
return fmt.Errorf("task failed: %v", task.FailReason) return fmt.Errorf("task failed: %v", task.FailReason)
} }
@ -198,15 +160,10 @@ func (s *Service) Notify(job model.MidJourneyJob) error {
if task.ImageUrl != "" { if task.ImageUrl != "" {
job.OrgURL = task.ImageUrl job.OrgURL = task.ImageUrl
} }
job.MessageId = task.Id
tx := s.db.Updates(&job) tx := s.db.Updates(&job)
if tx.Error != nil { if tx.Error != nil {
return fmt.Errorf("error with update database: %v", tx.Error) return fmt.Errorf("error with update database: %v", tx.Error)
} }
if task.Status == "SUCCESS" {
// release lock task
atomic.AddInt32(&s.HandledTaskNum, -1)
}
// 通知前端更新任务进度 // 通知前端更新任务进度
if oldProgress != job.Progress { if oldProgress != job.Progress {
s.notifyQueue.RPush(job.UserId) s.notifyQueue.RPush(job.UserId)