当前位置:   article > 正文

rabbitmq安装延时队列插件实现延时队列_rabbitmq延迟队列插件

rabbitmq延迟队列插件

前言

这个很难查看消息堆积的情况,因为他把要发送的延时消息存在本地的分布式mnesia数据库中,其次过期时间为最大int值,超过这个值(大概49天)得代码判定重复过期设置。

下载插件地址

要注意和自己的rabbitmq的版本对应起来
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

我的mq是docker安装的3.9.7的
在这里插入图片描述

在这里插入图片描述

下载完之后把插件copymqplugin目录下,然后启用插件。之后重启容器,我这里是docker-compose安装的

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 1
docker-compose restart
  • 1

在这里插入图片描述
在这里插入图片描述

进入rabbitmq管理页面查看插件是否安装成功

Type里面查看是否有x-delayed-message选项,如果存在就代表插件安装成功。
在这里插入图片描述

使用mq延时队列插件下springboot实现延时队列

yaml配置mq,然后在mq管理页面创建虚拟host:fchan

spring:
  rabbitmq:
    host: 110.40.181.73
    port: 35672
    username: root
    password: 10086
    virtual-host: /fchan
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在这里插入图片描述

配置延时队列和延时交换机的绑定

package com.fchan.mq.mqDelay;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MqDelayConfig {

    //最后经过死信队列转发后实际消费的交换机
    private static final String EXCHANGE_NAME = "delayed_exchange";
    //最后经过死信队列转发后实际消费的队列
    private static final String QUEUE_NAME = "delayed_queue";
    //最后经过死信队列转发后实际消费的路由key
    private static final String ROUTE_KEY = "delayed_key";

    /**
     * 交换机
     */
    @Bean
    CustomExchange exchange() {
        //通过x-delayed-type参数设置fanout /direct / topic / header 类型
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(EXCHANGE_NAME, "x-delayed-message",true, false,args);
    }

    /**
     * 队列
     */
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME,true,false,false);
    }

    /**
     * 将队列绑定到交换机
     */
    @Bean
    public Binding binding(CustomExchange exchange, Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ROUTE_KEY)
                .noargs();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

消息生产者

package com.fchan.mq.mqDelay;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class MyRabbitSender {

    Logger log = LoggerFactory.getLogger(MyRabbitSender.class);

    private static final String ROUTE_KEY = "delayed_key";
    private static final String EXCHANGE_NAME = "delayed_exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @param msg   消息
     * @param delay 延时时间,秒
     */
    public void send2(String msg, int delay) {
        log.info("RabbitSender.send() msg = {}", msg);
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, msg, message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);  //消息持久化
            message.getMessageProperties().setDelay(delay * 1000);   // 单位为毫秒
            return message;
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

消息消费者

package com.fchan.mq.mqDelay;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyRabbitConsume {
    Logger log = LoggerFactory.getLogger(MyRabbitConsume.class);

    @RabbitListener(queues = "delayed_queue")
    public void infoConsumption(String data) throws Exception {
        log.info("收到信息:{}",data);
        log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

在这里插入图片描述

参考了大佬的博文
https://juejin.cn/post/6977516798828609567#heading-13

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/400974
推荐阅读
相关标签
  

闽ICP备14008679号