当前位置:   article > 正文

使用Rabbitmq进行异步消费和延迟消费_rabbitmq异步消费消息

rabbitmq异步消费消息

作者RabbitmQ版本

erlang 版本:21.3

RabbitMQ 版本:3.7.14

要是用延迟消费的话,需要去下载一个插件:rabbitmq_delayed_message_exchange插件

下载地址:Community Plugins — RabbitMQ

rabbitmq-delayed-message-exchange v3.8版本适用于RabbitMQ3.7.X版本,插件要与RabbitMQ版本对应,不然会出现莫名其妙的错误

下载好后放到rabbitmq安装目录下的:plugins中

接下来直接上代码:

pom中引入需要的架包:

  1. <!--RabbitMq-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

配置文件中新增RabbitMq连接信息

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. username: guest
  6. password: guest
  7. virtual-host: /

配置RabbitMq

  1. @Data
  2. @Configuration
  3. @ConfigurationProperties(prefix = "spring.rabbitmq")
  4. public class RabbitMqConfig {
  5. private String host;
  6. private int port;
  7. private String userName;
  8. private String password;
  9. @Bean
  10. public ConnectionFactory connectionFactory() {
  11. CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);
  12. cachingConnectionFactory.setUsername(userName);
  13. cachingConnectionFactory.setPassword(password);
  14. cachingConnectionFactory.setVirtualHost("/");
  15. cachingConnectionFactory.setPublisherConfirms(true);
  16. return cachingConnectionFactory;
  17. }
  18. @Bean
  19. public RabbitTemplate rabbitTemplate() {
  20. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
  21. return rabbitTemplate;
  22. }
  23. }
  1. @Configuration
  2. public class QueueConfig {
  3. /**
  4. * 分派普通消息交换机
  5. */
  6. @Bean
  7. public TopicExchange topicExchange(){
  8. return new TopicExchange("ordinary_exchange",true,false);
  9. }
  10. @Bean
  11. public Queue queue() {
  12. Queue queue = new Queue("ordinary_queue", true);
  13. return queue;
  14. }
  15. @Bean
  16. public Binding binding() {
  17. return BindingBuilder.bind(queue()).to(topicExchange()).with("ordinary_queue");
  18. }
  19. /**
  20. * 分派延迟消息交换机
  21. */
  22. @Bean
  23. public CustomExchange delayExchange(){
  24. Map<String, Object> paramMap = new HashMap<String, Object>();
  25. paramMap.put("x-delayed-type","direct");
  26. return new CustomExchange("delay_exchange","x-delayed-message",true,false,paramMap);
  27. }
  28. @Bean
  29. public Queue delayQueue(){
  30. Queue delayQueue = new Queue("delay_queue", true);
  31. return delayQueue;
  32. }
  33. @Bean
  34. public Binding delayMessagebinding(){
  35. return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay_queue").noargs();
  36. }
  37. }
  1. @Service
  2. @Slf4j
  3. public class MessageServiceImpl implements MessageService {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @Override
  7. public void sendMsg(String queueName,String msg) {
  8. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  9. System.out.println("消息发送时间:"+sdf.format(new Date()));
  10. rabbitTemplate.convertAndSend("ordinary_exchange", queueName, msg);
  11. }
  12. /**
  13. * 实现延迟消息
  14. * @param queueName
  15. * @param message
  16. */
  17. @Override
  18. public void sendDelayMessage(String queueName, String message) {
  19. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  20. log.debug("消息发送时间:{}",sdf.format(new Date()));
  21. rabbitTemplate.convertAndSend("delay_exchange", queueName, message, new MessagePostProcessor() {
  22. @Override
  23. public Message postProcessMessage(Message message) throws AmqpException {
  24. message.getMessageProperties().setHeader("x-delay",3000);
  25. return message;
  26. }
  27. });
  28. }
  29. }
  1. @Component
  2. @Slf4j
  3. public class MessageReceiver {
  4. @RabbitListener(queues = "ordinary_queue")
  5. public void receive(String msg) {
  6. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  7. log.debug("消息接收时间:{},接收到的消息:{}",sdf.format(new Date()),msg);
  8. }
  9. @RabbitListener(queues = "delay_queue")
  10. public void receiveDelayMessage(String message) {
  11. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  12. log.debug("消息接收时间:{},接收到的消息:{}",sdf.format(new Date()),message);
  13. }
  14. }
  1. @RestController
  2. public class MessageController extends BaseController {
  3. @Autowired
  4. private MessageService messageService;
  5. @GetMapping("send")
  6. public ApiRest sendMessage(String queueName, String msg){
  7. messageService.sendMsg(queueName,msg);
  8. return this.success();
  9. }
  10. @GetMapping("delaySend")
  11. public ApiRest sendDelayMessage(String queueName,String message){
  12. messageService.sendDelayMessage(queueName, message);
  13. return this.success();
  14. }
  15. }

接下来测试就搞定延迟消费

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/689513
推荐阅读
相关标签
  

闽ICP备14008679号