模块化定时任务,方便在插件中注册任务;增加日志分组

This commit is contained in:
孟帅
2023-06-05 20:14:57 +08:00
parent 62ecbb7f26
commit 48f8c20d9c
79 changed files with 820 additions and 783 deletions

View File

@@ -2,7 +2,6 @@ package queue
import (
"context"
"github.com/gogf/gf/v2/frame/g"
"sync"
)
@@ -28,7 +27,7 @@ func RegisterConsumer(cs consumerStrategy) {
defer consumers.Unlock()
topic := cs.GetTopic()
if _, ok := consumers.list[topic]; ok {
g.Log().Debugf(ctx, "queue.RegisterConsumer topic:%v duplicate registration.", topic)
Logger().Debugf(ctx, "queue.RegisterConsumer topic:%v duplicate registration.", topic)
return
}
consumers.list[topic] = cs
@@ -51,7 +50,7 @@ func consumerListen(ctx context.Context, job consumerStrategy) {
)
if err != nil {
g.Log().Fatalf(ctx, "InstanceConsumer %s err:%+v", topic, err)
Logger().Fatalf(ctx, "InstanceConsumer %s err:%+v", topic, err)
return
}
@@ -67,6 +66,6 @@ func consumerListen(ctx context.Context, job consumerStrategy) {
ConsumerLog(ctx, topic, mqMsg, err)
}); listenErr != nil {
g.Log().Fatalf(ctx, "消费队列:%s 监听失败, err:%+v", topic, listenErr)
Logger().Fatalf(ctx, "消费队列:%s 监听失败, err:%+v", topic, listenErr)
}
}

View File

@@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gfile"
"hotgo/internal/library/queue/disk"
"sync"
@@ -45,7 +44,7 @@ func (q *DiskConsumerMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg M
if index, offset, data, err := queue.Read(); err == nil {
var mqMsg MqMsg
if err = json.Unmarshal(data, &mqMsg); err != nil {
g.Log().Warningf(ctx, "disk.ListenReceiveMsgDo Unmarshal err:%+v, topic%v, data:%+v .", err, topic, string(data))
Logger().Warningf(ctx, "disk.ListenReceiveMsgDo Unmarshal err:%+v, topic%v, data:%+v .", err, topic, string(data))
continue
}
if mqMsg.MsgId != "" {
@@ -130,14 +129,14 @@ func NewDiskQueue(topic string, config *disk.Config) *disk.Queue {
if !gfile.Exists(conf.Path) {
if err := gfile.Mkdir(conf.Path); err != nil {
g.Log().Errorf(ctx, "NewDiskQueue Failed to create the cache directory. Procedure, err:%+v", err)
Logger().Errorf(ctx, "NewDiskQueue Failed to create the cache directory. Procedure, err:%+v", err)
return nil
}
}
queue, err := disk.New(conf)
if err != nil {
g.Log().Errorf(ctx, "NewDiskQueue err:%v", err)
Logger().Errorf(ctx, "NewDiskQueue err:%v", err)
return nil
}
return queue

View File

@@ -71,7 +71,6 @@ func (r *KafkaMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error
case <-sendCtx.Done():
return mqMsg, gerror.New("send mqMst timeout")
}
return mqMsg, nil
}
@@ -95,11 +94,11 @@ func (r *KafkaMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg))
go func(consumerCtx context.Context) {
for {
if err = r.consumerIns.Consume(consumerCtx, []string{topic}, &consumer); err != nil {
g.Log().Fatalf(ctx, "kafka Error from consumer, err%+v", err)
Logger().Fatalf(ctx, "kafka Error from consumer, err%+v", err)
}
if consumerCtx.Err() != nil {
g.Log().Debugf(ctx, fmt.Sprintf("kafka consoumer stop : %v", consumerCtx.Err()))
Logger().Debugf(ctx, fmt.Sprintf("kafka consoumer stop : %v", consumerCtx.Err()))
return
}
consumer.ready = make(chan bool)
@@ -108,13 +107,13 @@ func (r *KafkaMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg))
// await till the consumer has been set up
<-consumer.ready
g.Log().Debug(ctx, "kafka consumer up and running!...")
Logger().Debug(ctx, "kafka consumer up and running!...")
gproc.AddSigHandlerShutdown(func(sig os.Signal) {
g.Log().Debug(ctx, "kafka consumer close...")
Logger().Debug(ctx, "kafka consumer close...")
cancel()
if err = r.consumerIns.Close(); err != nil {
g.Log().Fatalf(ctx, "kafka Error closing client, err:%+v", err)
Logger().Fatalf(ctx, "kafka Error closing client, err:%+v", err)
}
})
return
@@ -256,6 +255,5 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
})
session.MarkMessage(message, "")
}
return nil
}

View File

@@ -8,6 +8,7 @@ package queue
import (
"context"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
)
const (
@@ -15,16 +16,20 @@ const (
ProducerLogErrFormat = "生产 [%s] 失败, body:%+v, err:%+v"
)
func Logger() *glog.Logger {
return g.Log("queue")
}
// ConsumerLog 消费日志
func ConsumerLog(ctx context.Context, topic string, mqMsg MqMsg, err error) {
if err != nil {
g.Log().Errorf(ctx, ConsumerLogErrFormat, topic, string(mqMsg.Body), err)
Logger().Errorf(ctx, ConsumerLogErrFormat, topic, string(mqMsg.Body), err)
}
}
// ProducerLog 生产日志
func ProducerLog(ctx context.Context, topic string, mqMsg MqMsg, err error) {
if err != nil {
g.Log().Errorf(ctx, ProducerLogErrFormat, topic, string(mqMsg.Body), err)
Logger().Errorf(ctx, ProducerLogErrFormat, topic, string(mqMsg.Body), err)
}
}

View File

@@ -79,7 +79,7 @@ func init() {
mqProducerInstanceMap = make(map[string]MqProducer)
mqConsumerInstanceMap = make(map[string]MqConsumer)
if err := g.Cfg().MustGet(ctx, "queue").Scan(&config); err != nil {
g.Log().Warningf(ctx, "queue init err:%+v", err)
Logger().Warningf(ctx, "queue init err:%+v", err)
}
}
@@ -207,7 +207,6 @@ func NewConsumer(groupName string) (mqClient MqConsumer, err error) {
mutex.Lock()
defer mutex.Unlock()
mqConsumerInstanceMap[groupName] = mqClient
return
}

View File

@@ -148,7 +148,7 @@ func (r *RedisMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg))
}
for err = range errCh {
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
g.Log().Infof(ctx, "ListenReceiveMsgDo Delay topic:%v, err:%+v", topic, err)
Logger().Infof(ctx, "ListenReceiveMsgDo Delay topic:%v, err:%+v", topic, err)
}
}
}()
@@ -166,7 +166,7 @@ func (r *RedisMq) loopReadQueue(key string) (mqMsgList []MqMsg) {
for {
data, err := conn.Do(ctx, "RPOP", key)
if err != nil {
g.Log().Warningf(ctx, "loopReadQueue redis RPOP err:%+v", err)
Logger().Warningf(ctx, "loopReadQueue redis RPOP err:%+v", err)
break
}
@@ -176,7 +176,7 @@ func (r *RedisMq) loopReadQueue(key string) (mqMsgList []MqMsg) {
var mqMsg MqMsg
if err = data.Scan(&mqMsg); err != nil {
g.Log().Warningf(ctx, "loopReadQueue Scan err:%+v", err)
Logger().Warningf(ctx, "loopReadQueue Scan err:%+v", err)
break
}

View File

@@ -3,13 +3,8 @@
// @Copyright Copyright (c) 2023 HotGo CLI
// @Author Ms <133814250@qq.com>
// @License https://github.com/bufanyun/hotgo/blob/master/LICENSE
//
package queue
import (
"github.com/gogf/gf/v2/frame/g"
)
type RocketMqLogger struct {
Flag string
LevelLog string
@@ -22,18 +17,17 @@ func (l *RocketMqLogger) Debug(msg string, fields map[string]interface{}) {
if msg == "" && len(fields) == 0 {
return
}
if l.LevelLog == "debug" || l.LevelLog == "all" {
g.Log().Debug(ctx, msg)
Logger().Debug(ctx, msg)
}
}
func (l *RocketMqLogger) Level(level string) {
g.Log().Info(ctx, level)
Logger().Info(ctx, level)
}
func (l *RocketMqLogger) OutputPath(path string) (err error) {
g.Log().Info(ctx, path)
Logger().Info(ctx, path)
return nil
}
@@ -46,7 +40,7 @@ func (l *RocketMqLogger) Info(msg string, fields map[string]interface{}) {
}
if l.LevelLog == "info" || l.LevelLog == "all" {
g.Log().Info(ctx, msg)
Logger().Info(ctx, msg)
}
}
@@ -59,7 +53,7 @@ func (l *RocketMqLogger) Warning(msg string, fields map[string]interface{}) {
}
if l.LevelLog == "warn" || l.LevelLog == "all" {
g.Log().Warning(ctx, msg)
Logger().Warning(ctx, msg)
}
}
@@ -71,7 +65,7 @@ func (l *RocketMqLogger) Error(msg string, fields map[string]interface{}) {
return
}
if l.LevelLog == "error" || l.LevelLog == "all" {
g.Log().Error(ctx, msg)
Logger().Error(ctx, msg)
}
}
@@ -84,6 +78,6 @@ func (l *RocketMqLogger) Fatal(msg string, fields map[string]interface{}) {
}
if l.LevelLog == "fatal" || l.LevelLog == "all" {
g.Log().Fatal(ctx, msg)
Logger().Fatal(ctx, msg)
}
}