This commit is contained in:
孟帅
2023-05-10 23:54:50 +08:00
parent bbe655a4d8
commit 49a96750bf
314 changed files with 15138 additions and 6244 deletions

View File

@@ -51,7 +51,7 @@ func (q *DiskConsumerMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg M
if mqMsg.MsgId != "" {
receiveDo(mqMsg)
queue.Commit(index, offset)
sleep = time.Millisecond * 1
sleep = time.Millisecond * 10
}
} else {
sleep = time.Second
@@ -102,6 +102,11 @@ func (d *DiskProducerMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, er
return
}
func (d *DiskProducerMq) SendDelayMsg(topic string, body string, delaySecond int64) (mqMsg MqMsg, err error) {
err = gerror.New("implement me")
return
}
func (d *DiskProducerMq) getProducer(topic string) *disk.Queue {
queue, ok := d.producers[topic]
if ok {

View File

@@ -3,7 +3,6 @@
// @Copyright Copyright (c) 2023 HotGo CLI
// @Author Ms <133814250@qq.com>
// @License https://github.com/bufanyun/hotgo/blob/master/LICENSE
//
package queue
import (
@@ -76,6 +75,11 @@ func (r *KafkaMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error
return mqMsg, nil
}
func (r *KafkaMq) SendDelayMsg(topic string, body string, delaySecond int64) (mqMsg MqMsg, err error) {
err = gerror.New("implement me")
return
}
// ListenReceiveMsgDo 消费数据
func (r *KafkaMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg)) (err error) {
if r.consumerIns == nil {

View File

@@ -3,35 +3,28 @@
// @Copyright Copyright (c) 2023 HotGo CLI
// @Author Ms <133814250@qq.com>
// @License https://github.com/bufanyun/hotgo/blob/master/LICENSE
//
package queue
import (
"context"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
"hotgo/utility/charset"
)
const (
ConsumerLogErrFormat = "消费 [%s] 失败, mqMsgId:%+v, mqMsgData:%+v, err:%+v, stack:%+v"
ProducerLogErrFormat = "生产 [%s] 失败, data:%+v, err:%+v, stack:%+v"
ConsumerLogErrFormat = "消费 [%s] 失败, body:%+v, err:%+v"
ProducerLogErrFormat = "生产 [%s] 失败, body:%+v, err:%+v"
)
// ConsumerLog 消费日志
func ConsumerLog(ctx context.Context, topic string, mqMsg MqMsg, err error) {
if err != nil {
g.Log().Printf(ctx, ConsumerLogErrFormat, topic, mqMsg.MsgId, mqMsg.BodyString(), err, charset.ParseErrStack(err))
} else {
g.Log().Print(ctx, "消费 ["+topic+"] 成功", mqMsg.MsgId)
g.Log().Errorf(ctx, ConsumerLogErrFormat, topic, string(mqMsg.Body), err)
}
}
// ProducerLog 生产日志
func ProducerLog(ctx context.Context, topic string, data interface{}, err error) {
func ProducerLog(ctx context.Context, topic string, mqMsg MqMsg, err error) {
if err != nil {
g.Log().Printf(ctx, ProducerLogErrFormat, topic, gconv.String(data), err, charset.ParseErrStack(err))
} else {
g.Log().Print(ctx, "生产 ["+topic+"] 成功", gconv.String(data))
g.Log().Errorf(ctx, ProducerLogErrFormat, topic, string(mqMsg.Body), err)
}
}

View File

@@ -11,6 +11,17 @@ func Push(topic string, data interface{}) (err error) {
return
}
mqMsg, err := q.SendMsg(topic, gconv.String(data))
ProducerLog(ctx, topic, mqMsg.MsgId, err)
return err
ProducerLog(ctx, topic, mqMsg, err)
return
}
// DelayPush 推送延迟队列
func DelayPush(topic string, data interface{}, second int64) (err error) {
q, err := InstanceProducer()
if err != nil {
return
}
mqMsg, err := q.SendDelayMsg(topic, gconv.String(data), second)
ProducerLog(ctx, topic, mqMsg, err)
return
}

View File

@@ -18,6 +18,7 @@ import (
type MqProducer interface {
SendMsg(topic string, body string) (mqMsg MqMsg, err error)
SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error)
SendDelayMsg(topic string, body string, delaySecond int64) (mqMsg MqMsg, err error)
}
type MqConsumer interface {
@@ -42,10 +43,7 @@ type Config struct {
}
type RedisConf struct {
Address string `json:"address"`
Db int `json:"db"`
Pass string `json:"pass"`
IdleTimeout int `json:"idleTimeout"`
Timeout int64 `json:"timeout"`
}
type RocketmqConf struct {
Address []string `json:"address"`
@@ -124,19 +122,12 @@ func NewProducer(groupName string) (mqClient MqProducer, err error) {
Version: config.Kafka.Version,
})
case "redis":
address := g.Cfg().MustGet(ctx, "queue.redis.address", nil).String()
if len(address) == 0 {
err = gerror.New("queue redis address is not support")
return
if _, err = g.Redis().Do(ctx, "ping"); err == nil {
mqClient = RegisterRedisMqProducer(RedisOption{
Timeout: config.Redis.Timeout,
}, groupName)
}
mqClient, err = RegisterRedisMqProducer(RedisOption{
Addr: config.Redis.Address,
Passwd: config.Redis.Pass,
DBnum: config.Redis.Db,
Timeout: config.Redis.IdleTimeout,
}, PoolOption{
5, 50, 5,
}, groupName, config.Retry)
case "disk":
config.Disk.GroupName = groupName
mqClient, err = RegisterDiskMqProducer(config.Disk)
@@ -197,19 +188,11 @@ func NewConsumer(groupName string) (mqClient MqConsumer, err error) {
ClientId: clientId,
})
case "redis":
if len(config.Redis.Address) == 0 {
err = gerror.New("queue redis address is not support")
return
if _, err = g.Redis().Do(ctx, "ping"); err == nil {
mqClient = RegisterRedisMqConsumer(RedisOption{
Timeout: config.Redis.Timeout,
}, groupName)
}
mqClient, err = RegisterRedisMqConsumer(RedisOption{
Addr: config.Redis.Address,
Passwd: config.Redis.Pass,
DBnum: config.Redis.Db,
Timeout: config.Redis.IdleTimeout,
}, PoolOption{
5, 50, 5,
}, groupName)
case "disk":
config.Disk.GroupName = groupName
mqClient, err = RegisterDiskMqConsumer(config.Disk)

View File

@@ -1,42 +1,26 @@
package queue
import (
"context"
"encoding/json"
"fmt"
"github.com/bufanyun/pool"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gomodule/redigo/redis"
"hotgo/utility/encrypt"
"math/rand"
"strconv"
"time"
)
type RedisMq struct {
poolName string
groupName string
retry int
timeout int
}
type PoolOption struct {
InitCap int
MaxCap int
IdleTimeout int
timeout int64
}
type RedisOption struct {
Addr string
Passwd string
DBnum int
Timeout int
}
var redisPoolMap map[string]pool.Pool
func init() {
redisPoolMap = make(map[string]pool.Pool)
Timeout int64
}
// SendMsg 按字符串类型生产数据
@@ -49,43 +33,88 @@ func (r *RedisMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error
if r.poolName == "" {
return mqMsg, gerror.New("RedisMq producer not register")
}
if topic == "" {
return mqMsg, gerror.New("RedisMq topic is empty")
}
msgId := getRandMsgId()
rdx, put, err := getRedis(r.poolName, r.retry)
defer put()
if err != nil {
return mqMsg, gerror.New(fmt.Sprint("queue redis 生产者获取redis实例失败:", err))
}
mqMsg = MqMsg{
RunType: SendMsg,
Topic: topic,
MsgId: msgId,
MsgId: getRandMsgId(),
Body: body,
Timestamp: time.Now(),
}
mqMsgJson, err := json.Marshal(mqMsg)
data, err := json.Marshal(mqMsg)
if err != nil {
return mqMsg, gerror.New(fmt.Sprint("queue redis 生产者解析json消息失败:", err))
return
}
queueName := r.genQueueName(r.groupName, topic)
_, err = redis.Int64(rdx.Do("LPUSH", queueName, mqMsgJson))
if err != nil {
return mqMsg, gerror.New(fmt.Sprint("queue redis 生产者添加消息失败:", err))
key := r.genKey(r.groupName, topic)
if _, err = g.Redis().Do(ctx, "LPUSH", key, data); err != nil {
return
}
if r.timeout > 0 {
_, err = rdx.Do("EXPIRE", queueName, r.timeout)
if err != nil {
return mqMsg, gerror.New(fmt.Sprint("queue redis 生产者设置过期时间失败:", err))
if _, err = g.Redis().Do(ctx, "EXPIRE", key, r.timeout); err != nil {
return
}
}
return
}
func (r *RedisMq) SendDelayMsg(topic string, body string, delaySecond int64) (mqMsg MqMsg, err error) {
if delaySecond < 1 {
return r.SendMsg(topic, body)
}
if r.poolName == "" {
err = gerror.New("SendDelayMsg RedisMq not register")
return
}
if topic == "" {
err = gerror.New("SendDelayMsg RedisMq topic is empty")
return
}
mqMsg = MqMsg{
RunType: SendMsg,
Topic: topic,
MsgId: getRandMsgId(),
Body: []byte(body),
Timestamp: time.Now(),
}
data, err := json.Marshal(mqMsg)
if err != nil {
return
}
var (
conn = g.Redis()
key = r.genKey(r.groupName, "delay:"+topic)
expireSecond = time.Now().Unix() + delaySecond
timePiece = fmt.Sprintf("%s:%d", key, expireSecond)
z = gredis.ZAddMember{Score: float64(expireSecond), Member: timePiece}
)
if _, err = conn.ZAdd(ctx, key, &gredis.ZAddOption{}, z); err != nil {
return
}
if _, err = conn.RPush(ctx, timePiece, data); err != nil {
return
}
// consumer will also delete the item
if r.timeout > 0 {
_, _ = conn.Expire(ctx, timePiece, r.timeout+delaySecond)
_, _ = conn.Expire(ctx, key, r.timeout)
}
return
}
@@ -98,190 +127,148 @@ func (r *RedisMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg))
return gerror.New("RedisMq topic is empty")
}
queueName := r.genQueueName(r.groupName, topic)
var (
key = r.genKey(r.groupName, topic)
key2 = r.genKey(r.groupName, "delay:"+topic)
)
go func() {
for range time.Tick(500 * time.Millisecond) {
mqMsgList := r.loopReadQueue(queueName)
for range time.Tick(300 * time.Millisecond) {
mqMsgList := r.loopReadQueue(key)
for _, mqMsg := range mqMsgList {
receiveDo(mqMsg)
}
}
}()
go func() {
mqMsgCh, errCh := r.loopReadDelayQueue(key2)
for mqMsg := range mqMsgCh {
receiveDo(mqMsg)
}
for err = range errCh {
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
g.Log().Infof(ctx, "ListenReceiveMsgDo Delay topic:%v, err:%+v", topic, err)
}
}
}()
select {}
}
// 生成队列名称
func (r *RedisMq) genQueueName(groupName string, topic string) string {
// 生成队列key
func (r *RedisMq) genKey(groupName string, topic string) string {
return fmt.Sprintf("queue:%s_%s", groupName, topic)
}
func (r *RedisMq) loopReadQueue(queueName string) (mqMsgList []MqMsg) {
rdx, put, err := getRedis(r.poolName, r.retry)
defer put()
if err != nil {
g.Log().Warningf(ctx, "loopReadQueue getRedis err:%+v", err)
return
}
func (r *RedisMq) loopReadQueue(key string) (mqMsgList []MqMsg) {
conn := g.Redis()
for {
infoByte, err := redis.Bytes(rdx.Do("RPOP", queueName))
if redis.ErrNil == err || len(infoByte) == 0 {
break
}
data, err := conn.Do(ctx, "RPOP", key)
if err != nil {
g.Log().Warningf(ctx, "loopReadQueue redis RPOP err:%+v", err)
break
}
var mqMsg MqMsg
if err = json.Unmarshal(infoByte, &mqMsg); err != nil {
g.Log().Warningf(ctx, "loopReadQueue Unmarshal err:%+v", err)
if data.IsEmpty() {
break
}
var mqMsg MqMsg
if err = data.Scan(&mqMsg); err != nil {
g.Log().Warningf(ctx, "loopReadQueue Scan err:%+v", err)
break
}
if mqMsg.MsgId != "" {
mqMsgList = append(mqMsgList, mqMsg)
}
}
return mqMsgList
}
func RegisterRedisMqProducer(connOpt RedisOption, poolOpt PoolOption, groupName string, retry int) (client MqProducer, err error) {
client, err = RegisterRedisMq(connOpt, poolOpt, groupName, retry)
if err != nil {
err = gerror.Newf("RegisterRedisMqProducer err:%+v", err)
return
}
return
func RegisterRedisMqProducer(connOpt RedisOption, groupName string) (client MqProducer) {
return RegisterRedisMq(connOpt, groupName)
}
// RegisterRedisMqConsumer 注册消费者
func RegisterRedisMqConsumer(connOpt RedisOption, poolOpt PoolOption, groupName string) (client MqConsumer, err error) {
client, err = RegisterRedisMq(connOpt, poolOpt, groupName, 0)
if err != nil {
err = gerror.Newf("RegisterRedisMqConsumer err:%+v", err)
return
}
return
func RegisterRedisMqConsumer(connOpt RedisOption, groupName string) (client MqConsumer) {
return RegisterRedisMq(connOpt, groupName)
}
// RegisterRedisMq 注册redis实例
func RegisterRedisMq(connOpt RedisOption, poolOpt PoolOption, groupName string, retry int) (mqIns *RedisMq, err error) {
poolName, err := registerRedis(connOpt.Addr, connOpt.Passwd, connOpt.DBnum, poolOpt)
if err != nil {
return
}
if retry <= 0 {
retry = 0
}
mqIns = &RedisMq{
poolName: poolName,
func RegisterRedisMq(connOpt RedisOption, groupName string) *RedisMq {
return &RedisMq{
poolName: encrypt.Md5ToString(fmt.Sprintf("%s-%d", groupName, time.Now().UnixNano())),
groupName: groupName,
retry: retry,
timeout: connOpt.Timeout,
}
return mqIns, nil
}
// RegisterRedis 注册一个redis配置
func registerRedis(host, pass string, dbNum int, opt PoolOption) (poolName string, err error) {
poolName = encrypt.Md5ToString(fmt.Sprintf("%s-%s-%d", host, pass, dbNum))
if _, ok := redisPoolMap[poolName]; ok {
return poolName, nil
}
connRedis := func() (interface{}, error) {
conn, err := redis.Dial("tcp", host)
if err != nil {
return nil, err
}
if pass != "" {
if _, err = conn.Do("AUTH", pass); err != nil {
return nil, err
}
}
if dbNum > 0 {
if _, err = conn.Do("SELECT", dbNum); err != nil {
return nil, err
}
}
return conn, err
}
// closeRedis 关闭连接
closeRedis := func(v interface{}) error {
return v.(redis.Conn).Close()
}
// pingRedis 检测连接连通性
pingRedis := func(v interface{}) error {
conn := v.(redis.Conn)
val, err := redis.String(conn.Do("PING"))
if err != nil {
return err
}
if val != "PONG" {
return gerror.New("queue redis ping is error ping => " + val)
}
return nil
}
p, err := pool.NewChannelPool(&pool.Config{
InitialCap: opt.InitCap,
MaxCap: opt.MaxCap,
Factory: connRedis,
Close: closeRedis,
Ping: pingRedis,
IdleTimeout: time.Duration(opt.IdleTimeout) * time.Second,
})
if err != nil {
return poolName, err
}
mutex.Lock()
defer mutex.Unlock()
redisPoolMap[poolName] = p
return poolName, nil
}
// getRedis 获取一个redis db连接
func getRedis(poolName string, retry int) (db redis.Conn, put func(), err error) {
put = func() {}
if _, ok := redisPoolMap[poolName]; ok == false {
return nil, put, gerror.New("db connect is nil")
}
redisPool := redisPoolMap[poolName]
conn, err := redisPool.Get()
for i := 0; i < retry; i++ {
if err == nil {
break
}
conn, err = redisPool.Get()
time.Sleep(time.Second)
}
if err != nil {
return nil, put, err
}
put = func() {
if err = redisPool.Put(conn); err != nil {
return
}
}
db = conn.(redis.Conn)
return db, put, nil
}
func getRandMsgId() string {
rand.Seed(time.Now().UnixNano())
rand.NewSource(time.Now().UnixNano())
radium := rand.Intn(999) + 1
timeCode := time.Now().UnixNano()
return fmt.Sprintf("%d%.4d", timeCode, radium)
}
func (r *RedisMq) loopReadDelayQueue(key string) (resCh chan MqMsg, errCh chan error) {
resCh = make(chan MqMsg, 0)
errCh = make(chan error, 1)
go func() {
defer close(resCh)
defer close(errCh)
conn := g.Redis()
for {
now := time.Now().Unix()
do, err := conn.Do(ctx, "zrangebyscore", key, "0", strconv.FormatInt(now, 10), "limit", 0, 1)
if err != nil {
return
}
val := do.Strings()
if len(val) == 0 {
select {
case <-ctx.Done():
errCh <- ctx.Err()
return
case <-time.After(time.Second):
continue
}
}
for _, listK := range val {
for {
pop, err := conn.LPop(ctx, listK)
if err != nil {
errCh <- err
return
} else if pop.IsEmpty() {
conn.ZRem(ctx, key, listK)
conn.Del(ctx, listK)
break
} else {
var mqMsg MqMsg
if err = pop.Scan(&mqMsg); err != nil {
g.Log().Warningf(ctx, "loopReadDelayQueue Scan err:%+v", err)
break
}
if mqMsg.MsgId == "" {
continue
}
select {
case resCh <- mqMsg:
case <-ctx.Done():
errCh <- ctx.Err()
return
}
}
}
}
}
}()
return resCh, errCh
}

View File

@@ -3,7 +3,6 @@
// @Copyright Copyright (c) 2023 HotGo CLI
// @Author Ms <133814250@qq.com>
// @License https://github.com/bufanyun/hotgo/blob/master/LICENSE
//
package queue
import (
@@ -15,6 +14,7 @@ import (
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"github.com/apache/rocketmq-client-go/v2/rlog"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
)
@@ -81,6 +81,11 @@ func (r *RocketMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err erro
return mqMsg, nil
}
func (r *RocketMq) SendDelayMsg(topic string, body string, delaySecond int64) (mqMsg MqMsg, err error) {
err = gerror.New("implement me")
return
}
// ListenReceiveMsgDo 消费数据
func (r *RocketMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg)) (err error) {
if r.consumerIns == nil {