赞
踩
1.为什么使用kafka?以实际业务中用户提现为例,比如一笔交易发生,这笔交易是否是欺诈交易,这笔交易是否已经达到限额了,相关用户是否在黑名单中,交易结果联动短信、Email等渠道通知用户……等等,也许一笔简单的交易,就要与反欺诈系统、黑名单系统、限额或授信管控系统、短信邮件系统进行通讯
(1)解耦,避免子系统之间过度依赖
(2)异步处理,使主程序关注自己的业务点(即正常交易逻辑的处理),将其他附加操作发送给消息接收方来处理
(3)广播,一个生产者可以向多个消费者发生消息(各子系统对应的消费者),topic的的数据发送给每一个group,但是topic下的每一个分区只会将自己的数据发送给该group下的一个customer,如果想要广播,只需要将不同的customer放在不同的group种即可
(4)错峰、缓冲,在业务量巨大的情况下,消息以队列的形成存储,且通过分布式架构,kafka集群可以横向扩容很多的broker,或者在topic中通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序。以此解决生产者与消费者处理速度不一致的问题,消费者可以根据自己的处理能力从broker中拉取消息,避免大量的请求打在服务上,造成服务崩溃
zookeeper会存储元数据信息,如kafka各broker节点信息,topic信息及partition各分区、每个分区的leader等信息都会存储在zookeeper中,同时zookeeper会配合controller对整个kafka集群进行管理,controller是从broker中进行选取的。首先说一下controller是如何产生的,zookeeper中节点分持久节点和临时节点,当创建节点的客户端与zookeeper断开连接后节点还会存在,但是当创建临时节点的客户端断开连接后临时节点会被删除。当多个broker启动时,都会向zookeeper去注册controller临时节点,注册成功的将成为controller,其余的broker会在这个临时节点上注册watcher监听器,用于监听该节点是否还存在当成为controller的broker出现故障与zookeeper断开连接后,其他broker会再去竞争创建controller临时节点。这一过程与利用zookeeper实现分布式锁相类似,不过分布式锁是使用顺序临时节点实现的,它会按照顺序监听前一个分布式锁的请求者Zookeeper实现分布式锁_kongmin_123的博客-CSDN博客_zookeeper实现分布式锁
producer:
1.Topic创建
kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test1
该命令创建了一个名为test1的topic,有2个分区,每个分区2个副本,partition两个副本会分为一主一从,下面说一下partition的主从副本是如何再broker上分布的,它有一套默认的分配规则,尽可能的保证partition的副本均匀的分布在不同的broker上,防止因为单个broker宕机导致某partition完全不可用,如下
Partition leader和follow的分布规则,将所有存活的N个Brokers和topic下待分配的Partition排序
1)将第i个Partition的leader分配到第(i mod n)个Broker上,初始时一般副本0即为leader
2)将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
上面这个其实是错误的,这样会发生所有topic下的相同partition编号的leader的都在同一个broker上,且这样分配永远都是broker编号越小分配的partition数量就越多。分配并不均匀
//Topic的partiton在broker分配方式 在kafka源码中有一个很重要的方法 assignReplicasToBrokers(
int nPartitions, //topic下分区的数量
int replicationFactor,//分区的复制因子数量
List<int> brokers, //broker编号列表
int fixedStartIndex, //第一个副本的分配位置
int startPartitionId) //起始分区id 默认第0个分区
在方法中会按照broker数量生成两个随机数,startIndex和nextReplicaShift,startIndex表示topic第0个分区第0个副本的在broker中的位置,之后分区的第0个副本依次排列在后一个broker上;而对于同一个分区的其他副本,则会计算出一个偏移量,以第0个副本的位置为基准找到自己的broker
2. 比较重要的几个参数配置
producer.batch-size 16k producer端为每一个partition维护一个缓冲区,大小为16k
producer.linger.ms 0ms 当partition缓存区不满时等待多久发送,默认0ms
producer.buffer.memory 32M procuder端缓冲区大小默认为32M
Producer.retries 1 发送失败后重试次数
消息确认机制 ack 0 、1、 -1/all
大家可能听过kafka生产者有异步可同步发送消息两种模式,从0.8之后的版本producer均为异步发送模式,producer端缓冲区默认为32M当消息产生速度大于消息发送速度,导致缓存区被装满后,需要等待缓冲区中消息都被发送后才能继续向缓冲区添加消息。但是大家平常使用kafkaTemplate.send方法时,一般都会感觉执行方法后会立即同步发送消息,这种现象出现的原因主要是由batch-size和linger.ms配置有关,前者表示对每一个partition都有一个缓冲区,默认为16K,当缓存区满了以后才会将这些消息批量发送到该partition中,但是又因为linger.ms默认为0,它表示partition缓冲区不满时,需要等待多久发送,它的默认是0,所以才会发生一有消息到来就立即发送。
同时producer还支持ack消息确认机制,当我们向一个partition发送消息时,会将该消息交由分区的leader进行存储,而它的follower会从leader拉取消息。当ack为0时,表示producer只管发送,并不保证消息被partition接收;当ack为1时,生产者需要等待消息被分区leader存储,并提交确认后,再继续发送下一个消息;当ack为-1时,表示生产者需要等待消息被leader和所有follower存储并确认后再发送下一条消息。配置从0到-1可以保证消息不易在发送过程中丢失,但是吞吐量也会变低,一般用1就可以了。
3. send方法如何选择将消息发送到topic下的具体partition
当我们使用send方法发送消息时,一般是send(topicName, message),但是我们还可以选择partition和key,只要指定了partition时,会向该partition发送消息;如果没有指定partition则会使用Partitioner.class接口的partition方法获取partition编号(kafka中它的默认实现类为DefaultPartitioner.class),它的返回值是int,表示分配的partition编号。在该方法中,如果指定了key,那么会计算key的byte数组,然后通过hash获得一个值最后对当前的partition数量partitionNum取模,选择发送的partition编号;当不指定时,会获取一个topicCounterMap,key是topic,value是一个AtomicInteger类型对象,如果是首次获取它的话会随机生成一个数字创建AtomicInteger对象,之后会通过getAndIncrement()生成数字,获取当前可用的partition列表,如果这个可用list的长度大于0时,会根据获取的值对可用列表的size取模获得n,然后list.get(n)选择发送的分区,若可用list的size为0时,则会用AtomicInteger值与partitionNum取模获取partition编号。
customer:
costomer从各分区leader处读取消息
1. 较为重要的配置
consumer.enable-auto-commit true
设为false后还是自动提交
Listener.ack-mode: BATCH -> MANUAL
consumer.auto-commit-interval 5000ms
customer消息消费后需要提交offset,默认为自动提交,,间隔为5000ms。我们可以将配置enable-auto-commit设为false,从而将offset的提交方式改为手动提交。但是当我们在spring-boot项目中使用kafka时,如果单单将配置改为false是不起作用的,offset还是自动提交,这是因为spring-boot帮我们在代码里进行了优化,KafkaMessageListenerContainer中processCommit防止我们误操作将提交方式改为手动后,却没有提交offset,如果相要生效的话,还需要将ack-mode改为MANUAL,它默认为BATCH。可以使用Acknowledgement调用acknowledge()方法进行手动提交。
2.customer拉取数据的时候如何选择partition
customer的分区分配机制与PartitionAssignor.class接口有关,AbstractPartitionAssignor.class抽象类实现了该接口,而该抽象类有3种实现方式,分别为RangeAssignor.class、RoundRobinAssignor.class、StickyAssignor.class三种,它们都实现了assign方法用于向customer分配partition,RangeAssigner是kafka的customer端partition的默认选取策略。因为topic的一个partition只能分配给一个group下的一个customer,下面举得例子中customer均在同一group。
RangeAssignor是以单个topic的partition为基础的。它会把当前topic的partition按编号排序,将customer按字典顺排序,然后将partition数除以当前订阅它的customer数,如果不能整除,排在前面的customer会多一个分区,有customer1,customer2,都订阅了topic1,topic1 有1-0,1-1,1-2, 1-3,1-4,5个分区,则c1有1-0,1-1,1-2,c2有1-3,1-4。
RoundRobinAssignor 则是会将所有topic的所有partition进行编号排序,customer也会进行字段排序,然后按照轮询的方式分配,如有customer1,customer2,订阅了topic1,topic2,topic1 有1-0,1-1,1-2, 1-3,1-4,5个分区,topic-2 有2-0,2-1,2-2 ,2-3,2-4 5个分区,则
C1 1-0 1-2 1-4. 2-1 2-3
C2 1-1 1-3. 2-0 2-2 2-4
需要注意的是range和roundRobin这两种方式,当某个customer不可用进行partition再平衡时,其他正常的customer的partition也需要进行再分配。
StickyAssignor:
有两个原则
1.尽量保证每个customer的partition数量差距不大 ,使分配尽量平衡
2.当某customer不可用,进行重分配时,正常customer的partition不再参与重分配,尽可能保持现有分配,减少一个paritition从一个customer转移分配到另一个customer所造成的开销,且第一个原则优先级高于第二个。
stickyAssigner和roundRobinAssgner在分配partition时有些相似,例如现在有C1、C2、C3均订阅T0-T3,T0、T1、T2、T3 它们分别有2个分区,则stickyAssigner和roundRobinAssgner分配均如下所示:
C1 :T0-0 T1-1 T3-0
C2 :T0-1 T2-0 T3-1
C3 :T1-0 T2-1
当C2被删除时,会发生partition的再平衡,roundRobinAssgner此时为
C1 T0-0 T1-0 T2-0 T3-0
C3 T0-1 T1-1 T2-1. T3-1
而stickyAssigner会产生,它保留了C1和C3原来分配的partition,并且保证了C1C3 partition数量的平衡
C1 :T0-0 T1-1 T3-0 T2-0
C3 :T1-0 T2-1 T0-1 T3-1
若现在有C1、C2、C3,T0、T1、T2,分别有1、2、3个分区,若C1订阅T0,C2订阅T0、T1,C2订阅T0、T1、T2,
则在roundRobinAssigner中
C1 T0-0
C2 T1-0
C3 T1-1 T2-0 T2-1 T2-2
在StrckyAssigner中使每一个customer分配的partition数量尽量平衡
C1 T0-0
C2 T1-0 T1-1
C3 T2-0 T2-1 T2-2
假如现在删除了C1,在roundRobinAssigner中
C2 T0-0 T1-1
C3 T1-0 T2-0 T2-1 T2-2
而在strckyAssigneer中有,它使customer获取partition数量平衡,且保持了原有partition不被重分配
C2 T1-0 T1-1 T0-0
C3 T2-0 T2-1 T2-2
kafka高可用机制:
高可用是指出现故障的时候,能够尽可能减少停工时间,保持服务高度的可用性。
在kafka中有多个broker节点,对每一个topic而言,也有多个分区partition,创建topic时可以指定每个分区有几个副本,在这些副本中,需要选出一个leader节点,其余为follower节点。topic的所有分区的leader会均匀的分布在不同的broker节点中,它们的follower也会均匀的分布在不同的broker中,以避免因为单一broker宕机,而引起一个partition的所有副本全部失效。
同时producer发送消息时,是发送到partition的leader副本,follower分区会从主分区来拉取数据进行备份,每一个分区的leader都会维护一个ISR集合,集合中的各从分区都保持着与主分区消息的同步,当所有ISR从分区写入成功(为了提高效率从节点只是获取消息,并没有将其写入到磁盘中)后会向主分区发送一个ack通知,只有被ISR集合中所有副本存储成功的消息,customer才能从leader获取。如果ISR中的一个分区长时间没有从leader拉取数据,就会被移除出ISR集群,等到它恢复正常后才会被重新加入。如果leader所在broker出现故障的话会从ISR集合中选举一个新的leader从而保证可用性。
kafka在版本0.8之前一个副本是否被移除isr集合是由replica.lag.time.max.ms(从leader拉取消息的时间间隔)、replica.lag.max.messages(落后leader的消息条数)这两个参数去控制的,在0.9及之后的版本replica.lag.max.messages参数被移除,仅有replica.lag.time.max.ms拉取消息时间间隔用于判断是否被移除isr集合。
kafka消息存储
kafka的消息是以日志的形式进行存储的,在kafka的配置文件server.properties中有配置log.dirs指明日志目录,mac的默认地址为/usr/local/var/lib/kafka-logs。我们知道topic的partition的各个副本是均匀分布在每个broker上的,只要partition有副本在该broker,那么日志目录下就会有该partition的一个文件夹,以topic的名字+partition编号命名,其中包括有log文件和对应的index文件。本broker的该partition副本获取的消息都会追加到log文件,log中的数据会根据配置,保存一段时间后删除,默认是7天,当一个log文件大小超过一定限制时,将会新建一个log文件,我用的2.2.1默认大小是1G。
log是一个日志文件,index是索引文件,他俩组成一个segment,一个partition由多个segemnt组成,segment又称为段,log和index的文件名一样只是后缀不同,全局的第一个segment名从0开始,其余 的segment名为上一个segment最后一个消息的offset值。index文件中记录了消息在log文件中的编号和在log中开始的位置(起始偏移量),index并没有为每一条消息都建立这种key-value对,主要是为了避免索引文件占太大的存储空间;log文件中有消息体,消息在partition中的offset,消息的长度等信息。
那么通过partition的offset如何查找消息呢?
首先根据offset值通过对log文件名二分查找确定它所在的log文件,然后通过想要寻找的offset减去文件名的offset值获取它在文件中对应的编号,进入它的index文件再次通过二分查找查找确定该offset的具体位置或在log文件的哪个范围内,然后通过遍历找到对应的消息。
大家可能好奇customer-offset文件夹中存的是什么,其实它里面存的就是topic的每个partition被哪个group下customer的消费后提交的offset,可以使用kafka提供的kafka-customer-offset-checker来查看
- Group Topic Pid Offset logSize Lag Owner
- test-consumer-group stable-test 0 601808 601808 0 none
- test-consumer-group stable-test 1 602826 602828 2 none
- test-consumer-group stable-test 2 602136 602136 0 none
而partition目录下的timeindex文件看名字就知道其实它和index一样也是一个索引文件,只不过index存储的是消息在文件中编号-消息在文件中起始偏移量的key-value对,而timeindex中索引的key是写入时间。
leader-epoch-checkpoint文件中存储的是该partition对应leader换届后第一次被写入消息的offset,如 0 0,1 200 ,意思就是第一届leader写入的第一个消息offset是0,它一共写入了200个消息后出现故障,第二届leader第一个写入的消息offset'是200.它主要与leader换届后保证消息的完整性,避免消息丢失和离散。与其相关的还有leo和hw及水印备份机制。
HW&LEO
LEO(last end offset):日志末端位移,记录了该partition副本下一条消息的offeset位移值
HW(high watermark):该值叫高水印值,HW 一定不会大于 LEO 值,小于 HW 值的消息被认为是“已提交”或“已备份”的消息,并对消费者可见
leo和hw在partition的所有副本中都存在,且leader副本不光维护自己的leo还会存储isr中follower副本的leo
(1)当leader接收到producter发送的消息时,其leo会+1
(2)当isr中follower向leader发送拉取消息的请求时,会带上自己当前的leo值
(3)leader收到follow拉取消息的请求后会根据自己的HW值和所有isr集合副本的leo取最小值作为自己的hw,并更新leader本地存储的对应follow副本的leo值
(4)leader将新消息返回,并带上自己的hw
(5)follower写入消息并更新自己的leo,同时与leader的hw值进行比较,获取较小的值作为自己的hw,可以看出来follower的hw不会大于leader的hw
关于follower副本同步的过程中,还有两个关键的概念,HW(HighWatermark)和LEO(Log End Offset). 这两个参数跟ISR集合紧密关联。HW标记了一个特殊的offset,当消费者处理消息的时候,只能拉取到leader HW之前的消息,HW之后的消息对消费者来说是不可见的。也就是说,取partition对应ISR中最小的LEO作为HW,consumer只能消费到HW之前的位置。每个replica都有HW,leader和follower各自维护更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步、更新HW后,此时消息才能被consumer消费。这样就保证了如果leader副本损坏,该消息仍然可以从新选举的leader中获取。LEO 是所有副本都会有的一个offset标记,它表示追加到当前副本的下一个消息的offset。当生产者向leader副本追加消息的时候,leader副本的LEO标记就会递增;当follower副本成功从leader副本拉取消息并更新到本地的时候,follower副本的LEO就会增加。
线程中的活锁是指:线程一直处于运行状态,但是其任务却一直无法进展的一种活性故障。
而kafka中的消费者也有可能出现活锁的现象:当一个customer能够订阅了某topic并能够poll(拉取)消息时,该消费者就会占用某一分区,同时需要定时的向zk发送心跳,以证明自己的存活。当消费者占有某一分区后,且能够正常发送心跳,但是不poll消息了,不再进行消息处理了,这种情况下就出现了活锁。可以使用max.poll.interval.ms活跃检测机制, 在此基础上,如果你调用的poll的时间间隔大于最大间隔,则客户端将主动地离开组,以便其他消费者接管该分区。
kafka消息丢失?
(1)对producer而言可以通过设置ack消息确认参数来降低消息发送到topic分区时丢失的概率;
设置Producer.retries当发送消息失败时进行重试;
使用KafkaProducer.send(record, callback),自定义发送回调函数,如果失败了怎么处理等。
(2)在broker端,topic每个分区的副本数必须大于1,即replication.factor参数要大于1;要设置min.insync.replicas>1,即ISR集合的数量要大于1,因为在极端情况下集合中可能只有partition的leader,follower都被剔除出去了,当producer将消息发送到该partition时会直接报错,这样才能保证当leader挂了还能从ISR选举出新的leader;此时topic的partition的leader和follower副本均匀分布在每一个broker上,且为leader副本维护一个ISR集合,避免因单个broker宕机造成整个partition不可用,当leader副本所在broker出现故障时可以及时从isr集合中选取新的leader
(3)customer端:如果customer的offset为自动提交,如果消息处理时间过长,而offset被提交,这时customer出现故障消息还没有被处理完成,当customer重新启动时,就会跳过该消息造成消息丢失,可用将offset的提交方式改为手动提交,当消息处理完成后再提交offset
如何避免重复消费消息?
这一现象是当消息处理完成,提交offset之前customer出现故障,当重启后会再次消费该消息,可以在业务代码中进行处理,比如在发送的数据中获取一个唯一标识,如果是入库操作,则查一下数据库,如果不是入库可以在redis中进行存储,当进行操作时先进行一次查询用于判断是否是重复消费
topic副本主从同步:漫游Kafka设计篇之主从同步_磊磊崔的博客-CSDN博客_kafka 主从
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。