当前位置:   article > 正文

mq系列rabbitmq-03开发基础概念(重试队列,死信队列等)_rabitmq消息重试是在哪个版本

rabitmq消息重试是在哪个版本

描述

消息队列(MQ)的基本概念,很多时候都要了解清楚,这样在学消息队列中间件就比较能够游刃有余,遇到不清楚的也可以重新翻来看看,加深理解。这里有关于:优先级队列、延迟队列、死信队列、重试队列、消息回溯、消息堆积、消息追踪/消息轨迹、消息过滤、消息审计、消息路由等的介绍。

01.优先级队列

优先级队列不同于先进先出队列,优先级高的消息具备优先被消费的特权,这样可以为下游提供不同消息级别的保证。不过这个优先级也是需要有一个前提的:如果消费者的消费速度大于生产者的速度,并且消息中间件服务器(一般简单的称之为Broker)中没有消息堆积,那么对于发送的消息设置优先级也就没有什么实质性的意义了,因为生产者刚发送完一条消息就被消费者消费了,那么就相当于Broker中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。

优先级队列,顾名思义,具有更高优先级的队列具有较高的优先权,优先级高的消息具备优先被消费的特权。
可以通过RabbitMQ管理界面配置队列的优先级属性,如下图的x-max-priority.
在这里插入图片描述

也可以通过代码去实现,比如:

Map<String,Object> args = new HashMap<String,Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("queue_priority", true, false, false, args);
  • 1
  • 2
  • 3

配置了队列优先级的属性之后,可以在管理页面看到Pri的标记:
在这里插入图片描述
上面配置的是一个队列queue的最大优先级。之后要在发送的消息中设置消息本身的优先级,如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange_priority","rk_priority",properties,("messages").getBytes());
  • 1
  • 2
  • 3
  • 4

02.延迟队列

当你在网上购物的时候是否会遇到这样的提示:“三十分钟之内未付款,订单自动取消”,。在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能;相应的,自动评价也是类似的,这些都是延迟队列的典型应用场景。延迟队列存储的是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。延迟队列一般分为两种:基于消息的延迟和基于队列的延迟。基于消息的延迟是指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,当然这也会对性能造成极大的影响。实际应用中大多采用基于队列的延迟,设置不同延迟级别的队列,比如5s、10s、30s、1min、5mins、10mins等,每个队列中消息的延迟时间都是相同的,这样免去了延迟排序所要承受的性能之苦,通过一定的扫描策略(比如定时)即可投递超时的消息。

常用延迟队列实现方式

  1. Java自带的DelayQueue队列(底层代码DelayQueue,而Delayed继承了Comparable,所以,可以实现一个排序效果)

​ 这是java本身提供的一种延时队列,如果项目业务复杂性不高可以考虑这种方式。它是使用jvm内存来实现的,停机会丢失数据(需要自行持久化),扩展性不强。

  1. 使用redis监听key的过期来实现

​ 当用户下订单后把订单信息设置为redis的key,30分钟失效,程序编写监听redis的key失效,然后处理订单(我也尝试过这种方式)。这种方式最大的弊端就是只能监听一台redis的key失效,集群下将无法实现,也有人监听集群下的每个redis节点的key,但我认为这样做很不合适。如果项目业务复杂性不高,redis单机部署,就可以考虑这种方式。

  1. 死信队列+TTL过期时间方式
    ​ 新建一个队列,设置ttl时间,没有消费者消费,超过ttl时间后自动添加到死信队列中,订阅到死信队列的消费者获取消息,达到延期队列的效果。

而其他的解决方案,重点讲解延迟插件
在 RabbitMQ3.6.x 之前我们一般采用死信队列+TTL过期时间来实现延迟队列,我们这里不做过多介绍,网上很多文章都有过介绍。在 RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件,可以下载放置到 RabbitMQ 根目录下的 plugins 下。

本人使用的RabbitMQ是3.7.7版本,rabbitmq_delayed_message_exchange-3.8.0.ez这个插件放到RabbitMQ安装目录的plugins文件中 在RabbitMQ 安装目的sbin用cmd使用命令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 1

开启插件后,启动RabbitMQ,访问登录后访问http://localhost:15672,用guest/guest登录后,在交换机exchanges的tab下,底部新增将看到如下图设置,则表示插件已启动,以后直接就可以使用了。
在这里插入图片描述
插件下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
延迟插件底层简单原理图:
在这里插入图片描述
实现原理

原始的DLX + TTL 的模式,消息首先会路由到一个正常的队列,根据设置的 TTL 进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机(具体代码请看下面config下的配置类交换机定义参数),它的消息在发布之后不会立即进入队列,先将消息保存至 Mnesia(一个分布式数据库管理系统,适合于电信和其它需要持续运行和具备软实时特性的 Erlang 应用。目前资料介绍的不是很多)。

这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了。
在这里插入图片描述
主题和直连,广播模式经过测试也可以使用
配置交换机,队列和routingkey,对应上图中的名称

package com.woniuxy.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author: mayuhang  <br/>
 * Date: 2021/3/17:16:03  <br/>
 * Description:延迟队列配置类
 */
@Configuration
public class LazyExchangeConfig {
    public static final String LAZY_EXCHANGE="Ex.LazyExchange";
    public static final String LAZY_QUEUE="MQ.LazyQueue";
    public static final String LAZY_KEY="lazy.#";//这个lazy.#表示设置lazy超时时间,生产者发布消息时如果设置routingkey=lazy.1000表示延迟1s
    @Bean
    public CustomExchange lazyExchange() {
        //第一种设置方法  设置延迟交换机配置
        Map<String, Object> pros = new HashMap<>();
          //设置交换机支持延迟消息推送
        pros.put("x-delayed-type", "topic");
        CustomExchange exchange = new CustomExchange(LAZY_EXCHANGE, "x-delayed-message",false, true,pros);
        return exchange;
    }
    @Bean
    public Queue lazyQueue(){
        return new Queue(LAZY_QUEUE,false);
    }
    @Bean
    public Binding lazyBinding(){
        return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY).noargs();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

延迟的时间由上述定义的LAZY_KEY决定,生产者发布消息时如果设置routingkey=lazy.1000表示延迟1s

03.死信队列

由于某些原因消息无法被正确的投递,为了确保消息不会被无故的丢弃,一般将其置于一个特殊角色的队列,这个队列一般称之为死信队列。与此对应的还有一个“回退队列”的概念,试想如果消费者在消费时发生了异常,那么就不会对这一次消费进行确认(Ack),进而发生回滚消息的操作之后消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为了解决这个问题,可以为每个队列设置一个回退队列,它和死信队列都是为异常的处理提供的一种机制保障。实际情况下,回退队列的角色可以由死信队列和重试队列来扮演。

死信队列产生的三种情况

  1. 消息被拒绝(basic.reject或basic.nack)并且requeue=false.(spring的参数是:default-requeue-rejected)
    默认当消费者将读取消息的应答模式设置为手动ack时,默认如果某个消息推送给某个消费者,消息的状态会自动变为nack,当收到消费者的ack信息后消息被删除,如果消费者不掉线不回应ack,消息会一直处于nack状态,如果消费者调用会自动回撤到ready状态,其他消费者可消费,这是requeue=true默认的情况,如果希望消费者掉线后没有ack进入死信队列可以设置requeue=false。
    或者通过手动拒绝:channel.basicReject(deliveryTag,requeue),也会进入死信队列。
  2. 消息TTL过期
    给队列中的消息设置个ttl的参数,超过指定时间未消费自动进入死信队列。
  3. 队列达到最大长度(队列满了,无法再添加数据到mq中)
    队列的长度大于设置的长度,自动进入死信队列。

演示个例子
配置死信队列

@Configuration
public class RabbitMQConfig {

    // 声明业务Exchange
    @Bean
    public TopicExchange businessExchange(){
        return new TopicExchange("businessExchange");
    }

    // 声明业务队列A
    @Bean
    public Queue businessQueue(){
        Map<String, Object> args = new HashMap<>();
//       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机,也就是当这个业务队列出现上述三种状态时自动讲消息写入这个交换机并且routingkey是dle.err
        args.put("x-dead-letter-exchange", "deadLetterExchange");
//       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", "dle.err");

        return new Queue("businessQueue",true,false,false,args);
    }

    // 声明业务队列A绑定关系
    @Bean
    public Binding businessBinding(Queue businessQueue, TopicExchange businessExchange){
        return BindingBuilder.bind(businessQueue).to(businessExchange).with("emp.*");
    }


    //声明死信Exchange
   @Bean
    public TopicExchange deadLetterExchange(){
        return new TopicExchange("deadLetterExchange");
    }

    // 声明死信队列A
    @Bean
    public Queue deadLetterQueue(){
        return new Queue("dle-queue");
    }
    //将队列绑定到死信交换机上,这就是个死信队列。
   @Bean
    public Binding deadLetterQueueBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange){
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dle.*");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

YML配置

spring:
  rabbitmq:
    host: 192.168.0.211
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 设置手动ack
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

消费者nack消息

@Component
public class DedaLetterListener {

    // 监听业务队列
    @RabbitListener(queues = "businessQueue")
    public void businessQueue(String msg, Channel channel, Message message) throws IOException {
     channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    }

    // 监听死信队列
    @RabbitListener(queues = "dle-queue")
    public void deadLetterQueue(String msg, Channel channel, Message message) throws IOException {
        System.out.println("死信队列消费消息:" + msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

注意basicNack的参数

// deliveryTag:该消息的index
// multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
public void basicAck(long deliveryTag, boolean multiple)
    
//deliveryTag:该消息的index
//multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
//requeue:是否重新入队列 ,应该设置为false就会进入死信队列  
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

死信队列也可以用于延时队列,可以给队列消息设置超时时间,指定时间到了之后未消费也可进入死信队列
配置延时队列

@Configuration
public class RabbitMQConfigTTL {

    // 声明业务Exchange
    @Bean
    public TopicExchange businessExchange(){
        return new TopicExchange("ttl-Exchange");
    }

    // 创建延时队列1
    @Bean
    public Queue businessQueue1(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "deadLetterExchange");
        args.put("x-dead-letter-routing-key", "dle.err");
        args.put("x-message-ttl", 5000);   // 超时时间是5s
        return new Queue("5-queue",true,false,false,args);
    }

    // 创建延时队列2
    @Bean
    public Queue businessQueue2(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "deadLetterExchange");
        args.put("x-dead-letter-routing-key", "dle.err");
        args.put("x-message-ttl", 20000); //  // 超时时间是20s
        return new Queue("20-queue",true,false,false,args);
    }

    // 延时队列绑定关系
    @Bean
    public Binding businessBinding1(Queue businessQueue1, TopicExchange businessExchange){
        return BindingBuilder.bind(businessQueue1).to(businessExchange).with("emp.*");
    }

     // 延时队列绑定
    @Bean
    public Binding businessBinding2(Queue businessQueue2, TopicExchange businessExchange){
        return BindingBuilder.bind(businessQueue2).to(businessExchange).with("user.*");
    }


    //声明死信Exchange
   @Bean
    public TopicExchange deadLetterExchange(){
        return new TopicExchange("deadLetterExchange");
    }

    // 声明死信队列
    @Bean
    public Queue deadLetterQueue(){
        return new Queue("dle-queue",true,false,false,null);
    }

    // 死信队列绑定交换机
   @Bean
    public Binding deadLetterQueueBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange){
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dle.*");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

YAML配置

spring:
  rabbitmq:
    host: 192.168.193.88
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 设置手动ack
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

不添加延时队列的消费者,让他自动超时,
同时订阅死信队列确认是否进入死信队列

 @RabbitListener(queues = "dle-queue")
    public void dleQueue(String msg, Channel channel, Message message) throws IOException {
        System.out.println("dleQueue1:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
  • 1
  • 2
  • 3
  • 4
  • 5

模拟发送消息

 @RequestMapping("/test")
    public void test1(String msg) {
        System.out.println("p:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        if ("5".equals(msg)) { // 添加到5s队列
            rabbitTemplate.convertAndSend("ttl-Exchange", "emp.add", msg);
        } else if ("20".equals(msg)) { // 添加到20s队列中
            rabbitTemplate.convertAndSend("ttl-Exchange", "user.add", msg);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

执行 http://ip:端口/test?msg=5或者20发送消息

04.重试队列

重试队列其实可以看成是一种回退队列,具体指消费端消费消息失败时,为防止消息无故丢失而重新将消息回滚到Broker中。与回退队列不同的是重试队列一般分成多个重试等级,每个重试等级一般也会设置重新投递延时,重试次数越多投递延时就越大。举个例子:消息第一次消费失败入重试队列Q1,Q1的重新投递延迟为5s,在5s过后重新投递该消息;如果消息再次消费失败则入重试队列Q2,Q2的重新投递延迟为10s,在10s过后再次投递该消息。以此类推,重试越多次重新投递的时间就越久,为此需要设置一个上限,超过投递次数就入死信队列。重试队列与延迟队列有相同的地方,都是需要设置延迟级别,它们彼此的区别是:延迟队列动作由内部触发,重试队列动作由外部消费端触发;延迟队列作用一次,而重试队列的作用范围会向后传递。

05.消费模式之推模式push

对于kafka而言,由Broker主动推送消息至消费端,实时性较好,不过需要一定的流制机制来确保服务端推送过来的消息不会压垮消费端。

06.消费模式之拉模式pull

对于kafka而言,消费端主动向Broker端请求拉取(一般是定时或者定量)消息,实时性较推模式差,但是可以根据自身的处理能力而控制拉取的消息量。

07.消息回溯

一般消息在消费完成之后就被处理了,之后再也不能消费到该条消息。消息回溯正好相反,是指消息在消费完成之后,还能消费到之前被消费掉的消息。对于消息而言,经常面临的问题是“消息丢失”,至于是真正由于消息中间件的缺陷丢失还是由于使用方的误用而丢失一般很难追查,如果消息中间件本身具备消息回溯功能的话,可以通过回溯消费复现“丢失的”消息进而查出问题的源头之所在。消息回溯的作用远不止与此,比如还有索引恢复、本地缓存重建,有些业务补偿方案也可以采用回溯的方式来实现。

08.消息堆积

流量削峰是消息中间件的一个非常重要的功能,而这个功能其实得益于其消息堆积能力。从某种意义上来讲,如果一个消息中间件不具备消息堆积的能力,那么就不能把它看做是一个合格的消息中间件。消息堆积分内存式堆积和磁盘式堆积。

09.消息追踪/轨迹

对于分布式架构系统中的链路追踪(trace)而言,大家一定不会陌生。对于消息中间件而言,消息的链路追踪(以下简称消息追踪)同样重要。对于消息追踪最通俗的理解就是要知道消息从哪来,存在哪里以及发往哪里去。基于此功能下,我们可以对发送或者消费完的消息进行链路追踪服务,进而可以进行问题的快速定位与排查。想要知道消息发送成功了吗?发送的消息在消费端为什么消费不到?为什么又会重复消费?等等问题。引入消息轨迹可以知道消息从生产者触发,经由broker等代理存储,再到消费者消费的整个过程,各个节点的状态、时间、地点等数据汇聚而成完整的链路信息。

10.消息过滤

消息过滤是指按照既定的过滤规则为下游用户提供指定类别的消息。就以kafka而言,完全可以将不同类别的消息发送至不同的topic中,由此可以实现某种意义的消息过滤,或者Kafka还可以根据分区对同一个topic中的消息进行分类。不过更加严格意义上的消息过滤应该是对既定的消息采取一定的方式按照一定的过滤规则进行过滤。同样以Kafka为例,可以通过客户端提供的ConsumerInterceptor接口或者Kafka Stream的filter功能进行消息过滤。对于rocketmq来说,支持Tag、SQL92和类过滤器(新版去除)等3种模式。

11.消息审计

消息审计是指在消息在生产、存储和消费的整个过程之间对消息个数及延迟的审计,以此来检测是否有数据丢失、是否有数据重复、端到端的延迟又是多少等。有关产品:Uber的Chaperone、LinkedIn的kafka monitor、Confluent Control Center等,有需要或感兴趣可自行通过网络了解下。

12.消息路由

将消息路由到指定的队列中,消费者消费队列里的消息。RabbitMQ可以从交换器Exchanger根据路由键路由到指定一个或多个队列。kafka默认是按照消息主题进行路由,消息路由在kafka中使用场景较少,使用起来也比较麻烦,如无特殊需要,一般不推荐使用。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/694372
推荐阅读
相关标签
  

闽ICP备14008679号