当前位置:   article > 正文

kafka长连接(golang)_golang kafka重连

golang kafka重连

kafka sarama提供producer及consumer相关客户端建立及使用,本文主要研究sarama如何保证客户端使用时的长连接。
一:客户端初始化
NewClient()函数用于完成客户端初始化,更新metadata数据,供consumer和producer使用。其中metadata包含topic下所有partition,主partition信息及主partition所在broker信息。
NewClient()函数主要完成以下工作:

  • 根据Config与addrs生成新的client结构体,结构体参数如下:

type client struct {
   
conf *Config
closer, closed chan none n 
seedBrokers []*Broker
deadSeeds []*Broker
 
controllerID int32 // cluster controller broker id
brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
metadataTopics map[string]none // topics that need to collect metadata
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 后台不停刷新kafka server集群的metadata信息,Metadata.RefreshFrequency默认为10分钟
  • 若broker集群处于leader选举期间连不上,默认重试3次,每次等待250ms
  • config中keepAlive字段,表示连接broker的保活间隔

二:cosumer客户端
consumer客户端主动探测kafka server信息以保证连接的有效性。
kafka 消费客户端源码解析(golang版)详细的介绍了consumer客户端如何消费kafka server中消息,其中ConsumerPartition函数中调用dispatch完成连接异常工作处理。
go dispatcher()函数,监听trigger,每次trigger代表一次消费连接异常,异常发生后,默认每2秒重连一次,直到重连成功

func (child *partitionConsumer) dispatcher() {
   
   for range child.trigger {
   
      select {
   
      case <-child.dying:
         close(child.trigger)
      case <-time.After
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/500121
推荐阅读
相关标签
  

闽ICP备14008679号