当前位置:   article > 正文

如何使用Java和RabbitMQ实现延迟队列?

如何使用Java和RabbitMQ实现延迟队列?

前言

今天我们使用Java和RabbitMQ实现消息队列的延迟功能。

前期准备,需要安装好docker、docker-compose的运行环境。

需要安装RabbitMQ的可以看下面这篇文章。

如何使用PHP和RabbitMQ实现消息队列?-CSDN博客

今天讲的是依赖RabbitMQ的延迟插件实现消息队列的延迟功能。

如何安装RabbitMQ的延迟插件并且启用,可以看下面的这篇文章。

如何使用PHP和RabbitMQ实现延迟队列(方式一)?_php调rabbit 设置延时-CSDN博客

一、编写代码

1、使用springboot框架快速搭建一个项目。

2、在 pom.xml 中添加 Spring Boot AMQP 的依赖,内容如下。

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

3、在 application.yml 中配置 RabbitMQ 的连接信息,内容如下。

  1. spring:
  2. rabbitmq:
  3. host: localhost
  4. port: 5672
  5. username: guest
  6. password: guest

4、在配置类中定义交换机、队列和绑定,内容如下。

  1. package com.ayzen.hello;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import org.springframework.amqp.core.*;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class RabbitMqConfig {
  9. public static final String DELAYED_EXCHANGE = "delayed_exchange";
  10. public static final String DELAYED_QUEUE = "delayed_queue";
  11. public static final String ROUTING_KEY = "delayed_key";
  12. @Bean
  13. CustomExchange delayedExchange() {
  14. Map<String, Object> args = new HashMap<>();
  15. args.put("x-delayed-type", "direct");
  16. return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
  17. }
  18. @Bean
  19. Queue delayedQueue() {
  20. return new Queue(DELAYED_QUEUE, true);
  21. }
  22. @Bean
  23. Binding binding(Queue delayedQueue, CustomExchange delayedExchange) {
  24. return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs();
  25. }
  26. }

5、创建一个生产者,发送一个带有延迟属性的消息,内容如下。

  1. package com.ayzen.hello;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.core.MessageBuilder;
  6. import org.springframework.amqp.core.MessageProperties;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.http.ResponseEntity;
  10. import org.springframework.web.bind.annotation.GetMapping;
  11. import org.springframework.web.bind.annotation.RequestMapping;
  12. import org.springframework.web.bind.annotation.RestController;
  13. @RestController
  14. @RequestMapping("/test")
  15. public class TestController {
  16. final Logger logger = LoggerFactory.getLogger(getClass());
  17. @Autowired
  18. RabbitTemplate rabbitTemplate;
  19. @GetMapping("/send")
  20. public ResponseEntity<Object> send() {
  21. this.sendDelayMessage("sendDelayMessage", 5000);
  22. return ResponseEntity.ok(ResponseDto.success("ok"));
  23. }
  24. private void sendDelayMessage(String message, long ttlInMilliseconds) {
  25. MessageProperties messageProperties = new MessageProperties();
  26. messageProperties.setHeader("x-delay", ttlInMilliseconds);
  27. Message msg = MessageBuilder.withBody(message.getBytes()).andProperties(messageProperties).build();
  28. rabbitTemplate.convertAndSend("delayed_exchange", "delayed_key", msg);
  29. logger.info("send message to rabbitmq.");
  30. }
  31. }

6、创建一个消息者,监听接收队列中的消息,内容如下。

  1. package com.ayzen.hello;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class TestService {
  8. final Logger logger = LoggerFactory.getLogger(getClass());
  9. @RabbitListener(queues = "delayed_queue")
  10. public void process(String message) {
  11. logger.info("process message from rabbitmq,message={}", message);
  12. }
  13. }

7、至此,测试项目代码已完成,下一步将进行验证。

二、测试验证

1、启动服务。

2、调用生产者,执行如下代码。

curl http://127.0.0.1:8080/test/send

3、查看日志,正常情况会返回如下内容。

如上图所示,在2024-04-07T22:32:47.489+08:00接收到生产者的请求,然后在2024-04-07T22:32:52.588+08:00执行消费动作,延迟5秒。

4、至此,使用Java和RabbitMQ实现延迟队列的功能已验证完毕。

总结

用Java和RabbitMQ实现消息队列的延迟功能,其实依靠的是RabbitMQ的一个延迟插件,主要有以下几个步骤。

1、安装RabbitMQ延迟插件。

2、编写Java测试项目。

3、进行测试验证。

上面的代码只是做个简单的示例,如果运用到实际的项目当中需要做进一步的优化。

最后因本人能力有限,有什么不对的地方望各位大佬指出好让我改进,多多包含,谢谢大家。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/384372
推荐阅读
相关标签
  

闽ICP备14008679号