当前位置:   article > 正文

kafka 消费者进行消费数据的各种场景的API(你值得一看)_kafkaconsumer 的所有api

kafkaconsumer 的所有api

kafka消费端的参数

 

二 实现案例

2.1 订阅某个主题

创建一个独立消费者,消费 kafka-ljf 主题中数据。
注意: 在消费者 API 代码中必须配置消费者组 id 。命令行启动消费者不填写消费者组
id 会被自动填写随机的消费者组 id
2.消费者代码
  1. package com.ljf.spring.boot.demo.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.common.serialization.StringDeserializer;
  7. import java.time.Duration;
  8. import java.util.ArrayList;
  9. import java.util.Properties;
  10. /**
  11. * @ClassName: ConsumerTopicDemo
  12. * @Description: TODO
  13. * @Author: liujianfu
  14. * @Date: 2022/04/10 14:02:05
  15. * @Version: V1.0
  16. **/
  17. public class ConsumerTopicDemo {
  18. public static void main(String[] args) {
  19. // 1.创建消费者的配置对象
  20. Properties properties = new Properties();
  21. // 2.给消费者配置对象添加参数
  22. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
  23. "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
  24. // 配置序列化 必须
  25. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  26. StringDeserializer.class.getName());
  27. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  28. StringDeserializer.class.getName());
  29. // 配置消费者组(组名任意起名) 必须
  30. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "beijing");
  31. // 创建消费者对象
  32. KafkaConsumer<String, String> kafkaConsumer = new
  33. KafkaConsumer<String, String>(properties);
  34. // 注册要消费的主题(可以消费多个主题)
  35. ArrayList<String> topics = new ArrayList<>();
  36. topics.add("kafka-ljf");
  37. kafkaConsumer.subscribe(topics);
  38. // 拉取数据打印
  39. while (true) {
  40. // 设置 1s 中消费一批数据
  41. ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
  42. // 打印消费到的数据
  43. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  44. System.out.println(consumerRecord);
  45. }
  46. }
  47. }
  48. }

3.执行生产者产生数据

 4.消费数据,观察

2.2 订阅某个主题下的某个分区

需求:创建一个独立消费者,消费 kafka-ljf主题 0 号分区的数据。

2.代码

 

  1. package com.ljf.spring.boot.demo.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.common.TopicPartition;
  7. import org.apache.kafka.common.serialization.StringDeserializer;
  8. import java.time.Duration;
  9. import java.util.ArrayList;
  10. import java.util.Properties;
  11. /**
  12. * @ClassName: ConsumerPartitionDemo
  13. * @Description: TODO
  14. * @Author: liujianfu
  15. * @Date: 2022/04/10 14:55:31
  16. * @Version: V1.0
  17. **/
  18. public class ConsumerPartitionDemo {
  19. public static void main(String[] args) {
  20. // 1.创建消费者的配置对象
  21. Properties properties = new Properties();
  22. // 2.给消费者配置对象添加参数
  23. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
  24. "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
  25. // 配置序列化 必须
  26. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  27. StringDeserializer.class.getName());
  28. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  29. StringDeserializer.class.getName());
  30. // 配置消费者组(必须),名字可以任意起
  31. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"beijing");
  32. KafkaConsumer<String, String> kafkaConsumer = new
  33. KafkaConsumer<>(properties);
  34. // 消费某个主题的某个分区数据,0号分区
  35. ArrayList<TopicPartition> topicPartitions = new
  36. ArrayList<>();
  37. topicPartitions.add(new TopicPartition("kafka-ljf", 0));
  38. kafkaConsumer.assign(topicPartitions);
  39. while (true){
  40. ConsumerRecords<String, String> consumerRecords =
  41. kafkaConsumer.poll(Duration.ofSeconds(1));
  42. for (ConsumerRecord<String, String> consumerRecord :
  43. consumerRecords) {
  44. System.out.println(consumerRecord);
  45. }
  46. }
  47. }
  48. }

 3.生产者生产数据

 4.消费者消费

可见只消费了0号分区上的数据 

2.3  消费者组案例

测试同一个主题的分区数据,只能由一个消费者组中的一个消费。

1.consumer代码复制一份,变为两个消费者

 

2. 消费者2:

 3.生产者:

 

4.查看消费者消费信息

 5.查看消费者2消费信息

 结论:即可看到两个消费者在消费不同 分区的数据。消费者一消费分区1的数据,消费者2消费分区2的数据。

2.4  指定offset消费

auto.offset.reset = earliest | latest | none   其中 默认是 latest
Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量
时(例如该数据已被删除),该怎么办?
1 earliest :自动将偏移量重置为最早的偏移量,相当于   --from-beginning
(2) latest (默认值) :自动将偏移量重置为最新偏移量。
(3) none :如果未找到消费者组的先前偏移量,则向消费者抛出异常。

 (4)任意指定 offset 位移开始消费

代码:

具体代码:

  1. package com.ljf.spring.boot.demo.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.common.TopicPartition;
  7. import org.apache.kafka.common.serialization.StringDeserializer;
  8. import java.time.Duration;
  9. import java.util.ArrayList;
  10. import java.util.HashSet;
  11. import java.util.Properties;
  12. import java.util.Set;
  13. /**
  14. * @ClassName: ConsumerSeekDemo
  15. * @Description: TODO
  16. * @Author: liujianfu
  17. * @Date: 2022/04/10 16:08:01
  18. * @Version: V1.0
  19. **/
  20. public class ConsumerSeekDemo {
  21. public static void main(String[] args) {
  22. // 0 配置信息
  23. Properties properties = new Properties();
  24. // 连接
  25. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
  26. "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
  27. // key value 反序列化
  28. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  29. StringDeserializer.class.getName());
  30. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  31. StringDeserializer.class.getName());
  32. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "beijing");
  33. // 1 创建一个消费者
  34. KafkaConsumer<String, String> kafkaConsumer = new
  35. KafkaConsumer<>(properties);
  36. // 2 订阅一个主题
  37. ArrayList<String> topics = new ArrayList<>();
  38. topics.add("kafka-ljf");
  39. kafkaConsumer.subscribe(topics);
  40. Set<TopicPartition> assignment= new HashSet<>();
  41. while (assignment.size() == 0) {
  42. kafkaConsumer.poll(Duration.ofSeconds(1));
  43. // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
  44. assignment = kafkaConsumer.assignment();
  45. }
  46. // 遍历所有分区,并指定 offset 从 10 的位置开始消费
  47. for (TopicPartition tp: assignment) {
  48. kafkaConsumer.seek(tp, 10);
  49. }
  50. // 3 消费该主题数据
  51. while (true) {
  52. ConsumerRecords<String, String> consumerRecords =
  53. kafkaConsumer.poll(Duration.ofSeconds(1));
  54. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  55. System.out.println(consumerRecord);
  56. }
  57. }
  58. }
  59. }

 结果:

可以看到都是从0,1分区中,offset为10的位置开始查询的。 

注意:每次执行完,需要修改消费者组名;

2.4  指定offset设置为earliest

auto.offset.reset = earliest | latest | none   其中默认是 latest。本案例设置为earliest。

注意:每次执行完,需要修改消费者组名;每次执行要起一个不同的消费组的名字

代码

 

  1. package com.ljf.spring.boot.demo.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.common.TopicPartition;
  7. import org.apache.kafka.common.serialization.StringDeserializer;
  8. import java.time.Duration;
  9. import java.util.ArrayList;
  10. import java.util.Properties;
  11. /**
  12. * @ClassName: ConsumerDefineOffset
  13. * @Description: TODO
  14. * @Author: liujianfu
  15. * @Date: 2022/04/10 16:30:01
  16. * @Version: V1.0
  17. **/
  18. public class ConsumerDefineOffset {
  19. public static void main(String[] args) {
  20. // 1.创建消费者的配置对象
  21. Properties properties = new Properties();
  22. // 2.给消费者配置对象添加参数
  23. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
  24. "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
  25. // 配置序列化 必须
  26. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  27. StringDeserializer.class.getName());
  28. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  29. StringDeserializer.class.getName());
  30. //设置读取的offset的位置
  31. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
  32. //properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  33. //properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
  34. // 配置消费者组(必须),名字可以任意起
  35. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"shanghai");//注意:每次执行完,需要修改消费者组名;
  36. KafkaConsumer<String, String> kafkaConsumer = new
  37. KafkaConsumer<>(properties);
  38. // 消费某个主题的某个分区数据,0号分区
  39. ArrayList<TopicPartition> topicPartitions = new
  40. ArrayList<>();
  41. topicPartitions.add(new TopicPartition("kafka-ljf", 0));
  42. kafkaConsumer.assign(topicPartitions);
  43. while (true){
  44. ConsumerRecords<String, String> consumerRecords =
  45. kafkaConsumer.poll(Duration.ofSeconds(1));
  46. for (ConsumerRecord<String, String> consumerRecord :
  47. consumerRecords) {
  48. System.out.println(consumerRecord);
  49. }
  50. }
  51. }
  52. }

3.执行结果

 2.5  指定时间消费数据

在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。
例如要求按照时间消费前一天的数据,怎么处理?
  1. package com.ljf.spring.boot.demo.consumer;
  2. import org.apache.kafka.clients.consumer.*;
  3. import org.apache.kafka.common.TopicPartition;
  4. import org.apache.kafka.common.serialization.StringDeserializer;
  5. import java.time.Duration;
  6. import java.util.*;
  7. /**
  8. * @ClassName: ConsumerRangeTime
  9. * @Description: TODO
  10. * @Author: liujianfu
  11. * @Date: 2022/04/10 16:53:36
  12. * @Version: V1.0
  13. **/
  14. public class ConsumerRangeTime {
  15. public static void main(String[] args) {
  16. Properties properties = new Properties();
  17. // 连接
  18. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
  19. "192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");
  20. // key value 反序列化
  21. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  22. StringDeserializer.class.getName());
  23. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  24. StringDeserializer.class.getName());
  25. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-time");
  26. // 1 创建一个消费者
  27. KafkaConsumer<String, String> kafkaConsumer = new
  28. KafkaConsumer<>(properties);
  29. // 2 订阅一个主题
  30. ArrayList<String> topics = new ArrayList<>();
  31. topics.add("kafka-ljf");
  32. kafkaConsumer.subscribe(topics);
  33. Set<TopicPartition> assignment = new HashSet<>();
  34. while (assignment.size() == 0) {
  35. kafkaConsumer.poll(Duration.ofSeconds(1));
  36. // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
  37. assignment = kafkaConsumer.assignment();
  38. }
  39. HashMap<TopicPartition, Long> timestampToSearch = new
  40. HashMap<>();
  41. // 封装集合存储,每个分区对应一天前的数据
  42. for (TopicPartition topicPartition : assignment) {
  43. timestampToSearch.put(topicPartition, System.currentTimeMillis() - 5 * 24 * 3600 * 1000);
  44. }
  45. // 获取从 1 天前开始消费的每个分区的 offset
  46. Map<TopicPartition, OffsetAndTimestamp> offsets =
  47. kafkaConsumer.offsetsForTimes(timestampToSearch);
  48. // 遍历每个分区,对每个分区设置消费时间。
  49. for (TopicPartition topicPartition : assignment) {
  50. OffsetAndTimestamp offsetAndTimestamp =
  51. offsets.get(topicPartition);
  52. // 根据时间指定开始消费的位置
  53. if (offsetAndTimestamp != null){
  54. kafkaConsumer.seek(topicPartition,
  55. offsetAndTimestamp.offset());
  56. }
  57. }
  58. // 3 消费该主题数据
  59. while (true) {
  60. ConsumerRecords<String, String> consumerRecords =
  61. kafkaConsumer.poll(Duration.ofSeconds(1));
  62. for (ConsumerRecord<String, String> consumerRecord :
  63. consumerRecords) {
  64. System.out.println(consumerRecord);
  65. }
  66. }
  67. }
  68. }

结果:

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/638944
推荐阅读
相关标签
  

闽ICP备14008679号