当前位置:   article > 正文

Spring Boot 集成 Kafka 遇到问题_kafka has pass since batch

kafka has pass since batch

问题1:nested exception is org.apache.kafka.common.errors.RecordTooLargeException:

异常类: nested exception is org.apache.kafka.common.errors.RecordTooLargeException:

很明显,消息体目前过大。需要提高Kafka服务器能够接收的最大数据大小。

解决方法:提高Kafka服务器单条消息最大大小。配置参数 message.max.bytes

我的方法:生产环境大概每份文件不超过5M,调整服务器broker参数message.max.bytes=510241024

问题2:Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 101: org.apache.kafka.common.errors.DisconnectException.

异常类:org.apache.kafka.common.errors.DisconnectException

异常类描述:Server disconnected before a request could be completed.

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.clients.FetchSessionHandler[442] - [Consumer clientId=consumer-1, groupId=auto-dev] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 101: org.apache.kafka.common.errors.DisconnectException.

问题定位:由于消息读写频率较高,因此kafka服务器的负载达到上限。所以有些消费者实例一直就poll不到数据。达到配置的时间【session.timeout.ms】、【request.timeout.ms】,消费者会被踢出Kafka。因此日志不断打印该错误。(其实另一方面也提示消费时间过长)

解决方法:提高consumer参数【session.timeout.ms】、【request.timeout.ms】的值。

我的方法:在application.yaml中为【session.timeout.ms】、【request.timeout.ms】这两个参数均调整到4分钟,即240000ms。

问题3:TimeoutException: Expiring 1 record(s) … has passed since batch creation

问题定位:消息已经投递到kafka,但是kafka来不及处理。提示kafka写性能达到瓶颈。

我的方法:可以从两方面修改。

(1)控制代码发送频率,即控制调用kafkaTemplate.send方法的频率。作为有经验的开发人员,不要问我怎么控制发送频率。代码里加入Thread.sleep就可以不是么

(2)在生产者参数中提高【delivery.timeout.ms】的时间。默认2分钟kafka没有响应则报异常。我们可以增加该时间,单位毫秒。但是有上限,不能超过【request.timeout.ms】与【linger.ms】之和。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号