赞
踩
开发过程中,使用kafka批量消费,发现拉取数量一直为1,如何提高批量拉取数量,记录下踩坑记录。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
@Configuration public class KafkaBlukConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.max-poll-records:30}") private Integer maxPollRecords; @Value("${spring.kafka.consumer.groupId:group1}") private String group; /** * 消费者配置信息 */ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, group); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } /** * 消费者批量⼯程 */ @Bean public KafkaListenerContainerFactory<?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.setBatchListener(true); return factory; } }
@Component
public class KafkaBatchConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaBatchConsumer.class);
@KafkaListener(id = "consumer1", topics = "topic2", containerFactory = "batchFactory")
public void consume(List<ConsumerRecord<String, String>> record) throws Exception {
log.info("KafkaBatchConsumer recode size : {} ", record.size());
}
}
spring:
kafka:
bootstrap-servers: 192.168.56.112:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
@Slf4j @RestController @RequestMapping("/kafka") public class KafkaProducer { // 自定义的主题名称 public static final String TOPIC_NAME = "topic2"; private KafkaTemplate<String, String> kafkaTemplate; /** * http://localhost:8080/kafka/send?msg=a * @param msg */ @RequestMapping("/send") public String send(@RequestParam("msg") String msg) { log.info("准备发送消息为:{}", msg); // 1.发送消息 ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, msg); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable throwable) { // 2.发送失败的处理 log.error("生产者 发送消息失败:" + throwable.getMessage()); } @Override public void onSuccess(SendResult<String, String> stringObjectSendResult) { // 3.发送成功的处理 log.info("生产者 发送消息成功:" + stringObjectSendResult.toString()); } }); return "接口调用成功"; } }
http://localhost:9999/kafka/send?msg=a
多次调用发现如下:
发现拉取消息的大小始终为1
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, group); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000); ################ 添加下面两行 ########### props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 3000); ###################################### props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }
可以看到批量消费成功。
消费者从服务器获取记录的最小字节数,broker 收到消费者拉取数据的请求的时候,如果可用数据量小于设置的值,那么 broker 将会等待有足够可用的数据的时候才返回给消费者,这样可以降低消费者和 broker 的工作负载。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。
如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可能会因为获取不到足够大小的消息而一直阻塞等待,从而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms 参数用于指定 等待 FetchResponse 的最长时间,服务端根据此时间决定何时进行响应,默认值为 500(ms)。如果 Kafka 中没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待 500ms 再响应消费者请求。这个参数的设定需要参考 Consumer 与 Kafka 之间的延迟大小,如果业务应用对延迟敏感,那么可以适当调小这个参数。
/** * 能够立即返回给客户端的4种情况 * 1. fetch请求没有大于0的wait时间,参考fetch.max.wait.ms设置 * 2. fetch请求要拉取的分区为空 * 3. 根据fetch.min.bytes的设置,有足够的数据返回 * 4. 出现异常 */ if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) { // fetchPartitionData是一个TopicPartition -> FetchPartitionData 的map集合 val fetchPartitionData = logReadResults.map { case (tp, result) => tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, result.lastStableOffset, result.info.abortedTransactions) } // 调用响应回调函数 responseCallback(fetchPartitionData) }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。