赞
踩
spring boot 已经对kafka 进行了很好的封装集成,只需要找配置文件中配置相应的配置参数,再配合
@KafkaListener 注解即可监听kafka 消息,但如果想动态监听某一类消息而不是固定的某几个topic 呢?
虽然@KafkaListener 提提供了topicPattern 可以实现简单的正则表达式配置, 一方面如果是没有固定规则的topic 不能监听扩展不友好,另一方面一个 KafkaConsumer 监听多个topic ,批量拉取消息时一批
消息存在多个topic消息,不好做消息的ack ,offsets 提交控制
目前的需求需要满足以下几点:
设计方案 通过建立一个 Kafka client ,用于定时任务扫描卡fkaka 注册的所有topic
维护一个客户端线程池 Consumer pool ,以topic ,group 为 标识维护一个 Consumer 或者一组(看业务可以加分区等创建多个) 。如果池已存在那么久不创建消费者监听,不存在则动态创建一个
Consumer topic 为一个异步线程实现
/*** * * @author wangxiaobo * */ @Log4j2 @Data public class KfakaConsumerRunnable extends Thread { private Map<String, Object> consumerConfigs; private String topicName; private String groupId; private ConsumerMessageHelper consumerMessageHelper; @Override public void run() { log.error("注册开始KfakaConfigRunnable{} ", topicName); if(StringUtils.isNotBlank(groupId)) { consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); } KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(consumerConfigs); consumer.subscribe(Collections.singletonList(topicName)); try { while (true) { // 从服务器开始拉取数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1500)); if (records.isEmpty()) { Thread.sleep(5000); continue; } // do bussiness consumerMessageHelper.hbaseCommonWrite(records, topicName); consumer.commitSync(); /* * consumer.commitAync(((offsets, exception) -> { if (exception == null) { * offsets.forEach((topicPartition, metadata) -> { log.info(topicPartition + * " -> offset=" + metadata.offset()); }); } else { * log.info("消息确认错误,重置偏移",exception); // 如果出错了,同步提交位移 * consumer.commitSync(offsets); } })); */ } } catch (Exception e) { log.error("KfakaConfigRunnable 消费异常:{} ", topicName, e); } finally { //异常先关闭 consumer.close(); KfakaConfig.KFAKACONSUMERRUNNABLE_POOL.remove(topicName); } } }
/*** * * @author wangxiaobo * */ @Configuration @Log4j2 public class KfakaConfig { public static Map<String, KfakaConsumerRunnable> KFAKACONSUMERRUNNABLE_POOL = new ConcurrentHashMap<String, KfakaConsumerRunnable>(); @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.profiles.active}") private String active; public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<String, Object>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 100000); propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 110000); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka2"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 5000); // 每个批次获取数 return propsMap; } @Bean public AdminClient getAdminClient() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); AdminClient adminClient = AdminClient.create(properties); return adminClient; } }
@Component public class KafkaAutoTableHandler{ @Autowired AdminClient adminClient; @Autowired KfakaConfig kfakaConfig; @Autowired ConsumerMessageHelper consumerMessageHelper; public void initTopicListener() throws Exception{ ListTopicsResult result = adminClient.listTopics(); KafkaFuture<Set<String>> names = result.names(); for (String topic : names.get()) { if(topic.startsWith(HbaseConfig.PREFIX_TABLE) && !KfakaConfig.KFAKACONSUMERRUNNABLE_POOL.containsKey(topic)) { KfakaConsumerRunnable consumerRunnable = new KfakaConsumerRunnable(); consumerRunnable.setConsumerConfigs(kfakaConfig.consumerConfigs()); consumerRunnable.setTopicName(topic); consumerRunnable.setConsumerMessageHelper(consumerMessageHelper); KfakaConfig.KFAKACONSUMERRUNNABLE_POOL.put(topic,consumerRunnable); consumerRunnable.start(); } } } }
这个根据自己的业务实现编写即可
2021-08-13 08:19:20.188 [Thread-17] INFO com.xiaobo.kafka.ConsumerMessageHelper - topic3,size:2
2021-08-13 08:19:20.191 [Thread-38] INFO com.xiaobo.kafka.ConsumerMessageHelper - topic2,:62
2021-08-13 08:19:20.193 [Thread-125] INFO com.xiaobo.kafka.ConsumerMessageHelper - topic1,size:29
2021-08-13 08:19:20.202 [Thread-125] INFO com.xiaobo.kafka.ConsumerMessageHelper - topic1,size:17
2021-08-13 08:19:20.191 [Thread-38] INFO com.xiaobo.kafka.ConsumerMessageHelper - topic2,:162
2021-08-13 08:19:20.188 [Thread-17] INFO com.xiaobo.kafka.ConsumerMessageHelper - topic3,size:7
2021-08-13 08:19:20.207 [Thread-125] INFO com.xiaobo.kafka.ConsumerMessageHelper - topic1,size:19
2021-08-13 08:19:20.212 [Thread-125] INFO com.xiaobo.kafka.ConsumerMessageHelper - topic1,size:17
2021-08-13 08:19:20.217 [Thread-125] INFO com.xiaobo.kafka.ConsumerMessageHelper - topic1,size:21
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。