From fbfa2a71a92e832f1fe4e7282eefbb6c74f00504 Mon Sep 17 00:00:00 2001 From: RockYang Date: Mon, 15 Apr 2024 08:26:07 +0800 Subject: [PATCH] release v4.0.3 --- CHANGELOG.md | 6 ++- api/handler/mj_handler.go | 2 +- api/service/mj/plus_client.go | 4 +- api/service/mj/pool.go | 4 +- api/service/mj/service.go | 69 +++++++---------------------------- 5 files changed, 23 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 19b561db..e3cdc2f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # 更新日志 +## v4.0.3 2024-04-15 +* Bug修复:修复MJ-PLUS 服务会自动删除10分钟前的任务问题 +* Bug修复:修复MJ 的 U/V 操作会强制使用 Fast 模式 Bug -## 4.0.2 + +## v4.0.2 * 功能新增:支持前端菜单可以配置 * 功能优化:在登录和注册界面标题显示软件版本号 diff --git a/api/handler/mj_handler.go b/api/handler/mj_handler.go index 7988a380..b659c5b0 100644 --- a/api/handler/mj_handler.go +++ b/api/handler/mj_handler.go @@ -125,7 +125,7 @@ func (h *MidJourneyHandler) Image(c *gin.Context) { params += fmt.Sprintf(" --c %d", data.Chaos) } if len(data.ImgArr) > 0 && data.Iw > 0 { - params += fmt.Sprintf(" --iw %f", data.Iw) + params += fmt.Sprintf(" --iw %.2f", data.Iw) } if data.Raw { params += " --style raw" diff --git a/api/service/mj/plus_client.go b/api/service/mj/plus_client.go index 89bca627..52846208 100644 --- a/api/service/mj/plus_client.go +++ b/api/service/mj/plus_client.go @@ -163,7 +163,7 @@ func (c *PlusClient) Upscale(task types.MjTask) (ImageRes, error) { "customId": fmt.Sprintf("MJ::JOB::upsample::%d::%s", task.Index, task.MessageHash), "taskId": task.MessageId, } - apiURL := fmt.Sprintf("%s/mj/submit/action", c.apiURL) + apiURL := fmt.Sprintf("%s/mj-%s/mj/submit/action", c.Config.Mode, c.apiURL) var res ImageRes var errRes ErrRes r, err := c.client.R(). @@ -189,7 +189,7 @@ func (c *PlusClient) Variation(task types.MjTask) (ImageRes, error) { "customId": fmt.Sprintf("MJ::JOB::variation::%d::%s", task.Index, task.MessageHash), "taskId": task.MessageId, } - apiURL := fmt.Sprintf("%s/mj/submit/action", c.apiURL) + apiURL := fmt.Sprintf("%s/mj-%s/mj/submit/action", c.Config.Mode, c.apiURL) var res ImageRes var errRes ErrRes r, err := req.C().R(). diff --git a/api/service/mj/pool.go b/api/service/mj/pool.go index 0143467f..7404021e 100644 --- a/api/service/mj/pool.go +++ b/api/service/mj/pool.go @@ -36,7 +36,7 @@ func NewServicePool(db *gorm.DB, redisCli *redis.Client, manager *oss.UploaderMa } cli := NewPlusClient(config) name := fmt.Sprintf("mj-plus-service-%d", k) - service := NewService(name, taskQueue, notifyQueue, 4, 600, db, cli) + service := NewService(name, taskQueue, notifyQueue, db, cli) go func() { service.Run() }() @@ -49,7 +49,7 @@ func NewServicePool(db *gorm.DB, redisCli *redis.Client, manager *oss.UploaderMa } cli := NewProxyClient(config) name := fmt.Sprintf("mj-proxy-service-%d", k) - service := NewService(name, taskQueue, notifyQueue, 4, 600, db, cli) + service := NewService(name, taskQueue, notifyQueue, db, cli) go func() { service.Run() }() diff --git a/api/service/mj/service.go b/api/service/mj/service.go index 3c870a17..ad118308 100644 --- a/api/service/mj/service.go +++ b/api/service/mj/service.go @@ -8,7 +8,6 @@ import ( "chatplus/utils" "fmt" "strings" - "sync/atomic" "time" "gorm.io/gorm" @@ -16,41 +15,26 @@ import ( // Service MJ 绘画服务 type Service struct { - Name string // service Name - Client Client // MJ Client - taskQueue *store.RedisQueue - notifyQueue *store.RedisQueue - db *gorm.DB - maxHandleTaskNum int32 // max task number current service can handle - HandledTaskNum int32 // already handled task number - taskStartTimes map[int]time.Time // task start time, to check if the task is timeout - taskTimeout int64 + Name string // service Name + Client Client // MJ Client + taskQueue *store.RedisQueue + notifyQueue *store.RedisQueue + db *gorm.DB } -func NewService(name string, taskQueue *store.RedisQueue, notifyQueue *store.RedisQueue, maxTaskNum int32, timeout int64, db *gorm.DB, cli Client) *Service { +func NewService(name string, taskQueue *store.RedisQueue, notifyQueue *store.RedisQueue, db *gorm.DB, cli Client) *Service { return &Service{ - Name: name, - db: db, - taskQueue: taskQueue, - notifyQueue: notifyQueue, - Client: cli, - taskTimeout: timeout, - maxHandleTaskNum: maxTaskNum, - taskStartTimes: make(map[int]time.Time, 0), + Name: name, + db: db, + taskQueue: taskQueue, + notifyQueue: notifyQueue, + Client: cli, } } func (s *Service) Run() { logger.Infof("Starting MidJourney job consumer for %s", s.Name) for { - s.checkTasks() - if !s.canHandleTask() { - // current service is full, can not handle more task - // waiting for running task finish - time.Sleep(time.Second * 3) - continue - } - var task types.MjTask err := s.taskQueue.LPop(&task) if err != nil { @@ -125,37 +109,14 @@ func (s *Service) Run() { continue } logger.Infof("任务提交成功:%+v", res) - // lock the task until the execute timeout - s.taskStartTimes[int(task.Id)] = time.Now() - atomic.AddInt32(&s.HandledTaskNum, 1) // 更新任务 ID/频道 job.TaskId = res.Result + job.MessageId = res.Result job.ChannelId = s.Name s.db.Updates(&job) } } -// check if current service instance can handle more task -func (s *Service) canHandleTask() bool { - handledNum := atomic.LoadInt32(&s.HandledTaskNum) - return handledNum < s.maxHandleTaskNum -} - -// remove the timeout tasks -func (s *Service) checkTasks() { - for k, t := range s.taskStartTimes { - if time.Now().Unix()-t.Unix() > s.taskTimeout { - delete(s.taskStartTimes, k) - atomic.AddInt32(&s.HandledTaskNum, -1) - - s.db.Model(&model.MidJourneyJob{Id: uint(k)}).UpdateColumns(map[string]interface{}{ - "progress": -1, - "err_msg": "任务超时", - }) - } - } -} - type CBReq struct { Id string `json:"id"` Action string `json:"action"` @@ -186,6 +147,7 @@ func (s *Service) Notify(job model.MidJourneyJob) error { "progress": -1, "err_msg": task.FailReason, }) + s.notifyQueue.RPush(job.UserId) return fmt.Errorf("task failed: %v", task.FailReason) } @@ -198,15 +160,10 @@ func (s *Service) Notify(job model.MidJourneyJob) error { if task.ImageUrl != "" { job.OrgURL = task.ImageUrl } - job.MessageId = task.Id tx := s.db.Updates(&job) if tx.Error != nil { return fmt.Errorf("error with update database: %v", tx.Error) } - if task.Status == "SUCCESS" { - // release lock task - atomic.AddInt32(&s.HandledTaskNum, -1) - } // 通知前端更新任务进度 if oldProgress != job.Progress { s.notifyQueue.RPush(job.UserId)