当前位置:   article > 正文

springboot的kafka动态工具类(动态创建topic、监听和监听方法)_kafka动态监听topic

kafka动态监听topic

springboot的kafka动态工具类(动态创建topic、监听和监听方法)

一、使用场景

需要动态创建topic,然后动态创建该topic的监听容器,同时可以指定该监听容器的处理方法,避免增删监听topic时需要重启操作等情况。

很多情况下,使用kafka一般都会主动创建好队列(Topic)和消费者监听(@KafkaListener),特别是监听者,一般都是动态创建好后,然后使用@KafkaListener指定Topic后创建。

上述情况的优点在于:可以明确topic和消费者,启动时程序主动就创建好对应topic的消费容器和消费方法,直接消费即可。

缺点:如果需要监听新的topic,则需要添加@KafkaListener的配置并且重新启动项目,对于灵活性要求高或者线上的程序是比较麻烦的。

二、工具类概述

所以基于上述情况,为了更加灵活的创建和使用Kafka的topic和listener,专门写了一个kafka相关的工具类:

topic相关的包含了:topic创建、删除、列表、是否存在等方法。

Listener相关包含了:容器的创建、启动、停止、暂停、恢复等方法。

这里有个概念需要先了解下,监听容器里有两个状态,可以简单理解为:一个是容器的运行状态running,一个是容器的监听状态pauseRequest,再容器运行状态开启的基础上,监听状态开启,才能够正常消费消息。

三、代码展示

  1. 那么老规矩,万事先依赖:
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId
</dependency>
  • 1
  • 2
  • 3
  • 4
  1. 然后是配置文件类KafkaConfig,设置kafka相关配置
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 *@ClassName KafkaConfig
 *@Description: TODO kafka的配置类
 **/

@Configuration
@EnableKafka
public class KafkaConfig {

    private static final String kafkaServer = "kafka-ip:9092";//kafka地址


    /**
     * @Title producerFactory
     * @Description TODO 生产者工厂类,设置生产者相关配置
     * @return org.springframework.kafka.core.ProducerFactory<java.lang.String,java.lang.Object>
     */
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);//kafka 地址
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//序列化
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//序列化
        props.put(ProducerConfig.ACKS_CONFIG, "all");//确认机制,all是所有副本确认,1是一个副本确认,0是不需要副本确认
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, "10");//批量发送大小
        props.put(ProducerConfig.LINGER_MS_CONFIG, "1");//批量发送等待时间  和上面的batch-size谁先到先发送
        return new DefaultKafkaProducerFactory<>(props);
    }

    /**
     * @Title kafkaTemplate
     * @Description TODO kafka生产者工具类
     * @return org.springframework.kafka.core.KafkaTemplate<java.lang.String,java.lang.Object>
     */
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }



    /**
     * @Title consumerFactory
     * @Description TODO 消费者工厂类,配置消费者的一些配置
     * @return org.springframework.kafka.core.ConsumerFactory<java.lang.String,java.lang.Object>
     */
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);//每次抓取消息的大小
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//是否自动提交
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 50 * 1000 * 1000);//请求超时时间
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);

    }

    /**
     * @Title kafkaListenerContainerFactory
     * @Description TODO 监听容器的工厂类,创建监听容器时使用
     * @return org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<java.lang.String,java.lang.Object>
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    /**
     * @Title adminClient
     * @Description TODO kafka客户端
     * @return org.apache.kafka.clients.admin.AdminClient
     */
    @Bean
    public AdminClient adminClient() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        AdminClient adminClient = AdminClient.create(props);
        return adminClient;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  1. kafka的相关配置设置好了,就可以使用工具类KafkaUtil了
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Set;

/**
 * @ClassName: KafkaUtil
 * @Description: TODO 用于创建kafka Topic队列和listener监听容器的工具类
 **/

@Component
@Slf4j
public class KafkaUtil {

    private static AdminClient adminClient;

    private static KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    private static KafkaTemplate kafkaTemplate;

    /**
     * @Title KafkaUtil
     * @Description 构造函数注入
     * @param adminClient kafka客户端对象
     * @param kafkaListenerEndpointRegistry kafka监听容器注册对象
     * @param kafkaListenerEndpointRegistry kafka生产者工具类
     * @return
     */
    @Autowired
    public KafkaUtil(AdminClient adminClient, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry, KafkaTemplate kafkaTemplate) {
        KafkaUtil.adminClient = adminClient;
        KafkaUtil.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
        KafkaUtil.kafkaTemplate = kafkaTemplate;
    }

    //region topic相关方法


    /**
     * @Title createTopic
     * @Description 创建kafka topic
     * @param topicName topic名
     * @param partitions 分区数
     * @param replicas 副本数(short)
     * @return void
     */
    public static void createTopic(String topicName, int partitions, short replicas) throws Exception {
        NewTopic newTopic = new NewTopic(topicName, partitions, replicas);
        CreateTopicsResult topics = adminClient.createTopics(Collections.singleton(newTopic));
        topics.all().get();
        log.info("【{}】topic创建成功", topicName);
    }

    /**
     * @Title deleteTopic
     * @Description 删除topic
     * @param topicName  topic名称
     * @return void
     */
    public static void deleteTopic(String topicName) throws Exception {
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topicName));
        deleteTopicsResult.all().get();
        log.info("【{}】topic删除成功", topicName);

    }

    /**
     * @Title updateTopicRetention
     * @Description 修改topic的过期时间
     * @param topicName  topic名称
     * @param ms  过期时间(毫秒值)
     * @return void
     */
    public static void updateTopicRetention(String topicName, String ms) throws Exception {
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
        ConfigEntry configEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, ms);
        Config config = new Config(Collections.singleton(configEntry));
        // 创建AlterConfigsOptions
        AlterConfigsOptions alterConfigsOptions = new AlterConfigsOptions().timeoutMs(10000);
        // 执行修改操作
        adminClient.alterConfigs(Collections.singletonMap(resource, config), alterConfigsOptions).all().get();
        log.info("【{}】topic过期时间设置完成,过期时间为:{}毫秒", topicName, ms);
    }


    /**
     * @Title listTopic
     * @Description 获取topic列表
     * @return java.util.Set<java.lang.String>
     */
    public static Set<String> listTopic() throws Exception {
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Set<String> strings = listTopicsResult.names().get();
        return strings;
    }


    /**
     * @Title existTopic
     * @Description topic是否存在
     * @param topicName topic名称
     * @return boolean
     */
    public static boolean existTopic(String topicName) throws Exception {
        Set<String> strings = listTopic();
        if (strings == null || strings.isEmpty()) {
            return false;
        }
        return strings.contains(topicName);
    }


    //endregion

    //region 生产者发送消息示例

    /**
     * @Title sendMsg
     * @Description 通过注册信息找到对应的容器并启动
     * @param topic 队列名称
     * @param msg 消息
     * @return void
     */
    public static void sendMsg(String topic, Object msg) throws Exception {
        kafkaTemplate.send(topic, msg);
        //kafkaTemplate.send(topic,2,"key",msg);//带有分区和key值的
    }
    //endregion

    //region 消费者监听容器相关方法


    /**
     * @Title existListenerContainer
     * @Description TODO 根据ID查询容器是否存在
     * @param id 监听容器id
     * @return boolean
     */
    public static boolean existListenerContainer(String id) throws Exception {
        Set<String> listenerIds = kafkaListenerEndpointRegistry.getListenerContainerIds();
        return listenerIds.contains(id);
    }


    /**
     * @Title registerListener
     * @Description TODO  创建kafka监听容器并注册到注册信息中,一次可以注册多个topic的监听容器
     * @param id 容器id,自定义
     * @param consumerGroupId 消费者组id自定义
     * @param processBean 处理消息的类
     * @param processMethod 处理消息的方法
     * @param topics 需要监听的topic数组
     * @return void
     */
    public static void registerListenerContainer(String id, String consumerGroupId, Object processBean, Method processMethod, String... topics) throws Exception {
        //判断id是否存在
        if (existListenerContainer(id)) {
            //如果当前id的容器已存在,不添加
            log.info("当前id为{}的容器已存在,不进行添加操作!", id);
            return;
        }
        //判断所有队列是否存在
        for (String topic : topics) {
            if (!existTopic(topic)) {
                //如果存在topic不存在,不添加
                log.info("【{}】topic不存在,不进行添加操作!", topic);
                return;
            }
        }
        MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
        //设置监听器端点相关信息
        //设置Id
        endpoint.setId(id);
        //设置消费者组
        endpoint.setGroupId(consumerGroupId);
        //设置要监听的topic数组,可以是多个
        endpoint.setTopics(topics);
        //设置每个监听器线程数
        endpoint.setConcurrency(3);
        //设置批量监听
        endpoint.setBatchListener(true);
        //设置消息处理工厂类,这里用的是默认工厂
        endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
        //设置实际处理的Bean对象,即实际的对象,比如new Class();
        endpoint.setBean(processBean);
        //设置实际处理的方法(包含方法名和参数)
        endpoint.setMethod(processMethod);
        //注册Container并启动,startImmediately表示立马启动
        kafkaListenerEndpointRegistry.registerListenerContainer(endpoint, SpringUtil.getBean(KafkaListenerContainerFactory.class), true);
        log.info("Kafka监听容器操作:ID为{}的容器已【注册】,监听的topics:{}", id, topics);


//        for (String topicName : topics) {
//            if (!KafkaConfig.notExistTopicCreateContainerFlag && !nameTopics.contains(topicName)) {
//                log.info("【{}】topic不存在,不创建容器!", topicName);
//                continue;
//            }
//            //创建一个kafka监听器端点对象
//            MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
//            //设置监听器端点相关信息
//            //设置Id
//            endpoint.setId(topicName);
//            //设置消费者组
//            endpoint.setGroupId(topicName + "_consumer_group");
//            //设置主题
//            endpoint.setTopics(topicName);
//            //设置每个监听器线程数
//            endpoint.setConcurrency(3);
//            //设置批量监听
//            endpoint.setBatchListener(true);
//            //设置默认处理工厂
//            endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
//            //设置实际处理的Bean对象
//            endpoint.setBean(new ConsumerController());
//            //设置实际处理的方法名和参数类型
//            endpoint.setMethod(ConsumerController.class.getMethod("consumeMessage", String.class));
//            //注册Container并启动
//            kafkaListenerEndpointRegistry.registerListenerContainer(endpoint, SpringUtil.getBean(KafkaListenerContainerFactory.class), true);
//            log.info("Kafka监听容器操作:ID为{}的容器已【注册】", topicName);
//        }
    }


    /**
     * @Title startListenerContainer
     * @Description 根据id开启监听容器的运行状态
     * @param id 监听容器的id
     * @return void
     */
    public static void startListenerContainer(String id) throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        if (listenerContainer == null) {
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);
            return;
        }
        listenerContainer.start();
        log.info("Kafka监听容器操作:ID为{}的容器已【开启】", id);
    }


    /**
     * @Title stopListenerContainer
     * @Description TODO 根据id停止监听容器的运行状态
     * @param id 监听容器的id
     * @return void
     */
    public static void stopListenerContainer(String id) throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        if (listenerContainer == null) {
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);
            return;
        }
        listenerContainer.stop();
        log.info("Kafka监听容器操作:ID为{}的容器已【停止】", id);
    }


    /**
     * @Title pauseListenerContainer
     * @Description TODO 根据id暂停监听容器的监听状态
     * @param id 监听容器的id
     * @return void
     */
    public static void pauseListenerContainer(String id) throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        if (listenerContainer == null) {
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);
            return;
        }
        listenerContainer.pause();
        log.info("Kafka监听容器操作:ID为{}的容器已【暂停】", id);
    }

    /**
     * @Title resumeListenerContainer
     * @Description TODO  根据id恢复监听容器的监听状态
     * @param id 监听容器的id
     * @return void
     */
    public static void resumeListenerContainer(String id) throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        if (listenerContainer == null) {
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);
            return;
        }
        listenerContainer.resume();
        log.info("Kafka监听容器操作:ID为{}的容器已【恢复】", id);
    }


    /**
     * @Title isNormalStateListenerContainer
     * @Description 是否是正常状态的容器
     * (kafka监听容器的运行状态标志是running,监听状态标志是pauseRequested,停止是关闭了资源,暂停是停止消费)
     *  只有running是true,并且pauseRequested是false,监听容器才能正常消费消息
     * @param id 监听容器的id
     * @return boolean
     */
    public static boolean isNormalStateListenerContainer(String id) throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        //如果不存在此id容器,则返回false
        if (listenerContainer == null) {
            return false;
        }
        //存在则返回容器的运行状态和非暂停状态
        return listenerContainer.isRunning() && !listenerContainer.isPauseRequested();
    }


    /**
     * @Title getPauseStateListenerContainer
     * @Description 获取监听容器的暂停状态(监听的状态)
     * @param id 监听容器id
     * @return boolean
     */
    public static boolean getPauseStateListenerContainer(String id) throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        if (listenerContainer == null) {
            return true;
        }
        return listenerContainer.isPauseRequested();

    }

    /**
     * @Title getRunningStateListenerContainer
     * @Description 获取监听容器的运行状态(容器的状态)
     * @param id 监听容器id
     * @return boolean
     */
    public static boolean getRunningStateListenerContainer(String id) throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        if (listenerContainer == null) {
            return false;
        }
        return listenerContainer.isRunning();
    }

    /**
     * @Title setStateNormalListenerContainer
     * @Description 使容器的运行状态和监听状态都是正常
     * @param id 监听容器的id
     * @return boolean 正常返回true,非正常返回false
     */
    public static boolean setStateNormalListenerContainer(String id) throws Exception {
        if (!existListenerContainer(id)) {
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);
            return false;
        }
        //先判断容器运行状态是否正常,如果不正常,则开启
        if (!getRunningStateListenerContainer(id)) {
            startListenerContainer(id);
        }
        //再判断容器监听状态是否正常,如果不正常,则恢复
        if (getPauseStateListenerContainer(id)) {
            resumeListenerContainer(id);
        }
        //设置完后,再查询状态并返回。
        return isNormalStateListenerContainer(id);
    }

    //endregion

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372
  • 373
  • 374
  • 375
  • 376
  1. 然后编写KakfaTest测试类开始测试
/**
 *@ClassName KakfaTest
 *@Description: TODO
 *@Date 2024/5/16 15:21
 **/

@RestController
public class KakfaTest {


    @RequestMapping("kafkatest")
    public void test() {

        try {
            String topicName = "kafka-test-1";
            KafkaUtil.createTopic(topicName, 1, (short) 1);//创建topic
            KafkaUtil.updateTopicRetention(topicName, String.valueOf(1000000));//更新topic的过期时间

            Set<String> strings = KafkaUtil.listTopic();//查出所有topic
            System.out.println("所有topic:" + strings);

            boolean b = KafkaUtil.existTopic(topicName);//查询topic是否存在
            System.out.println("topic-是否存在:" + b);


            String listenerID = "kafka-test-listener-1";

            //创建监听容器
            KafkaUtil.registerListenerContainer(listenerID, "test-consumer-group", new KakfaTest(), KakfaTest.class.getDeclaredMethod("consumerMessage", List.class), topicName);

            boolean b1 = KafkaUtil.existListenerContainer(listenerID);//查询监听容器是否存在
            System.out.println("容器-是否存在:" + b1);

            boolean normalStateListenerContainer = KafkaUtil.isNormalStateListenerContainer(listenerID);//查询监听容器是否为正常状态
            System.out.println("容器-状态:" + normalStateListenerContainer);

            KafkaUtil.pauseListenerContainer(listenerID);//暂停监听容器的监听状态
            boolean pauseStateListenerContainer = KafkaUtil.getPauseStateListenerContainer(listenerID);//查询监听容器的监听状态
            System.out.println("容器-监听状态:" + !pauseStateListenerContainer);


            KafkaUtil.stopListenerContainer(listenerID);//暂停监听容器的监听状态
            boolean runningStateListenerContainer = KafkaUtil.getRunningStateListenerContainer(listenerID);//查询监听容器的监听状态
            System.out.println("容器-运行状态:" + runningStateListenerContainer);


            boolean normalStateListenerContainer2 = KafkaUtil.isNormalStateListenerContainer(listenerID);//查询监听容器是否为正常状态
            System.out.println("容器-状态:" + normalStateListenerContainer2);


            boolean b2 = KafkaUtil.setStateNormalListenerContainer(listenerID);//设置监听容器为正常状态
            boolean normalStateListenerContainer3 = KafkaUtil.isNormalStateListenerContainer(listenerID);//查询监听容器是否为正常状态
            System.out.println("容器-状态:" + normalStateListenerContainer3);


        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    @RequestMapping("del")
    public void deleteTopic() throws Exception {
        String topicName = "kafka-test-1";
        KafkaUtil.deleteTopic(topicName);//删除topic
    }


    @RequestMapping("send")
    public void sendMsg() throws Exception {
        String topicName = "kafka-test-1";
        KafkaUtil.sendMsg(topicName, "haha");
        boolean b = KafkaUtil.existTopic(topicName);//查询topic是否存在
        System.out.println("topic-是否存在:" + b);
    }


    /**
     * @Title consumerMessage
     * @Description TODO 消费监听处理消息的方法
     * @param message 接受来自kakfa的参数
     * @param ack 消息确认参数
     * @return void
     */
    public void consumerMessage(List<ConsumerRecord<String, Object>> message, Acknowledgment ack) {
        System.out.println("收到消息:" + message);
        //消息确认
        ack.acknowledge();

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  1. 测试日志查看:

① 调用测试(/kafkatest)接口:
在这里插入图片描述

② 调用发送消息(/send)接口,消费端日志:

在这里插入图片描述

③ 调用删除(/del)接口:
在这里插入图片描述

四、总结:

kafka动态的topic创建和设置主要是通过客户端adminClient来操作,而监听容器则是通过监听注册的kafkaListenerEndpointRegistry来进行创建和管理。
通过这个工具类我们可以快速且动态的创建topic、监听容器和定义监听消息的处理方法,非常nice。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/846940
推荐阅读
相关标签