赞
踩
在上述的整个流程中,消息丢失的情况分为以几种可能性:
1、producer 端 发送消息给kafka server 端,中间网络出现问题,消息无法送达
2、kafka server端 在收到消息以后,保存消息时发生异常,异常分为三种
(1)可重试错误,通过重试来解决
(2) 网络连接错误
(3)无主(no leader)错误
3、consumer 端 在消费消息时发生异常,导致consumer端消费失败
注:当然这里还可能发生另一种错误,就是在producer发送消息到kafka server端时,消息体过大,producer client 直接抛出异常,导致发送失败
我们先来了解一下,producer端发送消息的方式:
ProducerRecord<String,String> record = new ProducerRecord<>("topicName","key","value");
try{
//这里只是把消息放进了一个缓冲区中,然后使用单独的线程将消息发送到服务端
producer.send(record);
}
catch(Exception){
e.printStackTrace();
}
ProducerRecord<String,String> record = new ProducerRecord<>("topicName","key","value");
try{
//send方法返回的是Future<RecordMetaData> 对象,然后我们可以调用get()方法等待响应
Future<RecordMetaData> future = producer.send(record);
future.get();
}
catch(Exception){
e.printStackTrace();
}
private class DemoProducerCallback implements Callback{
@override
public void onCompletion(RecordMetadata recordMetadata,Exception e){
//发生错误的回调方法,可以写入日志,或写入DB通过其它线程重重试,保证最终的数据送达
}
}
ProducerRecord<String,String> record = new ProducerRecord<>("topicName","key","value");
producer.send(record,new DemoProducerCallback()))
总结:从以上的三种发送方式中,我们可以知道,采用第一种方式发送时,消息丢失时我们的应用程序是无感知的,如果需要保证消息的不丢失,那么必须要选择第二种或者第三种(需要配合下一节中讲到的acks 参数),当然这里更推荐第三方种方式。
在producer 端的配置项中有很多的配置项,我们摘出几种比较重要的来一一解读:
acks:该参数指定了,kafka server的多少个副本收到消息以后才算真的正消息发送成功。取值范围:
acks = 0 表示producer 在将消息成功写入到 kafka server 之前不会收任消息
acks = 1 表示只要kafka server 集群中的leader节点收到消息,producer 端就会收到kafka server的成功响应
acks = all 表示只有当消息到leader节点,并且这条数据也同步到了所有副本中,producer 才会收到kafka server的成功响应。
buffer.memory:生产端 缓冲区的大小设置
compression type:生产端采用的数据压缩方式,取值 snappy,gzip,lz4,默认不会压缩。(启用压缩意味着,需要producer 和kafka server要占用更多的cpu资源)
retries:生产端发送消息到kafka server时,发生临时性错误以后,生产者发送消息到kafka server端重试的次数。如果重试超过该次数,则发生异常
batch.size: 当多个消息被发送至同一分区时,生产者会把它们发送到同一批。该参数指定了同一批次可以使用的内存大小,按字节数计算(而不是消息条数)。
linger.ms:该参数指定了生产者在发送批次之前等待更多消息加入批次的时间,producer client 会在批次填满(batch.size) 或linger.ms 到上限时,将消息发送至kafka server.
max.in.flight.requests.per.connection:该参数指定了生产者在收到kafka server 的成功响应之前,可以发送多少消息。(可以利用该配置让kafka server中的消息变得有序)
max.request.size:该参数用来控制生产者发送单个请求的数据大小。对于消费端也有相同的配置(message.max.bytes),建议两边设置相同。
总结:我们的问题,可以通过设置配置项 acks 、retries 来保证数据的不丢失。acks=1时,lead节点只要收到消息就会告诉producer消息接收成功,假如此时lead 挂掉了开始重新选主,选主成功后之前lead收到的那条消息就会丢失,如果需要保证消息的绝对不丢失,建议设置 acks =all
这里需要补充一个知识点,kafka的server端同一个topic下有多个分区,单个分区会有不同的副本。如果producer 发送消息么kafka server端,leader收到了消息以后,告诉producer 发送成功,此时再同步消息到多个副本,但由于某一个副本同步较慢,此时leader挂了,需要选主,选主的过程中,一旦那个较慢的副本成为新的leader,那么新的leader中就不包含了原leader收到的那条最新数据,导致消息丢失。
broker中的配置项,unclean.leader.election.enable = false,表示不允许非ISR中的副本被选举为首领,以免数据丢失。
ISR:是指与leader保持一定程度(这种范围是可通过参数进行配置的)同步的副本和 leader 共同被称为ISR
OSR:与leader同步时,滞后很多的副本(不包括leader)被称为OSR
AR,分区中所有的副本统称为AR。AR = ISR + OSR
1、设置 enable.auto.commit = false
2、在consumer端消费消息操作完成以后 再提交 offset,类似于上文中的代码示例
q圈子:755389584
我们来看一下,enale.auto.commit = false时,如何手动提交的
public void consumerMsg(){ while(true){ //这里的poll(100)指的是kafka server端没有消息时,连接等待的时间,超过该时间立即返回空给consumer ConsumerRecords<String,String> records = consomer.poll(100); for(ConsumerRecord<String,String> record : records){ // 这里是消费消息的逻辑(简单逻辑输入到控制台) System.out.printIn(record.value)); //提交偏移量 try{ consumer.commitSync(); //同步提交 如果异步的话,可以使用 consumer.commitAsync(); } catch(CommitFailedException ex){ log.error("commit fail"); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。