赞
踩
本文针对 Kafka 的消费者代码,均由 Kafka 原生的客户端 API 构成,而非 spring-kafka 或 SpringCloudStream 等对于原生 API 进行封装的示例。
Kafka 的消费方式,根据不同的需求可以做出多样的选择。
对 Kafka 消费者来说,Subscribe 模式是最简单的方式,往往也是最常用的方式,即仅需要订阅一个主题即可。
- public static void testSubscribe() {
- Properties properties = new Properties();
- // Kafka集群地址
- properties.put("bootstrap.servers", "100.1.4.16:9092,100.1.4.17:9092,100.1.4.18:9092");
- // 消费者组,仅在subscribe模式下生效,用于分区自动再均衡,而assign模式直接指定分区
- properties.put("group.id", "test_group");
- // 反序列化器
- properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
- // 订阅topic
- String topic = "test_topic";
- consumer.subscribe(Pattern.compile(topic));
- while (true) {
- // 每1000ms轮询一次
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
- log.info("本次轮询到:{}条", records.count());
- for (ConsumerRecord<String, String> record : records) {
- log.info("-------消息来了:topic={}, partition={}, offset={}, value={}", record.topic(), record.partition(),
- record.offset(), record.value());
- }
- }
- }
通过 Kafka 原生的消费者 API 来消费数据,主要分为三个步骤:
上面说了,除了 Subscribe 模式,还有 Assign 模式,用来手动指定要消费的消息分区。
- public static void testAssignOffset() {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "100.1.4.16:9092,100.1.4.17:9092,100.1.4.18:9092");
- properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- // 默认每次轮询最多取多少条消息,默认500
- properties.put("max.poll.records", 1);
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
- String topic = "test_topic";
- TopicPartition tp = new TopicPartition(topic, 0);
- // 指定分区
- consumer.assign(Collections.singletonList(tp));
- log.info("本topic下所有的分区:{}", consumer.partitionsFor(topic));
- // 获取消费者被分配到的分区(注意,assign模式会直接返回手动指定的分区,而subscribe模式等到自动分配分区后才有返回)
- log.info("本消费者分配到的分区:{}", consumer.assignment());
- // 为某个指定分区任意位置、起始位置、末尾位置为起始消费位置(offset默认从0开始)
- // 注意若分配的offset<分区最小的offset(可能kafka删除策略影响,比如默认删除超过7d的数据导致最小offset值变化),将从最新offset处监听消费
- // consumer.seek(tp, 5);
- // consumer.seekToBeginning(Arrays.asList(tp));
- // consumer.seekToEnd(Collections.singletonList(tp));
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
- log.info("本次轮询到:{}条", records.count());
- for (ConsumerRecord<String, String> record : records) {
- log.info("-------消息来了:topic={}, partition={}, offset={}, value={}", record.topic(), record.partition(),
- record.offset(), record.value());
- }
- }
- }
Assign 模式下,可通过 seek/seekToBeginning/seekToEnd 等 API 来指定偏移量 offset 开始消费。
在 2.2 上一节的代码基础上,打开 seek/seekToBeginning/seekToEnd 等注释,即可指定偏移量进行消费。
Kafka 不仅支持指定偏移量消费,也支持指定消息的时间戳进行消费。不过根本上也是通过偏移量的消费。
- public static void testAssignTimeStamp() throws ParseException {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "100.1.4.16:9092,100.1.4.17:9092,100.1.4.18:9092");
- properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
- String topic = "test_topic";
- // 设置消费起始时间
- String startTime = "2020-05-19 15:52:41";
- Long startTimestamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(startTime).getTime();
- Map<TopicPartition, Long> timestampMap = new HashMap<>();
- // 获取每一个分区信息
- List<PartitionInfo> partitionInfoLst = consumer.partitionsFor(topic);
- for (PartitionInfo partitionInfo : partitionInfoLst) {
- // 设置每一个分区的起始消费时间为指定时间
- timestampMap.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), startTimestamp);
- }
- // 通过时间戳查找给定分区的偏移量
- Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes(timestampMap);
- // 指定分区
- consumer.assign(offsetMap.keySet());
- // 设置每一个分区的指定时间对应的消费偏移量
- for (TopicPartition topicPartition : offsetMap.keySet()) {
- consumer.seek(topicPartition, offsetMap.get(topicPartition).offset());
- }
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
- log.info("本次轮询到:{}条", records.count());
- for (ConsumerRecord<String, String> record : records) {
- log.info("-------消息来了:topic={}, partition={}, offset={}, value={}", record.topic(), record.partition(),
- record.offset(), record.value());
- }
- }
- }
指定时间戳消费的关键步骤如下:
上面的指定偏移量也好,指定时间戳的消费方式也罢,都是属于 Assign 模式的。那 Subscribe 模式能否也可以指定偏移量消费呢?答案是可以的。
- public static void testSubscribeOffset() {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "100.1.4.16:9092,100.1.4.17:9092,100.1.4.18:9092");
- properties.put("group.id", "test_group");
- properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
- String topic = "test_topic";
- Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
- hashMaps.put(new TopicPartition(topic, 0), new OffsetAndMetadata(5));
- // 手动提交指定offset作为起始消费offset
- consumer.commitSync(hashMaps);
- consumer.subscribe(Pattern.compile(topic));
- while (true) {
- // 每1000ms轮询一次
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
- log.info("本次轮询到:{}条", records.count());
- for (ConsumerRecord<String, String> record : records) {
- log.info("-------消息来了:topic={}, partition={}, offset={}, value={}", record.topic(), record.partition(),
- record.offset(), record.value());
- }
- }
- }
实际上,我们是通过在开启轮询之前,手动提交一次偏移量信息,然后再去轮询消息的方式达到目的。
同样地,若指定分区的偏移量已在分区上不存在(比如受到 Kafka 清除策略的影响),则将从最新 offset 处监听消费。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。