赞
踩
基于@KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。 这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:拉取消息数量上限为 ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),与自动装载KafkaConsumer时的配置信息格式不同(如:拉取消息数量上限为 spring.kafka.comsumer.max-poll-records),虽然 DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs)来自spring-kafka 。详情如下:
配置参数及其含义,参见《@KafkaListener的配置使用》
特别说明下,其实spring-kafka已包含了kafka-clients
<!-- spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.0</version>
</dependency>
<!-- kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
@Component @Slf4j public class KafKaProducer { @Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String topic, Object object) { /* * 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于 * kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult<K, V>,而这里 K,V 的泛型实际上 被用于 * ProducerRecord<K, V> producerRecord,即生产者发送消息的 key,value 类型 */ ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { log.error("发送消息失败:" + throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> sendResult){ // log.info("发送消息成功:" + sendResult.toString()); } }); } }
其中可配置多个kafka集群,每个kafka集群生成一个KafkaListenerContainerFactory实例
@Data @Slf4j @Configuration public class KafkaConfig { @Resource Environment environment; @Bean public Map<String, Object> consumerProperties() { String servers = environment.getProperty("kafka.servers", "127.0.0.1:9092"); String groupId = environment.getProperty("kafka.groupId", "consumer-group"); String maxPollRecords = environment.getProperty("kafka.max.poll.records", "100"); String maxPollInterval = environment.getProperty("kafka.max.poll.interval", "600000"); String sessionTimeout = environment.getProperty("kafka.session.timeout.ms", "60000"); String jaasConfig = environment.getProperty("kafka.sasl.jaas.config"); /// 注意这里,配置信息基于原生kafka的配置信息格式 Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-256"); props.put("sasl.jaas.config", jaasConfig); return props; } @Bean public KafkaListenerContainerFactory<?> containerFactory() { Integer concurrency = environment.getProperty("kafka.concurrency", Integer.class, 1); Integer pollTimeout = environment.getProperty("kafka.poll.timeout", Integer.class, 3000); ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>(); containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(this.consumerProperties())); containerFactory.setConcurrency(concurrency); // 消费并发数量 containerFactory.setBatchListener(true); // 批量监听消息 containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移 containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取时限 return containerFactory; } }
@KafkaListener注解的containerFactory参数引用上述配置类中定义的KafkaListenerContainerFactory实例(bean),也就指定了对应的kafka集群
@Slf4j @Component public class KafkaConsumerListen implements BatchMessageListener<String, String> { @Autowired private Environment environment; @Autowired private KafkaMsgHandleService msgHandleService; @Autowired private ThreadPoolTaskExecutor taskExecutor; /************************ * 接收消息 ************************/ @Override @KafkaListener( containerFactory = "containerFactory", groupId = "${kafka.groupId}", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "${kafka.concurrency}") public void onMessage(List<ConsumerRecord<String, String>> records) { try { final List<String> msgs = records.stream().map(ConsumerRecord::value).collect(Collectors.toList()); log.info("收到消息体:size={} content:{}", msgs.size(), JSON.toJSONString(msgs)); /// 处理消息 msgs.forEach(this::processRecord); } catch (Exception e) { log.error("KafkaListener_kafka_consume_error.", e); } } /************************ * 处理消息 ************************/ private void processRecord(String msg) { taskExecutor.submit(() -> { if (!environment.getProperty("kafka1.switch", Boolean.class,true)) { log.warn("KafkaListener_turn_off_drop_message."); return; } msgHandleService.handle(msg); }); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。