赞
踩
在当今信息爆炸的时代,实时数据处理已经成为许多应用的核心需求。而Kafka作为一个高性能的消息队列系统,为我们提供了一个实现实时数据处理的利器。而Spring Boot,则是一个强大的框架,可以帮助我们快速搭建应用。本文将带你进入这个充满挑战的领域,探索如何利用Spring Boot整合Kafka实现消费者,实现实时数据的处理和分析。
<spring.boot.version>2.7.8</spring.boot.version> <kafka.version>2.8.2</kafka.version> <netty.version>4.1.73.Final</netty.version> <lombok.version>1.18.24</lombok.version> <!-- 版本号自己添加 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency>
package fun.acowbo.config; import lombok.Data; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import java.util.HashMap; import java.util.Map; /** * @author xiaobo */ @Data @Configuration @EnableKafka @ConfigurationProperties(prefix = "spring.kafka.consumer") public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; private String groupId; private String autoCommitInterval; private String autoOffsetReset; private String enableAutoCommit; private String keyDeserializer; private String valueDeserializer; @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 启用批量消费 factory.setBatchListener(true); // 设置并发消费者数量 factory.setConcurrency(3); return factory; } }
现在让我解释一下这个方法的主要内容:
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
: 创建了一个 ConcurrentKafkaListenerContainerFactory
的实例,用于配置 Kafka 消息监听容器的工厂。
factory.setConsumerFactory(consumerFactory());
: 设置了消费者工厂,即将上一步定义的 consumerFactory()
方法返回的消费者工厂设置给消息监听容器工厂,以便创建 Kafka 消费者实例。
factory.setBatchListener(true);
: 启用了批量消费模式。设置为 true
表示消息监听器将以批量的方式处理接收到的消息,从而提高消费效率。
factory.setConcurrency(3);
: 设置了并发消费者数量为 3。这表示每个监听容器会创建 3 个消费者实例来并发地处理消息,以提高消息处理的吞吐量。这里要看你的分区数是多少,如果是3那么刚好,如果是2,就有多余,浪费。
return factory;
: 返回了配置完成的消息监听容器工厂实例。
总的来说,这个方法的作用是创建并配置一个 Kafka 消息监听容器工厂,设置了消费者工厂、启用了批量消费模式,并设置了并发消费者数量,以便提高消息处理的效率和吞吐量。
package fun.acowbo.service; import fun.acowbo.utils.BoCommonUtil; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.List; @Component @Slf4j public class BatchKafkaConsumer { public static final String CONSUMER_FILE_PATH = "/Users/xiaobo/Downloads/con-group.log"; @KafkaListener(topics = "acowbo", groupId = "consumer-two-group", containerFactory = "kafkaListenerContainerFactory") // , Acknowledgment ack public void listen(List<ConsumerRecord<String, String>> records) { log.info("Received {} records" , records.size()); for (ConsumerRecord<String, String> record : records) { try { // 消息消费处理 BoCommonUtil.writeFile(record.value(), CONSUMER_FILE_PATH); } catch (Exception e) { log.error("消费失败!{}",e.getMessage()); } } // ack.acknowledge(); // 手动提交位移 } }
package fun.acowbo.config; import lombok.Data; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import java.util.HashMap; import java.util.Map; /** * @author todoitbo * @date 2024/3/15 */ @Data @Configuration @ConfigurationProperties(prefix = "spring.kafka.consumer") public class ConsumerTwoConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; private String groupId; private String autoOffsetReset; private String enableAutoCommit; private String keyDeserializer; private String valueDeserializer; @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); return new DefaultKafkaConsumerFactory<>(props); } }
package fun.acowbo.service; import fun.acowbo.utils.BoCommonUtil; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.Duration; import java.util.Collections; /** * @author todoitbo * @date 2024/3/15 */ @Slf4j @Service public class ConsumerTwoService { @Resource private ConsumerFactory<String, String> consumerFactory; public static final String CONSUMER_FILE_PATH = "/Users/xiaobo/Downloads/con-two.log"; public void pollMessages() { Consumer<String, String> consumer = consumerFactory.createConsumer(); new Thread(() -> { try { consumer.subscribe(Collections.singletonList("acowbo")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 处理拉取到的消息 records.forEach(record -> { try { BoCommonUtil.writeFile(record.key() + record.value(), CONSUMER_FILE_PATH); } catch (Exception e) { log.error("Received message: key = {}, value = {}", record.key(), record.value()); } }); // 使用异步提交规避阻塞 consumer.commitAsync(); } } catch (Exception e) { log.error("consumer error", e); } finally { try { // 最后一次提交使用同步阻塞式提交 consumer.commitSync(); } finally { consumer.close(); } } }).start(); } }
在 Kafka 消费者中,使用 consumer.commitAsync()
和 consumer.commitSync()
来手动提交消费位移(offset)的方法。让我逐步解释它们的作用以及在生产环境中的好处:
consumer.commitAsync()
方法:consumer.commitAsync()
是异步提交消费位移的方法。它会在后台线程中提交消费位移,而不会阻塞当前线程的执行。consumer.commitSync()
方法:consumer.commitSync()
是同步阻塞式提交消费位移的方法。它会阻塞当前线程,直到消费位移提交成功或失败,并返回提交结果。commitAsync()
和 commitSync()
的好处:在生产环境中,使用 commitAsync()
和 commitSync()
可以避免因消费位移未提交或提交失败而导致的消息丢失或重复消费的问题,同时确保消费者的性能和可靠性。
在这段代码中,同时使用了 consumer.commitAsync()
和 consumer.commitSync()
是为了兼顾性能和可靠性:
使用 consumer.commitAsync()
:
使用 consumer.commitSync()
:
在最后一次提交消费位移时,使用了 consumer.commitSync()
,是为了确保程序即将关闭时的消费位移提交的可靠性。虽然同步阻塞式提交会阻塞当前线程,但在应用即将关闭的情况下,可以接受这种阻塞以保证数据的完整性和一致性。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。