赞
踩
kafka sarama提供producer及consumer相关客户端建立及使用,本文主要研究sarama如何保证客户端使用时的长连接。
一:客户端初始化
NewClient()函数用于完成客户端初始化,更新metadata数据,供consumer和producer使用。其中metadata包含topic下所有partition,主partition信息及主partition所在broker信息。
NewClient()函数主要完成以下工作:
type client struct {
conf *Config
closer, closed chan none n
seedBrokers []*Broker
deadSeeds []*Broker
controllerID int32 // cluster controller broker id
brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
metadataTopics map[string]none // topics that need to collect metadata
}
二:cosumer客户端
consumer客户端主动探测kafka server信息以保证连接的有效性。
kafka 消费客户端源码解析(golang版)详细的介绍了consumer客户端如何消费kafka server中消息,其中ConsumerPartition函数中调用dispatch完成连接异常工作处理。
go dispatcher()函数,监听trigger,每次trigger代表一次消费连接异常,异常发生后,默认每2秒重连一次,直到重连成功
func (child *partitionConsumer) dispatcher() {
for range child.trigger {
select {
case <-child.dying:
close(child.trigger)
case <-time.After
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。