赞
踩
ack机制能够保证数据的不丢失,如果ack设置为0,风险很大,一般不建议设置为0。即使设置为1,也会随着leader宕机丢失数据。
producer.type=sync
request.required.acks=1
也会考虑ack的状态,除此之外,异步模式下的有个buffer,通过buffer来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值,如果buffer满了数据还没有发送出去,有个选项是配置是否立即清空buffer。可以设置为-1,永久阻塞,也就数据不再生产。异步模式下,即使设置为-1。也可能因为程序员的不科学操作,操作数据丢失,比如kill -9,但这是特别的例外情况。
producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
结论:producer有丢数据的可能,但是可以通过配置保证消息的不丢失。
通过 offset commit 来保证数据的不丢失,Kafka自己记录了每次消费的offset数值,下次继续消费的时候,会接着上次的offset进行消费。
而offset的信息在Kafka0.8版本之前保存在Zookeeper中,在0.8版本之后保存到topic中,即使消费者在运行过程中挂掉了,再次启动的时候会找到offset的值,找到之前消费消息的位置,接着消费,由于 offset的信息写入的时候并不是每条消息消费完成后都写入的,所以这种情况有可能会造成重复消费,但是不会丢失消息。
唯一例外的情况是,我们在程序中给原本做不同功能的两个consumer组设置
Kafka SpoutConfig.bulider.setGroupid
的时候设置成了一样的 groupid,这种情况会导致这两个组共享同一份数据,就会产生组A消费 partition1,partition2 中的消息,组B消费 partition3 的消息,这样每个组消费的消息都会丢失,都是不完整的。 为了保证每个组都独享一份消息数据,groupid一定不要重复才行。
每个broker中的partition我们一般都会设置有replication(副本)的个数,生产者写入的时候首先根据分发策略(有partition按partition,有key按key,都没有轮询)写入到leader中,follower(副本)再跟leader同步数据,这样有了备份,也可以保证消息数据的不丢失。
要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费。
Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过 producer.type
属性进行配置。
Kafka通过配置 request.required.acks
属性来确认消息的生产:
0---表示不进行消息接收是否成功的确认;
1---表示当Leader接收成功时确认;
-1---表示Leader和Follower都接收成功时确认;
综上所述,有6种消息生产的情况,下面分情况来分析消息丢失的场景:
(1) acks=0
,不和 Kafka 集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;
(2) acks=1
、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;
Kafka 消息消费有两个 consumer 接口,Low-level API 和 High-level API:
Low-level API:消费者自己维护 offset 等值,可以实现对 Kafka 的完全控制;
High-level API:封装了对 parition 和 offset 的管理,使用简单;
如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“ 诡异”的消失了;
解决办法:
针对消息丢失:
同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;
异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;
针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。
消息重复消费及解决参考: 如何保证消息不被重复消费?(如何保证消息消费时的幂等性)-Java知音
消息的重复消费问题,需要考虑幂等性,如果消费类型天生幂等,那么就没有必要去考虑重复消费的问题。
但是一般上 MQ 的任务都是一些比较耗时的任务,比如说调用第三方服务,此时可以采用第三方记录(redis),保存消息的唯一 id,定义消息的唯一id 以及时间戳的消费类型(待处理,过程中,失败/完成),如果是消费过程丢失,会导致消息长时间处于待处理的状态,我们可以另起一个定时任务,轮询一定时间间隔的任务,将它生产到兜底服务,由兜底服务去判断是否需要再次消费(比如说,一个 sms 服务,可以去查询当前消息 id 的短信是否已经成功发出,如果成功发出,则标记当前任务成功消费。)
Kafka 将消息以主题为单位进行归纳。
主要有两个条件
生产者直接将数据发送到 broker 的 leader(主节点),不需要在多个节点进行分发,为了 帮助 producer 做到这点,所有的 Kafka 节点都可以及时的告知,哪些节点是活动的,目标 topic 目标分区的 leader 在哪。这样 producer 就可以直接将消息发送到目的地了。
Kafka 消费消息时,向 broker 发出 “fetch” 请求去消费特定分区的消息,同时,consumer 还指定消息在日志中的偏移量(offset),这样就可以消费从这个位置开始的消息。
消费者拥有 了 offset 的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的。
Kafka 遵循了一种大部分消息系统共同的传统 的设计:producer 将消息推送到 broker,consumer 从 broker 拉取消息 。
消息由一个固定长度的头部和可变长度的字节数组组成。头部包含了一个版本号和 CRC32 校验码。
在Kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常被收到。
可以通过设置request.required.acks
决定 ack 策略。
0
: 生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱。当 server 挂掉的时候就会丢数据;1
:服务端会等待 ack 值。leader 副本确认接收到消息后发送 ack 。如果 leader 挂掉后,不确保是否复制完成,也就是说新 leader 可能会丢失数据;-1
:同样在 1 的基础上,服务端会等所有的 follower 的副本收到数据后, leader 才会发出 的 ack,这样数据就不会丢失。
消费者每次消费数据的时候,都会记录消费的物理偏移量(offset), 等到下次消费时,他会接着上次位置继续消费 。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。