当前位置:   article > 正文

4.2.8 Kafka 延时队列, 重试队列(结合redis实现)_fetch.min.isr

fetch.min.isr

目录

2.7 延时队列

2.8 重试队列

代码实现


 

Kafka 高级特性-延时/重试队列

 

2.7 延时队列

两个follower副本都已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而leader副本并没有新的消息写入,那么此时leader副本该如何处理呢?可以直接返回空的拉取结果给follower副本,不过在leader副本一直没有新消息写入的情况下,follower副本会一直发送拉取请求,并且总收到空的拉取结果,消耗资源。

Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes,由参数fetch.min.bytes配置,默认值为1)的消息,那么就会创建一个延时拉取操作(DelayedFetch)以等待拉取到足够数量的消息。当延时拉取操作执行时,会再读取一次日志文件,然后将拉取结果返回给follower副本。

延迟操作不只是拉取消息时的特有操作,在Kafka中有多种延时操作,比如延时数据删除、延时生产等。

对于延时生产(消息)而言,如果在使用生产者客户端发送消息的时候将acks参数设置为-1,那么就意味着需要等待ISR集合中的所有副本都确认收到消息之后才能正确地收到响应的结果,或者捕获超时异常。

 

 

假设某个分区有3个副本:leader、follower1和follower2,它们都在分区的ISR集合中。不考虑ISR变动的情况,Kafka在收到客户端的生产请求后,将消息3和消息4写入leader副本的本地日志文件。

由于客户端设置了acks为-1,那么需要等到follower1和follower2两个副本都收到消息3和消息4后才能告知客户端正确地接收了所发送的消息。如果在一定的时间内,follower1副本或follower2副本没能够完全拉取到消息3和消息4,那么就需要返回超时异常给客户端。生产请求的超时时间由参数request.timeout.ms配置,默认值为30000,即30s。

那么这里等待消息3和消息4写入follower1副本和follower2副本,并返回相应的响应结果给客户端的动作是由谁来执行的呢?在将消息写入leader副本的本地日志文件之后,Kafka会创建一个延时的生产操作(DelayedProduce),用来处理消息正常写入所有副本或超时的情况,以返回相应的响应结果给客户端。

延时操作需要延时返回响应的结果,首先它必须有一个超时时间(delayMs),如果在这个超时时间内没有完成既定的任务,那么就需要强制完成以返回响应结果给客户端。其次,延时操作不同于定时操作,定时操作是指在特定时间之后执行的操作,而延时操作可以在所设定的超时时间之前完成,所以延时操作能够支持外部事件的触发。

就延时生产操作而言,它的外部事件是所要写入消息的某个分区的HW(高水位)发生增长。也就是说,随着follower副本不断地与leader副本进行消息同步,进而促使HW进一步增长,HW每增长一次都会检测是否能够完成此次延时生产操作,如果可以就执行以此返回响应结果给客户端;如果在超时时间内始终无法完成,则强制执行。

延时拉取操作,是由超时触发或外部事件触发而被执行的。超时触发很好理解,就是等到超时时间之后触发第二次读取日志文件的操作。外部事件触发就稍复杂了一些,因为拉取请求不单单由follower副本发起,也可以由消费者客户端发起,两种情况所对应的外部事件也是不同的。如果是follower副本的延时拉取,它的外部事件就是消息追加到了leader副本的本地日志文件中;如果是消费者客户端的延时拉取,它的外部事件可以简单地理解为HW的增长。

 

时间轮实现延时队列。
TimeWheel。size,每个单元格的时间,   每个单元格都代表一个时间,size*每个单元格的时间就是一个周期。

 

 

 

2.8 重试队列

kafka没有重试机制不支持消息重试,也没有死信队列,因此使用kafka做消息队列时,需要自己实现消息重试的功能。

实现

创建新的kafka主题作为重试队列:
1. 创建一个topic作为重试topic,用于接收等待重试的消息。
2. 普通topic消费者设置待重试消息的下一个重试topic。
3. 从重试topic获取待重试消息储存到redis的zset中,并以下一次消费时间排序
4. 定时任务从redis获取到达消费事件的消息,并把消息发送到对应的topic
5. 同一个消息重试次数过多则不再重试

 

代码实现

1. 新建springboot项目

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.2.8.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.lagou.kafka.demo</groupId>
  12. <artifactId>demo-retryqueue</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>demo-retryqueue</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-data-redis</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-web</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.kafka</groupId>
  30. <artifactId>spring-kafka</artifactId>
  31. </dependency>
  32. <dependency>
  33. <groupId>com.alibaba</groupId>
  34. <artifactId>fastjson</artifactId>
  35. <version>1.2.73</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.springframework.boot</groupId>
  39. <artifactId>spring-boot-starter-test</artifactId>
  40. <scope>test</scope>
  41. <exclusions>
  42. <exclusion>
  43. <groupId>org.junit.vintage</groupId>
  44. <artifactId>junit-vintage-engine</artifactId>
  45. </exclusion>
  46. </exclusions>
  47. </dependency>
  48. <dependency>
  49. <groupId>io.projectreactor</groupId>
  50. <artifactId>reactor-test</artifactId>
  51. <scope>test</scope>
  52. </dependency>
  53. <dependency>
  54. <groupId>org.springframework.kafka</groupId>
  55. <artifactId>spring-kafka-test</artifactId>
  56. <scope>test</scope>
  57. </dependency>
  58. </dependencies>
  59. <build>
  60. <plugins>
  61. <plugin>
  62. <groupId>org.springframework.boot</groupId>
  63. <artifactId>spring-boot-maven-plugin</artifactId>
  64. </plugin>
  65. </plugins>
  66. </build>
  67. </project>

2. 添加application.properties

  1. # bootstrap.servers
  2. spring.kafka.bootstrap-servers=node1:9092
  3. # key序列化器
  4. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  5. # value序列化器
  6. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  7. # 消费组id:group.id
  8. spring.kafka.consumer.group-id=retryGroup
  9. # key反序列化器
  10. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  11. # value反序列化器
  12. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  13. # redis数据库编号
  14. spring.redis.database=0
  15. # redis主机地址
  16. spring.redis.host=node1
  17. # redis端口
  18. spring.redis.port=6379
  19. # Redis服务器连接密码(默认为空)
  20. spring.redis.password=
  21. # 连接池最大连接数(使用负值表示没有限制)
  22. spring.redis.jedis.pool.max-active=20
  23. # 连接池最大阻塞等待时间(使用负值表示没有限制)
  24. spring.redis.jedis.pool.max-wait=-1
  25. # 连接池中的最大空闲连接
  26. spring.redis.jedis.pool.max-idle=10
  27. # 连接池中的最小空闲连接
  28. spring.redis.jedis.pool.min-idle=0
  29. # 连接超时时间(毫秒)
  30. spring.redis.timeout=1000
  31. # Kafka主题名称,业务主题
  32. spring.kafka.topics.test=tp_demo_retry_01
  33. # 重试队列,重试主题
  34. spring.kafka.topics.retry=tp_demo_retry_02

3. RetryqueueApplication.java

  1. package com.lagou.kafka.demo;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class RetryqueueApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(RetryqueueApplication.class, args);
  8. }
  9. }

4. AppConfig.java

  1. package com.lagou.kafka.demo.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.data.redis.connection.RedisConnectionFactory;
  5. import org.springframework.data.redis.core.RedisTemplate;
  6. // 配置redis template
  7. @Configuration
  8. public class AppConfig {
  9. @Bean
  10. public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
  11. RedisTemplate<String, Object> template = new RedisTemplate<>();
  12. // 配置连接工厂
  13. template.setConnectionFactory(factory);
  14. return template;
  15. }
  16. }

5. KafkaController.java

  1. package com.lagou.kafka.demo.controller;
  2. import com.lagou.kafka.demo.service.KafkaService;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.web.bind.annotation.PathVariable;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import java.util.concurrent.ExecutionException;
  10. @RestController
  11. public class RetryController {
  12. @Autowired
  13. private KafkaService kafkaService;
  14. @Value("${spring.kafka.topics.test}")
  15. private String topic;
  16. @RequestMapping("/send/{message}")
  17. public String sendMessage(@PathVariable String message) throws ExecutionException, InterruptedException {
  18. ProducerRecord<String, String> record = new ProducerRecord<>(
  19. topic,
  20. message
  21. );
  22. // 向业务主题发送消息
  23. String result = kafkaService.sendMessage(record);
  24. return result;
  25. }
  26. }

6. KafkaService.java

  1. package com.lagou.kafka.demo.service;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.apache.kafka.clients.producer.RecordMetadata;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.kafka.core.KafkaTemplate;
  8. import org.springframework.kafka.support.SendResult;
  9. import org.springframework.stereotype.Service;
  10. import java.util.concurrent.ExecutionException;
  11. @Service
  12. public class KafkaService {
  13. private Logger log = LoggerFactory.getLogger(KafkaService.class);
  14. // 标红不管
  15. @Autowired
  16. private KafkaTemplate<String, String> kafkaTemplate;
  17. public String sendMessage(ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {
  18. SendResult<String, String> result = this.kafkaTemplate.send(record).get();
  19. RecordMetadata metadata = result.getRecordMetadata();
  20. String returnResult = metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset();
  21. log.info("发送消息成功:" + returnResult);
  22. return returnResult;
  23. }
  24. }

7. ConsumerListener.java

  1. package com.lagou.kafka.demo.listener;
  2. import com.lagou.kafka.demo.service.RetryService;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.kafka.annotation.KafkaListener;
  8. import org.springframework.stereotype.Component;
  9. @Component
  10. public class ConsumerListener {
  11. private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);
  12. @Autowired
  13. private RetryService kafkaRetryService;
  14. private static int index = 0;
  15. // 拉取下面主题的消息
  16. @KafkaListener(topics = "${spring.kafka.topics.test}", groupId = "${spring.kafka.consumer.group-id}")
  17. public void consume(ConsumerRecord<String, String> record) {
  18. try {
  19. // 业务处理
  20. log.info("消费的消息:" + record);
  21. index++;
  22. if (index % 2 == 0) {
  23. throw new Exception("该重发了");
  24. }
  25. } catch (Exception e) {
  26. log.error(e.getMessage());
  27. // 消息重试,实际上先将消息放到redis, 再从redis放到消息队列中
  28. kafkaRetryService.consumerLater(record);
  29. }
  30. }
  31. }

8. KafkaRetryService.java

  1. package com.lagou.kafka.demo.service;
  2. import com.alibaba.fastjson.JSON;
  3. import com.lagou.kafka.demo.entity.RetryRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;
  5. import org.apache.kafka.common.header.Header;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import org.springframework.kafka.core.KafkaTemplate;
  11. import org.springframework.stereotype.Service;
  12. import java.nio.ByteBuffer;
  13. import java.util.Calendar;
  14. import java.util.Date;
  15. @Service
  16. public class RetryService {
  17. private static final Logger log = LoggerFactory.getLogger(RetryService.class);
  18. /**
  19. * 消息消费失败后下一次消费的延迟时间(秒)
  20. * 第一次重试延迟10秒;第 二次延迟30秒,第三次延迟1分钟...
  21. */
  22. private static final int[] RETRY_INTERVAL_SECONDS = {10, 30, 1*60, 2*60, 5*60, 10*60, 30*60, 1*60*60, 2*60*60};
  23. /**
  24. * 重试topic
  25. */
  26. @Value("${spring.kafka.topics.retry}")
  27. private String retryTopic;
  28. @Autowired
  29. private KafkaTemplate<String, String> kafkaTemplate;
  30. public void consumerLater(ConsumerRecord<String, String> record){
  31. // 获取消息的已重试次数
  32. int retryTimes = getRetryTimes(record);
  33. Date nextConsumerTime = getNextConsumerTime(retryTimes);
  34. // 如果达到重试次数,则不再重试
  35. if(nextConsumerTime == null) {
  36. return;
  37. }
  38. // 组织消息
  39. RetryRecord retryRecord = new RetryRecord();
  40. retryRecord.setNextTime(nextConsumerTime.getTime());
  41. retryRecord.setTopic(record.topic());
  42. retryRecord.setRetryTimes(retryTimes);
  43. retryRecord.setKey(record.key());
  44. retryRecord.setValue(record.value());
  45. // 转换为字符串
  46. String value = JSON.toJSONString(retryRecord);
  47. // 发送到重试队列
  48. kafkaTemplate.send(retryTopic, null, value);
  49. }
  50. /**
  51. * 获取消息的已重试次数
  52. */
  53. private int getRetryTimes(ConsumerRecord record){
  54. int retryTimes = -1;
  55. for(Header header : record.headers()){
  56. if(RetryRecord.KEY_RETRY_TIMES.equals(header.key())){
  57. ByteBuffer buffer = ByteBuffer.wrap(header.value());
  58. retryTimes = buffer.getInt();
  59. }
  60. }
  61. retryTimes++;
  62. return retryTimes;
  63. }
  64. /**
  65. * 获取待重试消息的下一次消费时间
  66. */
  67. private Date getNextConsumerTime(int retryTimes){
  68. // 重试次数超过上限,不再重试
  69. if(RETRY_INTERVAL_SECONDS.length < retryTimes) {
  70. return null;
  71. }
  72. Calendar calendar = Calendar.getInstance();
  73. calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]);
  74. return calendar.getTime();
  75. }
  76. }

9. RetryListener.java

  1. package com.lagou.kafka.demo.listener;
  2. import com.alibaba.fastjson.JSON;
  3. import com.lagou.kafka.demo.entity.RetryRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;
  5. import org.apache.kafka.clients.producer.ProducerRecord;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import org.springframework.data.redis.core.RedisTemplate;
  11. import org.springframework.data.redis.core.ZSetOperations;
  12. import org.springframework.kafka.annotation.KafkaListener;
  13. import org.springframework.kafka.core.KafkaTemplate;
  14. import org.springframework.scheduling.annotation.EnableScheduling;
  15. import org.springframework.scheduling.annotation.Scheduled;
  16. import org.springframework.stereotype.Component;
  17. import java.util.Set;
  18. import java.util.UUID;
  19. @Component
  20. // 下面注解表示开启调度
  21. @EnableScheduling
  22. public class RetryListener {
  23. private Logger log = LoggerFactory.getLogger(RetryListener.class);
  24. private static final String RETRY_KEY_ZSET = "_retry_key"; // 时间
  25. private static final String RETRY_VALUE_MAP = "_retry_value"; // 消息
  26. @Autowired
  27. private RedisTemplate<String,Object> redisTemplate;
  28. @Autowired
  29. private KafkaTemplate<String, String> kafkaTemplate;
  30. //
  31. @Value("${spring.kafka.topics.test}")
  32. private String bizTopic;
  33. @KafkaListener(topics = "${spring.kafka.topics.retry}") // 只要有消息就取出来
  34. // public void consume(List<ConsumerRecord<String, String>> list) {
  35. // for(ConsumerRecord<String, String> record : list){
  36. public void consume(ConsumerRecord<String, String> record) {
  37. System.out.println("需要重试的消息:" + record);
  38. RetryRecord retryRecord = JSON.parseObject(record.value(), RetryRecord.class);
  39. /**
  40. * 防止待重试消息太多撑爆redis,可以将待重试消息按下一次重试时间分开存储放到不同介质
  41. * 例如下一次重试时间在半小时以后的消息储存到mysql,并定时从mysql读取即将重试的消息储储存到redis
  42. */
  43. // 通过redis的zset进行时间排序
  44. String key = UUID.randomUUID().toString();
  45. redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value());
  46. redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime());
  47. }
  48. // }
  49. /**
  50. * 定时任务从redis读取到达重试时间的消息,发送到对应的topic
  51. */
  52. // @Scheduled(cron="2 * * * * *")
  53. @Scheduled(fixedDelay = 2000)
  54. public void retryFromRedis() {
  55. log.warn("retryFromRedis----begin");
  56. long currentTime = System.currentTimeMillis();
  57. // 根据时间倒序获取
  58. Set<ZSetOperations.TypedTuple<Object>> typedTuples =
  59. redisTemplate.opsForZSet().reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime);
  60. // 移除取出的消息
  61. redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime);
  62. for(ZSetOperations.TypedTuple<Object> tuple : typedTuples){
  63. String key = tuple.getValue().toString();
  64. String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP, key).toString();
  65. redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key);
  66. RetryRecord retryRecord = JSON.parseObject(value, RetryRecord.class);
  67. ProducerRecord record = retryRecord.parse();
  68. ProducerRecord recordReal = new ProducerRecord(
  69. bizTopic,
  70. record.partition(),
  71. record.timestamp(),
  72. record.key(),
  73. record.value(),
  74. record.headers()
  75. );
  76. kafkaTemplate.send(recordReal);
  77. }
  78. // todo 发生异常将发送失败的消息重新发送到redis
  79. }
  80. }

10. RetryRecord.java

  1. package com.lagou.kafka.demo.entity;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.apache.kafka.common.header.Header;
  4. import org.apache.kafka.common.header.internals.RecordHeader;
  5. import java.nio.ByteBuffer;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. public class RetryRecord {
  9. public static final String KEY_RETRY_TIMES = "retryTimes";
  10. private String key;
  11. private String value;
  12. private Integer retryTimes;
  13. private String topic;
  14. private Long nextTime;
  15. public RetryRecord() {
  16. }
  17. public String getKey() {
  18. return key;
  19. }
  20. public void setKey(String key) {
  21. this.key = key;
  22. }
  23. public String getValue() {
  24. return value;
  25. }
  26. public void setValue(String value) {
  27. this.value = value;
  28. }
  29. public Integer getRetryTimes() {
  30. return retryTimes;
  31. }
  32. public void setRetryTimes(Integer retryTimes) {
  33. this.retryTimes = retryTimes;
  34. }
  35. public String getTopic() {
  36. return topic;
  37. }
  38. public void setTopic(String topic) {
  39. this.topic = topic;
  40. }
  41. public Long getNextTime() {
  42. return nextTime;
  43. }
  44. public void setNextTime(Long nextTime) {
  45. this.nextTime = nextTime;
  46. }
  47. public ProducerRecord parse() { // 解析成ProducerRecord
  48. Integer partition = null;
  49. Long timestamp = System.currentTimeMillis();
  50. List<Header> headers = new ArrayList<>();
  51. ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4);
  52. retryTimesBuffer.putInt(retryTimes);
  53. retryTimesBuffer.flip();
  54. headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer));
  55. ProducerRecord sendRecord = new ProducerRecord(
  56. topic, partition, timestamp, key, value, headers);
  57. return sendRecord;
  58. }
  59. }

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号