mirror of
https://github.com/bufanyun/hotgo.git
synced 2025-10-12 04:53:47 +08:00
版本预发布
This commit is contained in:
@@ -48,12 +48,13 @@ func (r *KafkaMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error
|
||||
}
|
||||
|
||||
if r.producerIns == nil {
|
||||
return mqMsg, gerror.New("queue kafka producerIns is nil")
|
||||
err = gerror.New("queue kafka producerIns is nil")
|
||||
return
|
||||
}
|
||||
|
||||
r.producerIns.Input() <- msg
|
||||
ctx, cancle := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancle()
|
||||
sendCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case info := <-r.producerIns.Successes():
|
||||
@@ -68,7 +69,7 @@ func (r *KafkaMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error
|
||||
if nil != fail {
|
||||
return mqMsg, fail.Err
|
||||
}
|
||||
case <-ctx.Done():
|
||||
case <-sendCtx.Done():
|
||||
return mqMsg, gerror.New("send mqMst timeout")
|
||||
}
|
||||
|
||||
@@ -86,22 +87,23 @@ func (r *KafkaMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg))
|
||||
receiveDoFun: receiveDo,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func(ctx context.Context) {
|
||||
consumerCtx, cancel := context.WithCancel(context.Background())
|
||||
go func(consumerCtx context.Context) {
|
||||
for {
|
||||
if err := r.consumerIns.Consume(ctx, []string{topic}, &consumer); err != nil {
|
||||
if err = r.consumerIns.Consume(consumerCtx, []string{topic}, &consumer); err != nil {
|
||||
g.Log().Fatalf(ctx, "kafka Error from consumer, err%+v", err)
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
g.Log().Debugf(ctx, fmt.Sprintf("kafka consoumer stop : %v", ctx.Err()))
|
||||
if consumerCtx.Err() != nil {
|
||||
g.Log().Debugf(ctx, fmt.Sprintf("kafka consoumer stop : %v", consumerCtx.Err()))
|
||||
return
|
||||
}
|
||||
consumer.ready = make(chan bool)
|
||||
}
|
||||
}(ctx)
|
||||
}(consumerCtx)
|
||||
|
||||
<-consumer.ready // Await till the consumer has been set up
|
||||
// await till the consumer has been set up
|
||||
<-consumer.ready
|
||||
g.Log().Debug(ctx, "kafka consumer up and running!...")
|
||||
|
||||
signal.AppDefer(func() {
|
||||
@@ -115,85 +117,94 @@ func (r *KafkaMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg))
|
||||
return
|
||||
}
|
||||
|
||||
// RegisterKafkaMqConsumerMust 注册消费者
|
||||
func RegisterKafkaMqConsumerMust(connOpt KafkaConfig) (client MqConsumer) {
|
||||
// RegisterKafkaMqConsumer 注册消费者
|
||||
func RegisterKafkaMqConsumer(connOpt KafkaConfig) (client MqConsumer, err error) {
|
||||
mqIns := &KafkaMq{}
|
||||
kfkVersion, _ := sarama.ParseKafkaVersion(connOpt.Version)
|
||||
kfkVersion, err := sarama.ParseKafkaVersion(connOpt.Version)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if validateVersion(kfkVersion) == false {
|
||||
kfkVersion = sarama.V2_4_0_0
|
||||
}
|
||||
|
||||
brokers := connOpt.Brokers
|
||||
config := sarama.NewConfig()
|
||||
config.Consumer.Return.Errors = true
|
||||
config.Version = kfkVersion
|
||||
conf := sarama.NewConfig()
|
||||
conf.Consumer.Return.Errors = true
|
||||
conf.Version = kfkVersion
|
||||
if connOpt.UserName != "" {
|
||||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.User = connOpt.UserName
|
||||
config.Net.SASL.Password = connOpt.Password
|
||||
conf.Net.SASL.Enable = true
|
||||
conf.Net.SASL.User = connOpt.UserName
|
||||
conf.Net.SASL.Password = connOpt.Password
|
||||
}
|
||||
|
||||
// 默认按随机方式消费
|
||||
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
|
||||
config.Consumer.Offsets.Initial = sarama.OffsetNewest
|
||||
config.Consumer.Offsets.AutoCommit.Interval = 10 * time.Millisecond
|
||||
config.ClientID = connOpt.ClientId
|
||||
conf.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
|
||||
conf.Consumer.Offsets.Initial = sarama.OffsetNewest
|
||||
conf.Consumer.Offsets.AutoCommit.Interval = 10 * time.Millisecond
|
||||
conf.ClientID = connOpt.ClientId
|
||||
|
||||
consumerClient, err := sarama.NewConsumerGroup(brokers, connOpt.GroupID, config)
|
||||
consumerClient, err := sarama.NewConsumerGroup(brokers, connOpt.GroupID, conf)
|
||||
if err != nil {
|
||||
g.Log().Fatal(ctx, err)
|
||||
return
|
||||
}
|
||||
mqIns.consumerIns = consumerClient
|
||||
return mqIns
|
||||
return mqIns, err
|
||||
}
|
||||
|
||||
// RegisterKafkaProducerMust 注册并启动生产者接口实现
|
||||
func RegisterKafkaProducerMust(connOpt KafkaConfig) (client MqProducer) {
|
||||
// RegisterKafkaProducer 注册并启动生产者接口实现
|
||||
func RegisterKafkaProducer(connOpt KafkaConfig) (client MqProducer, err error) {
|
||||
mqIns := &KafkaMq{}
|
||||
|
||||
connOpt.ClientId = "HOTGO-Producer"
|
||||
RegisterKafkaProducer(connOpt, mqIns) //这里如果使用go程需要处理chan同步问题
|
||||
|
||||
return mqIns
|
||||
// 这里如果使用go程需要处理chan同步问题
|
||||
if err = doRegisterKafkaProducer(connOpt, mqIns); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return mqIns, nil
|
||||
}
|
||||
|
||||
// RegisterKafkaProducer 注册同步类型实例
|
||||
func RegisterKafkaProducer(connOpt KafkaConfig, mqIns *KafkaMq) {
|
||||
kfkVersion, _ := sarama.ParseKafkaVersion(connOpt.Version)
|
||||
// doRegisterKafkaProducer 注册同步类型实例
|
||||
func doRegisterKafkaProducer(connOpt KafkaConfig, mqIns *KafkaMq) (err error) {
|
||||
kfkVersion, err := sarama.ParseKafkaVersion(connOpt.Version)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if validateVersion(kfkVersion) == false {
|
||||
kfkVersion = sarama.V2_4_0_0
|
||||
}
|
||||
|
||||
brokers := connOpt.Brokers
|
||||
config := sarama.NewConfig()
|
||||
conf := sarama.NewConfig()
|
||||
// 等待服务器所有副本都保存成功后的响应
|
||||
config.Producer.RequiredAcks = sarama.WaitForAll
|
||||
conf.Producer.RequiredAcks = sarama.WaitForAll
|
||||
// 随机向partition发送消息
|
||||
config.Producer.Partitioner = sarama.NewRandomPartitioner
|
||||
conf.Producer.Partitioner = sarama.NewRandomPartitioner
|
||||
// 是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
|
||||
config.Producer.Return.Successes = true
|
||||
conf.Producer.Return.Successes = true
|
||||
|
||||
config.Producer.Return.Errors = true
|
||||
config.Producer.Compression = sarama.CompressionNone
|
||||
config.ClientID = connOpt.ClientId
|
||||
conf.Producer.Return.Errors = true
|
||||
conf.Producer.Compression = sarama.CompressionNone
|
||||
conf.ClientID = connOpt.ClientId
|
||||
|
||||
config.Version = kfkVersion
|
||||
conf.Version = kfkVersion
|
||||
if connOpt.UserName != "" {
|
||||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.User = connOpt.UserName
|
||||
config.Net.SASL.Password = connOpt.Password
|
||||
conf.Net.SASL.Enable = true
|
||||
conf.Net.SASL.User = connOpt.UserName
|
||||
conf.Net.SASL.Password = connOpt.Password
|
||||
}
|
||||
|
||||
var err error
|
||||
mqIns.producerIns, err = sarama.NewAsyncProducer(brokers, config)
|
||||
mqIns.producerIns, err = sarama.NewAsyncProducer(brokers, conf)
|
||||
if err != nil {
|
||||
g.Log().Fatal(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
signal.AppDefer(func() {
|
||||
g.Log().Debug(ctx, "kafka producer AsyncClose...")
|
||||
mqIns.producerIns.AsyncClose()
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// validateVersion 验证版本是否有效
|
||||
|
Reference in New Issue
Block a user