赞
踩
- package com.ljf.spring.boot.demo.consumer;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Properties;
-
- /**
- * @ClassName: ConsumerTopicDemo
- * @Description: TODO
- * @Author: liujianfu
- * @Date: 2022/04/10 14:02:05
- * @Version: V1.0
- **/
- public class ConsumerTopicDemo {
- public static void main(String[] args) {
- // 1.创建消费者的配置对象
- Properties properties = new Properties();
- // 2.给消费者配置对象添加参数
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
- // 配置序列化 必须
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
-
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- // 配置消费者组(组名任意起名) 必须
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "beijing");
- // 创建消费者对象
- KafkaConsumer<String, String> kafkaConsumer = new
- KafkaConsumer<String, String>(properties);
- // 注册要消费的主题(可以消费多个主题)
- ArrayList<String> topics = new ArrayList<>();
- topics.add("kafka-ljf");
- kafkaConsumer.subscribe(topics);
- // 拉取数据打印
- while (true) {
- // 设置 1s 中消费一批数据
- ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
- // 打印消费到的数据
- for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
- System.out.println(consumerRecord);
- }
- }
- }
-
- }
3.执行生产者产生数据
4.消费数据,观察
需求:创建一个独立消费者,消费 kafka-ljf主题 0 号分区的数据。
2.代码
- package com.ljf.spring.boot.demo.consumer;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Properties;
-
- /**
- * @ClassName: ConsumerPartitionDemo
- * @Description: TODO
- * @Author: liujianfu
- * @Date: 2022/04/10 14:55:31
- * @Version: V1.0
- **/
- public class ConsumerPartitionDemo {
- public static void main(String[] args) {
- // 1.创建消费者的配置对象
- Properties properties = new Properties();
- // 2.给消费者配置对象添加参数
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
- // 配置序列化 必须
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
-
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- // 配置消费者组(必须),名字可以任意起
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,"beijing");
- KafkaConsumer<String, String> kafkaConsumer = new
- KafkaConsumer<>(properties);
- // 消费某个主题的某个分区数据,0号分区
- ArrayList<TopicPartition> topicPartitions = new
- ArrayList<>();
- topicPartitions.add(new TopicPartition("kafka-ljf", 0));
- kafkaConsumer.assign(topicPartitions);
- while (true){
- ConsumerRecords<String, String> consumerRecords =
- kafkaConsumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord<String, String> consumerRecord :
- consumerRecords) {
- System.out.println(consumerRecord);
- }
- }
- }
- }
3.生产者生产数据
4.消费者消费
可见只消费了0号分区上的数据
1.consumer代码复制一份,变为两个消费者
2. 消费者2:
3.生产者:
4.查看消费者消费信息
5.查看消费者2消费信息
结论:即可看到两个消费者在消费不同 分区的数据。消费者一消费分区1的数据,消费者2消费分区2的数据。
(4)任意指定 offset 位移开始消费
代码:
具体代码:
- package com.ljf.spring.boot.demo.consumer;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.HashSet;
- import java.util.Properties;
- import java.util.Set;
-
- /**
- * @ClassName: ConsumerSeekDemo
- * @Description: TODO
- * @Author: liujianfu
- * @Date: 2022/04/10 16:08:01
- * @Version: V1.0
- **/
- public class ConsumerSeekDemo {
- public static void main(String[] args) {
- // 0 配置信息
- Properties properties = new Properties();
- // 连接
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
- // key value 反序列化
-
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
-
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "beijing");
- // 1 创建一个消费者
- KafkaConsumer<String, String> kafkaConsumer = new
- KafkaConsumer<>(properties);
- // 2 订阅一个主题
- ArrayList<String> topics = new ArrayList<>();
- topics.add("kafka-ljf");
- kafkaConsumer.subscribe(topics);
- Set<TopicPartition> assignment= new HashSet<>();
- while (assignment.size() == 0) {
- kafkaConsumer.poll(Duration.ofSeconds(1));
- // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
- assignment = kafkaConsumer.assignment();
- }
- // 遍历所有分区,并指定 offset 从 10 的位置开始消费
- for (TopicPartition tp: assignment) {
- kafkaConsumer.seek(tp, 10);
- }
- // 3 消费该主题数据
- while (true) {
- ConsumerRecords<String, String> consumerRecords =
- kafkaConsumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
- System.out.println(consumerRecord);
- }
- }
- }
-
- }
结果:
可以看到都是从0,1分区中,offset为10的位置开始查询的。
auto.offset.reset = earliest | latest | none 其中默认是 latest。本案例设置为earliest。
注意:每次执行完,需要修改消费者组名;每次执行要起一个不同的消费组的名字
代码
- package com.ljf.spring.boot.demo.consumer;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Properties;
-
- /**
- * @ClassName: ConsumerDefineOffset
- * @Description: TODO
- * @Author: liujianfu
- * @Date: 2022/04/10 16:30:01
- * @Version: V1.0
- **/
- public class ConsumerDefineOffset {
- public static void main(String[] args) {
- // 1.创建消费者的配置对象
- Properties properties = new Properties();
- // 2.给消费者配置对象添加参数
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
- // 配置序列化 必须
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
-
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- //设置读取的offset的位置
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
- //properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- //properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
- // 配置消费者组(必须),名字可以任意起
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,"shanghai");//注意:每次执行完,需要修改消费者组名;
- KafkaConsumer<String, String> kafkaConsumer = new
- KafkaConsumer<>(properties);
- // 消费某个主题的某个分区数据,0号分区
- ArrayList<TopicPartition> topicPartitions = new
- ArrayList<>();
- topicPartitions.add(new TopicPartition("kafka-ljf", 0));
- kafkaConsumer.assign(topicPartitions);
- while (true){
- ConsumerRecords<String, String> consumerRecords =
- kafkaConsumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord<String, String> consumerRecord :
- consumerRecords) {
- System.out.println(consumerRecord);
- }
- }
- }
- }
3.执行结果
- package com.ljf.spring.boot.demo.consumer;
-
- import org.apache.kafka.clients.consumer.*;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.*;
-
- /**
- * @ClassName: ConsumerRangeTime
- * @Description: TODO
- * @Author: liujianfu
- * @Date: 2022/04/10 16:53:36
- * @Version: V1.0
- **/
- public class ConsumerRangeTime {
- public static void main(String[] args) {
- Properties properties = new Properties();
- // 连接
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
- // key value 反序列化
-
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
-
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-time");
- // 1 创建一个消费者
- KafkaConsumer<String, String> kafkaConsumer = new
- KafkaConsumer<>(properties);
- // 2 订阅一个主题
- ArrayList<String> topics = new ArrayList<>();
- topics.add("kafka-ljf");
- kafkaConsumer.subscribe(topics);
- Set<TopicPartition> assignment = new HashSet<>();
- while (assignment.size() == 0) {
- kafkaConsumer.poll(Duration.ofSeconds(1));
- // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
- assignment = kafkaConsumer.assignment();
- }
- HashMap<TopicPartition, Long> timestampToSearch = new
- HashMap<>();
- // 封装集合存储,每个分区对应一天前的数据
- for (TopicPartition topicPartition : assignment) {
- timestampToSearch.put(topicPartition, System.currentTimeMillis() - 5 * 24 * 3600 * 1000);
- }
- // 获取从 1 天前开始消费的每个分区的 offset
- Map<TopicPartition, OffsetAndTimestamp> offsets =
- kafkaConsumer.offsetsForTimes(timestampToSearch);
- // 遍历每个分区,对每个分区设置消费时间。
- for (TopicPartition topicPartition : assignment) {
- OffsetAndTimestamp offsetAndTimestamp =
- offsets.get(topicPartition);
- // 根据时间指定开始消费的位置
- if (offsetAndTimestamp != null){
- kafkaConsumer.seek(topicPartition,
- offsetAndTimestamp.offset());
- }
- }
- // 3 消费该主题数据
- while (true) {
- ConsumerRecords<String, String> consumerRecords =
- kafkaConsumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord<String, String> consumerRecord :
- consumerRecords) {
- System.out.println(consumerRecord);
- }
- }
- }
-
- }
结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。