当前位置:   article > 正文

springboot rabbitmq 延时消息、延迟消息、非阻塞重试机制实现_spring mq 延时重试机制

spring mq 延时重试机制

重试的应用场景

比如,系统之间同步数据,A系统发送数据给B系统,因为网络原因或者B系统正在重启,可能收不到信息,为了确保B能收到消息就得重试几次;经典的比如,微信支付回调

对后台通知交互时,如果微信收到商户的应答不符合规范或超时,微信认为通知失败,微信会通过一定的策略定期重新发起通知,尽可能提高通知的成功率,但微信不保证通知最终能成功。(通知频率为15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h

  • 总计 24h4m)

支付宝支付回调

程序执行完后必须打印输出 success。如果商家反馈给支付宝的字符不是 success 这 7
个字符,支付宝服务器会不断重发通知,直到超过 24 小时 22 分钟。一般情况下,25 小时以内完成 8
次通知(通知的间隔频率一般是:4m,10m,10m,1h,2h,6h,15h)。

问题背景

spring-retry的@Retryable方式,是阻塞式的,rabbitmq使用这种方式,如果重试次数过多,后边的消息会阻塞一直得不到处理,重试次数过少则不能保证对方收到回调;那提高消费者数量可以吗?也是不行的,最终会耗尽所有消费者。这就相当于你去银行办业务,轮到你时,你要办的业务正好办不了,窗口就一直等着,后边的人无法办业务;如果增加窗口的数量,同样的原因,最终导致全部窗口阻塞;

解决思路

解决思路是,每个消息只分配给一次机会,失败后,放入延迟队列,然后处理下一个消息,到达延迟时间,消息再次入列,这样消息不会阻塞。这就相当于,轮到你时,要办的业务办不了,先让后边的人办,你在后边等一会,x时间后再办;
用一张图概括就是RabbitMQ Non-Blocking Retry Solutions in SpringBoot Solution B — Delay Plugin
在这里插入图片描述

具体步骤

安装延迟插件

首先rabbitmq要安装rabbitmq-delayed-message-exchange插件,我的rabbitmq用的是3.8.27所以使用对应的插件3.8.17,下载后文件名rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez

rabbitmq装在docker里,先进入docker

docker exec -it containerId  /bin/bash
  • 1

找到放插件的目录

root@xx:/# rabbitmq-plugins directories -s
// 插件在`/opt/rabbitmq/plugins`目录里
Plugin archives directory: /opt/rabbitmq/plugins 
Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@3f09bf7c586f-plugins-expand
Enabled plugins file: /etc/rabbitmq/enabled_plugins

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

所以将宿主机的延迟插件复制到docker里。

docker cp rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez containerId:/opt/rabbitmq/plugins
  • 1

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 1

查看已启用的插件

root@xx:/# rabbitmq-plugins list --enabled
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@f1cb3aad0f70
 |/
[E*] rabbitmq_delayed_message_exchange 3.8.9+1.g8f537ac
[E*] rabbitmq_management               3.8.27
[E*] rabbitmq_prometheus               3.8.27


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

springboot配置

maven引入依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.1.7.RELEASE</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

application.yml配置

#RabbitMQ配置
  rabbitmq:
    host: localhost
    port: 5672
    username: username
    password: password
    listener:
      simple:
       # consumer收到消息,不回复ack
        acknowledge-mode: NONE
        # 抛出异常时,不再入队列
        default-requeue-rejected: false

#自定义的mq errorhandler,处理失败时的重试次数
retry:
  # 最大重试次数
  max_retry: 5
  # 延迟(毫秒)
  delay: 1000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

创建延迟exchange

搜了很多博客,都说要先通过management ui手动创建exchange,其实是不需要的,照下边这样就可以用代码创建exchange。

@Slf4j
@Configuration
public class MqConfig {
    @Autowired
    private AmqpTemplate amqpTemplate;
 /**
     * 延迟交换机名称
     */
    public static final String DELAY_EXCHANGE_NAME="delay-exchange";
    /**
     * 延迟交换机的类型
     */
    public static final String DELAY_EXCHANGE_TYPE = "x-delayed-message";
      /**
     * 错误重试延迟
     */
    private static  Integer DELAY;
    /**
     * 最大重试次数,2880次*30秒=24小时,也就是说,默认重试24小时
     */
    private static  Integer MAX_RETRY;
    @Value("${retry.max_retry}")
    public void setMaxRetry(Integer maxRetry){
        MAX_RETRY = maxRetry;
    }
    @Value("${retry.delay}")
    public void setDelay(Integer delay){
        DELAY = delay;
    }
/**
     * 创建延迟交换机,必须先创建才能监听
     * @return
     */
    @Bean
    public CustomExchange delayExchange() {
        CustomExchange customExchange = new CustomExchange(MqConfig.DELAY_EXCHANGE_NAME,
                MqConfig.DELAY_EXCHANGE_TYPE);
        customExchange.getArguments().put("x-delayed-type",
                ExchangeTypes.DIRECT);
        return customExchange;
    }
// mq的异常处理器
@Bean
    public RabbitListenerErrorHandler retryErrorHandler() {
        RabbitListenerErrorHandler errorHandler = (amqpMessage, message, exception) -> {
            log.error("message监听器出错了",exception);
            MessageProperties messageProperties = amqpMessage.getMessageProperties();
            Map<String, Object> headers = messageProperties.getHeaders();
            Integer xRetryCount = ((Integer) headers.get("retry-count"));
            //根据失败次数,决定是否继续发送到延迟队列
            if (xRetryCount == null) {
                xRetryCount = MAX_RETRY;
            }
            Integer retriedCount = (Integer) headers.get("retried-count");
            if (retriedCount == null) {
                retriedCount = 1;
            }
            log.info("已执行次数:{},最大重试次数:{}",retriedCount,xRetryCount);
            if(retriedCount < xRetryCount){
                log.info("已执行次数小于最大重试次数");
                retriedCount++;
                headers.put("retried-count",retriedCount);
                String routingKey = messageProperties.getConsumerQueue();
                messageProperties.setDelay(DELAY);
                log.info("延迟:{}毫秒", DELAY);
                log.info("路由key:{}",routingKey);
                amqpTemplate.send(DELAY_EXCHANGE_NAME, routingKey, amqpMessage);
                return null;
            }
            log.info("已执行次数达到最大重试次数了,不再进行重试");
            return null;
        };
        return errorHandler;
    }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

点对点模式下的消费重试

生产者

// 发送到了默认交换机(所有的队列都会绑定到默认交换机)
String message = "";
String routingKey = "队列名称"
amqpTemplate.convertAndSend(routingKey ,message);
  • 1
  • 2
  • 3
  • 4

监听器

 /**
     * 接收到消息后,将消息传给目的地
     * @param message
     */
     //队列又绑定了延迟交换机,默认交换机收到的消息会在此处理
    @RabbitListener(bindings =@QueueBinding(value = @Queue("队列名称"),
            exchange = @Exchange(value = MqConfig.DELAY_EXCHANGE_NAME,
                    type = MqConfig.DELAY_EXCHANGE_TYPE),
            key = "队列名称"),errorHandler = "retryErrorHandler")
    public void doSynDataToThirdApp(String message){
        //http请求发送给第三方,如果出错,则会执行errorHandler  ,从而实现重试
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

简单来说,就是将消息发送到默认交换机,errorHandler捕获到消息失败,如果未达到最大重试次数,则设置延迟时间,发送到延迟交换机;如果已经达到重试次数,则结束,不再入队列;这里要注意,发送是默认交换机,再次入队列时是延迟交换机

广播模式(发布/订阅)下的重试

以上是点对点模式的重试机制,广播模式的重试略有不同,广播模式可参考springboot rabbitmq 发布订阅 广播模式

  • 生产者发送到fanout交换机
  • 再次入队列时要是延迟交换机

生产者

// 发送到fanout交换机
String message = "";
String exchangeName = "fanout交换机名称"
//因为是fanout类型交换机,所以routingkey会被忽略,故此取空字符串
String routingKey = ""
amqpTemplate.convertAndSend(exchangeName ,routingKey ,message);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

监听器

队列要同时绑定fanout交换机和延迟交换机

 /**
* 
  * 队列同时绑定fanout交换机和延迟交换机
  *  @param message
  */
 @RabbitListener(bindings = {
 @QueueBinding(value = @Queue("队列名称"),
         exchange = @Exchange(value = "fanout交换机名称",
                 type = ExchangeTypes.FANOUT)),
@QueueBinding(value = @Queue("队列名称"),
        exchange = @Exchange(value = MqConstant.DELAY_EXCHANGE_NAME,
                type = MqConstant.DELAY_EXCHANGE_TYPE),
        key = "队列名称")
}, errorHandler = "retryErrorHandler")
 public void listen(String message) {
     log.debug("广播模式,收到消息",message);
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

总结

重试机制的关键是,错误处理器捕获到错误,发送到延迟交换机,所以队列要与延迟交换机绑定;也就是说,队列要绑定两个交换机,第一个交换机用来接收广播消息,第二个交换机(延迟交换机)用来处理失败时的重试。

参考

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/603428
推荐阅读
相关标签
  

闽ICP备14008679号