当前位置:   article > 正文

rabbitmq使用死信队列做延迟消息_rabbitmq 利用死信队列实现延时推送消息

rabbitmq 利用死信队列实现延时推送消息

 创建队列  交换机  和绑定关系

  1. package com.test.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.Exchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. /**
  11. * Created by IntelliJ IDEA.
  12. *
  13. * @Author : Yang Kai
  14. * @create 2023/1/3 10:00
  15. */
  16. @Configuration
  17. public class MyMQConfig {
  18. /**
  19. * 用@Bean方式创建,如果rabbitMQ服务器没有这些的话,容器中的 Queue Exchange Binding 都会连上rabbitMQ服务器自动创建
  20. *
  21. * @return
  22. */
  23. //队列
  24. @Bean
  25. public Queue orderDelayQueue() {
  26. /**
  27. * 死信队列的属性
  28. * X-dead-letter-exchange: order-event-exchange
  29. * X-dead-letter-routing-key: order.release.order
  30. * X-message-ttl: 60000
  31. */
  32. Map<String, Object> arguments = new HashMap<>();
  33. //指定交换机
  34. arguments.put("x-dead-letter-exchange", "order-event-exchange");
  35. //死信路由
  36. arguments.put("x-dead-letter-routing-key", "order.release.order");
  37. //死信的存活时间
  38. arguments.put("x-message-ttl", 60000);
  39. //String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments
  40. Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
  41. return queue;
  42. }
  43. //队列
  44. @Bean
  45. public Queue orderReleaseOrderQueue() {
  46. return new Queue("order.release.order.queue", true, false, false);
  47. }
  48. //交换机
  49. @Bean
  50. public Exchange orderEventExchange() {
  51. //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
  52. return new TopicExchange("order-event-exchange", true, false);
  53. }
  54. //绑定关系
  55. @Bean
  56. public Binding orderCreateOrderBinding() {
  57. //String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments
  58. Binding binding = new Binding("order.delay.queue", //目标队列
  59. Binding.DestinationType.QUEUE,
  60. "order-event-exchange",//交换机
  61. "order.create.order",//队列和交换机绑定的路由
  62. null);
  63. return binding;
  64. }
  65. //绑定关系
  66. @Bean
  67. public Binding orderReleaseOrderQueueBinding() {
  68. Binding binding1 = new Binding("order.release.order.queue", //目标队列
  69. Binding.DestinationType.QUEUE,
  70. "order-event-exchange",//交换机
  71. "order.release.order",//队列和交换机绑定的路由
  72. null);
  73. return binding1;
  74. }
  75. }

监听被死信队列过期的消息投递到正常队列的队

  1. //监听正常队列,这个队列是被死信队列过期后投递的队列
  2. @RabbitListener(queues = "order.release.order.queue")
  3. public void listener(Message message, Student student, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
  4. System.out.println("message:"+message);
  5. MessageProperties messageProperties = message.getMessageProperties();
  6. long tag = deliveryTag;
  7. Map<String, Object> headers = messageProperties.getHeaders();
  8. String messageId = headers.get("spring_returned_message_correlation").toString();
  9. System.out.println("message_id: "+messageId);
  10. if(stringRedisTemplate.opsForHash().entries("rabbitmq_log").containsKey(messageId)){
  11. //redis中包含该key,说明已经被消费过了
  12. System.out.println(messageId+"消息已经被消费过一次");
  13. //确认消息已被消费
  14. channel.basicAck(tag,false);
  15. return;
  16. }
  17. try {
  18. Date date = new Date();
  19. System.out.println("接收到死信队列的消息 = " + student);
  20. System.out.println("时间 = :" + date);
  21. stringRedisTemplate.opsForHash().put("rabbitmq_log",messageId,"v");
  22. channel.basicAck(tag,false);
  23. System.out.println(messageId+"消息消费成");
  24. }catch (Exception e){
  25. e.printStackTrace();
  26. System.out.println("消息消费异常");
  27. channel.basicNack(tag,false,true);
  28. }
  29. }

发送消息

  1. //测试死信队列
  2. @Test
  3. public void contextLoadsxx() {
  4. String msgId = UUID.randomUUID().toString();
  5. System.out.println("1msgId:"+msgId);
  6. RabbitmqSendLog rabbitmqSendLog = new RabbitmqSendLog();
  7. rabbitmqSendLog.setMessageId(msgId);
  8. rabbitmqSendLog.setExchange("exchange-directs-user");
  9. rabbitmqSendLog.setRouteKey("route.user");
  10. rabbitmqSendLog.setCount(1);
  11. rabbitmqSendLog.setStatus(0);
  12. //1分钟后重试的时间
  13. rabbitmqSendLog.setTryTime(new Date(System.currentTimeMillis()+1000+60+1));
  14. Student student = new Student();
  15. student.setAddress("杭州1");
  16. student.setAge("112");
  17. student.setName("小明");
  18. String s = JSON.toJSONString(student);
  19. rabbitmqSendLog.setContent(s);
  20. int insert = rabbitmqSendLogMapper.insert(rabbitmqSendLog);
  21. try {
  22. rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", student,new CorrelationData(msgId));
  23. }catch (Exception e){
  24. //如果mq网络原因.发送邮件,活着其他方式通知
  25. // rabbitmqSendLogMapper.insert(rabbitmqSendLog);
  26. }
  27. }

发送时间

 收到消息时间

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/930165
推荐阅读
相关标签
  

闽ICP备14008679号