当前位置:   article > 正文

实现 Kafka 分区内消费者多线程顺序消费_kafka消费顺序

kafka消费顺序

在1个topic中,有3个partition,那么如何保证数据的顺序消费?

生产者在写的时候,可以指定一个 key,被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。

消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是没有错乱的。

但是消费者里可能会有多个线程来并发处理消息,而多个线程并发处理的话,顺序可能就乱掉了。

解决方案

写 n 个 queue,将具有相同key的数据都存储在同一个 queue,然后对于 n 个线程,每个线程分别消费一个 queue 即可,并手动提交位点。由于 kafka consumer 实例不支持多线程同时提交位点,这里采取全局记数器的方式,在每一批次记录的消费过程中,每消费完一条记录则全局记数器加 1,全局记数器等于这一批记录的总条数时提交位点。

在Java中,可以使用多线程和队列来实现对具有相同 key 的数据进行消费,并通过手动提交位点来保证数据的消费。以下是一个带有手动位点提交的解决方案的示例代码:

  1. import java.util.HashMap;
  2. import java.util.Map;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.LinkedBlockingQueue;
  5. public class DataConsumer {
  6. private Map<String, BlockingQueue<String>> queues;
  7. private Map<String, Integer> offsets;
  8. public DataConsumer(int numThreads) {
  9. queues = new HashMap<>();
  10. offsets = new HashMap<>();
  11. // 创建N个队列和位点
  12. for (int i = 0; i < numThreads; i++) {
  13. BlockingQueue<String> queue = new LinkedBlockingQueue<>();
  14. String key = Integer.toString(i);
  15. queues.put(key, queue);
  16. offsets.put(key, 0);
  17. // 创建并启动消费线程
  18. Thread consumerThread = new Thread(new Consumer(queue, key));
  19. consumerThread.start();
  20. }
  21. }
  22. public void consumeData(String key, String data) {
  23. BlockingQueue<String> queue = queues.get(key);
  24. if (queue != null) {
  25. try {
  26. // 将数据放入对应的队列
  27. queue.put(data);
  28. } catch (InterruptedException e) {
  29. Thread.currentThread().interrupt();
  30. }
  31. }
  32. }
  33. public void commitOffset(String key, int offset) {
  34. offsets.put(key, offset);
  35. System.out.println("Committed offset for key " + key + ": " + offset);
  36. }
  37. private static class Consumer implements Runnable {
  38. private final BlockingQueue<String> queue;
  39. private final String key;
  40. private int offset;
  41. public Consumer(BlockingQueue<String> queue, String key) {
  42. this.queue = queue;
  43. this.key = key;
  44. this.offset = 0;
  45. }
  46. @Override
  47. public void run() {
  48. // 消费队列中的数据
  49. while (!Thread.currentThread().isInterrupted()) {
  50. try {
  51. String data = queue.take();
  52. // 进行消费逻辑
  53. System.out.println("Consumed data: " + data);
  54. offset++;
  55. // 模拟提交位点
  56. if (offset % 10 == 0) {
  57. DataConsumer.getInstance().commitOffset(key, offset);
  58. }
  59. } catch (InterruptedException e) {
  60. Thread.currentThread().interrupt();
  61. }
  62. }
  63. }
  64. }
  65. private static DataConsumer instance;
  66. public static synchronized DataConsumer getInstance() {
  67. if (instance == null) {
  68. instance = new DataConsumer(3);
  69. }
  70. return instance;
  71. }
  72. public static void main(String[] args) {
  73. DataConsumer dataConsumer = DataConsumer.getInstance();
  74. // 模拟产生数据
  75. for (int i = 0; i < 30; i++) {
  76. dataConsumer.consumeData(Integer.toString(i % 3), "Data " + (i + 1));
  77. }
  78. }
  79. }

在以上代码中,DataConsumer 类维护了一个 Map 来存储队列和位点的关系。每个消费者线程都有一个对应的位点来记录消费的进度。

在 commitOffset 方法中,根据 key 提交位点的偏移值。

消费线程在每次成功消费一条数据后,更新位点,并判断是否满足提交位点的条件。这里模拟每消费10条数据提交一次位点。

在 main 方法中,通过 consumeData 方法模拟了产生了30条数据,并将它们放入不同的队列中进行消费。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/419858
推荐阅读
相关标签
  

闽ICP备14008679号