mirror of
https://github.com/bufanyun/hotgo.git
synced 2025-11-10 19:23:44 +08:00
golangci-lint run
This commit is contained in:
@@ -5,24 +5,24 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// consumerStrategy 消费者策略,实现该接口即可加入到消费队列中
|
||||
type consumerStrategy interface {
|
||||
// Consumer 消费者接口,实现该接口即可加入到消费队列中
|
||||
type Consumer interface {
|
||||
GetTopic() string // 获取消费主题
|
||||
Handle(ctx context.Context, mqMsg MqMsg) (err error) // 处理消息
|
||||
Handle(ctx context.Context, mqMsg MqMsg) (err error) // 处理消息的方法
|
||||
}
|
||||
|
||||
// consumerManager 消费者管理
|
||||
type consumerManager struct {
|
||||
sync.Mutex
|
||||
list map[string]consumerStrategy // 维护的消费者列表
|
||||
list map[string]Consumer // 维护的消费者列表
|
||||
}
|
||||
|
||||
var consumers = &consumerManager{
|
||||
list: make(map[string]consumerStrategy),
|
||||
list: make(map[string]Consumer),
|
||||
}
|
||||
|
||||
// RegisterConsumer 注册任务到消费者队列
|
||||
func RegisterConsumer(cs consumerStrategy) {
|
||||
func RegisterConsumer(cs Consumer) {
|
||||
consumers.Lock()
|
||||
defer consumers.Unlock()
|
||||
topic := cs.GetTopic()
|
||||
@@ -35,18 +35,18 @@ func RegisterConsumer(cs consumerStrategy) {
|
||||
|
||||
// StartConsumersListener 启动所有已注册的消费者监听
|
||||
func StartConsumersListener(ctx context.Context) {
|
||||
for _, consumer := range consumers.list {
|
||||
go func(consumer consumerStrategy) {
|
||||
consumerListen(ctx, consumer)
|
||||
}(consumer)
|
||||
for _, c := range consumers.list {
|
||||
go func(c Consumer) {
|
||||
consumerListen(ctx, c)
|
||||
}(c)
|
||||
}
|
||||
}
|
||||
|
||||
// consumerListen 消费者监听
|
||||
func consumerListen(ctx context.Context, job consumerStrategy) {
|
||||
func consumerListen(ctx context.Context, job Consumer) {
|
||||
var (
|
||||
topic = job.GetTopic()
|
||||
consumer, err = InstanceConsumer()
|
||||
topic = job.GetTopic()
|
||||
c, err = InstanceConsumer()
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -54,17 +54,16 @@ func consumerListen(ctx context.Context, job consumerStrategy) {
|
||||
return
|
||||
}
|
||||
|
||||
if listenErr := consumer.ListenReceiveMsgDo(topic, func(mqMsg MqMsg) {
|
||||
if listenErr := c.ListenReceiveMsgDo(topic, func(mqMsg MqMsg) {
|
||||
err = job.Handle(ctx, mqMsg)
|
||||
|
||||
//if err != nil {
|
||||
// if err != nil {
|
||||
// // 遇到错误,重新加入到队列
|
||||
// //queue.Push(topic, mqMsg.Body)
|
||||
//}
|
||||
// }
|
||||
|
||||
// 记录消费队列日志
|
||||
ConsumerLog(ctx, topic, mqMsg, err)
|
||||
|
||||
}); listenErr != nil {
|
||||
Logger().Fatalf(ctx, "消费队列:%s 监听失败, err:%+v", topic, listenErr)
|
||||
}
|
||||
|
||||
@@ -117,8 +117,9 @@ func (r *reader) close() {
|
||||
// sync index and offset
|
||||
func (r *reader) sync() {
|
||||
name := path.Join(r.config.Path, indexFile)
|
||||
data, _ := json.Marshal(&r.checkpoint)
|
||||
_ = os.WriteFile(name, data, filePerm)
|
||||
if data, err := json.Marshal(&r.checkpoint); err == nil {
|
||||
_ = os.WriteFile(name, data, filePerm)
|
||||
}
|
||||
}
|
||||
|
||||
// restore index and offset
|
||||
|
||||
@@ -85,7 +85,7 @@ func (r *KafkaMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg))
|
||||
return gerror.New("queue kafka consumer not register")
|
||||
}
|
||||
|
||||
consumer := Consumer{
|
||||
consumer := KaConsumer{
|
||||
ready: make(chan bool),
|
||||
receiveDoFun: receiveDo,
|
||||
}
|
||||
@@ -219,26 +219,25 @@ func validateVersion(version sarama.KafkaVersion) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type Consumer struct {
|
||||
type KaConsumer struct {
|
||||
ready chan bool
|
||||
receiveDoFun func(mqMsg MqMsg)
|
||||
}
|
||||
|
||||
// Setup is run at the beginning of a new session, before ConsumeClaim
|
||||
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
|
||||
func (consumer *KaConsumer) Setup(sarama.ConsumerGroupSession) error {
|
||||
// Mark the consumer as ready
|
||||
close(consumer.ready)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
|
||||
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
|
||||
func (consumer *KaConsumer) Cleanup(sarama.ConsumerGroupSession) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
|
||||
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
||||
|
||||
func (consumer *KaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
||||
// NOTE:
|
||||
// Do not move the code below to a goroutine.
|
||||
// The `ConsumeClaim` itself is called within a goroutine, see:
|
||||
|
||||
Reference in New Issue
Block a user