赞
踩
参考阅读地址:
阅读1
阅读2
阅读3
阅读4
阅读5
阅读6
(i % n)
个broker上((i + j) % n)
个Broker上根据上面的分配规则,若replica的数量大于broker的数量,必定会有两个相同的replica分配到同一个broker上,产生冗余。因此replica的数量应该小于或等于broker的数量。
(1)执行生产者代码后,会在JVM中启动一个producer,它会将数据发送到指定的topic。
(2)message不会直接就发送出去,会首先封装成ProducerRecord,构造ProducerRecord实例对象时,可以传入topic、key、value等。当需要指定消息发送到哪个分区,就需要传入key。value里是消息内容,一般是json格式。
(3)消息还需要序列化,因为涉及到数据的磁盘落地,然后又重新从磁盘读取数据,因此需要使用序列化(生产者)和反序列化(消费者)。
(4)序列化后的数据,还会经过分区器,这里可以指定自定义分区器,如果不指定就是默认分区器。分区器决定数据将存在topic哪个分区,那如何知道这个topic有几个分区?知道了又如何确定哪个分区就是leader分区,就算知道leader分区,又如何判断属于哪个broker呢?这一切都需要通过获取broker上的元数据来得到答案
。
在0.8版本,这些元数据是存在zookeeper中的,这样设计是有弊端的,zookeeper本来不是为高并发设计的,如果大量访问涌入zookeeper获取元数据,可能会出问题。在0.10.x之后,这些原数据通过存在某个broker的controller,将从zookeeper获取的元数据都分发到各个broker一份,因此从其中一个broker获取到的数据就是元数据,这样各个broker分摊了zookeeper的压力,将以前从zookeeper获取元数据,分到多个broker去提供了。
(5)接下来数据还不会直接发送出去,会先存入到一个默认是32M大小的内存缓冲区。
(6)缓冲区的数据,会先填入一个又一个的batch,默认一个batch是16K,这个也是可以设置batch.size修改的,需要根据实际情况来配置。batch大小达到指定大小就会发送出去,如果大小没达到16K,还有一个时间限定,可以通过linger.ms来设置,当达到指定的时间不管batch有没有达到指定大小都会发送出去。
producer会有一个专门的sender线程,将满足条件的batch一起发送过去,这样可以将多条消息批量的发送,比一条条的发送更加的节省资源,不用频繁的创建和销毁连接,在0.8版本,是没有batch这个东西的,来一条就发送一条(有改进的空间,仿造批量发送可以提高性能,来自某前辈的经验)。
(7)消息通过sender发送给leader分区,需要经过三层网络架构,然后先写入到broker的os cache里,然后再落地到本地磁盘,落地到磁盘是采用顺序写的方式,一般不会直接写入到磁盘,这样会影响性能(datanode写入数据是直接写入到磁盘的,如果也先写入到os cache,会提高整体性能)。
(1)既没有指定partition又没有指定key的值,第一次调用时会随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic的 partition数取余得到partition值, 即round-robin 算法,这也是默认的分区分配策略
(2)当key为空时,消息随机发送到各个分区(各个版本会有不同,有的是采用轮询的方式,有的是随机,有的是一定时间内只发送给固定partition,隔一段时间后随机换一个)
(3)当key为有值时,用key的hash值对partition个数取模,决定要把消息发送到哪个partition上
(4)指定分区发送
//1 连接broker props.put("bootstrap.servers","node01:9092,node02:9092"); //2 key和value序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //3 acks // -1 代表所有处于isr列表中的follower partition都会同步写入消息成功 // 0 代表消息只要发送出去就行,其他不管 // 1 代表发送消息到leader partition写入成功就可以 props.put("acks","-1"); //4 重试次数 props.put("retries",3);//大部分问题,设置这个就可以解决,生产环境可以设置多些 5-10次 // 5 隔多久重试一次 props.put("retry.backoff.ms",2000); //6 如果要提升kafka的吞吐量,可以指定压缩类型,如lz4 props.put("compression.type","none"); //7 缓冲区大小,默认是32M props.put("buffer.size",33554432); //8 一个批次batch的大小,默认是16k,需要根据一条消息的大小去调整 props.put("batch.size",323840);//设置为32k //9 如果一个batch没满,达到如下的时间也会发送出去 props.put("linger.ms",200); //10 一条消息最大的大小,默认是1M,生产环境中一般会修改变大,否则会报错 props.put("max.request.size",1048576); //11 一条消息发送出去后,多久还没收到响应,就认为是超时 props.put("request.timeout.ms",5000);
producer.send()
方法中指定具体的分区public void producer1() { Properties properties = new Properties(); properties.put("bootstrap.servers", "127.0.0.1:9092"); properties.put("acks", "all"); properties.put("retries", "3"); properties.put("batch.size", "16384"); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); //key和value的序列化 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); //发送消息 for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>("second", 0, "key ", "value = " + i), new ProducerCallBackV2()); } //关闭连接资源 producer.close(); } class ProducerCallBackV2 implements Callback { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { log.error("发送失败:msg={}", e); return; } System.out.println("offset : " + metadata.offset()); System.out.println("partition : " + metadata.partition()); System.out.println("topic : " + metadata.topic()); System.out.println("==============================="); } }
Partitioner
接口的partition()方法 返回分区编号public class MyPartion implements Partitioner { private final Random random = new Random(); public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // kafka 消息分发策略 List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic); //指定发送的分区值 int partitionNum = 0; if (key == null) { // 随机分区 partitionNum = random.nextInt(partitionInfoList.size()); } else { partitionNum = Math.abs((key.hashCode()) % partitionInfoList.size()); } System.err.println("topic=" + topic + ",key=" + key + ",value=" + value + ",partitionNum=" + partitionNum); return partitionNum; } public void close() { } public void configure(Map<String, ?> map) { } }
public void producer2() { Properties properties = new Properties(); properties.put("bootstrap.servers", "127.0.0.1:9092"); properties.put("acks", "all"); properties.put("retries", "3"); properties.put("batch.size", "16384"); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); //key和value的序列化 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //添加自定义分区器 properties.put("partitioner.class", "com.example.test.KafkaApiTest.MyPartion"); //构造生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); //发送消息 for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>("second", "congge-self ", "val = " + i), new ProducerCallBackV3()); } //关闭连接资源 producer.close(); }
acks这个配置可以指定三个值,分别是0,1和-1。我们分别来说三者代表什么:
ack=0:producer发送一次就不再发送了,不管是否发送成功,直接发送下一条数据,性能最快
存在问题:发送出去的消息还在半路,或者还没写入磁盘, Partition Leader所在Broker就直接挂了,客户端认为消息发送成功了,此时就会导致这条消息就丢失
ack=1(默认):意味着producer发送数据后,需要等待Leader接收到消息并且写入完成,不管他其他的Follower,就认为成功了,才会发送下一条数据,性能中等
存在问题:万一Partition Leader刚刚接收到消息,Follower还没来得及同步过去,结果Leader所在的broker宕机了,导致数据丢失
ack=-1 (即all):意味着发送的消息写入所有的ISR集合中的副本(注意不是全部副本)后,才会发送下一条数据,性能最慢,但可靠性最强
还有一点值得一提,kafka有一个配置参数,min.insync.replicas
,默认是1(也就是只有leader的情况,实际生产应该调高),该属性规定了最小的ISR数。这意味着当acks为-1(即all)的时候,这个参数规定了必须写入的ISR集中的副本数,如果没达到,那么producer会产生异常。
当ack=all时,会有以下2个问题:
问题一:如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复
数据发送到leader后 ,部分ISR的副本同步,leader此时挂掉。比如follower1和follower2都有可能变成新的leader, producer端会得到返回异常,producer端会重新发送数据,数据可能会重复
重复消费问题,任何队列都无法避免,所以消费端应用程序应该做好幂等性处理。
问题二:acks=all 就可以代表数据一定不会丢失了吗?
Partition只有一个副本,也就是一个Leader,任何Follower都没有.接收完消息后宕机,也会导致数据丢失,acks=all,必须跟ISR列表里至少有2个以上的副本配合使用
在设置request.required.acks=-1的同时,也要min.insync.replicas
这个参数设定 ISR中的最
小副本数是多少,默认值为1,改为 >=2,
如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常
幂等性是在0.11版本之后引入的, Producer在生产发送消息时,难免会重复发送消息。Producer进行retry时会产生重试机制,发生消息重复发送。而引入幂等性后,重复发送只会生成一条有效的消息
在每条消息中附带了PID(ProducerID)和SequenceNumber。服务端会记录每个producer对应的当前最大sequence,producerId + sequence。如果新的消息带上的sequence不大于当前的最大sequence就拒绝这条消息,如果消息落盘会同时更新最大sequence,这个时候重发的消息会被服务端拒掉从而避免消息重复。
而多分区的情况,我们需要保证原子性的写入多个分区,即写入到多个分区的消息要么全部成功,要么全部回滚。这时候就需要使用事务,在producer端设置 transcational.id
为一个指定字符串,保证原子性地写入到多个分区。
总结:
默认为关闭,需要修改配置文件: enable.idempotence=true
同时要求 ack=all
且 retries > 1
,这样幂等producer只能保证单分区上无重复消息。
消费者消费数据,需要反序列化数据,且采用了零拷贝的技术,由于消费者和broker都在同一个操作系统下,一般都是linux,因此数据反序列化后读取到了os cache,然后发送到网关就直接被消费者消费,如下图。如果数据反序列化到os cache(理解为数据的内核态),再拷贝一次到用户态(这个状态的数据可以跨系统平台)再消费,在同一平台下这会是一次多余的拷贝,kafka中省略了这个动作,这大大提高了消费者读取数据的速度。
消费者消费某个leader分区的数据,会从消费者offset的下一个位置开始消费,如图所示上一次消费到了offset 7的位置,下一次消费就从offset 8的位置开始消费。在zookeeper 0.8版本前,消费者的offset都保存在zookeeper中的,后面考虑到多个消费者要和zookeeper通信获取offset会增加zookeeper的压力,从1.0.x开始,这些消费者的offset改保存到了__consumer_offset这个主题里,而它分布在多个broker,将压力就分摊了。
注意消费者能消费到的数据offset,需要小于这个分区的HW(高水印值),比如下图这个分区的HW是9,则offset 10开始的数据就不可以消费
Range 策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序。并对消费者按照字母字典顺序进行排序。假设有10个分区,3个消费者,排完序的分区将会是0-9;消费者线程排完序是C0-0, C1-1, C2-2 。然后将partitions的个数除于消费者线程的总数来决定每个消费者线程将会消费几个分区。如果除不尽,则前面的消费者会多消费一个分区。所以最终结果是:C0消费 0-3分区,C1消费4-6分区,C2消费7-9分区。
如果同时消费两个主题的话,分区数相同,消费者相同,此时,C0消费者比其他消费者线程多消费2个分区,这就是Range Strategy 的一个弊端。最好是分区数是消费者的整数倍。
轮询分区策略是把所有的partition 和所有consumer 都列出来,然后按照hashcode进行排序。最后通过轮询算法分配partition给消费者。如果所有consumer实例的订阅都是相同的,那么partition会均匀分布。
使用轮询分区策略必须满足两个条件:
StickyAssignor策略,“sticky”这个单词可以翻译为“粘性的”,Kafka从0.11.x版本开始引入这种分配策略
它主要有两个目的:
- 分区的分配要尽可能的均匀;存在此目的的原因在于 Round Robin 和 Range 分配策略实际上都会导致某几个 consumer 承载过多的分区,从而导致消费压力不均衡;
- 分区的分配尽可能的与上次分配的保持相同。当两者发生冲突时,第一个目标优先于第二个目标。
//1 连接broker props.put("bootstrap.servers","node01:9092,node02:9092,node03:9092"); //2 key和value序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //3 acks // -1 代表所有处于isr列表中的follower partition都会同步写入消息成功 // 0 代表消息只要发送出去就行,其他不管 // 1 代表发送消息到leader partition写入成功就可以 props.put("acks","-1"); //4 重试次数 props.put("retries",3);//大部分问题,设置这个就可以解决,生产环境可以设置多些 5-10次 // 5 隔多久重试一次 props.put("retry.backoff.ms",2000); //6 如果要提升kafka的吞吐量,可以指定压缩类型 props.put("compression.type","none"); //7 缓冲区大小,默认是32M props.put("buffer.size",33554432); //8 一个批次batch的大小,默认是16k,需要根据一条消息的大小去调整 props.put("batch.size",323840);//设置为32k //9 如果一个batch没满,达到如下的时间也会发送出去 props.put("linger.ms",200); //10 一条消息最大的大小,默认是1M,生产环境中一般会修改变大,否则会报错 props.put("max.request.size",1048576); //11 一条消息发送出去后,多久还没收到响应,就认为是超时 props.put("request.timeout.ms",5000); //12 使用自定义分区器 props.put("partitioner.class","com.example.partitioner.MyPartitioner");
消费者有两种模式, 订阅模式和分配模式,:
模式 | 不同之处 | 相同之处 |
---|---|---|
subscribe() | 使用 Kafka Group 管理,自动进行 rebalance 操作 | 可以在 Kafka 保存 offset |
assign() | 用户自己进行相关的处理 | 也可以进行 offset commit,但是尽量保证 group.id 唯一性,如果使用一个与上面模式一样的 group,offset commit 请求将会被拒绝 |
如果使用assign,则表明该consumer是独立consumer(standalone consumer),它不属于任何消费者组。独立consumer可以订阅任何分区,彼此之间也没有关系,即两个独立consumer可以订阅并消费相同的分区
void consumer1() throws InterruptedException { final String topic = "first"; Properties prop = new Properties(); prop.put("bootstrap.servers", "127.0.0.1:9092"); prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.put("group.id", "001"); prop.put("consumer-id", "test-consumer-001"); Consumer<String, String> consumer = new KafkaConsumer<>(prop); // 1.方式1 consumer.assign(Arrays.asList(new TopicPartition(topic, 2))); //只消费分区号为2的分区 // 2.方式2 consumer.subscribe(Arrays.asList(topic)); //消费topic 消费全部分区 // consumer.seekToBeginning(Arrays.asList(part)); //重头开始消费 // consumer.seek(part, 5); //指定从topic的分区的某个offset开始消费 // 订阅全部分区的功能 // ArrayList<TopicPartition> partitions = new ArrayList<>(); // 用来查询指定主题的元数据信息 // List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); // if (partitionInfos != null) { // for (PartitionInfo tpInfo : partitionInfos) { // // 获取主题以及分区 // partitions.add(new TopicPartition(tpInfo.topic(), tpInfo.partition())); // } // } // consumer.assign(partitions); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));//消费一定时间的数据 Thread.sleep(500); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("key:%s , value:%s , offset:%s", record.key(), record.value(), record.offset())); } } }
- 原因1:强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)
- 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。
- 消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会reblance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
- 原因4:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。
- 原因5:当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。
- 原因6:并发很大,可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能导致reblance重平衡,导致一部分offset自动提交失败,然后重平衡后重复消费. (这里就涉及到问题是消费者在创建时会有一个属性
max.poll.interval.ms
(默认间隔时间为300s),该属性意思为kafka消费者在每一轮poll()调用之间的最大延迟,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。)
(1)谁来执行Rebalance 以及管理 consumer-group ?
kafka 提供了一个角色:coordinator 来执行对于consumer-group的管理 ,当consumer-group 的第一个 consumer 启动的时候,它会去跟 kafka server 确定谁是他们组的 coordinator 。
之后该group 内所有成员都会和该coordinator 进行协调通信。
整个Rebalance 的过程分为两个步骤,join 和 sync 。
2.JoinGroup 的过程
表示加入到consumer-group 中,在这一步中,所有成员都会向 coordinator 发送joinGroup的请求。一旦所有成员都发送了joinGroup请求,那么 coordinator会选择一个consumer 担任leader
角色,并把consumer-group成员信息、订阅消息、leader和 rebanlance 的版本信息发送消费者,并返回分区策略给leader。
3.Synchronizing GroupGroup 阶段
leader 收到 coordinator 的分区策略后确定分配方案,将消费者对应的partition分配方案同步给consumer-group中的所有consumer。
(2)步骤:
- 当消费者收到协调者的再均衡开始通知时,需要立即提交偏移量;
- 消费者在收到提交偏移量成功的响应后,再发送JoinGroup请求,重新申请加入组,请求中会含有订阅的主题信息;
- 当协调者收到第一个JoinGroup请求时,会把发出请求的消费者指定为Leader消费者,同时等待
rebalance.timeout.ms
,在收集其他消费者的JoinGroup请求中的订阅信息后,将订阅信息放在JoinGroup响应中发送给Leader消费者,并告知他成为了Leader,同时也会发送成功入组的JoinGroup响应给其他消费者;- Leader消费者收到JoinGroup响应后,根据消费者的订阅信息制定分配方案,把方案放在SyncGroup请求中,发送给协调者。其他follower消费者在收到响应后,则直接发送SyncGroup请求,等待Leader的分配方案;
- 协调者收到分配方案后,再通过SyncGroup响应把分配方案发给所有消费组。
- 当所有消费者收到分配方案后,就意味着再均衡的结束,可以正常开始消费工作了。
那协调者和消费者之间是如何交互的?协调者如何掌握消费者的状态,又如何通知再均衡?
这里使用了心跳机制。在消费者端有一个专门的心跳线程负责以heartbeat.interval.ms
的间隔频率发送心跳给协调者,告诉协调者自己还活着。同时协调者会返回一个响应。而当需要开始再均衡时,协调者则会在响应中加入REBALANCE_IN_PROGRESS,当消费者收到响应时,便能知道再均衡要开始了。
影响消费者数量减少的参数有哪些?
session.timeout.ms
:Broker端参数,消费者的存活时间,默认10秒,如果在这段时间内,协调者没收到任何心跳,则认为该消费者已崩溃离组;
heartbeat.interval.ms
:消费者端参数,发送心跳的频率,默认3秒;
max.poll.interval.ms
:消费者端参数,两次调用poll的最大时间间隔,默认5分钟,如果5分钟内无法消费完,则会主动离组。
关系公式:
session.timeout.ms ≥ 3 * heartbeat.interval.ms
由于再平衡的开始依赖于心跳的响应,所以heartbeat.interval.ms除了决定心跳的频率,也决定了再均衡的通知频率。
1、可能重复消费: Consumer被踢出消费组,可能还没有提交offset,Rebalance时会Partition重新分配其它Consumer,会造成重复消费,虽有幂等操作但耗费消费资源,亦增加集群压力
2、集群不稳定:Rebalance扩散到整个ConsumerGroup的所有消费者,因为一个消费者的退出,导致整个Group进行了Rebalance,并在一个比较慢的时间内达到稳定状态,影响面较大
3、影响消费速度:Rebalance 期间整个group暂停消费,频繁的Rebalance反而降低了消息的消费速度,大部分时间都在重复消费和Rebalance
kafka中producer向topic推送消息,而consumer是主动去拉取消息。
- push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
- pull模式则可以根据consumer的消费能力以适当的速率消费消息。kafka consumer 采用 pull(拉)模式从 broker 中读取数据, pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数
request.timeout.ms
,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。
(1)节点必须可以维护与ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接
(2)如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久,会将follower从ISR列表剔除
(1)计算分区号:
partition = Math.abs(groupID.hashCode() % offsetsTopicPartitionCount)
根据groupID的哈希值,取余offsetsTopicPartitionCount(内部主题__consumer_offsets的分区数,默认50)的绝对值,其意思就是把消费组哈希散列到内部主题__consumer_offsets的一个分区上。确定协调者为什么要和内部主题扯上关系。这就跟协调者的作用有关了。协调者不仅是负责组成员管理和再均衡,在协调者中还需要负责处理消费者的偏移量提交,而偏移量提交则正是提交到__consumer_offsets*
的一个分区上。所以这里需要取余offsetsTopicPartitionCount
来确定偏移量提交的分区。
(2)找出分区Leader副本所在的Broker
确定了分区就简单了,分区Leader副本所在的Broker上的协调者,就是我们要找的。
这个算法通常用于帮助定位问题。当一个消费组出现问题时,我们可以先确定协调者的Broker,然后查看Broker端的日志来定位问题。
元数据主要解决以下四种问题:
服务器端的元数据通常是指集群Broker的元数据,包括集群有哪些Broker,有哪些topic,每个主题都有哪些分区,而每个分区的Leader副本在哪台Broker上等信息。这些信息保存在ZooKeeper和Controller中。Kafka以ZooKeeper中保存的元数据为权威数据,Controller会从ZooKeeper中获取最新的元数据并缓存在自己的内存中。
任何一个 Broker 在启动之后会存储这些Metadata信息的。而且,Kafka 提供的客户端在获取到 Metadata 信息之后也会将它存储到内存中的。在以下几种情况会更新已经缓存下来的Metadata 信息:
meta‐data.max.age.ms
参数配置的时间过期之后。以上两种情况 Kafka提供的客户端会自动再发送一次 Metadata 请求,这样就可以获取到更新的信息。
Kafka 的 Broker 收到客户端的请求处理完之后,会构造一个 TopicMetadataResponse,并发送给客户端:
上面的输出就可以看到各个分区的leader所在机器、isr以及所有replicas等信息。
kafka使用zookeeper进行元数据管理,保存broker注册信息,包括主题(Topic)、分区(Partition)信息等,选择分区leader,在低版本kafka消费者的offset信息也会保存在zookeeper中。
/Consumers
节点
0.9版本之前用于保存offset信息,0.9版本之后offset存储在kafka的每个broker中。__consumer_offsets 是 kafka 自行创建的,和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。
可以想象成一个 KV 格式的消息,key 就是一个三元组:group.id+topic+分区号,而 value 就是 offset 的值。
考虑到一个 kafka 生成环境中可能有很多consumer 和 consumer group,如果这些 consumer 同时提交位移,则必将加重
__consumer_offsets
的写入负载,因此 kafka 默认为该 topic 创建了50个分区,并且对每个 group.id做哈希求模运算Math.abs(groupID.hashCode()) % numPartitions
,从而将负载分散到不同的 __consumer_offsets 分区上。
一般情况下,当集群中第一次有消费者消费消息时会自动创建__consumer_offsets,它的副本因子受offsets.topic.replication.factor
参数的约束,默认值为1(注意:该参数的使用限制在0.11.0.0版本发生变化),分区数可以通过offsets.topic.num.partitions
参数设置,默认值为50。
/Controller
和/Controller_epoch
节点
/controller:保存控制器(broker的leader, 这里的leader要和副本的leader区分开,这里的leader是kafka集群中所有broker的leader)对应的brokerId信息等
/controller_epoch:这里用来解决脑裂问题,存放的是一个整形值(纪元编号,也称为隔离令牌),集群中每选举一次控制器,就会通过Zookeeper创建一个数值更大的epoch number,如果有broker收到比这个epoch数值小的数据,就会忽略消息。
其他的节点
/config/topics
:存储动态修改主题级别的配置信息
/config/clients
:存储动态修改客户端级别的配置信息
/config/changes
:动态修改配置时存储相应的信息
/admin/delete_topics
:在对主题进行删除操作时保存待删除主题的信息
/isr_change_notification
:保存Kafka副本ISR列表发生变化时通知的相应路径
先明确一个概念leader选举,因为kafka中涉及多处选举机制,容易搞混,kafka由三个方面会涉及到选举:
在kafka集群中由很多的broker(也叫做控制器),但是他们之间需要选举出一个leader,其他的都是follower。broker的leader有很重要的作用,诸如:创建、删除主题、增加分区并分配leader分区;集群broker管理,包括新增、关闭和故障处理;分区重分配(auto.leader.rebalance.enable=true,后面会介绍);分区leader选举。
过程如下:
现阶段的kakfa集群中,只需要broker的leader在zookeeper去注册相应的监听器,其他的broker很少去监听zookeeper的数据变化,但是每个broker还是需要对/controller进行监听;当/controller节点发生数据变化的时候,每个broker都会更新自身内存中保存的activeControllerId。
当/controller节点被删除时,集群中的broker会进行选举,如果broker在节点被删除前是控制器的话,在选举前还需要有一个下线的操作(关闭相应的资源,比如关闭状态机、注销相应的监听器等)。如果有特殊需要,可以手动删除/controller节点来触发新一轮的选举。当然关闭控制器所对应的broker以及手动向/controller节点写入新的brokerid的所对应的数据同样可以触发新一轮的选举。
副本就是提高数据的可靠性,实现故障自动转移,系统默认副本数量是1,生产环境一般配置数量是2个,保证数据可靠性;否则副本太多会增加磁盘的存储空间,增加网络上的数据传输,降低效率。
kafka的副本分为leader和follower,其中leader数据读写,follower只负责数据同步。关于副本有下面三个概念:
replica.lag.time.max.ms
参数设置的,默认是30s。leader分区副本是用来处理客户端的请求的,也包括follower复制的获取消息请求。
Kafka的高可用性也是通过副本及复制机制实现的(当其中一台包含leader副本的broker奔溃,就会从分布在其他broker的follower分区副本中选举leader副本)。
如果leader发送故障,就会从ISR中选举出新的leader。
ISR副本集合保存的副本的条件是什么?
上面一直说ISR副本集合中的副本就是和leader副本是同步的,那这个同步的标准又是什么呢?跟一个参数有关:replica.lag.time.max.ms
说到follower副本的任务,就是从leader副本拉取消息,如果持续拉取速度慢于leader副本写入速度,慢于时间超过replica.lag.time.max.ms后,它就变成“非同步”副本,就会被踢出ISR副本集合中。但后面如何follower副本的速度慢慢提上来,那就又可能会重新加入ISR副本集合中了。
分区leader的选举由kafka的broker leader(后面文章会以controller代替broker leader的描述)负责具体实施。
当创建分区(创建主题或增加分区都有创建分区的动作)或分区上线(比如分区中原先的leader副本下线,此时分区需要选举一个新的leader上线来对外提供服务)的时候都需要leader选举。选举的时候将会从AR集合中副本的顺序查找第一个存活的副本,并且要保证这个副本在ISR队列中。
另外当分区发生重分配的情况也是需要执行leader选举,此时从重分配的AR列表中找到第一个存活的副本,且这个副本在目前的ISR队列中。
再有就是当某一个borker节点关闭的时候,位于这个节点上的leader副本都会下线,所以与此对应的分区需要执行leader的选举。此时将会从AR列表中找到第一个存活的副本,且这个副本在目前的ISR列表中,另外还要确保这个副本不处于正在被关闭的节点上。
kafka还提供了一个参数配置:unclean.leader.election.enable
,默认是true,参数规定是否允许非ISR的副本成为leader,如果设置为true,当ISR队列是空,ISR为空说明leader和follower都挂掉了,此时将选择那些不在ISR队列中的副本选择为新的leader,这副本同步消息的速度可能远远落后于leader,所以可能会造成丢失数据的风险。生产环境中建议关闭这个参数,设置为false。
出现某些broker宕机,会导致leader都集中在几台broker中,造成读写压力过大,并且就算恢复了宕机的broker,原来的leader也会变成follower并无法分担压力,造成集群负载不均衡。
解决上述问题kafka出现了自动平衡的机制。kafka提供了下面几个参数进行控制:
auto.leader.rebalance.enable
:自动leader parition平衡,默认是trueleader.imbalance.per.broker.percentage
:每个broker允许的不平衡的leader的比率,默认是10%,如果超过这个值,控制器将会触发leader的平衡leader.imbalance.check.interval.seconds
:检查leader负载是否平衡的时间间隔,默认是300秒但是在生产环境中是不开启这个自动平衡,因为触发leader partition的自动平衡会损耗性能,或者可以将触发自动平和的参数leader.imbalance.per.broker.percentage
的值调大点。
更新分区间的副本配比,首先创建一个assign-replicas.json的文件,内容如下:
{
"version": 1,
"partitions": [
{"topic": "test-assign", "partition": 0, "replicas": [1, 2]},
{"topic": "test-assign", "partition": 1, "replicas": [1, 2]},
{"topic": "test-assign", "partition": 2, "replicas": [1, 2]}
]
}
// 执行手动分配
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file assign-replicas.json --execute
// 验证是否正确
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file assign-replicas.json --verify
// 查看topic分区和副本分布
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-assign
HW: 高水位,HighWatermark的缩写,去一个partition对应的ISR最小的LEO作为作为HW,consumer最多只能消费到HW所在的位置。每个leader和follower都负责更新自己的HW状态。对于leader新写入的消息,consumer不能立即消费,leader会等待该消息被所有的ISR中的副本同步更新HW之后,此时该消息才能消费,这样就保证了如何leader所在的broker失效,该消息任然可以从新选举的leader中获取。
LEO:(Log End Offset)每个分区都会有自己的标记,标记当前分区的最后一条消息(针对Leader就是Leader分区的最后一条,某个Follower就是当前该Follower的最后一条消息)
(1)producer向leader写入数据 (数据的读写操作都在leader上)
(2)分区leader接收到数据写入本地Log
(3)follower从leader pull消息数据,写入自己本地Log
(4)follower向leader发送ACK确认
(5)leader收到ACK之后,会更新自己的HW值,取follower中的最小的LEO值来更新
(6)向producer回传ACK
那kafka是如何移动自己的hw和leo呢?
leader 是如何移动自己的leo?
leader收到producer产生的消息的时候,leo会先位移标志位
其他副本是如何移动自己的leo?
副本异步拉去leader中的数据,然后同步自己的leo,正常情况下,这时候leader和副本之间的leo都是一样的
leader的hw是如何移动的?
副本在拉取leader的leo的时候,会告诉leader自己现在的leo在哪,因此上一步的时候,告诉leader自己的leo是0,所以leader的hw不移动,如果一直没有producer产生消息,那么在下一次同步的时候,副本会告诉leader自己的leo是1,然后leader会将所有副本的信息进行一次保存,并获取最小的leo,作为自己的hw;也就是说副本已经将消息同步过去了,最少同步到某个地方了,因此hw是所有副本中的最小leo
副本的hw是怎么移动的呢?
副本需要去找leader去同步的获取leader的hw,也就是它需要直到当前所有副本中的最小同步位置在哪,然后在定自己的同步位置应该在什么地方,如果获取到leader的hw的值大于自己的leo的话,那自己就是那个最落后的,因此自己的hw就是自己的leo,否则的话,副本的hw和leader的hw保持一致
副本如果宕机重新启动的时候,在0.1.1.0版本之前呢,启动之后,找不到宕机之前的leo,直接将hw作为leo的最后值,这个认为也是一个比较靠谱的策略,但是为什么不计leo呢?
不知道,需要之后进行源码级别的讨论,那么这就会导致,如果副本宕机重启,恢复完之后,开始去拉leader的数据的时候,自己被选为leader了,但是自己的leo却不是最新的,由于leader天生具有权威性,因此会认为自己都是对的,之后原来的leader恢复的时候,要去同步leader的数据,然后将自己的数据更新,发现丢了一些消息,就是副本在恢复的时候副本当时的高水位和原始副本的leo之间的差值。
为了改进这个措施,之后版本对这个操作进行了更新,在恢复的时候,首先去leader上获取先leo值,并和自己的进行对比,如果自己落后的话,可能需要更新否则的话不动,因此需要进行记录自己的leo,如果能对这个管理leo的数据进行管理的话,需要有一个确切的版本号来控制,小的版本号会被废弃,这就是epoch的设计原则,能够实时的直到自己到底是领先还是落后的标示,切换leader;具体是有一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。只要每次有变化,就+1,然后其他重启的数据都和这个保持了一致,所以数据会同步;副本是否更新leo的值不再依赖hw,而是实实在在的记录值,类似于乐观锁,永远是最新的leo值,所以不会丢数据
当一个副本挂了,如何进行恢复呢?
从日志中查询数据,然后逐渐恢复,如果没有标识可能会落后于原始的进度,导致消息的丢失!
follower副本只是当leader副本挂了后才重新选取leader,follower并不向外提供服务
假如还没同步完成,leader副本就宕机了,怎么办?
kafka中提供了关于follower从leader同步数据的ACK方式(-1, 0, 1)。相当于把分布式CAP理论中的C和A做成配置项ack。可以根据业务做选择使用哪种。
注意: 这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
max.in.flight.requests.per.connection
设置为小于等于5。原因是,Kafka服务端会缓存最近发过来的元数据,等缓存满了5个后就会对这些元数据进行排序,这样就可以保证数据有序了。每个partition分为多个segment,每个segment包括.index
文件、.log
文件和.timeindex
等文件。这些文件位于文件夹下,该文件命名规则为:topic名称+分区号。注意这里面还有一个leader-epoch-checkpoint
文件,保存的是Leader Epoch的值(解决副本数据一致性需要)。
├── test-topic-0
├── 00000000000000000000.index
├── 00000000000000000000.log
├── 00000000000000000000.timeindex
├── 00000000000000000113.index
├── 00000000000000000113.log
├── 00000000000000000113.timeindex
└── leader-epoch-checkpoint
分段文件名规则:分区的第一个segment是从0开始的,后续每个segment文件名为上一个segment文件最后一条消息的offset,ofsset的数值最大为64位(long类型),20位数字字符长度,没有数字用0填充。
log文件默认写满1G后,会进行log rolling形成一个新的分段(segment)来记录消息,这里面的分段大小取决于:
log.segment.bytes
参数决定。
index和timeindex文件在刚使用时会分配10M的大小,当进行log rolling后,它会修剪为实际的大小,所以看到前几个索引文件的大小,只有几百K。
查看log文件内容的方法:
./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log
当log文件写入4k(这里可以通过log.index.interval.bytes
设置)数据,就会写入一条索引信息到index文件中,这样的index索引文件就是一个稀疏索引,它并不会每条日志都建立索引信息。
log日志文件是顺序写入,大体上由message+实际offset+position组成,而索引文件的数据结构则是由相对offset(4byte)+position(4byte)组成。
当kafka查询一条offset对应实际消息时,可以通过index进行二分查找,获取最近的低位offset,然后从低位offset对应的position开始,从实际的log文件中开始往后查找对应的消息
时间戳索引文件,它的作用是可以查询某一个时间段内的消息,它的数据结构是:时间戳(8byte)+ 相对offset(4byte),如果要使用这个索引文件,先要通过时间范围找到对应的offset,然后再去找对应的index文件找到position信息,最后在遍历log文件,这个过程也是需要用到index索引文件的
在 Kafka 中大量使用了 PageCache, 这也是 Kafka 能实现高吞吐的重要因素之一。
首先看一下读操作,当一个进程要去读取磁盘上的文件内容时,操作系统会先查看要读取的数据页是否缓冲在PageCache 中,如果存在则直接返回要读取的数据,这就减少了对于磁盘 I/O的 操作;但是如果没有查到,操作系统会向磁盘发起读取请求并将读取的数据页存入 PageCache 中,之后再将数据返回给进程,就和使用redis缓冲是一个道理。
接着写操作和读操作是一样的,如果一个进程需要将数据写入磁盘,操作系统会检查数据页是否在PageCache 中已经存在,如果不存在就在 PageCache中添加相应的数据页,接着将数据写入对应的数据页。另外被修改过后的数据页也就变成了脏页,操作系统会在适当时间将脏页中的数据写入磁盘,以保持数据的一致性。
零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数,通常使用在IO读写过程中。常规应用程序IO过程如下图,会经过四次拷贝:
从上面的流程可以知道内核态和用户态之间的拷贝相当于执行两次无用的操作,之间切换也会花费很多资源;当数据从磁盘经过DMA 拷贝到内核缓存(页缓存)后,为了减少CPU拷贝的性能损耗,操作系统会将该内核缓存与用户层进行共享,减少一次CPU copy过程,同时用户层的读写也会直接访问该共享存储,本身由用户层到Socket缓存的数据拷贝过程也变成了从 内核到内核的CPU拷贝过程,更加的快速,这就是零拷贝,IO流程如下图。
java的JDK NIO中方法transferTo()
方法就能够实现零拷贝操作,这个实现依赖于操作系统底层的sendFile()
实现的:
public void transferTo(long position, long count, WritableByteChannel target);
底层调用的是:
#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
但是需要注意零拷贝和系统底层有很大的关系,所以是否可以进行零拷贝的系统调用的看具体的操作系统是否实现。
看一下Java nio的零拷贝例子:
import java.io.*; import java.nio.channels.FileChannel; public class ZeroCopy { public static void main(String[] args) { File source = new File("G:/source.zip"); File target = new File("G:/target.zip"); NioZeroCopy(source, target); } public static void NioZeroCopy(File source, File target) { try ( FileChannel sourceChannel = new FileInputStream(source).getChannel(); FileChannel targetChannel = new FileOutputStream(target).getChannel(); ) { for(long count = sourceChannel.size(); count > 0;) { long transfer = sourceChannel.transferTo(sourceChannel.position(), count, targetChannel); sourceChannel.position(sourceChannel.position() + transfer); count -= transfer; } } catch (IOException e) { System.out.println("异常:" + e.getMessage()); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。