赞
踩
需要动态创建topic,然后动态创建该topic的监听容器,同时可以指定该监听容器的处理方法,避免增删监听topic时需要重启操作等情况。
很多情况下,使用kafka一般都会主动创建好队列(Topic)和消费者监听(@KafkaListener),特别是监听者,一般都是动态创建好后,然后使用@KafkaListener指定Topic后创建。
上述情况的优点在于:可以明确topic和消费者,启动时程序主动就创建好对应topic的消费容器和消费方法,直接消费即可。
缺点:如果需要监听新的topic,则需要添加@KafkaListener的配置并且重新启动项目,对于灵活性要求高或者线上的程序是比较麻烦的。
所以基于上述情况,为了更加灵活的创建和使用Kafka的topic和listener,专门写了一个kafka相关的工具类:
topic相关的包含了:topic创建、删除、列表、是否存在等方法。
Listener相关包含了:容器的创建、启动、停止、暂停、恢复等方法。
这里有个概念需要先了解下,监听容器里有两个状态,可以简单理解为:一个是容器的运行状态running,一个是容器的监听状态pauseRequest,再容器运行状态开启的基础上,监听状态开启,才能够正常消费消息。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId
</dependency>
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import java.util.HashMap; import java.util.Map; import java.util.Properties; /** *@ClassName KafkaConfig *@Description: TODO kafka的配置类 **/ @Configuration @EnableKafka public class KafkaConfig { private static final String kafkaServer = "kafka-ip:9092";//kafka地址 /** * @Title producerFactory * @Description TODO 生产者工厂类,设置生产者相关配置 * @return org.springframework.kafka.core.ProducerFactory<java.lang.String,java.lang.Object> */ @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);//kafka 地址 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//序列化 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//序列化 props.put(ProducerConfig.ACKS_CONFIG, "all");//确认机制,all是所有副本确认,1是一个副本确认,0是不需要副本确认 props.put(ProducerConfig.BATCH_SIZE_CONFIG, "10");//批量发送大小 props.put(ProducerConfig.LINGER_MS_CONFIG, "1");//批量发送等待时间 和上面的batch-size谁先到先发送 return new DefaultKafkaProducerFactory<>(props); } /** * @Title kafkaTemplate * @Description TODO kafka生产者工具类 * @return org.springframework.kafka.core.KafkaTemplate<java.lang.String,java.lang.Object> */ @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } /** * @Title consumerFactory * @Description TODO 消费者工厂类,配置消费者的一些配置 * @return org.springframework.kafka.core.ConsumerFactory<java.lang.String,java.lang.Object> */ @Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);//每次抓取消息的大小 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//是否自动提交 props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 50 * 1000 * 1000);//请求超时时间 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } /** * @Title kafkaListenerContainerFactory * @Description TODO 监听容器的工厂类,创建监听容器时使用 * @return org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<java.lang.String,java.lang.Object> */ @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } /** * @Title adminClient * @Description TODO kafka客户端 * @return org.apache.kafka.clients.admin.AdminClient */ @Bean public AdminClient adminClient() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); AdminClient adminClient = AdminClient.create(props); return adminClient; } }
import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.Collections; import java.util.Set; /** * @ClassName: KafkaUtil * @Description: TODO 用于创建kafka Topic队列和listener监听容器的工具类 **/ @Component @Slf4j public class KafkaUtil { private static AdminClient adminClient; private static KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; private static KafkaTemplate kafkaTemplate; /** * @Title KafkaUtil * @Description 构造函数注入 * @param adminClient kafka客户端对象 * @param kafkaListenerEndpointRegistry kafka监听容器注册对象 * @param kafkaListenerEndpointRegistry kafka生产者工具类 * @return */ @Autowired public KafkaUtil(AdminClient adminClient, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry, KafkaTemplate kafkaTemplate) { KafkaUtil.adminClient = adminClient; KafkaUtil.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry; KafkaUtil.kafkaTemplate = kafkaTemplate; } //region topic相关方法 /** * @Title createTopic * @Description 创建kafka topic * @param topicName topic名 * @param partitions 分区数 * @param replicas 副本数(short) * @return void */ public static void createTopic(String topicName, int partitions, short replicas) throws Exception { NewTopic newTopic = new NewTopic(topicName, partitions, replicas); CreateTopicsResult topics = adminClient.createTopics(Collections.singleton(newTopic)); topics.all().get(); log.info("【{}】topic创建成功", topicName); } /** * @Title deleteTopic * @Description 删除topic * @param topicName topic名称 * @return void */ public static void deleteTopic(String topicName) throws Exception { DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topicName)); deleteTopicsResult.all().get(); log.info("【{}】topic删除成功", topicName); } /** * @Title updateTopicRetention * @Description 修改topic的过期时间 * @param topicName topic名称 * @param ms 过期时间(毫秒值) * @return void */ public static void updateTopicRetention(String topicName, String ms) throws Exception { ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); ConfigEntry configEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, ms); Config config = new Config(Collections.singleton(configEntry)); // 创建AlterConfigsOptions AlterConfigsOptions alterConfigsOptions = new AlterConfigsOptions().timeoutMs(10000); // 执行修改操作 adminClient.alterConfigs(Collections.singletonMap(resource, config), alterConfigsOptions).all().get(); log.info("【{}】topic过期时间设置完成,过期时间为:{}毫秒", topicName, ms); } /** * @Title listTopic * @Description 获取topic列表 * @return java.util.Set<java.lang.String> */ public static Set<String> listTopic() throws Exception { ListTopicsResult listTopicsResult = adminClient.listTopics(); Set<String> strings = listTopicsResult.names().get(); return strings; } /** * @Title existTopic * @Description topic是否存在 * @param topicName topic名称 * @return boolean */ public static boolean existTopic(String topicName) throws Exception { Set<String> strings = listTopic(); if (strings == null || strings.isEmpty()) { return false; } return strings.contains(topicName); } //endregion //region 生产者发送消息示例 /** * @Title sendMsg * @Description 通过注册信息找到对应的容器并启动 * @param topic 队列名称 * @param msg 消息 * @return void */ public static void sendMsg(String topic, Object msg) throws Exception { kafkaTemplate.send(topic, msg); //kafkaTemplate.send(topic,2,"key",msg);//带有分区和key值的 } //endregion //region 消费者监听容器相关方法 /** * @Title existListenerContainer * @Description TODO 根据ID查询容器是否存在 * @param id 监听容器id * @return boolean */ public static boolean existListenerContainer(String id) throws Exception { Set<String> listenerIds = kafkaListenerEndpointRegistry.getListenerContainerIds(); return listenerIds.contains(id); } /** * @Title registerListener * @Description TODO 创建kafka监听容器并注册到注册信息中,一次可以注册多个topic的监听容器 * @param id 容器id,自定义 * @param consumerGroupId 消费者组id自定义 * @param processBean 处理消息的类 * @param processMethod 处理消息的方法 * @param topics 需要监听的topic数组 * @return void */ public static void registerListenerContainer(String id, String consumerGroupId, Object processBean, Method processMethod, String... topics) throws Exception { //判断id是否存在 if (existListenerContainer(id)) { //如果当前id的容器已存在,不添加 log.info("当前id为{}的容器已存在,不进行添加操作!", id); return; } //判断所有队列是否存在 for (String topic : topics) { if (!existTopic(topic)) { //如果存在topic不存在,不添加 log.info("【{}】topic不存在,不进行添加操作!", topic); return; } } MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>(); //设置监听器端点相关信息 //设置Id endpoint.setId(id); //设置消费者组 endpoint.setGroupId(consumerGroupId); //设置要监听的topic数组,可以是多个 endpoint.setTopics(topics); //设置每个监听器线程数 endpoint.setConcurrency(3); //设置批量监听 endpoint.setBatchListener(true); //设置消息处理工厂类,这里用的是默认工厂 endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); //设置实际处理的Bean对象,即实际的对象,比如new Class(); endpoint.setBean(processBean); //设置实际处理的方法(包含方法名和参数) endpoint.setMethod(processMethod); //注册Container并启动,startImmediately表示立马启动 kafkaListenerEndpointRegistry.registerListenerContainer(endpoint, SpringUtil.getBean(KafkaListenerContainerFactory.class), true); log.info("Kafka监听容器操作:ID为{}的容器已【注册】,监听的topics:{}", id, topics); // for (String topicName : topics) { // if (!KafkaConfig.notExistTopicCreateContainerFlag && !nameTopics.contains(topicName)) { // log.info("【{}】topic不存在,不创建容器!", topicName); // continue; // } // //创建一个kafka监听器端点对象 // MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>(); // //设置监听器端点相关信息 // //设置Id // endpoint.setId(topicName); // //设置消费者组 // endpoint.setGroupId(topicName + "_consumer_group"); // //设置主题 // endpoint.setTopics(topicName); // //设置每个监听器线程数 // endpoint.setConcurrency(3); // //设置批量监听 // endpoint.setBatchListener(true); // //设置默认处理工厂 // endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); // //设置实际处理的Bean对象 // endpoint.setBean(new ConsumerController()); // //设置实际处理的方法名和参数类型 // endpoint.setMethod(ConsumerController.class.getMethod("consumeMessage", String.class)); // //注册Container并启动 // kafkaListenerEndpointRegistry.registerListenerContainer(endpoint, SpringUtil.getBean(KafkaListenerContainerFactory.class), true); // log.info("Kafka监听容器操作:ID为{}的容器已【注册】", topicName); // } } /** * @Title startListenerContainer * @Description 根据id开启监听容器的运行状态 * @param id 监听容器的id * @return void */ public static void startListenerContainer(String id) throws Exception { MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id); if (listenerContainer == null) { log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id); return; } listenerContainer.start(); log.info("Kafka监听容器操作:ID为{}的容器已【开启】", id); } /** * @Title stopListenerContainer * @Description TODO 根据id停止监听容器的运行状态 * @param id 监听容器的id * @return void */ public static void stopListenerContainer(String id) throws Exception { MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id); if (listenerContainer == null) { log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id); return; } listenerContainer.stop(); log.info("Kafka监听容器操作:ID为{}的容器已【停止】", id); } /** * @Title pauseListenerContainer * @Description TODO 根据id暂停监听容器的监听状态 * @param id 监听容器的id * @return void */ public static void pauseListenerContainer(String id) throws Exception { MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id); if (listenerContainer == null) { log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id); return; } listenerContainer.pause(); log.info("Kafka监听容器操作:ID为{}的容器已【暂停】", id); } /** * @Title resumeListenerContainer * @Description TODO 根据id恢复监听容器的监听状态 * @param id 监听容器的id * @return void */ public static void resumeListenerContainer(String id) throws Exception { MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id); if (listenerContainer == null) { log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id); return; } listenerContainer.resume(); log.info("Kafka监听容器操作:ID为{}的容器已【恢复】", id); } /** * @Title isNormalStateListenerContainer * @Description 是否是正常状态的容器 * (kafka监听容器的运行状态标志是running,监听状态标志是pauseRequested,停止是关闭了资源,暂停是停止消费) * 只有running是true,并且pauseRequested是false,监听容器才能正常消费消息 * @param id 监听容器的id * @return boolean */ public static boolean isNormalStateListenerContainer(String id) throws Exception { MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id); //如果不存在此id容器,则返回false if (listenerContainer == null) { return false; } //存在则返回容器的运行状态和非暂停状态 return listenerContainer.isRunning() && !listenerContainer.isPauseRequested(); } /** * @Title getPauseStateListenerContainer * @Description 获取监听容器的暂停状态(监听的状态) * @param id 监听容器id * @return boolean */ public static boolean getPauseStateListenerContainer(String id) throws Exception { MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id); if (listenerContainer == null) { return true; } return listenerContainer.isPauseRequested(); } /** * @Title getRunningStateListenerContainer * @Description 获取监听容器的运行状态(容器的状态) * @param id 监听容器id * @return boolean */ public static boolean getRunningStateListenerContainer(String id) throws Exception { MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id); if (listenerContainer == null) { return false; } return listenerContainer.isRunning(); } /** * @Title setStateNormalListenerContainer * @Description 使容器的运行状态和监听状态都是正常 * @param id 监听容器的id * @return boolean 正常返回true,非正常返回false */ public static boolean setStateNormalListenerContainer(String id) throws Exception { if (!existListenerContainer(id)) { log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id); return false; } //先判断容器运行状态是否正常,如果不正常,则开启 if (!getRunningStateListenerContainer(id)) { startListenerContainer(id); } //再判断容器监听状态是否正常,如果不正常,则恢复 if (getPauseStateListenerContainer(id)) { resumeListenerContainer(id); } //设置完后,再查询状态并返回。 return isNormalStateListenerContainer(id); } //endregion }
/** *@ClassName KakfaTest *@Description: TODO *@Date 2024/5/16 15:21 **/ @RestController public class KakfaTest { @RequestMapping("kafkatest") public void test() { try { String topicName = "kafka-test-1"; KafkaUtil.createTopic(topicName, 1, (short) 1);//创建topic KafkaUtil.updateTopicRetention(topicName, String.valueOf(1000000));//更新topic的过期时间 Set<String> strings = KafkaUtil.listTopic();//查出所有topic System.out.println("所有topic:" + strings); boolean b = KafkaUtil.existTopic(topicName);//查询topic是否存在 System.out.println("topic-是否存在:" + b); String listenerID = "kafka-test-listener-1"; //创建监听容器 KafkaUtil.registerListenerContainer(listenerID, "test-consumer-group", new KakfaTest(), KakfaTest.class.getDeclaredMethod("consumerMessage", List.class), topicName); boolean b1 = KafkaUtil.existListenerContainer(listenerID);//查询监听容器是否存在 System.out.println("容器-是否存在:" + b1); boolean normalStateListenerContainer = KafkaUtil.isNormalStateListenerContainer(listenerID);//查询监听容器是否为正常状态 System.out.println("容器-状态:" + normalStateListenerContainer); KafkaUtil.pauseListenerContainer(listenerID);//暂停监听容器的监听状态 boolean pauseStateListenerContainer = KafkaUtil.getPauseStateListenerContainer(listenerID);//查询监听容器的监听状态 System.out.println("容器-监听状态:" + !pauseStateListenerContainer); KafkaUtil.stopListenerContainer(listenerID);//暂停监听容器的监听状态 boolean runningStateListenerContainer = KafkaUtil.getRunningStateListenerContainer(listenerID);//查询监听容器的监听状态 System.out.println("容器-运行状态:" + runningStateListenerContainer); boolean normalStateListenerContainer2 = KafkaUtil.isNormalStateListenerContainer(listenerID);//查询监听容器是否为正常状态 System.out.println("容器-状态:" + normalStateListenerContainer2); boolean b2 = KafkaUtil.setStateNormalListenerContainer(listenerID);//设置监听容器为正常状态 boolean normalStateListenerContainer3 = KafkaUtil.isNormalStateListenerContainer(listenerID);//查询监听容器是否为正常状态 System.out.println("容器-状态:" + normalStateListenerContainer3); } catch (Exception e) { throw new RuntimeException(e); } } @RequestMapping("del") public void deleteTopic() throws Exception { String topicName = "kafka-test-1"; KafkaUtil.deleteTopic(topicName);//删除topic } @RequestMapping("send") public void sendMsg() throws Exception { String topicName = "kafka-test-1"; KafkaUtil.sendMsg(topicName, "haha"); boolean b = KafkaUtil.existTopic(topicName);//查询topic是否存在 System.out.println("topic-是否存在:" + b); } /** * @Title consumerMessage * @Description TODO 消费监听处理消息的方法 * @param message 接受来自kakfa的参数 * @param ack 消息确认参数 * @return void */ public void consumerMessage(List<ConsumerRecord<String, Object>> message, Acknowledgment ack) { System.out.println("收到消息:" + message); //消息确认 ack.acknowledge(); }
① 调用测试(/kafkatest)接口:
② 调用发送消息(/send)接口,消费端日志:
③ 调用删除(/del)接口:
kafka动态的topic创建和设置主要是通过客户端adminClient来操作,而监听容器则是通过监听注册的kafkaListenerEndpointRegistry来进行创建和管理。
通过这个工具类我们可以快速且动态的创建topic、监听容器和定义监听消息的处理方法,非常nice。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。