赞
踩
首先说一下延迟队列这个东西,实际上实现他的方法有很多,kafka实现并不是一个最好的选择,例如redis的zset可以实现,rocketmq天然的可以实现,rabbitmq也可以实现。如果切换前几种方案成本高的情况下,那么就使用kafka实现,实际上kafka实现延迟队列也是借用了rocketmq的延迟队列思想,rocketmq的延迟时间是固定的几个,并不是自定义的,但是kafka可以实现自定义的延迟时间,但是不能过多,因为是依据topic实现的,接下来我使用go实现简单的kafka的延迟队列。
1、首先创建两个topic、一个delayTopic、一个realTopic
2、生产者把消息先发送到delayTopic
3、延迟服务再把delayTopic里面的消息超过我们所设置的时间写入到realTopic
4、消费者再消费realTopic里面的数据即可
msg := &sarama.ProducerMessage{
Topic: kafka.DelayTopic,
Timestamp: time.Now(),
Key: sarama.StringEncoder("rta_key"),
Value: sarama.StringEncoder(riStr),
}
partition, offset, err := kafka.KafkaDelayQueue.SendMessage(msg)
const ( DelayTime = time.Minute * 5 DelayTopic = "delayTopic" RealTopic = "realTopic" ) // KafkaDelayQueueProducer 延迟队列生产者,包含了生产者和延迟服务 type KafkaDelayQueueProducer struct { producer sarama.SyncProducer // 生产者 delayTopic string // 延迟服务主题 } // NewKafkaDelayQueueProducer 创建延迟队列生产者 // producer 生产者 // delayServiceConsumerGroup 延迟服务消费者组 // delayTime 延迟时间 // delayTopic 延迟服务主题 // realTopic 真实队列主题 func NewKafkaDelayQueueProducer(producer sarama.SyncProducer, delayServiceConsumerGroup sarama.ConsumerGroup, delayTime time.Duration, delayTopic, realTopic string, log *log) *KafkaDelayQueueProducer { var ( signals = make(chan os.Signal, 1) ) signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, os.Interrupt) // 启动延迟服务 consumer := NewDelayServiceConsumer(producer, delayTime, realTopic, log) log.Info("[NewKafkaDelayQueueProducer] delay queue consumer start") go func() { for { if err := delayServiceConsumerGroup.Consume(context.Background(), []string{delayTopic}, consumer); err != nil { log.Error("[NewKafkaDelayQueueProducer] delay queue consumer failed,err: ", zap.Error(err)) break } time.Sleep(2 * time.Second) log.Info("[NewKafkaDelayQueueProducer] 检测消费函数是否一直执行") // 检查是否接收到中断信号,如果是则退出循环 select { case sin := <-signals: consumer.Logger.Info("[NewKafkaDelayQueueProducer]get signal,", zap.Any("signal", sin)) return default: } } log.Info("[NewKafkaDelayQueueProducer] consumer func exit") }() log.Info("[NewKafkaDelayQueueProducer] return KafkaDelayQueueProducer") return &KafkaDelayQueueProducer{ producer: producer, delayTopic: delayTopic, } } // SendMessage 发送消息 func (q *KafkaDelayQueueProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { msg.Topic = q.delayTopic return q.producer.SendMessage(msg) } // DelayServiceConsumer 延迟服务消费者 type DelayServiceConsumer struct { producer sarama.SyncProducer delay time.Duration realTopic string Logger *log.DomobLog } func NewDelayServiceConsumer(producer sarama.SyncProducer, delay time.Duration, realTopic string, log *log.DomobLog) *DelayServiceConsumer { return &DelayServiceConsumer{ producer: producer, delay: delay, realTopic: realTopic, Logger: log, } } func (c *DelayServiceConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { c.Logger.Info("[delaye ConsumerClaim] cc") for message := range claim.Messages() { // 如果消息已经超时,把消息发送到真实队列 now := time.Now() c.Logger.Info("[delay ConsumeClaim] out", zap.Any("send real topic res", now.Sub(message.Timestamp) >= c.delay), zap.Any("message.Timestamp", message.Timestamp), zap.Any("c.delay", c.delay), zap.Any("claim.Messages len", len(claim.Messages())), zap.Any("sub:", now.Sub(message.Timestamp)), zap.Any("meskey:", message.Key), zap.Any("message:", string(message.Value)), ) if now.Sub(message.Timestamp) >= c.delay { c.Logger.Info("[delay ConsumeClaim] jinlai", zap.Any("mes", string(message.Value))) _, _, err := c.producer.SendMessage(&sarama.ProducerMessage{ Topic: c.realTopic, Timestamp: message.Timestamp, Key: sarama.ByteEncoder(message.Key), Value: sarama.ByteEncoder(message.Value), }) if err != nil { c.Logger.Info("[delay ConsumeClaim] delay already send to real topic failed", zap.Error(err)) return nil } if err == nil { session.MarkMessage(message, "") c.Logger.Info("[delay ConsumeClaim] delay already send to real topic success") continue } } // 否则休眠一秒 time.Sleep(time.Second) return nil } c.Logger.Info("[delay ConsumeClaim] ph", zap.Any("partitiion", claim.Partition()), zap.Any("HighWaterMarkOffset", claim.HighWaterMarkOffset())) c.Logger.Info("[delay ConsumeClaim] delay consumer end") return nil } func (c *DelayServiceConsumer) Setup(sarama.ConsumerGroupSession) error { return nil } func (c *DelayServiceConsumer) Cleanup(sarama.ConsumerGroupSession) error { return nil }
这个方法整体逻辑就是不断消费延迟队列里面的消息,判断消息时间是否大于现在,如果大于现在说明消息超时了,就把该消息发送到真实的队列里面去了,真实队列是一直在消费的。如果没超时的话就不会标记消息,还会重新消费,消费成功会标记该消息。
重点:我在测试的时候是一秒拉一次消息,但这个也不是太准时,不过最终结果差距不大,想知道具体怎么消费的可以自己debug
type ConsumerRta struct { Logger *log } func ConsumerToRequestRta(consumerGroup sarama.ConsumerGroup, lg *log) { var ( signals = make(chan os.Signal, 1) wg = &sync.WaitGroup{} ) signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, os.Interrupt) wg.Add(1) // 启动消费者协程 go func() { defer wg.Done() consumer := NewConsumerRta(lg) consumer.Logger.Info("[ConsumerToRequestRta] consumer group start") // 执行消费者组消费 for { if err := consumerGroup.Consume(context.Background(), []string{kafka.RealTopic}, consumer); err != nil { consumer.Logger.Error("[ConsumerToRequestRta] Error from consumer group:", zap.Error(err)) break } time.Sleep(2 * time.Second) // 等待一段时间后重试 // 检查是否接收到中断信号,如果是则退出循环 select { case sin := <-signals: consumer.Logger.Info("get signal,", zap.Any("signal", sin)) return default: } } }() wg.Wait() lg.Info("[ConsumerToRequestRta] consumer end & exit") } func NewConsumerRta(lg *log) *ConsumerRta { return &ConsumerRta{ Logger: lg, } } func (c *ConsumerRta) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { // 消费逻辑 session.MarkMessage(message, "") return nil } return nil } func (c *ConsumerRta) Setup(sarama.ConsumerGroupSession) error { return nil } func (c *ConsumerRta) Cleanup(sarama.ConsumerGroupSession) error { return nil }
type KafkaConfig struct { BrokerList []string Topic []string GroupId []string Cfg *sarama.Config PemPath string KeyPath string CaPemPath string } var ( Producer sarama.SyncProducer ConsumerGroupReal sarama.ConsumerGroup ConsumerGroupDelay sarama.ConsumerGroup KafkaDelayQueue *KafkaDelayQueueProducer ) func NewKafkaConfig(cfg KafkaConfig) (err error) { Producer, err = sarama.NewSyncProducer(cfg.BrokerList, cfg.Cfg) if err != nil { return err } ConsumerGroupReal, err = sarama.NewConsumerGroup(cfg.BrokerList, cfg.GroupId[0], cfg.Cfg) if err != nil { return err } ConsumerGroupDelay, err = sarama.NewConsumerGroup(cfg.BrokerList, cfg.GroupId[1], cfg.Cfg) if err != nil { return err } return nil } func GetKafkaDelayQueue(log *log) { KafkaDelayQueue = NewKafkaDelayQueueProducer(Producer, ConsumerGroupDelay, DelayTime, DelayTopic, RealTopic, log) }
这个里面我没有怎么封装,可以自行封装,使用的是IBM的sarama客户端
基本上就是以上三步实现,里面的一些log日志可以传递自己的log日志即可,使用的是消费者组消费的,添加上自己的topic和groupid即可
重点:以上实现延迟时间可能不是太精准,我使用的时候还是有点小小的误差,不过误差不大,强相关业务还是使用其他专业实现延迟队列mq,或使用自行方案
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。