当前位置:   article > 正文

Springboot 2.X 集成 RabbitMQ 实现延时消息队列功能_springboot rabbitmq 有时生效

springboot rabbitmq 有时生效


1 摘要

延时消息,即生产者发送一条消息,消费者在等待指定的时间之后再进行消费。常见应用场景如商品下单后,当用户没有在固定时间内支付即会销毁订单。RabbitMQ 实现延时消息的原理是通过 TTL + dead message (消息存活时间 + 死信)组合来实现的。RabbitMQ 3.8 开始,提供了专门的延时队列插件。SpringBoot 1.6 开始支持 RabbitMQ 的延时插件。本文将基于 Springboot 2.X 集成 RabbitMQ 实现延时消息队列功能。

Spring boot 集成 RabbitMQ: SpringBoot 快速整合 RabbitMQ 消息队列框架

2 安装 RabbitMQ 延时消息插件

2.1 下载插件

插件下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
  • 1

选择与 RabbitMQ 版本对应的插件

rabbitmq-delayed-message-plugin

2.2 安装插件

RabbitMQ 的插件目录:

操作系统插件目录
Linux/usr/lib/rabbitmq/plugins and /usr/lib/rabbitmq/lib/rabbitmq_server-{version}/plugins
Windows默认地址C:\Program Files\RabbitMQ\rabbitmq_server-{version}\plugins,如果为自定义安装,则根据安装目录而定
macOS(Homebrew)/usr/local/Cellar/rabbitmq/{version}/plugins

作者使用的为 Linux 操作系统(centOS),其目录为:

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.0-beta.4/plugins
  • 1

将下载好的插件复制到插件目录即可

2.3 启用插件

执行启用插件命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 1

3 核心代码

3.1 配置定义
./demo-base-web/src/main/java/com/ljq/demo/springboot/baseweb/config/RabbitMQConfig.java
  • 1
package com.ljq.demo.springboot.baseweb.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description: tabbitmq 配置信息
 * @Author: junqiang.lu
 * @Date: 2019/1/21
 */
@Configuration
public class RabbitMQConfig {

    /**
     * 消息队列名称、消息队列路由键、消费者消费队列路由键
     */
    public static final String QUEUE_NAME_DELAY_CART = "rabbitmq_delay_cart";

    /**
     * 交换机名称
     */
    public static final String DIRECT_EXCHANGE_NAME_DELAY_CART = "rabbitmq_direct_exchange_delay_cart";

    /**
     * 交换机代理的路由键
     */
    public static final String DIRECT_EXCHANGE_ROUT_KEY_DELAY_CART = "rabbitmq.spring.boot.cart";

    /**
     * 生产者发送路由键
     */
    public static final String QUEUE_SENDER_ROUTING_KEY_DELAY_CART = "rabbitmq.spring.boot.cart";

    /**
     * 延时时长(单位:毫秒)
     */
    public static final int QUEUE_DELAY_TIME_CART = 30000;


    /**
     * 定义延时队列(cart)
     *
     * @return
     */
    @Bean("queueDelayCart")
    public Queue queueDelayCart() {
        return new Queue(QUEUE_NAME_DELAY_CART, true, false, true);
    }

    /**
     * 定义延时交换机(cart)
     * @return
     */
    @Bean("delayExchangeCart")
    public CustomExchange delayExchangeCart() {
        Map<String, Object> args = new HashMap<>(16);
        args.put("x-delayed-type", "direct");
        CustomExchange exchange = new CustomExchange(DIRECT_EXCHANGE_NAME_DELAY_CART,"x-delayed-message",
                false, true, args);
        return exchange;
    }

    /**
     * 绑定延时交换机与队列(cart)
     *
     * @param queue
     * @param customExchange
     * @return
     */
    @Bean
    public Binding bindingDirectExchangeDelayCart(@Qualifier("queueDelayCart") Queue queue,
                                                  @Qualifier("delayExchangeCart") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(DIRECT_EXCHANGE_ROUT_KEY_DELAY_CART).noargs();
    }



}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
3.2 消息队列生产者
./demo-base-web/src/main/java/com/ljq/demo/springboot/baseweb/rabbitmq/RabbitMQSender.java
  • 1
package com.ljq.demo.springboot.baseweb.rabbitmq;

import com.ljq.demo.springboot.baseweb.config.RabbitMQConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Description: rabbitMQ 消息发送者
 * @Author: junqiang.lu
 * @Date: 2019/1/21
 */
@Service
public class RabbitMQSender {

    private static final Logger logger = LoggerFactory.getLogger(RabbitMQSender.class);

    @Autowired
    private AmqpTemplate rabbitTemplate;

    /**
     * 使用直连交换机发送延时消息
     * 交换机类型: {@link org.springframework.amqp.core.DirectExchange}
     * 交换机名称: {@link RabbitMQConfig#DIRECT_EXCHANGE_ROUT_KEY_DELAY_CART}
     * @param message
     */
    public void sendDirectDelayCart(String message) {
        logger.info("exchangeName = {}, queue sender outing key = {}, message = {}",
                RabbitMQConfig.DIRECT_EXCHANGE_NAME_DELAY_CART, RabbitMQConfig.QUEUE_SENDER_ROUTING_KEY_DELAY_CART,
                message);
        rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE_NAME_DELAY_CART,
                RabbitMQConfig.QUEUE_SENDER_ROUTING_KEY_DELAY_CART, message, message1 -> {
                    message1.getMessageProperties().setDelay(RabbitMQConfig.QUEUE_DELAY_TIME_CART);
                    return message1;
                });
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
3.3 消息队列消费者
./demo-base-web/src/main/java/com/ljq/demo/springboot/baseweb/rabbitmq/RabbitMQReceiver.java
  • 1
package com.ljq.demo.springboot.baseweb.rabbitmq;

import com.ljq.demo.springboot.baseweb.config.RabbitMQConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @Description: RabbitMQ 消息队列消费者
 * @Author: junqiang.lu
 * @Date: 2019/1/21
 */
@Service
public class RabbitMQReceiver {

    private static final Logger logger = LoggerFactory.getLogger(RabbitMQReceiver.class);

    /**
     * 消息接收
     *
     * @param message
     */
    @RabbitHandler
    @RabbitListener(queues = {RabbitMQConfig.QUEUE_NAME_DELAY_CART})
    public void receiveDelayCart(String message) {
        logger.info("Received queueName = {}, message = {}",RabbitMQConfig.QUEUE_NAME_DELAY_CART, message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
3.4 使用示例(Controller)
./demo-web/src/main/java/com/ljq/demo/springboot/web/controller/RabbitMQController.java
  • 1
package com.ljq.demo.springboot.web.controller;

import com.ljq.demo.springboot.baseweb.api.ApiResult;
import com.ljq.demo.springboot.baseweb.rabbitmq.RabbitMQSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: RabbitMQ 消息队列测试 controller
 * @Author: junqiang.lu
 * @Date: 2019/1/21
 */
@RestController
@RequestMapping(value = "api/demo/rabbitmq")
public class RabbitMQController {

    @Autowired
    private RabbitMQSender rabbitMQSender;

    /**
     * 发送测试-延时队列
     *
     * @return
     */
    @GetMapping(value = "/send/delay")
    public ApiResult sendDelay(){
        rabbitMQSender.sendDirectDelayCart("send direct delay cart");

        return ApiResult.success();
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

4 测试

启动项目,访问测试接口:

http://127.0.0.1:8088/api/demo/rabbitmq/send/delay
  • 1

服务器日志:

2021-10-13 16:31:37 | INFO  | http-nio-8088-exec-1 | c.ljq.demo.springboot.web.acpect.SimpleInterceptor 29| preHandle
2021-10-13 16:31:37 | INFO  | http-nio-8088-exec-1 | com.ljq.demo.springboot.web.acpect.LogAspect 66| [AOP-LOG-START]
	requestMark: 70e83f06-9ec4-427e-9a4d-d8ebacfab13b
	requestIP: 127.0.0.1
	contentType:null
	requestUrl: http://127.0.0.1:8088/api/demo/rabbitmq/send/delay
	requestMethod: GET
	requestParams: 
	targetClassAndMethod: com.ljq.demo.springboot.web.controller.RabbitMQController#sendDelay
2021-10-13 16:31:37 | INFO  | http-nio-8088-exec-1 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQSender 108| exchangeName = rabbitmq_direct_exchange_delay_cart, queue sender outing key = rabbitmq.spring.boot.cart, message = send direct delay cart
2021-10-13 16:31:37 | INFO  | http-nio-8088-exec-1 | com.ljq.demo.springboot.web.acpect.LogAspect 72| [AOP-LOG-END]
	requestMark: 70e83f06-9ec4-427e-9a4d-d8ebacfab13b
	requestUrl: http://127.0.0.1:8088/api/demo/rabbitmq/send/delay
	response: ApiResult(code=200, msg=成功, data=null, extraData=null, timestamp=1634113897994)
2021-10-13 16:31:38 | INFO  | http-nio-8088-exec-1 | com.ljq.demo.springboot.baseweb.log.LogService 44| [LOG-RESPONSE]
	requestIp: 127.0.0.1
	requestUrl: http://127.0.0.1:8088/api/demo/rabbitmq/send/delay
	response: ApiResult(code=200, msg=成功, data=null, extraData=null, timestamp=1634113897994)
2021-10-13 16:31:38 | INFO  | http-nio-8088-exec-1 | c.ljq.demo.springboot.web.acpect.SimpleInterceptor 38| postHandle
2021-10-13 16:31:38 | INFO  | http-nio-8088-exec-1 | c.ljq.demo.springboot.web.acpect.SimpleInterceptor 44| afterCompletion
2021-10-13 16:32:08 | INFO  | SimpleAsyncTaskExecutor-1 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQReceiver 50| Received queueName = rabbitmq_delay_cart, message = send direct delay cart
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

rabbitmq-delayed-message-log

从日志可以看出在消息发送 30 秒后被消费。

至此,基于 SpringBoot 集成 RabbitMQ 实现延时消息队列的功能已经完成。

5 推荐参考资料

Scheduling Messages with RabbitMQ(官方文档)

RabbitMQ-Community Plugins(官方文档)

RabbitMQ-Installing Additional Plugins(官方文档)

Spring AMQP 1.6.0 Milestone 1 (and 1.5.4) Available

How to public message with delay time rabbitmq implement in spring boot

6 Github 源码

Gtihub 源码地址 : https://github.com/Flying9001/springBootDemo

个人公众号:404Code,分享半个互联网人的技术与思考,感兴趣的可以关注.
404Code

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/811397
推荐阅读
相关标签
  

闽ICP备14008679号