赞
踩
这里分析的是Apache官方客户端代码,源码位于:https://github.com/apache/kafka/tree/trunk/clients
从git clone 后 需要手动切换到对应分支。
本文所有案例工程在文末
消息发送的整体流程如下:生产者主要由两个线程协调运行。这两条线程分别为main线程和sender线程(发送线程)
我们可以看源码跟踪一下:从Producer 入口进入即可。
Producer<String, String> producer = new KafkaProducer<String, String>(pros);
进入构造方法,我们可以发现在初始化的时候,创建了一个Sender对象,并且启动了一个IO线程。(我这边是在第188行,如果找不到的话直接搜索即可)
this.sender = this.newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = "kafka-producer-network-thread | " + this.clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
接下来是拦截器的执行,在 producer.send 方法中:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return this.doSend(interceptedRecord, callback);
}
拦截器的作用是实现消息的定制化(类似于:Spring Interceptor、Mybatis 插件、Quartz的监听器等)
那这个拦截器是在哪里定义的呢?我们可以自己实现一下:
// 添加拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add("com.demo.interceptor.ChargingInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
可以在生产者的属性中指定多个拦截器,形成拦截器链。
举个栗子,假设发消息的时候需要扣钱,发一条消息一分钱,就可以使用拦截器实现。
public class ChargingInterceptor implements ProducerInterceptor<String, String> { // 发送消息的时候触发 @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { System.out.println("我要开始扣钱啦~"); return record; } // 收到服务端的ACK的时候触发 @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println("消息被服务端接收啦"); } @Override public void close() { System.out.println("生产者关闭了"); } // 用键值对配置的时候触发 @Override public void configure(Map<String, ?> configs) { System.out.println("configure..."); } }
我们只需要将该拦截器配置进参数中,当生产者发送消息的时候就会触发对应的方法。
调用send方法后,第二步是利用指定的工具对key和value进行序列化:(我这边是363行)
byte[] serializedKey;
try {
serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException var21) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21);
}
byte[] serializedValue;
try {
serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException var20) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20);
}
Serializer.java 针对不同的数据类型自带了相应的序列化工具:
除了自带的序列化工具之外,可以使用如JSON,Protobuf等,或者使用自定义类型的序列化器来实现,实现serialzer接口接口。
我们可以自定义一个序列化接口,然后发送消息的时候添加相关参数即可。
props.put("value.serializer", "com.demo.serializer.ProtobufSerializer");
看过序列化之后,就来到了路由指定。(377)
int partition = this.partition(record, serializedKey, serializedValue, cluster);
一条消息会发送到那个partition呢?他返回的是一个分区的编号,从0开始。
首先我们将分区分为四种情况:
partition数量可以自行去配置文件中修改。
第一种情况:
指定partition的情况下,直接将指定的值直接作为partition值。
for (int i = 0; i < 10; i++) {
ProducerRecord<String, Integer> producerRecord = new ProducerRecord<String, Integer>(topic, i, null, i);
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("Sent to partition: " + metadata.partition() + ", offset: " + metadata.offset());
}
第二种情况:
自定义分区器,将使用自定义的分区器算法选择分区,比如我们自定一个分区器,然后指定即可。
public class SimplePartitioner implements Partitioner { public SimplePartitioner() { } @Override public void configure(Map<String, ?> configs) { } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String k = (String) key; System.out.println(k); if (Integer.parseInt(k) % 2 == 0) { return 0; } else { return 1; } } @Override public void close() { } }
指定自定义分区器:
props.put("partitioner.class", "com.demo.partition.SimplePartitioner");
第三种情况:
没有指定partition值但是有key的情况下,使用默认分区DefaultPartitioner,将key的hash值与topic的partition数进行取余得到partition值;
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
第四种情况:
既没有partition值但有没有key值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的轮询算法。
public int partition(String topic, Cluster cluster) {
Integer part = (Integer)this.indexCache.get(topic);
return part == null ? this.nextPartition(topic, cluster, -1) : part;
}
选择分区以后并没有直接发送消息,而是把消息放入了消息累加器(390);
RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
RecordAccumulator 本质上是一个ConcurentMap;
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
一个partition一个batch。Batch满了之后会唤醒Sender线程,发送消息。(408)
if (result.batchIsFull || result.newBatchCreated) {
this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
我们可以在拦截器里面自定义消息处理逻辑,也可以选择自己喜欢的序列化工具,还可以自由选择分区。
生产者的消息是不是发出去就完事了?如果说网络出问题了,或者说Kafka服务端接受的时候出了问题,这个消息发送失败了,生产者是不知道的。
所以,Kafka服务端应该要有一种响应客户端的方式,只有在服务端确认以后,生产者才会发下一轮的消息,否则重新发送数据。
那么服务端什么时候才算接受成功呢?因为消息是存储在不同的partition里面的,所有是写入到partition之后才会相应生产者。
当然,单个partition(leader)写入成功,还是不够可靠,如果有多个副本,follower也要写入成功才可以。
服务端发送ACK给生产者总体上有两种思路:
第一种是需要有半数以上的follower节点同步完成,这样的话客户端等待的时间就短一些,延迟低。(所以我们通常部署节点的数量都是奇数,如果是偶数,两边一样就很尴尬。)
第二种是需要所有的follower节点全部完成同步,才发送ACK给客户端,延迟来说相对高一些,但是节点挂掉的可能性比较小,因为所有的节点数据都是完整的。
Kafka会选择那种方案呢?
Kafka选择了第二种。部署同样机器数量的情况下,第二种方案可靠性更高,同时网络延迟对Kafka的影响不是很大。
如果直接采用第二种思路,不考虑网络延迟,有没有别的问题呢?
假设leader收到数据,所有follower都开始同步数据,但是有一个follower出了问题,没有办法从leader同步数据。按照这个规则,leader就要一直等待,无法发送ACK…
从概率的角度来说,这种问题肯定是会出现的,就是某个follower出问题了,怎么解决这种问题呢?
所以我们的规则就不能那么粗暴了,不能因为一个follower的问题导致无法发送ACK;我们把规则改一下,不是所有的follower都有权力让leader等待,而是只有那些正常工作的follower同步数据的时候leader才会等待。
我们应该把那些正常和leader保持同步的replica维护起来,放到一个动态list里面,这个就叫做in-sync replica set(ISR)。现在只要ISR里面的follower同步完数据之后,leader就给客户端发送ACK。
如果一个follower长时间不同步数据,就将其从ISR中移除。那么到底多久没有同步数据才会被剔除呢?这个是由参数replica.lag.time.max.ms
决定,默认是30秒。当然了如果follower活过来了,则还能进入ISR中。
如果leader挂了,ISR会重新选择leader,这部分下文再说。
Kafka为客户端提供了三种可靠性机制,用户根据对可靠性和延迟的要求自行权衡,选择相应的配置。
参数配置如下:
pros.put("acks", "1");
举例:topic的partition0有三个副本。
ack = 0
producer不等待broker的ACK,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据。
ack = 1 (默认)
producer 等到 broker 的ack,partition 的 leader 落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据。
ack = -1 (all)
producer 等待broker的ack,partition 的leader 和follower 全部落盘成功后才返回ack。
这种方案是完美的吗?会出现问题吗?
如果在follower同步完成后,broker发送ack之前,leader发生故障,没有给生产者发送ACK,那么会遭成数据重复。
在这种情况下,把reties 设置成0(不重发),才不会重复。
三种机制,性能依次递减(producer吞吐量降低),数据健壮性则依次递增。我们可以根据业务场景选择合适的参数。
路径设置:config/server.propertise中的logs.dir配置
默认/tmp/kafka/logs
为了实现横向扩展,把不同的数据存放在不同的Broker上,同时降低单台服务器的访问压力,我们把一个topic中的数据分割成多个partition。
一个partition中消息是有序的,顺序写入,但是全局不一定有序。
在服务器上,每个partition都有一个物理目录,topic名字后面的数据标号则代表分区。
为了提高分区的可靠性,Kafka有设计了副本机制。
创建Topic的时候,通过指定 replication-factor 确定副本的数。
注意:副本数必须小于等于节点数,而不能大于Broker的数量,否则会保存。
错误示范:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 4 --partitions 1 --topic overrep
这样就可以保证,绝对不会有一个分区的两个副本分布在同一个节点上,不然副本机制也失去了备份的意义了。
这些所有的副本分为两种角色,leader对外提供读写服务,follower唯一的任务就是从leader异步拉取数据。
为什么只有leader提供读写服务呢?而不是像mysql一样读写分离?
答:这个是设计思想的不同。读写都发生在leader节点上,就不存在读写分离带来的一致性问题,这个叫做单调读一致性。
问题来了,如果分区有多个副本,哪一个节点上的副本是leader呢?
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic a3part3rep
怎么查看所有的副本中谁是leader?(需要集群环境)
./kafka-topics.sh --topic a3part3rep --describe --zookeeper localhost:2181
解释:
这个topic有三个分区三个副本。
第一个分区的3个副本编号 1 , 2 ,3 (注意副本的编号是从1开始的),同步中的也是 1 ,2 ,3 。第一个副本是leader。
假设 topic 有 4个分区2个副本呢?
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic a4part2rep
查看:
./kafka-topics.sh --topic a4part2rep --describe --zookeeper localhost:2181
解释:
这个分区有4个分区两个2副本;第一个分区的2个副本编号2,3 ;同步中的也是2,3。第三个副本是leader。
为什么第一个分区两个副本选择在2,3broker;第二个分区两个副本选择1,3;第三个分区的副本选择1,2broker呢?
副本在Broker的分布有什么规则吗?
a4part2rep这个topic,4个分区2个副本,一共8个副本,怎么分布到3台机器?
结果刚刚我们已经看到过了,大家也可以自行前往tmp/kafka-logs中查看对应的数据信息。
实际上,这种分配策略是由AdminUtils.scala的assignReplicasToBrokers函数决定的。
规则如下:
first of all
副本因子不能大于Broker的个数
第一个分区(编号为0的分区)的第一个副本位置是随机从brokerList选择的;
其他分区的第一个副本放置位置相对于第0个分区依次往后移。
也就是说:如果我们有5个Broker,5个分区,假设第一个分区的第一个副本放在第四个Broker上,那么第2个分区的第一个副本将会放在第5个Broker上;第三个分区的第一个副本将会放在第一个Broker上;第四个分区的第一个副本将会放在第二个Broker上,以此类推;
每个分区剩余副本数相对于第一个副本位置其实是由nextReplicaShift决定的,这个数也是随机生成的。
为什么这么设计呢?
在每个分区的第一个副本错开之后,一般第一个分区的第一个副本(按Broker编号排序)都是leader。leader是错开的,以至于broker挂了之后影响太大。
bin目录下的kafka-reassign-partition.sh可以根据Broker数量变化情况重新分配分区。
一个分区是不是只有一个文件呢?也就是说,消息日志文件会不会无限变大?
为了防止log不断追加导致文件过大,导致检索消息效率变低,一个partition又被划分成多个segment来组织数据。
在磁盘上,每个segment由一个log文件和2个index文件组成。
这三个文件是成套出现的。(其他文件先忽略)
leader-epoch-checkpoint 中保存了每一任leader开始写入消息时的offset。
log日志文件
在一个新的segment文件里面,日志时被追加写入的。如果满足一定条件,就会切分日志文件,产生一个新的segment。什么时候会触segment的切分呢?
第一种是根据日志文件大小。当一个segment写满以后,会创建一个新的segment,用最新的offset作为名称。这个例子可以通过往一个topic发送大量消息产生。
segment的默认大小是1G,通过以下参数控制:
log.segment.bytes
第二种是根据消息的最大时间戳和当前系统时间戳的差值。
有一个默认的参数:168小时 (一周)
log.roll.hours=168
意味着,如果服务器上次写入时间是一周之前,旧的segment就不写了,重新创建一个segment;
还可以从更加精细的时间单位进行控制,如果配置了毫秒级别的日志切分时间间隔会优先使用这个单位,否则就用小时的。
log.roll.ms
第三种情况就是offset索引文件或者timestamp索引文件达到了一定的大小,默认是10M。如果要减少日志文件的切分,可以把这个值调大一点。
log.index.size.max.bytes
意思就是:索引文件写满了,数据文件也要跟着拆分,不然这一套东西对不上。
由于一个segment的文件里面可能存放很多消息,如果要根据offset获取消息,必须要有一种快速检索消息的机制。这个就是索引。
在Kafka中设计了两种索引。
偏移量索引文件记录的是offset和消息物理地址(在log文件中的位置)的映射关系。时间戳索引文件记录是时间戳和offset的关系。
当然,内容是二进制的 文件,不能以纯文本的形式查看,bin目录下有dumplog工具。
./kafka-dump-log.sh --files /tmp/kafka-logs/mytopic-0/000000.index|head -n 10
注意Kafka的索引并不是每一条消息都会建立索引,二是一种稀疏索引。
稀疏索引的稀疏程度是根据消息的大小来控制的,默认是4KB;
log.index.interval.bytes=4096
只要写入的信息超过了4KB,偏移量索引文件和时间戳索引文件就会增加一条记录。
这个值设置的越小,索引则越密集;值越大则索引越稀疏。
相对来说越稠密的索引数据检索更快,但是会消耗更多的空间存储。
稀疏消费的空间少,但是插入和删除时开销比较大。
第二种所以是时间戳索引。
为什么会有时间戳索引文件呢?光有offset索引还不够吗?会根据时间戳来查找消息吗?
首先消息是必须要记录时间戳的。客户端封装的ProducerRecord就有timestamp属性。
为什么需要呢?
既然都记录时间戳了,那干脆就可以直接设计一个时间戳索引,可以根据时间戳查询。
注意创建时间戳有两种:一种是创建消息的时间戳,一种是消费在Broker追加写入的时间。我们改用那个参数?这个也可以通过参数控制:
log.message.timestamp.type=CreateTime
默认是创建时间,如果要改成日志追加时间,则修改为LogAppendTime;
快速检索
Kafka是如何基于索引快速检索呢?比如我要检索偏移量是10002673的消息。
为什么不用B+树?
因为Kafka是写多查少。如果用B+树,首先会出现大量的B+树,大量的插入会非常消耗性能。
都知道Kafka是将数据保存在磁盘中的,那么很多的旧数据我们该怎么办?
清理策略默认是开启的:
log.cleaner.enable=true
kafka里面提供了两种方式,一种是直接删除,一种是对日志进行压缩。默认是直接删除。
log.cleanup.policy=delete
如果是删除,什么时候删除呢?日志删除是通过定时任务实现的。默认5分钟执行一次。
log.retention.check.interval.ms=300000
那么从哪里删除呢?当然是从老数据开始。那么什么才是老数据呢?
通过以下参数进行控制:
log.retention.hours
默认值是168小时(一周),也就是时间戳超过一周的数据才会删除。
Kafka也提供了另外粒度更细的配置:分钟和毫秒。
这里还有一种情况,假设Kafka产生消息的速度是不均匀的,有的时候一周几百万条数据,有的时候一周几千条数据,那这个按照时间删除就不太合理了,所以第二种删除策略就是根据日志文件大小删除,先删旧的数据,一直删到不超过这个大小为止。
log.retention.bytes
默认值是-1,代表不限制大小,想写多少就写多少。它指的是所有文件的大小,我们也可以对单个segment文件大小进行限制。
log.segment.bytes
默认是1G.
第二种策略是不删除,对日志数据进行压缩。
问题:如果同一个key重复写入多次,会存储多次还是更新?
比如用来存储唯一的这个特殊topic:_consumer_offsets,存储的是消费者ID和partition的offset关系,消费者不断消费信息commit的时候是直接更新原来的offset,还是不断的写入呢?
答案是肯定存储多次,不然我们怎么实现顺序写呢。
当有了这些key相同value不同的消息的时候,存储空间就被浪费了。压缩就是将相同的Key合并为最后一个value.
当创建添加一个的分区或者分区增加了副本的时候,都要从所有副本中选举一个新的leader出来。
那么我们怎么进行选举呢?
通过ZK实现吗?通过ZK的watch机制来实现吗?
这种方法虽然简单,但是存在一定的弊端。如果分区和副本数量过多,所有的副本都直接进行选举的话,一旦某个出现节点的增,就会遭成大量的watch事件被触发,ZK的负载就会过重。
Kafka早期版本就是这么实现的,后来换了一种实现方式。
不是所有的replica都参与leader选举,而是由其中的一个Broker统一来指挥,这个Broker的角色就叫做Controller。类似redis集群中的哨兵机制。
所有的Broker会尝试在ZK中创建临时节点,只有一个能创建成功(先到先得)。
如果Controller挂掉了或者网络出现了问题,ZK上的临时节点会消息。其他的Broker通过watch监听到了Controller下线的消息后,开始竞选新的controller。方法跟之前还是一样的,谁先在ZK里面写入一个controller节点,谁就成为新的controller。
一个节点成为controller,它肩上的责任也比别人重了几分。
Controller确定以后,就可以开始做分区选主的事情了。显然每个replica都想推荐自己,但是所有的replica都有竞选资格吗?
并不是。这里有几个概念。
一个分区所有的副本,叫做Assigned-Replicas(AR)。
这些所有的副本中,跟leader数据保持一定程度同步的,叫做In-Sync Replicas(ISR)。
跟leader同步滞后过多的副本,叫做Out-Sync-Replicas(OSR)。
AR = ISR + OSR 。正常情况下OSR是空的,大家都同步,AR = ISR。
谁能够参加选举呢?肯定不是AR ,也不是OSR,而是ISR。而且这个ISR不是固定不动的,还是一个动态列表。
前面我们说过,如果同步延迟超过30秒,就剔除ISR,进入OSR。如果赶上了就加入ISR。
默认情况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举成新的leader。
如果ISR为空呢?在这种情况下,可以让ISR之外的副本参与选举。允许ISR之外的副本参与选举,叫做unclean leader election.
unable.leader.election.enable=false;
把这个参数改成true(一般情况不建议开启,会造成数据丢失。)
选举规则
分布式系统中常见的选举协议有哪些?
ZAB(ZK),Raft(Redis)思想归纳起来都是:先到先得,少数服从多数。
但是Kafka没有用到这些办法,而是用了一种自己实现的算法。
为什么呢?比如zab协议,可能会出现脑裂(节点不能互通的时候,出现多个leader,)惊群效应(大量watch事件被触发)
Kafka的选举实现类似与微软的PacificA算法。
在这种算法中,默认是让ISR中第一个replica变成leader。比如ISR是1,5,8.优先让1成为leader。
leader确定以后,客户端的读写操作只能操作leader节点。follower需要向leader同步数据。
不同的replica的offset是不一样的,到底怎么同步呢?
又要看几个概念了。。。。
LEO(Log End Offset):下一条等待写入的消息的offset(最新的offset+1);途中分别是9.8.6;
HW(Hign Watermark) :ISR中最小的LEO.,leader会改管理所有ISR中最小的LEO作为HW,目前是6.
**consumer最多只能消费到HW之前的位置(消费到offset5的消息)。**也就是说:其他的副本没有同步过去的消息,是不能被消费的。
为什么要这么设计呢?如果在同步成功之前就被消费了,consumer group的offset会偏大。如果leader崩溃,中间会缺失消息。
有了这两个offset之后,我们再来看看消息怎么同步。
follower1同步了1条消息,follower2同步了两条消息,此时HW推进了2,变成了8。
follower1同步了0条消息,follower2同步了1条消息。此时HW推进了1,变成了9。LEO和HW重叠了,所有的消息都可以消费了。
这里我们关注以下,从节点怎么跟主节点保持同步?
这中独特的ISR复制,可以在保障数据一致性情况下又可以提供高吞吐量。
follower故障
首先follower发生故障,会被踢出ISR。
follower恢复之后,从哪里开始同步数据呢?假设第一个replica宕机。(中间这个)
恢复以后,首先根据之前记录的HW(6),把高于HW的消息截掉(6,7).然后向leader同步消息。追上leader之后,重新加入ISR。
leader故障
假设图中leader发生故障。
首先选一个leader。因为replica1优先(中间这个),所以它成为leader。
为了保证数据一致性,其他的follower需要把高于HW的消息截取掉(这里没有消息需要截取。)
然后replica2开始同步数据。
注意:这种机制只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
我们首先研究以下消费者怎么消费。
我们知道在partition中,消息是不会删除的,所以在可以追加写入,写入的消息是连续存在的。
这种特性决定了Kafka是可以消费历史消息的而且按照消息的顺序消费指定消息,而不是只能消费对头的消息。
正常情况下,我们希望消费没有被消费过的数据,而且是从最先发送的开始消费。(这样才是有序和公平的);
对于一个partition,消费者组怎么才能做到接着上次消费的位置(offset)继续消费呢?我们肯定需要把这个对应关系保存起来,下次消费的时候直接查找就可以了。
(对应上文的索引文件)
这个对应关系到底保存在哪里呢?首先肯定不是保存在消费者这端的。为什么?因为所有的消费者都可以使用这个consumer group id,放在本地是做不到统一维护的,肯定需要放到服务端。
Kafka早期的版本把消费者和partition的offset直接维护在ZK中,但是读写的性能消耗太大了。后来就放在一个特殊的topic中,名字叫_consumer_offsets,默认有50个分区(offsets.topic.num.partitions默认是50),每个分区默认一个replication。
这样的一个特殊的Topic怎么存储消费者组对于分区的偏移量呢?
Topic里面是可以存放对象类型的value的(经过序列化和反序列化)。这个topic里面主要存储两种对象:
GroupMetadata
保存了消费者组中各个消费者的信息(每个消费者有编号)
OffsetAndMetadata
保存了消费者组和各个partition的offset位移信息元数据。
大致结构如下:
我们怎么知道offset会放在那个分区呢?
System.out.println(Math.abs("test".hashCode() % 50));
当然,这个是Broker有记录offset的情况,如果说新增了一个消费者组去消费一个topic的某个partition,没有offset的记录,这个时候我们应该从哪里开始消费?
什么情况下会找不到offset?就是没有消费过,没有把当前的offset上报给Broker。
消费者的代码中有一个参数用来控制如果找不到偏移量的时候从哪里开始消费。
auto.offset.reset
latest 默认值
也就是从最新的消息开始消费(最后发送的消息)。历史消息是不能消费的。
earliest
代表从最早的消息开始消费(最先发送的消息)。可以消费到历史消息。
none
如果消费组在服务端找不到offset会报错。
前面我们讲了,消费者组的offset是保存在broker的,但是是由消费者上报给broker的。并不是消费者组消费了消息,offset就会更新,消费者必须要有一个commit(提交)的动作,就跟RabbitMQ中消费者的ACK一样。
消费者可以自动提交或者手动提交,通过以下参数控制:
enable.auto.commit
默认为true。true代表消费者消费消息以后自动提交,此时Broker会更新消费者组的offset。
另外还可以通过配置参数来控制自动提交的频率:
auto.commit.interval.ms
默认是5秒。
如果我们要在消费完消息做完业务逻辑处理之后才commit,就要把这个值改成false。
如果是false,消费者就必须要调用一个方法让Broker更新offset。
有两种方式:
演示代码如下:
// 手动提交
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d ,key =%s, value= %s, partition= %s%n", record.offset(), record.key(), record.value(), record.partition());
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
// 同步提交
consumer.commitSync();
buffer.clear();
}
}
如果不提交或者提交失败,Broker的offset不会更新,消费者下次消费的时候会消费到重复的消息。
前面我们讲过,一个消费者里面的一个消费,只能消费Topic的一个分区。
如果分区数量跟消费者的数量一样,那就一人消费一个。如果是消费者比分区多,或者消费者比分区小,这个时候消费者跟分区的关系是怎么样的呢?
例如:2个消费者消费5个分区,怎么分配呢?
我们首先创建一个5个分区的topic。
sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic ass5part
启动两个消费者消费(同一个消费者,目标是消费同一个partition,不同的clientid)
// 两个消费者消费5个分区
KafkaConsumer<String, String> consumer1 = new KafkaConsumer<String, String>(props);
KafkaConsumer<String, String> consumer2 = new KafkaConsumer<String, String>(props);
// 订阅队列
consumer1.subscribe(Arrays.asList("ass5part"));
consumer2.subscribe(Arrays.asList("ass5part"));
给5个分区分别发送一条消息:
producer.send(new ProducerRecord<String, String>("ass5part", 0, "0", "0"));
producer.send(new ProducerRecord<String, String>("ass5part", 1, "1", "1"));
producer.send(new ProducerRecord<String, String>("ass5part", 2, "2", "2"));
producer.send(new ProducerRecord<String, String>("ass5part", 3, "3", "3"));
producer.send(new ProducerRecord<String, String>("ass5part", 4, "4", "4"));
结果(打印顺序不一定)
他是按照范围连续分配的,你一部分我一部分。
他实际上采用了默认的策略:RangeAssignor。
我们也可以通过配置参数使用其他的消费策略。
props.put("partition.assignment.strategy","org.apache.kafka.clients.consumer.RoundRobinAssignor");
另外两种策略的查询及如果如下:
StickyAssignor:这种策略比较复杂,但是相对来说均匀一点(每次的结果可能不一样)。
原则:
consumer可以指定topic的某个分区消费吗?比如我就喜欢坐在讲台旁的宝座,我可以去做吗?
这个时候我们需要使用打assgin而不是subscribe接口。subscribe会自动分配分区,而assign是由我们自己指定分区消费,相当于comsumer group id失效了。
// 订阅topic,消费指定parptition
TopicPartition tp = new TopicPartition("ass5part", 0);
之前已经说过,在第一次消费的时候,一个组的消费者和分区的消费就已经确定了,如果分配策略没动,关系是不会改变的。那什么时候才会重新分配呢?
有两种情况需要重新分配分区和消费者的关系:
为了让分区分配尽量均衡,这个时候会触发rebalance机制。
大致分为以下几步:
总结起来,主要是四点:磁盘顺序io,索引机制,批量操作和压紧,零拷贝。
磁盘IO
随机I/O读写的数据在磁盘上分散的,寻址会狠耗时。
顺序I/O读写的数据在磁盘上是集中的,不需要寻址的过程。
所以顺序IO是被随机IO快的多的。
索引在上文已经讲过了。
批量读写
Kafka将所有的消息变成一个批量文件,减少网络IO损耗。
零拷贝
我觉得从下面那个图应该可以看出零拷贝大大提升了文件传输的性能。
传统IO模型:
零拷贝模式:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。