当前位置:   article > 正文

Kafka丢失数据问题_kafka消费丢单

kafka消费丢单

问题一:

单批数据的长度超过限制会丢失数据
报错:
kafka.common.MessageSizeTooLargeException 
  • 1
  • 2
  • 3

问题原因:

默认单条消息最大为1M,当单条消息长度超过1M时,就会出现发送到broker失败,从而导致消息在producer的队列中一直累积,直到撑爆生产者的内存
  • 1

解决方式:

1.修改kafka的broker配置:`message.max.bytes(默认:1000000B)`,这个参数表示单条消息的最大长度。在使用kafka的时候,应该预估单条消息的最大长度,不然导致发送失败。
  • 1

2.修改kafka的broker配置:replica.fetch.max.bytes (默认: 1MB),broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。

3.修改消费者程序端配置:fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。如果不调节这个参数,就会导致消费者无法消费到消息,并且不会爆出异常或者警告,导致消息在broker中累积。

注意:
1.从性能上考虑:通过性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点。

2.确保 分区数最大的消息不会超过服务器的内存,否则会报OOM错误。同样地,消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数最大需要内存空间 不能超过服务器的内存。

3.垃圾回收:更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。

问题二:

出现断电或者机器故障,丢失数据
  • 1

问题原因:

kafka的数据一开始就是存储在PageCache上的,定期flush到磁盘上的,也就是说,不是每个消息都被存储在磁盘了,如果出现断电或者机器故障等,PageCache上的数据就丢失了。
  • 1

解决方式:

1.可以通过log.flush.interval.messages和log.flush.interval.ms来配置flush间隔,interval大丢的数据多些,小会影响性能但在0.8版本,可以通过replica机制保证数据不丢,代价就是需要更多资源,尤其是磁盘资源,
2.使用GZip和Snappy压缩,来缓解这个问题 
  • 1
  • 2

问题三:

自动更新offset
  • 1

问题原因:

当broker能接收消息的最大字节数比消费端能消费的最大字节数小,broker就会因为消费端无法使用这个消息而挂起。
  • 1

解决方式:

关闭自动更新offset,等到数据被处理后再手动跟新offset。在消费前做验证前拿取的数据是否是接着上回消费的数据,不正确则return先行处理排错。一般来说zookeeper只要稳定的情况下记录的offset是没有问题,除非是多个consumer group 同时消费一个分区的数据,其中一个先提交了,另一个就丢失了。
  • 1

问题三:

消费者设置手动提交offset , 并在处理逻辑中连接外部数据库 , 导致消费数据卡顿。
报错:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records
  • 1
  • 2
  • 3

问题原因:

处理耗时,导致超过了Kafka的sessiontimeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。如果在close之前调用了consumer.unsubscribe()则有可能部分offset没提交,下次重启会重复消费。
  • 1

解决方式:

1.减少单次拉取消息条数,增加最大拉取间隔时间。
	消费者配置中,减少单次拉取消息条数max.poll.records,增加获取消息后提交偏移量的最大时间max.poll.interval.ms。
	max.poll.records默认较大,容易产生消费积压导致超过设定的时间(默认5分钟),服务端会认为该消费者失效。

2.增加超时时间。
	消费者配置中,增加超时时间session.timeout.ms。
	session.timeout.ms是心跳检测时间,kafka消费者默认3秒发送一次心跳,若服务端在session.timeout.ms内未检测心跳,会认为该消费者失效。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/964391
推荐阅读
相关标签
  

闽ICP备14008679号