当前位置:   article > 正文

RabbitMQ延迟队列实现 ( 插件版 )_rabbitmq_delayed_message_exchange

rabbitmq_delayed_message_exchange

一、RabbitMQ延迟队列实现 ( 插件版 )

1.1、下载 rabbitmq-delayed-message-exchange 插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
  • 1
  • 由于我的RabbitMQ是再Linux服务器中通过Docker安装的,所以先将rabbitmq_delayed_message_exchange-3.9.0.ez插件上传到Linux中 ( 什么位置都可以 )。

cd

  • 进入放rabbitmq_delayed_message_exchange-3.9.0.ez插件的目录
cd /home/docker
  • 1
  • 将插件拷贝到容器内plugins目录下(cd-rabbitmq是RabbitMQ容器的容器名称,也可以使用容器ID)
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez cd-rabbitmq:/plugins
  • 1
  • 进入 RabbitMQ 容器
docker exec -it cd-rabbitmq /bin/bash
  • 1
  • 查看 rabbitmq_delayed_message_exchange-3.9.0.ez 插件是否存在
cd plugins
ls |grep delay
  • 1
  • 2
  • 在 plugins 内启用 rabbitmq_delayed_message_exchange-3.9.0.ez 插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 1
  • 退出RabbitMQ容器
exit
  • 1
  • 重启 RabbitMQ 容器
docker restart cd-rabbitmq
  • 1

cd

  • 容器启动成功之后,登录RabbitMQ的管理页面,找到ExchangesTab页。点击Add a new exchange,在Type里面查看是否有x-delayed-message选项,如果存在就代表插件安装成功。

cd



1.2、rabbitmq-delayed-message-exchange插件是实现原理

cd

安装完rabbitmq-delayed-message-exchange插件后,会生成一个新的Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制。接收到消息后并不会立即将消息投递至目标队列,而是存储在mnesia table(一个分布式数据库)中,然后检测消息延迟时间,如果达到可投递时间( 过期时间 )后,将其通过 x-delayed-type 类型标记的交换机投递到目标队列中。

原理:DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,会判断消息是否具有x-delay属性,如果有x-delay属性,说明是延迟消息,那么这些消息就会存到Mnesia table ( 一个分布式数据库 )中,并读取x-delay属性值作为延迟时间。消息通过计时器调度分发,在x-delay延迟时间到期后,就会重新投递消息到指定队列中。


1.3、配置RabbitMQ连接
#[ RabbitMQ相关配置 ]
#rabbitmq服务器IP
spring.rabbitmq.host=安装RabbitMQ的服务器IP
#rabbitmq服务器端口(默认为5672)
spring.rabbitmq.port=5672
#用户名(默认用户名为guest)
spring.rabbitmq.username=admin
#用户密码(默认密码为guest)
spring.rabbitmq.password=admin
#虚拟主机(一个RabbitMQ服务可以配置多个虚拟主机,每一个虚拟机主机之间是相互隔离,相互独立的,授权用户到指定的virtual-host就可以发送消息到指定队列)
#vhost虚拟主机地址( 默认为/ )
spring.rabbitmq.virtual-host=/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

1.4、创建自定义RabbitMQ配置类,配置延迟交换机、延迟队列、以及根据路由键配置交换机和队列的绑定关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfiguration {

    //延迟交换机
    public static final String DELAY_EXCHANGE = "delay_exchange";
    //延迟队列
    public static final String DELAY_QUEUE = "delay_queue";
    //延迟路由键
    public static final String DELAY_QUEUE_ROUTING_KEY = "delay_queue_routing_key";


    //延迟交换机
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange(DELAY_EXCHANGE)
            	//delayed标记当前交换机是一个具备延迟效果的交换机,类型默认是direct直接模式
                .delayed()
                .durable(true)
                .build();
    }

    //延迟队列
    @Bean("delayQueue")
    public Queue delayQueue(){
        return new Queue(DELAY_QUEUE, true, false, false);
    }

    //延迟队列和延迟交换机
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayQueue")Queue queue,
                                 @Qualifier("delayExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

1.5、创建消息生产者发送消息

1)注意:消息头Header中一定要携带上 x-delay 参数,用来指定消息的延迟时间。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.*;

/**
 * 延迟消息生产者
 */
@Component
public class DelayMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //发送延迟消息(1 推荐)
    public void sendDelayMessage(String message, Integer delayTime){
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
            //setDelay()的本质是对消息头设置 x-delay 参数,用来指定消息的延迟时间
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });
    }

    //发送延迟消息(2)
    public void sendDelayMessage02(String message, Integer delayTime){
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
            //消息头中一定要携带上 x-delay 参数,用来指定消息的延迟时间
            msg.getMessageProperties().setHeader("x-delay", delayTime);
            return msg;
        });
    }

    //发送延迟消息(3)
    public void sendDelayMessage03(String message, Integer delayTime){
        // 1. 准备消息
        Message msg = MessageBuilder
                .withBody(message.getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setHeader("x-delay", delayTime)
                .build();
        // 2. 准备correlationData
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 3. 发送消息
        rabbitTemplate.convertAndSend("DELAY_EXCHANGE",
                "DELAY_QUEUE_ROUTING_KEY",
                message,
                correlationData);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

1.6、创建消息消费者消费消息
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE;

@Slf4j
@Component
public class DelayQueueConsumer {

    /**
     * 监听延迟队列
     * @param message  接收的信息
     */
    @RabbitListener(queues = DELAY_QUEUE)
    public void receiveMessage(Message message) {
        String msg = new String(message.getBody());
        // 记录日志
        log.info("当前时间:{},从延迟队列中消费的消息:{}", LocalDateTime.now(), msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

1.7、创建控制类接收客户请求
import com.cd.springbootrabbitmq.producer.DelayMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

@Slf4j
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {

    @Autowired
    private DelayMessageProducer producer;

    /**
     * @param message 客户发送的消息
     * @param delayTime 消息延迟时间
     */
    @RequestMapping("/send")
    public void send(String message, Integer delayTime){
        // 记录日志
        log.info("当前时间:{},消息:{},延迟时间:{}", LocalDateTime.now(), message, delayTime );
        // 发送延迟消息
        producer.sendDelayMessage(message, delayTime);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

1.8、测试

在浏览器中先后提交下面两个请求:

  • localhost:8088/rabbitmq/send?message=测试自定义延迟处理60s&delayTime=60000

  • localhost:8088/rabbitmq/send?message=测试自定义延迟处理10s&delayTime=10000

查看idea控制台:

cd

解析:从控制台打印信息可以看出,虽然延迟60s的消息先发送,延迟10s的消息后发送。但延迟10s的消息无需等待延迟60s的消息被释放后,才能被消费。这些都是rabbitmq-delayed-message-exchange插件帮我们实现的。


1.9、提示

消息分发前是存储在节点下的Mnesia table中,通过计时器调度实现分发,官网写到:这个插件的设计并不适合大量延迟消息的情况(例如数百万条)。因为随着mnesia数据库的增长,延迟消息的延时时间变得难以控制,就很难达到预期的效果


1.10、登录rabbitmq management可视化控制台,遇到的错误:Not management user

cd

原因:该用户不是管理员,也就是我们登录的账号没有管理员权限

  • 进入RabbitMQ容器
docker exec -it cd-rabbitmq /bin/bash
  • 1
  • 查看用户列表
rabbitmqctl list_users
  • 1
  • 使用命令 rabbitmqctl set_user_tags 用户名 adminitrator 给用户赋予管理员权限。
rabbitmqctl set_user_tags guest adminitrator
  • 1
  • 然后使用 rabbitmqctl list_users 命令查看用户列表,如果该用户的tags标签变成adminitrator,则表示已经为该用户赋予了管理员权限
rabbitmqctl list_users
  • 1

cd


guest用户始终无法登录问题

我修改了guest用户为管理用户后,发现依然无法使用guest登录,所以新建了一个admin用户登录,创建流程如下:

  • 创建一个用户名为admin,密码为admin的用户
rabbitmqctl add_user admin admin
  • 1
  • 由于该用户刚创建,还没有赋予管理员权限。如果此时使用该用户、密码去登录rabbitmq management可视化控制台。就会报下面这个错误:Not management user 不是管理员用户

cd

  • 为admin用户设置权限,赋予用户默认vhose的全部操作权限:
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
  • 1
  • 所以需要使用 rabbitmqctl set_user_tags admin administrator 命令为admin用户赋予管理员权限,让这个用户变成管理员用户。
rabbitmqctl set_user_tags admin administrator
  • 1
  • 然后使用 rabbitmqctl list_users 命令查看用户列表,如果该用户的tags标签变成adminitrator,则表示已经为该用户赋予了管理员权限
rabbitmqctl list_users
  • 1
  • exit退出RabbitMQ容器
exit
  • 1

cd

  • 然后这次使用admin用户可以登录成功了,但不明白前面的guest用户为什么登录不了( guest用户也设置成管理用户了!但就是登陆不了 )
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号