当前位置:   article > 正文

kafka Consumer 消费者使用多线程并发执行,并保证顺序消费, 第一种使用纯线程方式、第二种使用Executors线程池_kafka消费者多线程消费

kafka消费者多线程消费

需求内容

单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum(客户账号)的数据需要保证消费的顺序。

注意点

1、如果1秒钟生产1000条数据,消费者处理时,每条数据需要500毫秒,则消费者每次拉取数据的条数最好能控制在500条以上,这样1秒内的数据可以拉取两次,每次使用500个线程进行处理,每次耗时500ms,
2*500ms=1秒,基本可以保证1000条数据能够在1秒内处理完成。
如果消费者每100ms拉取一次,每次拉取100条数据,消费者使用100个线程处理这100条数据,耗时500ms,第二次再拉取100条,耗时500ms...这样处理完1秒内的1000条数据将一共需要
10次*500ms=5秒钟,出现较大延迟。
同时,还要注意,一批数据中存在相同的accNum(客户账号)的情况,如果存在2条相同的accNum,因为需要顺序执行,一条执行需要500ms,两条顺序执行完成将花费1秒,这批数据的整体完成时间将变为1秒。
注意这三个参数的调整:
// fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
// fetch.max.wait.ms:一次拉取的最大等待时间:500ms
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
// max.poll.records: 一次拉取的最大条数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
注意消费者的拉取延迟时间:
tKafkaConsumer.poll(500);
2、每批次数据处理时,创建的线程数,会根据每次拉取的数据条数自动调整,最大线程数为消费者每次允许拉取的最大数据条数。这样系统可以根据数据量大小自动调整创建的线程数,线程池中的空闲线程可以在一定时间后自动释放。可以保证不同accNum(客户账号)的数据每次都分配一个线程单独处理,从而保证处理的时间(500ms)。

第一种使用纯线程方式(Thread+Callable+FutureTask)

因为每次处理都创建新的线程,造成大量线程同时创建和销毁,线程数波动剧烈,GC频繁,系统各项指标均不平稳。

  1. package org.fiend.kafka.config.kafka;
  2. import org.apache.kafka.clients.consumer.*;
  3. import org.apache.kafka.common.TopicPartition;
  4. import org.apache.kafka.common.serialization.StringDeserializer;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import java.time.Duration;
  8. import java.util.*;
  9. import java.util.concurrent.ExecutionException;
  10. import java.util.concurrent.FutureTask;
  11. /**
  12. * @author 86133 2023-07-06 15:42:52
  13. */
  14. public class CustomConsumer {
  15. private static Logger log = LoggerFactory.getLogger(CustomConsumer.class);
  16. public static void main(String[] args) throws InterruptedException {
  17. Properties props = new Properties();
  18. // bootstrap.servers:kafka集群地址
  19. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  20. // 消费者组id
  21. props.put("group.id", "test_consumer_group"); // 消费者组
  22. // key.deserializer:key的反序列化器
  23. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  24. // value.deserializer:value的反序列化器
  25. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  26. // fetch.max.bytes:一次拉取的最小可返回数据量:1Byte
  27. props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
  28. // fetch.max.bytes:一次拉取的最大数据量:50M
  29. props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 50 * 1024 * 1024);
  30. // fetch.max.wait.ms:一次拉取的最大等待时间:500ms
  31. props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
  32. // max.poll.records: 一次拉取的最大条数
  33. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
  34. // max.partition.fetch.bytes:一次拉取时,每个分区最大拉取数据量,默认1M
  35. props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1 * 1024 * 1024);
  36. // auto.offset.reset:当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了)时,自动设置开始消费的偏移量位置,默认latest。
  37. // earliest:自动重置偏移量到最早的偏移量(从头开始消费)。
  38. // latest:默认,自动重置偏移量为最新的偏移量(从最新的接收到的数据开始消费)。
  39. // none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
  40. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  41. // enable.auto.commit:是否允许自动提交offset,默认是。
  42. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  43. // auto.commit.interval.ms:自动提交offset的时间间隔,默认5秒。
  44. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  45. // heartbeat.interval.ms:消费者心跳检测时间间隔,默认3秒。
  46. props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
  47. // session.timeout.ms:session过期时间,默认10秒。
  48. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
  49. // max.poll.interval.ms:一批次数据最大可以执行时间,默认5分钟。
  50. props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
  51. // partition.assignment.strategy:分区分配策略,默认5分钟。
  52. props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
  53. KafkaConsumer<String, String> tKafkaConsumer = new KafkaConsumer<>(props);
  54. tKafkaConsumer.subscribe(Arrays.asList("riskMoniterTopic"));
  55. HashMap<String, List<String>> hashMap;
  56. while (true) {
  57. long start = System.currentTimeMillis();
  58. log.info("[开始]-consumer拉取数据");
  59. ConsumerRecords<String, String> records = tKafkaConsumer.poll(Duration.ofMillis(500));
  60. int dataCount = records.count();
  61. log.info("[完成]-consumer拉取数据-条数=[{}]-耗时=[{}]", dataCount, System.currentTimeMillis() - start);
  62. // 拉取的数据条数大于0时,才进行处理操作
  63. if (dataCount > 0) {
  64. // 初始化hashMap:容量可以设置为拉取数据条数的1倍,或者2倍,2倍更加分散
  65. // 消费者参数中设置一次拉取的最大条数为2000,基本不会超过该值。
  66. // hashMap的hash碰撞概率较低,2000条数据,分布到4000容量的hashMap中时,基本不会出现碰撞,只有相同的key会在一起,导致整体执行时间为相同多个key顺序执行的时间
  67. // [线程执行完成]消费者线程:consumer-thread-VV0039-已处理数据数量=3-已处理的所有客户账号=VV0039,VV0039,VV0039,
  68. // [线程执行完成]消费者线程:consumer-thread-AG0097-已处理数据数量=2-已处理的所有客户账号=AG0097,AG0097,
  69. // [线程执行完成]消费者线程:consumer-thread-ID0045-已处理数据数量=1-已处理的所有客户账号=ID0045,
  70. int arrListCapacity = dataCount * 2;
  71. hashMap = new HashMap<>(arrListCapacity);
  72. // 将拉取的数据按客户号码分散到HashMap中
  73. for (ConsumerRecord<String, String> record : records) {
  74. String value = record.value();
  75. }
  76. ArrayList<FutureTask<String>> tFutureTaskArrayList = new ArrayList<>(dataCount);
  77. // 循环hashMap,每个value开启一个线程循环处理该List中的全部数据,保证数据处理的顺序
  78. hashMap.forEach((k, v) -> {
  79. List<String> list = v;
  80. String threadName = "";
  81. if (list.size() > 0) {
  82. threadName = "consumer-thread-" + k;
  83. // 使用Callable执行一组数据
  84. FutureTask<String> futureTask = new FutureTask<>(() -> {
  85. String threadName1 = Thread.currentThread().getName();
  86. // log.info("[获取]-消费者线程:{}-获取到待处理数据数量:{}", threadName, busiDataEntities.size());
  87. for (String dataStr : list) {
  88. try {
  89. // 模拟业务处理时间,默认500ms
  90. Thread.sleep(500);
  91. } catch (InterruptedException e) {
  92. e.printStackTrace();
  93. }
  94. }
  95. return "消费者线程:" + threadName1 + "-已处理数据数量=" + list.size() + "-已处理的所有客户账号 = " + list;
  96. });
  97. // 启动一个线程执行一组数据
  98. new Thread(futureTask, threadName).start();
  99. // 将每个线程的futureTask都放入同一个ArrayList中
  100. tFutureTaskArrayList.add(futureTask);
  101. }
  102. });
  103. // 循环tFutureTaskArrayList,检查所有futureTask是否都已经返回,没返回的阻塞等待,等都返回后证明所有线程都执行完成,提交offset
  104. // 因为每次处理都创建新的线程,大量线程同时创建和销毁,线程数波动剧烈,考虑通过线程池进行优化
  105. for (int i = 0; i < tFutureTaskArrayList.size(); i++) {
  106. try {
  107. String returnStr = tFutureTaskArrayList.get(i).get();
  108. log.info("[线程执行完成]" + returnStr);
  109. } catch (ExecutionException e) {
  110. e.printStackTrace();
  111. }
  112. }
  113. }
  114. // 同步提交offset
  115. // tKafkaConsumer.commitSync();
  116. // 异步提交
  117. tKafkaConsumer.commitAsync(new OffsetCommitCallback() {
  118. @Override
  119. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  120. if (exception != null) {
  121. log.error("[失败]-提交offset失败!" + offsets);
  122. } else {
  123. log.info("[成功]-提交offset成功!");
  124. }
  125. }
  126. });
  127. log.info("【完成处理数据】-条数=[{}]-耗时=[{}]", dataCount, System.currentTimeMillis() - start);
  128. }
  129. }
  130. }

测试结果:

// [开始]-consumer拉取数据
// [完成]-consumer拉取数据-条数=[2000]-耗时=[5]
// [成功]-提交offset成功!
// 【完成处理数据】-条数=[2000]-耗时=[1731]
// [开始]-consumer拉取数据
// [完成]-consumer拉取数据-条数=[2000]-耗时=[4]
// [成功]-提交offset成功!
// 【完成处理数据】-条数=[2000]-耗时=[1678]
// [开始]-consumer拉取数据
// [完成]-consumer拉取数据-条数=[2000]-耗时=[23]
// [成功]-提交offset成功!
// 【完成处理数据】-条数=[2000]-耗时=[1637]
// 测试结果:2000条可以在2秒处理完成,则可以保证1000条时可以在1秒能处理完成,满足需求内容。
// 因为每次处理都创建新的线程,造成大量线程同时创建和销毁,线程数波动剧烈,GC频繁,系统各项指标均不平稳。

第二种使用Executors线程池(Executors+Callable+FutureTask)

通过线程池进行处理,线程数一直保持在2000个左右。

  1. package com.autoee.demo.kafka.main;
  2. import ch.qos.logback.classic.Level;
  3. import ch.qos.logback.classic.LoggerContext;
  4. import cn.hutool.core.date.DateUtil;
  5. import cn.hutool.core.date.TimeInterval;
  6. import cn.hutool.json.JSONUtil;
  7. import com.autoee.demo.riskmonitor.BusiDataEntity;
  8. import org.apache.kafka.clients.consumer.*;
  9. import org.apache.kafka.common.TopicPartition;
  10. import org.apache.kafka.common.serialization.StringDeserializer;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import java.util.*;
  14. import java.util.concurrent.*;
  15. import java.util.concurrent.atomic.AtomicInteger;
  16. /**
  17. * Title: <br>
  18. * Desc: <br>
  19. * Date: 2022-8-19 <br>
  20. * @author Double
  21. * @version 1.0.0
  22. */
  23. public class KafkaConsumerMutiThreadsTest4_Executors_HashMap {
  24. private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerMutiThreadsTest4_Executors_HashMap.class);
  25. // 设置main方法执行时的日志输出级别
  26. static {
  27. LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
  28. List<ch.qos.logback.classic.Logger> loggerList = loggerContext.getLoggerList();
  29. loggerList.forEach(logger -> {
  30. logger.setLevel(Level.INFO);
  31. });
  32. }
  33. // 需求内容:单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum的数据需要保证消费的顺序
  34. // 测试极限情况:数据已存在大量积压,启动消费者进行消费
  35. // 每次拉取都达到设置的单次可以拉取的最大条数:2000条
  36. public static void main(String[] args) throws InterruptedException {
  37. Properties props = new Properties();
  38. // bootstrap.servers:kafka集群地址
  39. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  40. // 消费者组id
  41. props.put("group.id", "test_consumer_group"); //消费者组
  42. // key.deserializer:key的反序列化器
  43. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  44. // value.deserializer:value的反序列化器
  45. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  46. // fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
  47. props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
  48. // fetch.max.bytes:一次拉取的最大数据量:50M
  49. props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 50 * 1024 * 1024);
  50. // fetch.max.wait.ms:一次拉取的最大等待时间:500ms
  51. props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
  52. // max.poll.records: 一次拉取的最大条数
  53. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
  54. // max.partition.fetch.bytes:一次拉取时,每个分区最大拉取数据量,默认1M
  55. props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1 * 1024 * 1024);
  56. // auto.offset.reset:当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了)时,自动设置开始消费的偏移量位置,默认latest。
  57. // earliest:自动重置偏移量到最早的偏移量(从头开始消费)。
  58. // latest:默认,自动重置偏移量为最新的偏移量(从最新的接收到的数据开始消费)。
  59. // none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
  60. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  61. // enable.auto.commit:是否允许自动提交offset,默认是。
  62. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  63. // auto.commit.interval.ms:自动提交offset的时间间隔,默认5秒。
  64. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  65. // heartbeat.interval.ms:消费者心跳检测时间间隔,默认3秒。
  66. props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
  67. // session.timeout.ms:session过期时间,默认10秒。
  68. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
  69. // max.poll.interval.ms:一批次数据最大可以执行时间,默认5分钟。
  70. props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
  71. // partition.assignment.strategy:分区分配策略,默认5分钟。
  72. props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
  73. KafkaConsumer<String, String> tKafkaConsumer = new KafkaConsumer<String, String>(props);
  74. tKafkaConsumer.subscribe(Arrays.asList("riskMoniterTopic"));
  75. // 使用Executors中的CachedThreadPool,初始核心线程数为0,最大线程数为无限大,线程最大空闲时间为60秒
  76. // corePoolSize=0
  77. // maximumPoolSize=Integer.MAX_VALUE,即2147483647,基本属于无界。
  78. // keepAliveTime=60秒
  79. // 工作队列使用没有容量的 SynchronousQueue,来一个任务处理一个任务,不进行缓存。如果提交任务速度高于线程池中线程处理任务的速度,则会不断创建新线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。
  80. // 可以自定义线程池进行优化
  81. ExecutorService executorService = Executors.newCachedThreadPool();
  82. HashMap<String, List<BusiDataEntity>> busiDataHashMap;
  83. while (true) {
  84. TimeInterval timer = DateUtil.timer();
  85. logger.info("[开始]-consumer拉取数据");
  86. ConsumerRecords<String, String> records = tKafkaConsumer.poll(500);
  87. int dataCount = records.count();
  88. AtomicInteger tAtomicInteger = new AtomicInteger();
  89. logger.info("[完成]-consumer拉取数据-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
  90. // 拉取的数据条数大于0时,才进行处理操作
  91. timer = DateUtil.timer();
  92. if (dataCount > 0) {
  93. // 初始化hashMap:容量可以设置为拉取数据条数的1倍,或者2倍,2倍更加分散
  94. // 消费者参数中设置一次拉取的最大条数为2000,基本不会超过该值。
  95. // hashMap的hash碰撞概率较低,2000条数据,分布到4000容量的hashMap中时,基本不会出现碰撞,只有相同的key会在一起,导致整体执行时间为相同多个key顺序执行的时间
  96. // [线程执行完成]消费者线程:pool-1-thread-1898-已处理数据数量=3-已处理的所有客户账号=GW0032,GW0032,GW0032,
  97. // [线程执行完成]消费者线程:pool-1-thread-1193-已处理数据数量=2-已处理的所有客户账号=KE0055,KE0055,
  98. // [线程执行完成]消费者线程:pool-1-thread-1187-已处理数据数量=2-已处理的所有客户账号=0E0005,0E0005,
  99. int capacity = dataCount * 2;
  100. busiDataHashMap = new HashMap<>(capacity);
  101. // 将拉取的数据按客户号码分散到HashMap中
  102. for (ConsumerRecord<String, String> record : records) {
  103. Object value = record.value();
  104. String jsonStr = JSONUtil.toJsonStr(value);
  105. // logger.info("[获取]-传入报文=[{}]", jsonStr);
  106. BusiDataEntity busiDataEntity = JSONUtil.toBean(jsonStr, BusiDataEntity.class);
  107. String accNum = busiDataEntity.getAccNum();
  108. if (busiDataHashMap.containsKey(accNum)) {
  109. busiDataHashMap.get(accNum).add(busiDataEntity);
  110. } else {
  111. List<BusiDataEntity> newList = new ArrayList<>();
  112. newList.add(busiDataEntity);
  113. busiDataHashMap.put(accNum, newList);
  114. }
  115. }
  116. ArrayList<FutureTask<String>> tFutureTaskArrayList = new ArrayList<>(dataCount);
  117. // 循环hashMap,每个value开启一个线程循环处理该List中的全部数据,保证数据处理的顺序
  118. int num = 0;
  119. busiDataHashMap.forEach((k, v) -> {
  120. List<BusiDataEntity> busiDataEntities = v;
  121. String threadName = "";
  122. if (busiDataEntities.size() > 0) {
  123. threadName = k;
  124. // 使用Callable执行同一个Key下的一组数据
  125. FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
  126. @Override
  127. public String call() {
  128. String threadName = Thread.currentThread().getName();
  129. // logger.info("[获取]-消费者线程:{}-获取到待处理数据数量:{}", threadName, busiDataEntities.size());
  130. String allAccNum = "";
  131. for (BusiDataEntity busiDataEntity : busiDataEntities) {
  132. allAccNum = allAccNum + busiDataEntity.getAccNum() + ",";
  133. try {
  134. // 模拟业务处理时间,默认500ms
  135. Thread.sleep(500);
  136. } catch (InterruptedException e) {
  137. e.printStackTrace();
  138. }
  139. }
  140. return "消费者线程:" + threadName + "-已处理数据数量=" + busiDataEntities.size() + "-已处理的所有客户账号=" + allAccNum;
  141. }
  142. });
  143. // 通过线程池进行任务处理
  144. executorService.submit(futureTask);
  145. // 将每个线程的futureTask都放入同一个ArrayList中
  146. tFutureTaskArrayList.add(futureTask);
  147. }
  148. });
  149. // 循环tFutureTaskArrayList,检查所有futureTask是否都已经返回,没返回的阻塞等待,等都返回后证明所有线程都执行完成,提交offset
  150. // 使用线程池后,线程数一直保持在2000个左右。
  151. for (int i = 0; i < tFutureTaskArrayList.size(); i++) {
  152. try {
  153. String returnStr = tFutureTaskArrayList.get(i).get();
  154. logger.info("[线程执行完成]" + returnStr);
  155. } catch (ExecutionException e) {
  156. e.printStackTrace();
  157. }
  158. }
  159. }
  160. //同步提交offset
  161. // tKafkaConsumer.commitSync();
  162. //异步提交
  163. tKafkaConsumer.commitAsync(new OffsetCommitCallback() {
  164. @Override
  165. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  166. if (exception != null) {
  167. logger.error("[失败]-提交offset失败!" + offsets);
  168. } else {
  169. logger.info("[成功]-提交offset成功!");
  170. }
  171. }
  172. });
  173. logger.info("【完成处理数据】-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
  174. }
  175. }
  176. }

测试结果:

// [开始]-consumer拉取数据
// [完成]-consumer拉取数据-条数=[2000]-耗时=[5]
// [成功]-提交offset成功!
// 【完成处理数据】-条数=[2000]-耗时=[1731]
// [开始]-consumer拉取数据
// [完成]-consumer拉取数据-条数=[2000]-耗时=[4]
// [成功]-提交offset成功!
// 【完成处理数据】-条数=[2000]-耗时=[1678]
// [开始]-consumer拉取数据
// [完成]-consumer拉取数据-条数=[2000]-耗时=[23]
// [成功]-提交offset成功!
// 【完成处理数据】-条数=[2000]-耗时=[1637]
// 测试结果:2000条可以在2秒处理完成,则可以保证1000条时可以在1秒能处理完成,满足需求内容。
// 使用线程池后,线程数一直保持在2000个左右

第三种使用Executors线程池(Executors+Runnable+CountDownLatch)

  1. package com.autoee.demo.kafka.main;
  2. import ch.qos.logback.classic.Level;
  3. import ch.qos.logback.classic.LoggerContext;
  4. import cn.hutool.core.date.DateUtil;
  5. import cn.hutool.core.date.TimeInterval;
  6. import cn.hutool.json.JSONUtil;
  7. import com.autoee.demo.riskmonitor.BusiDataEntity;
  8. import org.apache.kafka.clients.consumer.*;
  9. import org.apache.kafka.common.TopicPartition;
  10. import org.apache.kafka.common.serialization.StringDeserializer;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import java.util.*;
  14. import java.util.concurrent.*;
  15. import java.util.concurrent.atomic.AtomicInteger;
  16. /**
  17. * Title: <br>
  18. * Desc: <br>
  19. * Date: 2022-8-19 <br>
  20. * @author Double
  21. * @version 1.0.0
  22. */
  23. public class KafkaConsumerMutiThreadsTest5 {
  24. private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerMutiThreadsTest5.class);
  25. // 设置main方法执行时的日志输出级别
  26. static {
  27. LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
  28. List<ch.qos.logback.classic.Logger> loggerList = loggerContext.getLoggerList();
  29. loggerList.forEach(logger -> {
  30. logger.setLevel(Level.INFO);
  31. });
  32. }
  33. // 需求内容:单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum的数据需要保证消费的顺序
  34. // 测试极限情况:数据已存在大量积压,启动消费者进行消费
  35. // 每次拉取都达到设置的单次可以拉取的最大条数:2000条
  36. // [开始]-consumer拉取数据
  37. // [完成]-consumer拉取数据-条数=[2000]-耗时=[5]
  38. // [成功]-提交offset成功!
  39. // 【完成处理数据】-条数=[2000]-耗时=[1731]
  40. // [开始]-consumer拉取数据
  41. // [完成]-consumer拉取数据-条数=[2000]-耗时=[4]
  42. // [成功]-提交offset成功!
  43. // 【完成处理数据】-条数=[2000]-耗时=[1678]
  44. // [开始]-consumer拉取数据
  45. // [完成]-consumer拉取数据-条数=[2000]-耗时=[23]
  46. // [成功]-提交offset成功!
  47. // 【完成处理数据】-条数=[2000]-耗时=[1637]
  48. // 测试结果:2000条可以在2秒处理完成,则可以保证1000条时可以在1秒能处理完成,满足需求内容。
  49. // 通过线程池进行处理,线程数非常平稳,而且只需要十个左右线程就能处理每次2000条的数据。
  50. public static void main(String[] args) throws InterruptedException {
  51. Properties props = new Properties();
  52. // bootstrap.servers:kafka集群地址
  53. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  54. // 消费者组id
  55. props.put("group.id", "test_consumer_group"); //消费者组
  56. // key.deserializer:key的反序列化器
  57. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  58. // value.deserializer:value的反序列化器
  59. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  60. // fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
  61. props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
  62. // fetch.max.bytes:一次拉取的最大数据量:50M
  63. props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 50 * 1024 * 1024);
  64. // fetch.max.wait.ms:一次拉取的最大等待时间:500ms
  65. props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
  66. // max.poll.records: 一次拉取的最大条数
  67. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
  68. // max.partition.fetch.bytes:一次拉取时,每个分区最大拉取数据量,默认1M
  69. props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1 * 1024 * 1024);
  70. // auto.offset.reset:当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了)时,自动设置开始消费的偏移量位置,默认latest。
  71. // earliest:自动重置偏移量到最早的偏移量(从头开始消费)。
  72. // latest:默认,自动重置偏移量为最新的偏移量(从最新的接收到的数据开始消费)。
  73. // none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
  74. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  75. // enable.auto.commit:是否允许自动提交offset,默认是。
  76. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  77. // auto.commit.interval.ms:自动提交offset的时间间隔,默认5秒。
  78. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  79. // heartbeat.interval.ms:消费者心跳检测时间间隔,默认3秒。
  80. props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
  81. // session.timeout.ms:session过期时间,默认10秒。
  82. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
  83. // max.poll.interval.ms:一批次数据最大可以执行时间,默认5分钟。
  84. props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
  85. // partition.assignment.strategy:分区分配策略,默认5分钟。
  86. props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
  87. KafkaConsumer<String, String> tKafkaConsumer = new KafkaConsumer<String, String>(props);
  88. tKafkaConsumer.subscribe(Arrays.asList("riskMoniterTopic"));
  89. // 使用Executors中的CachedThreadPool,初始核心线程数为0,最大线程数为无限大,线程最大空闲时间为60秒
  90. // corePoolSize=0
  91. // maximumPoolSize=Integer.MAX_VALUE,即2147483647,基本属于无界。
  92. // keepAliveTime=60秒
  93. // 工作队列使用没有容量的 SynchronousQueue,来一个任务处理一个任务,不进行缓存。如果提交任务速度高于线程池中线程处理任务的速度,则会不断创建新线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。
  94. // 可以自定义线程池进行优化
  95. ExecutorService executorService = Executors.newCachedThreadPool();
  96. HashMap<String, List<BusiDataEntity>> busiDataHashMap;
  97. while (true) {
  98. TimeInterval timer = DateUtil.timer();
  99. logger.info("[开始]-consumer拉取数据");
  100. ConsumerRecords<String, String> records = tKafkaConsumer.poll(500);
  101. int dataCount = records.count();
  102. AtomicInteger tAtomicInteger = new AtomicInteger();
  103. logger.info("[完成]-consumer拉取数据-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
  104. // 拉取的数据条数大于0时,才进行处理操作
  105. timer = DateUtil.timer();
  106. if (dataCount > 0) {
  107. // 初始化hashMap:容量可以设置为拉取数据条数的1倍,或者2倍,2倍更加分散
  108. // 消费者参数中设置一次拉取的最大条数为2000,基本不会超过该值。
  109. // hashMap的hash碰撞概率较低,2000条数据,分布到4000容量的hashMap中时,基本不会出现碰撞,只有相同的key会在一起,导致整体执行时间为相同多个key顺序执行的时间
  110. // [线程执行完成]消费者线程:pool-1-thread-1898-已处理数据数量=3-已处理的所有客户账号=GW0032,GW0032,GW0032,
  111. // [线程执行完成]消费者线程:pool-1-thread-1193-已处理数据数量=2-已处理的所有客户账号=KE0055,KE0055,
  112. // [线程执行完成]消费者线程:pool-1-thread-1187-已处理数据数量=2-已处理的所有客户账号=0E0005,0E0005,
  113. int capacity = dataCount * 2;
  114. busiDataHashMap = new HashMap<>(capacity);
  115. // 将拉取的数据按客户号码分散到ArrayList中
  116. for (ConsumerRecord<String, String> record : records) {
  117. Object value = record.value();
  118. String jsonStr = JSONUtil.toJsonStr(value);
  119. // logger.info("[获取]-传入报文=[{}]", jsonStr);
  120. BusiDataEntity busiDataEntity = JSONUtil.toBean(jsonStr, BusiDataEntity.class);
  121. String accNum = busiDataEntity.getAccNum();
  122. if (busiDataHashMap.containsKey(accNum)) {
  123. busiDataHashMap.get(accNum).add(busiDataEntity);
  124. } else {
  125. List<BusiDataEntity> newList = new ArrayList<>();
  126. newList.add(busiDataEntity);
  127. busiDataHashMap.put(accNum, newList);
  128. }
  129. }
  130. // 循环ArrayList,每个下标中的List数据条数大于0时,开启一个线程循环处理该List中的全部数据,保证数据处理的顺序
  131. int num = 0;
  132. int busiDataHashMapSize = busiDataHashMap.keySet().size();
  133. // 使用CountDownLatch判断是否所有子线程都已执行完成,子线程个数等于busiDataHashMap中key的个数
  134. CountDownLatch tCountDownLatch = new CountDownLatch(busiDataHashMapSize);
  135. busiDataHashMap.forEach((k, v) -> {
  136. List<BusiDataEntity> busiDataEntities = v;
  137. String threadName = "";
  138. if (busiDataEntities.size() > 0) {
  139. threadName = k;
  140. // 使用Runnable执行同一个Key下的一组数据
  141. Runnable runnableTask = new Runnable() {
  142. @Override
  143. public void run() {
  144. String threadName = Thread.currentThread().getName();
  145. // logger.info("[获取]-消费者线程:{}-获取到待处理数据数量:{}", threadName, busiDataEntities.size());
  146. String allAccNum = "";
  147. String allBatchNo = "";
  148. for (BusiDataEntity busiDataEntity : busiDataEntities) {
  149. allAccNum = allAccNum + busiDataEntity.getAccNum() + ",";
  150. allBatchNo = allBatchNo + busiDataEntity.getBatchNo() + ",";
  151. try {
  152. // 模拟业务处理时间,默认500ms
  153. Thread.sleep(500);
  154. } catch (InterruptedException e) {
  155. e.printStackTrace();
  156. }
  157. }
  158. logger.info("[线程执行完成]-消费者线程:" + threadName + "-已处理数据数量=" + busiDataEntities.size() + "-已处理的所有客户账号=" + allAccNum + "-已处理的所有批次号=" + allBatchNo);
  159. // 每个线程处理完成后,将tCountDownLatch减1
  160. tCountDownLatch.countDown();
  161. }
  162. };
  163. // 通过线程池进行任务处理
  164. executorService.submit(runnableTask);
  165. }
  166. });
  167. // 通过CountDownLatch阻塞等待,等待所有线程都执行完成,提交offset
  168. tCountDownLatch.await();
  169. //同步提交offset
  170. // tKafkaConsumer.commitSync();
  171. //异步提交
  172. tKafkaConsumer.commitAsync(new OffsetCommitCallback() {
  173. @Override
  174. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  175. if (exception != null) {
  176. logger.error("[失败]-提交offset失败!" + offsets);
  177. } else {
  178. logger.info("[成功]-提交offset成功!");
  179. }
  180. }
  181. });
  182. logger.info("【完成处理数据】-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
  183. logger.info("----------------------------------------------------------------------------------------------------------------------------------------");
  184. }
  185. }
  186. }
  187. }

测试结果:

和第二种的执行时间差不多,但是各项性能指标好像更加平稳了,但是很出现线程阻塞的情况。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/419876
推荐阅读
相关标签
  

闽ICP备14008679号