赞
踩
创建队列 交换机 和绑定关系
- package com.test.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.Exchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * Created by IntelliJ IDEA.
- *
- * @Author : Yang Kai
- * @create 2023/1/3 10:00
- */
- @Configuration
- public class MyMQConfig {
- /**
- * 用@Bean方式创建,如果rabbitMQ服务器没有这些的话,容器中的 Queue Exchange Binding 都会连上rabbitMQ服务器自动创建
- *
- * @return
- */
- //队列
- @Bean
- public Queue orderDelayQueue() {
- /**
- * 死信队列的属性
- * X-dead-letter-exchange: order-event-exchange
- * X-dead-letter-routing-key: order.release.order
- * X-message-ttl: 60000
- */
- Map<String, Object> arguments = new HashMap<>();
- //指定交换机
- arguments.put("x-dead-letter-exchange", "order-event-exchange");
- //死信路由
- arguments.put("x-dead-letter-routing-key", "order.release.order");
- //死信的存活时间
- arguments.put("x-message-ttl", 60000);
- //String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments
- Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
- return queue;
- }
-
- //队列
- @Bean
- public Queue orderReleaseOrderQueue() {
- return new Queue("order.release.order.queue", true, false, false);
- }
-
- //交换机
- @Bean
- public Exchange orderEventExchange() {
- //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
- return new TopicExchange("order-event-exchange", true, false);
- }
-
- //绑定关系
- @Bean
- public Binding orderCreateOrderBinding() {
- //String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments
-
- Binding binding = new Binding("order.delay.queue", //目标队列
- Binding.DestinationType.QUEUE,
- "order-event-exchange",//交换机
- "order.create.order",//队列和交换机绑定的路由
- null);
- return binding;
- }
-
- //绑定关系
- @Bean
- public Binding orderReleaseOrderQueueBinding() {
- Binding binding1 = new Binding("order.release.order.queue", //目标队列
- Binding.DestinationType.QUEUE,
- "order-event-exchange",//交换机
- "order.release.order",//队列和交换机绑定的路由
- null);
- return binding1;
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
监听被死信队列过期的消息投递到正常队列的队
- //监听正常队列,这个队列是被死信队列过期后投递的队列
- @RabbitListener(queues = "order.release.order.queue")
- public void listener(Message message, Student student, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
- System.out.println("message:"+message);
- MessageProperties messageProperties = message.getMessageProperties();
- long tag = deliveryTag;
- Map<String, Object> headers = messageProperties.getHeaders();
- String messageId = headers.get("spring_returned_message_correlation").toString();
- System.out.println("message_id: "+messageId);
- if(stringRedisTemplate.opsForHash().entries("rabbitmq_log").containsKey(messageId)){
- //redis中包含该key,说明已经被消费过了
- System.out.println(messageId+"消息已经被消费过一次");
- //确认消息已被消费
- channel.basicAck(tag,false);
- return;
- }
- try {
- Date date = new Date();
- System.out.println("接收到死信队列的消息 = " + student);
- System.out.println("时间 = :" + date);
- stringRedisTemplate.opsForHash().put("rabbitmq_log",messageId,"v");
- channel.basicAck(tag,false);
- System.out.println(messageId+"消息消费成");
- }catch (Exception e){
- e.printStackTrace();
- System.out.println("消息消费异常");
- channel.basicNack(tag,false,true);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
发送消息
- //测试死信队列
- @Test
- public void contextLoadsxx() {
- String msgId = UUID.randomUUID().toString();
- System.out.println("1msgId:"+msgId);
- RabbitmqSendLog rabbitmqSendLog = new RabbitmqSendLog();
- rabbitmqSendLog.setMessageId(msgId);
- rabbitmqSendLog.setExchange("exchange-directs-user");
- rabbitmqSendLog.setRouteKey("route.user");
- rabbitmqSendLog.setCount(1);
- rabbitmqSendLog.setStatus(0);
- //1分钟后重试的时间
- rabbitmqSendLog.setTryTime(new Date(System.currentTimeMillis()+1000+60+1));
- Student student = new Student();
- student.setAddress("杭州1");
- student.setAge("112");
- student.setName("小明");
- String s = JSON.toJSONString(student);
- rabbitmqSendLog.setContent(s);
- int insert = rabbitmqSendLogMapper.insert(rabbitmqSendLog);
- try {
- rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", student,new CorrelationData(msgId));
- }catch (Exception e){
- //如果mq网络原因.发送邮件,活着其他方式通知
- // rabbitmqSendLogMapper.insert(rabbitmqSendLog);
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
发送时间
收到消息时间
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。