赞
踩
当使用Kafka时可以使用@KafkaListener很方便的对topic进行监听。但是对于在项目启动时,动态增加topic的监听,这种方式就无法实现,因此需要一种动态监听kafka topic的方式。
这种方式需要读取新增的kafka topic,这个不是难点,使用@Schedule注解轮询就可实现,难点在于如何通过代码监听,实现和@KafkaListener同样的效果。
从图中不难理解@KafkaListener从启动到拉取消息的过程,可以看到最终是调用KafkaMessageListenerContainer的start()方法,启动线程调用kafkaConsumer的poll()方法和被注解的方法。
从上面已经可以看出最终是调用KafkaMessageListenerContainer的start()方法进行监听kafka topic的消息,那么我们将动态变化的kafka配置生成一个KafkaMessageListenerContainer,并启动即可。
以下源码是KafkaMessageListenerContainer的构造函数
public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory,
ContainerProperties containerProperties) {
this(null, consumerFactory, containerProperties, (TopicPartitionInitialOffset[]) null);
}
因此我们需要构建ConsumerFactory和ContainerProperties,对于ConsumerFactory,其实现类为DefaultKafkaConsumerFactory,构造函数为:
public DefaultKafkaConsumerFactory(Map<String, Object> configs,
@Nullable Deserializer<K> keyDeserializer,
@Nullable Deserializer<V> valueDeserializer) {
this.configs = new HashMap<>(configs);
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
}
通过kafka的属性和序列化方式即可初始化DefaultKafkaConsumerFactory。
ContainerProperties存放了kafka监听器运行时的相关属性,因此在初始化后,还需要将kafka的相关属性赋值进去。
最后示例代码:
// consumer配置 Map<String, Object> configMap = Maps.newHashMap(); // 采用手动提交的方式 configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); configMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000); configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxxxxx"); configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // kafka监听器 Deserializer<String> stringDeserializer = new StringDeserializer(); DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(configMap, stringDeserializer, stringDeserializer); ContainerProperties props = new ContainerProperties("test-topic"); props.setMessageListener(new CustomerMsgHandler("test-topic")); props.setGroupId(configMap.get(ConsumerConfig.GROUP_ID_CONFIG).toString()); props.setAckMode(ContainerProperties.AckMode.MANUAL); KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props); // 启动 container.start();
对于自己处理消息的类,需要实现AcknowledgingMessageListener的onMessage方法:
@Slf4j public class CustomerMsgHandler implements AcknowledgingMessageListener<String, String> { private String topic; public CustomerMsgHandler(String topic) { this.topic = topic; } @Override public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) { // doSomething // 因为前面设置了手动提交ack的方式,这里需要在消息处理完成后提交ack acknowledgment.acknowledge(); } }
以上,可以通过读取配置,实例化KafkaMessageListenerContainer并调用其start()方法,实现动态kafka topic的监听。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。