赞
踩
写在前面:各位看到此博客的小伙伴,如有不对的地方请及时通过私信我或者评论此博客的方式指出,以免误人子弟。多谢!
默认情况下,当项目启动时,监听器就开始工作(监听消费发送到指定 topic 的消息)。如果我们想让监听器在程序运行的过程中能够动态地开启、关闭监听器,可以借助 KafkaListenerEndpointRegistry 实现,只需要定义两个 controller 接口分别通过 KafkaListenerEndpointRegistry 来控制监听器的开启、关闭即可,以下记录下测试代码:
首先改下消费者监听,消费者这边代码没有什么特别的,主要是设置了个消费者 ID(监听器 ID),开启、关闭监听时根据这个ID进行,代码如下:
- @KafkaListener(topics = {"mytest3"},groupId = "test-consumer-group",
- containerFactory = "batchFactory",id = "myListener1")
- public void test(List<String> message){
- System.out.println("接收到的消息:" + message);
-
- }
然后新增两个接口控制监听器的开启和关闭,代码如下:
- @Autowired
- private KafkaListenerEndpointRegistry registry;
-
- /**
- * 开启监听
- */
- @GetMapping("/start")
- public void start() {
- // 判断监听容器是否启动,未启动则将其启动
- if (!registry.getListenerContainer("myListener1").isRunning()) {
- registry.getListenerContainer("myListener1").start();
- }
- // 将其恢复
- registry.getListenerContainer("myListener1").resume();
- }
-
- /**
- * 关闭监听
- */
- @GetMapping("/stop")
- public void stop() {
- // 暂停监听
- registry.getListenerContainer("myListener1").pause();
- }
注意:
1.KafkaListenerEndpointRegistry 在 SpringIO 中已经被注册为 Bean,直接注入使用即可。
2.还需要注意一下启动监听容器的方法,resume 是恢复的意思不是启动的意思。所以我们需要判断容器是否运行,如果运行则调用 resume 方法,否则调用 start 方法。
测试一下,启动项目访问http://localhost:8080/send15 ,控制台打印如下:
访问http://localhost:8080/stop 关闭监听,再次访问http://localhost:8080/send15 ,控制台不再打印消息,说明已经关闭监听。
访问http://localhost:8080/start 再次开启监听,再次访问http://localhost:8080/send15 ,控制台又打印了接收到的消息。
开始说过,默认情况下,当项目启动时,监听器就开始工作,也可以通过配置在消费者中配置factory.setAutoStartup(false);来禁止消费者监听器自启动,贴一下代码,就不再测试了
- @Bean
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory() {
- ConcurrentKafkaListenerContainerFactory<String, Object> factory =
- new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
- //并发数量
- factory.setConcurrency(concurrency);
- //开启批量监听
- factory.setBatchListener(type);
- // 被过滤的消息将被丢弃
- factory.setAckDiscarded(true);
- // 设置记录筛选策略
- factory.setRecordFilterStrategy(new RecordFilterStrategy() {
- @Override
- public boolean filter(ConsumerRecord consumerRecord) {
- String msg = consumerRecord.value().toString();
- if(Integer.parseInt(msg.substring(msg.length() - 1)) % 2 == 0){
- return false;
- }
- // 返回true消息将会被丢弃
- return true;
- }
- });
- // ack模式
- factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
- // 禁止消费者监听器自启动
- factory.setAutoStartup(false);
- return factory;
- }
补充:以上是通过接口开启、关闭监听器,也可以使用定时自动开启、关闭监听,例如使用@Scheduled注解实现等。
另外消费者也有pause()和resume()相关方法,看下void pause(Collection<TopicPartition> partitions);从方法注释可以知道这个方法可以暂停从请求的某个分区进行抓取数据(暂停消费某个分区的数据),感兴趣的测试下。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。