当前位置:   article > 正文

Springboot 配置使用 Kafka_springboot引入kafka

springboot引入kafka

Springboot 配置使用 Kafka


前言

在学这个之前找了挺多资料,最后终于找到一篇完美的博文,跟着博主的思路,很清晰,跟着博文实际操作了一遍,目前可以正常运行。没有很多的大道理,会用就行了,看了看全网没有比这个更详细的了;

As we all know,当今世界最流行的消息中间件有 RabbitMq、RocketMq、Kafka,其中,应用最广泛的是 RabbitMq,RocketMq 是阿里巴巴的产品,性能超过 RabbitMq,已经经受了多年的双11考验,但是怕哪天阿里不维护了,用的人不多,Kafka 是吞吐量最大的一个,远超前两个,支持事务、可保证消息的不丢失(网上说的事务和消息可靠性不支持是说的旧版,2以后就开始支持了),对比来讲,Kafka相对于前两个,只有一个劣势,不太支持延时队列,其他方面都要优于它们。


一、Linux 安装 Kafka

参考博文:Debian(Linux通用)安装 Kafka 并配置远程访问


二、构建项目

多模块项目构建,这里不讲,如果你不会,就新建两个普通的web项目 KafkaConsumer KafkaProvider 就行。

三、引入依赖

新建一个标准的spring-web项目,额外依赖只需要这一个,网上说的 kafka-client 不是springboot 的东西,那就是个原生的 kafka 客户端, kafka-test也不需要,这个是用代码控制broker的东西。
  1. <dependency>
  2.     <groupId>org.springframework.kafka</groupId>
  3.     <artifactId>spring-kafka</artifactId>
  4. </dependency>

四、配置文件

这两种方式的代码会互相覆盖,而且有些配置只能用 config 方式配置,建议像我一样,两种都写,config 里面的配置参数从 yml 中获取,就可以不影响使用 Nacos 来在线修改 kafka 的配置了。

生产者

yml 方式

  1. server:
  2. port: 8081
  3. spring:
  4. kafka:
  5. producer:
  6. # Kafka服务器
  7. bootstrap-servers: 175.24.228.202:9092
  8. # 开启事务,必须在开启了事务的方法中发送,否则报错
  9. transaction-id-prefix: kafkaTx-
  10. # 发生错误后,消息重发的次数,开启事务必须设置大于0。
  11. retries: 3
  12. # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
  13. # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
  14. # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
  15. # 开启事务时,必须设置为all
  16. acks: all
  17. # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
  18. batch-size: 16384
  19. # 生产者内存缓冲区的大小。
  20. buffer-memory: 1024000
  21. # 键的序列化方式
  22. key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  23. # 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
  24. value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

Config 方式

  1. import org.apache.kafka.clients.producer.ProducerConfig;
  2. import org.apache.kafka.common.serialization.StringSerializer;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.boot.SpringBootConfiguration;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  7. import org.springframework.kafka.core.KafkaTemplate;
  8. import org.springframework.kafka.core.ProducerFactory;
  9. import org.springframework.kafka.support.serializer.JsonSerializer;
  10. import org.springframework.kafka.transaction.KafkaTransactionManager;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. /**
  14. * @author 徐一杰
  15. * @date 2022/10/31 18:05
  16. * kafka配置,也可以写在yml,这个文件会覆盖yml
  17. */
  18. @SpringBootConfiguration
  19. public class KafkaProviderConfig {
  20. @Value("${spring.kafka.producer.bootstrap-servers}")
  21. private String bootstrapServers;
  22. @Value("${spring.kafka.producer.transaction-id-prefix}")
  23. private String transactionIdPrefix;
  24. @Value("${spring.kafka.producer.acks}")
  25. private String acks;
  26. @Value("${spring.kafka.producer.retries}")
  27. private String retries;
  28. @Value("${spring.kafka.producer.batch-size}")
  29. private String batchSize;
  30. @Value("${spring.kafka.producer.buffer-memory}")
  31. private String bufferMemory;
  32. @Bean
  33. public Map<String, Object> producerConfigs() {
  34. Map<String, Object> props = new HashMap<>(16);
  35. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  36. //acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
  37. //acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
  38. //acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
  39. //开启事务必须设为all
  40. props.put(ProducerConfig.ACKS_CONFIG, acks);
  41. //发生错误后,消息重发的次数,开启事务必须大于0
  42. props.put(ProducerConfig.RETRIES_CONFIG, retries);
  43. //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送
  44. //批次的大小可以通过batch.size 参数设置.默认是16KB
  45. //较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。
  46. //比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟
  47. //实测batchSize这个参数没有用
  48. props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  49. //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,
  50. //即使数据没达到16KB,也将这个批次发送出去
  51. props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");
  52. //生产者内存缓冲区的大小
  53. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  54. //反序列化,和生产者的序列化方式对应
  55. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  56. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  57. return props;
  58. }
  59. @Bean
  60. public ProducerFactory<Object, Object> producerFactory() {
  61. DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
  62. //开启事务,会导致 LINGER_MS_CONFIG 配置失效
  63. factory.setTransactionIdPrefix(transactionIdPrefix);
  64. return factory;
  65. }
  66. @Bean
  67. public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {
  68. return new KafkaTransactionManager<>(producerFactory);
  69. }
  70. @Bean
  71. public KafkaTemplate<Object, Object> kafkaTemplate() {
  72. return new KafkaTemplate<>(producerFactory());
  73. }
  74. }

消费者

yml 方式

  1. server:
  2. port: 8082
  3. spring:
  4. kafka:
  5. consumer:
  6. # Kafka服务器
  7. bootstrap-servers: 175.24.228.202:9092
  8. group-id: firstGroup
  9. # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
  10. #auto-commit-interval: 2s
  11. # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
  12. # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
  13. # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
  14. # none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
  15. auto-offset-reset: latest
  16. # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
  17. enable-auto-commit: false
  18. # 键的反序列化方式
  19. #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  20. key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
  21. # 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
  22. value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
  23. # 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
  24. properties:
  25. spring:
  26. json:
  27. trusted:
  28. packages: "*"
  29. # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
  30. # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
  31. # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
  32. # 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
  33. # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
  34. # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
  35. max-poll-records: 3
  36. properties:
  37. # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
  38. max:
  39. poll:
  40. interval:
  41. ms: 600000
  42. # 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
  43. session:
  44. timeout:
  45. ms: 10000
  46. listener:
  47. # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
  48. concurrency: 4
  49. # 自动提交关闭,需要设置手动消息确认
  50. ack-mode: manual_immediate
  51. # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
  52. missing-topics-fatal: false
  53. # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
  54. poll-timeout: 600000

Config 方式

  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.boot.SpringBootConfiguration;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  6. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  7. import org.springframework.kafka.core.ConsumerFactory;
  8. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  9. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  10. import org.springframework.kafka.listener.ContainerProperties;
  11. import org.springframework.kafka.support.serializer.JsonDeserializer;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. /**
  15. * @author 徐一杰
  16. * @date 2022/10/31 18:05
  17. * kafka配置,也可以写在yml,这个文件会覆盖yml
  18. */
  19. @SpringBootConfiguration
  20. public class KafkaConsumerConfig {
  21. @Value("${spring.kafka.consumer.bootstrap-servers}")
  22. private String bootstrapServers;
  23. @Value("${spring.kafka.consumer.group-id}")
  24. private String groupId;
  25. @Value("${spring.kafka.consumer.enable-auto-commit}")
  26. private boolean enableAutoCommit;
  27. @Value("${spring.kafka.properties.session.timeout.ms}")
  28. private String sessionTimeout;
  29. @Value("${spring.kafka.properties.max.poll.interval.ms}")
  30. private String maxPollIntervalTime;
  31. @Value("${spring.kafka.consumer.max-poll-records}")
  32. private String maxPollRecords;
  33. @Value("${spring.kafka.consumer.auto-offset-reset}")
  34. private String autoOffsetReset;
  35. @Value("${spring.kafka.listener.concurrency}")
  36. private Integer concurrency;
  37. @Value("${spring.kafka.listener.missing-topics-fatal}")
  38. private boolean missingTopicsFatal;
  39. @Value("${spring.kafka.listener.poll-timeout}")
  40. private long pollTimeout;
  41. @Bean
  42. public Map<String, Object> consumerConfigs() {
  43. Map<String, Object> propsMap = new HashMap<>(16);
  44. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  45. propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  46. //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
  47. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
  48. //自动提交的时间间隔,自动提交开启时生效
  49. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
  50. //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
  51. //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
  52. //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
  53. //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
  54. propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  55. //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
  56. propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);
  57. //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
  58. //这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
  59. //如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
  60. //然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
  61. //要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
  62. //注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
  63. propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
  64. //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
  65. propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
  66. //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
  67. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
  68. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
  69. return propsMap;
  70. }
  71. @Bean
  72. public ConsumerFactory<Object, Object> consumerFactory() {
  73. //配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
  74. try(JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) {
  75. deserializer.trustedPackages("*");
  76. return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer);
  77. }
  78. }
  79. @Bean
  80. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
  81. ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
  82. factory.setConsumerFactory(consumerFactory());
  83. //在侦听器容器中运行的线程数,一般设置为 机器数*分区数
  84. factory.setConcurrency(concurrency);
  85. //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
  86. factory.setMissingTopicsFatal(missingTopicsFatal);
  87. //自动提交关闭,需要设置手动消息确认
  88. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  89. factory.getContainerProperties().setPollTimeout(pollTimeout);
  90. //设置为批量监听,需要用List接收
  91. //factory.setBatchListener(true);
  92. return factory;
  93. }
  94. }

五、开始写代码

下面我们开始写 Kafka 的消息发送代码

生产者

发送

KafkaController用于发送消息到 Kafka

  1. import icu.xuyijie.provider.entity.User;
  2. import icu.xuyijie.provider.handler.KafkaSendResultHandler;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. import org.springframework.kafka.support.KafkaHeaders;
  7. import org.springframework.messaging.MessageHeaders;
  8. import org.springframework.messaging.support.GenericMessage;
  9. import org.springframework.transaction.annotation.Transactional;
  10. import org.springframework.web.bind.annotation.PathVariable;
  11. import org.springframework.web.bind.annotation.RequestMapping;
  12. import org.springframework.web.bind.annotation.RestController;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. import java.util.Objects;
  16. import java.util.concurrent.ExecutionException;
  17. import java.util.concurrent.TimeUnit;
  18. import java.util.concurrent.TimeoutException;
  19. /**
  20. * @author 徐一杰
  21. * @date 2022/10/31 14:05
  22. * kafka发送消息
  23. */
  24. @RestController
  25. @RequestMapping("/provider")
  26. //这个注解代表这个类开启Springboot事务,因为我们在Kafka的配置文件开启了Kafka事务,不然会报错
  27. @Transactional(rollbackFor = RuntimeException.class)
  28. public class KafkaController {
  29. private final KafkaTemplate<Object, Object> kafkaTemplate;
  30. public KafkaController(KafkaTemplate<Object, Object> kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler) {
  31. this.kafkaTemplate = kafkaTemplate;
  32. //回调方法、异常处理
  33. this.kafkaTemplate.setProducerListener(kafkaSendResultHandler);
  34. }
  35. @RequestMapping("/sendMultiple")
  36. public void sendMultiple() {
  37. String message = "发送到Kafka的消息";
  38. for (int i = 0;i < 10;i++) {
  39. kafkaTemplate.send("topic1", "发送到Kafka的消息" + i);
  40. System.out.println(message + i);
  41. }
  42. }
  43. @RequestMapping("/send")
  44. public void send() {
  45. //这个User的代码我没放出来,自己随便写一个实体类,实体类一定要 implements Serializable
  46. User user = new User(1, "徐一杰");
  47. kafkaTemplate.send("topic1", user);
  48. kafkaTemplate.send("topic2", "发给topic2");
  49. }
  50. /**
  51. * Kafka提供了多种构建消息的方式
  52. * @throws ExecutionException
  53. * @throws InterruptedException
  54. * @throws TimeoutException
  55. */
  56. public void SendDemo() throws ExecutionException, InterruptedException, TimeoutException {
  57. //后面的get代表同步发送,括号内时间可选,代表超过这个时间会抛出超时异常,但是仍会发送成功
  58. kafkaTemplate.send("topic1", "发给topic1").get(1, TimeUnit.MILLISECONDS);
  59. //使用ProducerRecord发送消息
  60. ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("topic.quick.demo", "use ProducerRecord to send message");
  61. kafkaTemplate.send(producerRecord);
  62. //使用Message发送消息
  63. Map<String, Object> map = new HashMap<>();
  64. map.put(KafkaHeaders.TOPIC, "topic.quick.demo");
  65. map.put(KafkaHeaders.PARTITION_ID, 0);
  66. map.put(KafkaHeaders.MESSAGE_KEY, 0);
  67. GenericMessage<Object> message = new GenericMessage<>("use Message to send message", new MessageHeaders(map));
  68. kafkaTemplate.send(message);
  69. }
  70. }

成功回调和异常处理

KafkaSendResultHandler

  1. import org.apache.kafka.clients.producer.ProducerRecord;
  2. import org.apache.kafka.clients.producer.RecordMetadata;
  3. import org.springframework.kafka.support.ProducerListener;
  4. import org.springframework.lang.Nullable;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @author 徐一杰
  8. * @date 2022/10/31 15:41
  9. * kafka消息发送回调
  10. */
  11. @Component
  12. public class KafkaSendResultHandler implements ProducerListener<Object, Object> {
  13. @Override
  14. public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
  15. System.out.println("消息发送成功:" + producerRecord.toString());
  16. }
  17. @Override
  18. public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
  19. System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage());
  20. }
  21. }

消费者

接收

KafkaHandler用于接收 Kafka 里的消息

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
  4. import org.springframework.kafka.support.Acknowledgment;
  5. import org.springframework.web.bind.annotation.PathVariable;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import java.util.List;
  9. import java.util.Objects;
  10. /**
  11. * @author 徐一杰
  12. * @date 2022/10/31 14:04
  13. * kafka监听消息
  14. */
  15. @RestController
  16. public class KafkaHandler {
  17. private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
  18. public KafkaHandler(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
  19. this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
  20. }
  21. /**
  22. * 监听kafka消息
  23. *
  24. * @param consumerRecord kafka的消息,用consumerRecord可以接收到更详细的信息,也可以用String message只接收消息
  25. * @param ack kafka的消息确认
  26. * 使用autoStartup = "false"必须指定id
  27. */
  28. @KafkaListener(topics = {"topic1", "topic2"}, errorHandler = "myKafkaListenerErrorHandler")
  29. // @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"topic1", "topic2"}, autoStartup = "false")
  30. public void listen1(ConsumerRecord<Object, Objects> consumerRecord, Acknowledgment ack) {
  31. try {
  32. //用于测试异常处理
  33. //int i = 1 / 0;
  34. System.out.println(consumerRecord.get(0).value());
  35. //手动确认
  36. ack.acknowledge();
  37. } catch (Exception e) {
  38. System.out.println("消费失败:" + e);
  39. }
  40. }
  41. /**
  42. * 下面的方法可以手动操控kafka的队列监听情况
  43. * 先发送一条消息,因为autoStartup = "false",所以并不会看到有消息进入监听器。
  44. * 接着启动监听器,/start/webGroup。可以看到有一条消息进来了。
  45. * pause是暂停监听,resume是继续监听
  46. *
  47. * @param listenerId consumer的group-id
  48. */
  49. @RequestMapping("/pause/{listenerId}")
  50. public void stop(@PathVariable String listenerId) {
  51. Objects.requireNonNull(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)).pause();
  52. }
  53. @RequestMapping("/resume/{listenerId}")
  54. public void resume(@PathVariable String listenerId) {
  55. Objects.requireNonNull(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)).resume();
  56. }
  57. @RequestMapping("/start/{listenerId}")
  58. public void start(@PathVariable String listenerId) {
  59. Objects.requireNonNull(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)).start();
  60. }
  61. }

异常处理

MyKafkaListenerErrorHandler

  1. import org.apache.kafka.clients.consumer.Consumer;
  2. import org.springframework.kafka.listener.KafkaListenerErrorHandler;
  3. import org.springframework.kafka.listener.ListenerExecutionFailedException;
  4. import org.springframework.lang.NonNull;
  5. import org.springframework.messaging.Message;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * @author 徐一杰
  9. * @date 2022/10/31 15:27
  10. * 异常处理
  11. */
  12. @Component
  13. public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
  14. @Override
  15. @NonNull
  16. public Object handleError(@NonNull Message<?> message, @NonNull ListenerExecutionFailedException exception) {
  17. return new Object();
  18. }
  19. @Override
  20. @NonNull
  21. public Object handleError(@NonNull Message<?> message, @NonNull ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
  22. System.out.println("消息详情:" + message);
  23. System.out.println("异常信息::" + exception);
  24. System.out.println("消费者详情::" + consumer.groupMetadata());
  25. System.out.println("监听主题::" + consumer.listTopics());
  26. return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);
  27. }
  28. }

七、开始测试

启动生产者和消费者,消费者控制台打印出我配置的 group-id webGroup id就是启动成功了,如果启动报错不会解决,可以评论区留言

测试普通单条消息

浏览器访问 http://127.0.0.1:8081/provider/send 来调用生产者发送一条消息,生产者控制台打印出回调,消费者控制台输出接收到的消息

测试消费者异常处理

把消费者里的 listen1 方法里的这行代码取消注释
//用于测试异常处理int i =1/0;
重启消费者,访问 http://127.0.0.1:8081/provider/send ,发现消费者虽然报错但是没有抛出异常,而是被我们处理了

测试延时消息

发送延时消息要关闭事务,在生产者的 yml 和 config 配置文件里把下面代码注释掉
  1. # 开启事务,必须在开启了事务的方法中发送,否则报错#
  2. transaction-id-prefix: kafkaTx-
  1. //开启事务,会导致 LINGER_MS_CONFIG 配置失效
  2. //factory.setTransactionIdPrefix(transactionIdPrefix);
然后重新请求 http://127.0.0.1:8081/provider/send ,发现 5s 后消息发出,配置延迟时间的配置是props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); ,其实这个不是真正的延时消息,Kafka实现真正的延时消息要使用JDK的DelayQueue手动实现。

测试批量消息

打开消费者的 config 配置里 setBatchListener 这一行代码,我们定义的 MAX_POLL_RECORDS_CONFIG 为3,即每次批量读取3条消息,批量监听需要用List接收,listen1 方法的参数加一个List包起来
  1. //设置为批量监听,需要用List接收
  2. factory.setBatchListener(true);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
publicvoidlisten1(List<ConsumerRecord<Object,Objects>> consumerRecord,Acknowledgment ack)
注意!!!Debug消费者,因为我们要打断点观察每次接收的条数
调用消费者接口 http://127.0.0.1:8081/provider/sendMultiple 批量发送10条,可以看到消费者每次只接收3条

测试手动控制消费者监听

@KafkaListener 这样写,id 和 autoStartup 是关键
  1. @KafkaListener(id ="${spring.kafka.consumer.group-id}", topics ={
  2. "topic1","topic2"}, autoStartup ="false")
重启消费者,调用生产者接口 http://127.0.0.1:8081/provider/send ,我们发现这次消费者没有接收到消息,因为我们关闭了 autoStartup
要开始接收的话,调用消费者接口 http://127.0.0.1:8082/start/firstGroup ,这个方法可以启动 group-id 为 firstGroup 的 @KafkaListener,然后我们发现消费者控制台接收到消息
http://127.0.0.1:8082/pause/firstGroup 暂停接收
http://127.0.0.1:8082/resume/firstGroup 恢复接收

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/710638
推荐阅读
相关标签
  

闽ICP备14008679号