赞
踩
今天我们使用Java和RabbitMQ实现消息队列的延迟功能。
前期准备,需要安装好docker、docker-compose的运行环境。
需要安装RabbitMQ的可以看下面这篇文章。
如何使用PHP和RabbitMQ实现消息队列?-CSDN博客
今天讲的是依赖RabbitMQ的延迟插件实现消息队列的延迟功能。
如何安装RabbitMQ的延迟插件并且启用,可以看下面的这篇文章。
如何使用PHP和RabbitMQ实现延迟队列(方式一)?_php调rabbit 设置延时-CSDN博客
1、使用springboot框架快速搭建一个项目。
2、在 pom.xml
中添加 Spring Boot AMQP 的依赖,内容如下。
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
3、在 application.yml
中配置 RabbitMQ 的连接信息,内容如下。
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: guest
- password: guest
4、在配置类中定义交换机、队列和绑定,内容如下。
- package com.ayzen.hello;
-
- import java.util.HashMap;
- import java.util.Map;
-
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RabbitMqConfig {
-
- public static final String DELAYED_EXCHANGE = "delayed_exchange";
- public static final String DELAYED_QUEUE = "delayed_queue";
- public static final String ROUTING_KEY = "delayed_key";
-
- @Bean
- CustomExchange delayedExchange() {
- Map<String, Object> args = new HashMap<>();
- args.put("x-delayed-type", "direct");
- return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
- }
-
- @Bean
- Queue delayedQueue() {
- return new Queue(DELAYED_QUEUE, true);
- }
-
- @Bean
- Binding binding(Queue delayedQueue, CustomExchange delayedExchange) {
- return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs();
- }
- }
5、创建一个生产者,发送一个带有延迟属性的消息,内容如下。
- package com.ayzen.hello;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageBuilder;
- import org.springframework.amqp.core.MessageProperties;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.http.ResponseEntity;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- @RequestMapping("/test")
- public class TestController {
- final Logger logger = LoggerFactory.getLogger(getClass());
-
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @GetMapping("/send")
- public ResponseEntity<Object> send() {
- this.sendDelayMessage("sendDelayMessage", 5000);
- return ResponseEntity.ok(ResponseDto.success("ok"));
- }
-
- private void sendDelayMessage(String message, long ttlInMilliseconds) {
- MessageProperties messageProperties = new MessageProperties();
- messageProperties.setHeader("x-delay", ttlInMilliseconds);
- Message msg = MessageBuilder.withBody(message.getBytes()).andProperties(messageProperties).build();
-
- rabbitTemplate.convertAndSend("delayed_exchange", "delayed_key", msg);
-
- logger.info("send message to rabbitmq.");
- }
- }
6、创建一个消息者,监听接收队列中的消息,内容如下。
- package com.ayzen.hello;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class TestService {
- final Logger logger = LoggerFactory.getLogger(getClass());
-
- @RabbitListener(queues = "delayed_queue")
- public void process(String message) {
- logger.info("process message from rabbitmq,message={}", message);
- }
- }
7、至此,测试项目代码已完成,下一步将进行验证。
1、启动服务。
2、调用生产者,执行如下代码。
curl http://127.0.0.1:8080/test/send
3、查看日志,正常情况会返回如下内容。
如上图所示,在2024-04-07T22:32:47.489+08:00接收到生产者的请求,然后在2024-04-07T22:32:52.588+08:00执行消费动作,延迟5秒。
4、至此,使用Java和RabbitMQ实现延迟队列的功能已验证完毕。
用Java和RabbitMQ实现消息队列的延迟功能,其实依靠的是RabbitMQ的一个延迟插件,主要有以下几个步骤。
1、安装RabbitMQ延迟插件。
2、编写Java测试项目。
3、进行测试验证。
上面的代码只是做个简单的示例,如果运用到实际的项目当中需要做进一步的优化。
最后因本人能力有限,有什么不对的地方望各位大佬指出好让我改进,多多包含,谢谢大家。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。