当前位置:   article > 正文

@KafkaListener指定kafka集群

@kafkalistener

基于@KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。 这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:拉取消息数量上限为 ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),与自动装载KafkaConsumer时的配置信息格式不同(如:拉取消息数量上限为 spring.kafka.comsumer.max-poll-records),虽然 DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs)来自spring-kafka 。详情如下:

配置文件

配置参数及其含义,参见《@KafkaListener的配置使用》

依赖项

特别说明下,其实spring-kafka已包含了kafka-clients

<!-- spring-kafka --> 
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.0</version>
</dependency>
<!-- kafka-clients --> 
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

生产者代码

@Component
@Slf4j
public class KafKaProducer {

	@Autowired
	private KafkaTemplate kafkaTemplate;

	public void sendMessage(String topic, Object object) {

		/*
		 * 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于
		 * kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult<K, V>,而这里 K,V 的泛型实际上 被用于
		 * ProducerRecord<K, V> producerRecord,即生产者发送消息的 key,value 类型
		 */
		ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);

		future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
			@Override
			public void onFailure(Throwable throwable) {
				log.error("发送消息失败:" + throwable.getMessage());
			}
			@Override
			public void onSuccess(SendResult<String, Object> sendResult){
			    // log.info("发送消息成功:" + sendResult.toString());
			}
		});
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

KafkaListenerContainerFactory 配置类

其中可配置多个kafka集群,每个kafka集群生成一个KafkaListenerContainerFactory实例

@Data
@Slf4j
@Configuration
public class KafkaConfig {

    @Resource
    Environment environment;

    @Bean
    public Map<String, Object> consumerProperties() {

        String servers          = environment.getProperty("kafka.servers", "127.0.0.1:9092");
        String groupId          = environment.getProperty("kafka.groupId", "consumer-group");
        String maxPollRecords   = environment.getProperty("kafka.max.poll.records", "100");
        String maxPollInterval  = environment.getProperty("kafka.max.poll.interval", "600000");
        String sessionTimeout   = environment.getProperty("kafka.session.timeout.ms", "60000");
        String jaasConfig       = environment.getProperty("kafka.sasl.jaas.config");

        /// 注意这里,配置信息基于原生kafka的配置信息格式
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        /// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);

        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        props.put("sasl.jaas.config", jaasConfig);

        return props;
    }
    
    @Bean
    public KafkaListenerContainerFactory<?> containerFactory() {

        Integer concurrency = environment.getProperty("kafka.concurrency", Integer.class, 1);
        Integer pollTimeout = environment.getProperty("kafka.poll.timeout", Integer.class, 3000);

        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(this.consumerProperties()));
        containerFactory.setConcurrency(concurrency); // 消费并发数量
        containerFactory.setBatchListener(true);      // 批量监听消息
        containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移
        containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取时限
        return containerFactory;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

消费者代码

@KafkaListener注解的containerFactory参数引用上述配置类中定义的KafkaListenerContainerFactory实例(bean),也就指定了对应的kafka集群

@Slf4j
@Component
public class KafkaConsumerListen implements BatchMessageListener<String, String> {

    @Autowired
    private Environment environment;

    @Autowired
    private KafkaMsgHandleService msgHandleService;

    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;

    /************************
     *      接收消息
     ************************/
    @Override
    @KafkaListener( containerFactory = "containerFactory", 
    				groupId = "${kafka.groupId}", 
    				topics = "#{'${kafka.topics}'.split(',')}", 
    				concurrency = "${kafka.concurrency}")
    public void onMessage(List<ConsumerRecord<String, String>> records) {
        try {
            final List<String> msgs = records.stream().map(ConsumerRecord::value).collect(Collectors.toList());
            log.info("收到消息体:size={} content:{}", msgs.size(), JSON.toJSONString(msgs));
            /// 处理消息
            msgs.forEach(this::processRecord);
        } catch (Exception e) {
            log.error("KafkaListener_kafka_consume_error.", e);
        }
    }

    /************************
     *      处理消息
     ************************/
    private void processRecord(String msg) {
        taskExecutor.submit(() -> {
            if (!environment.getProperty("kafka1.switch", Boolean.class,true)) {
                log.warn("KafkaListener_turn_off_drop_message.");
                return;
            }
            msgHandleService.handle(msg);
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/483976
推荐阅读
相关标签
  

闽ICP备14008679号