赞
踩
生产者发送消息附带路由key:XA 》 交换机收到路由XA后》交换机把消息交给QA队列==》QA队列收到消息后会把消息交给死信交换机 》死信交换机到期时会把消息交给队列QD》此时消费者监听的QD会收到消息,以此来实现死信队列
生产者发送消息附带路由key:XB 》 交换机收到路由XB后》交换机把消息交给QB队列==》QB队列收到消息后会把消息交给死信交换机 》死信交换机到期时会把消息交给队列QD》此时消费者监听的QD会收到消息,以此来实现死信队列
生产者发送消息附带路由key:XC 》 交换机收到路由XC后》交换机把消息交给QC队列==》QC队列收到消息后会把消息交给死信交换机 》死信交换机到期时会把消息交给队列QD》此时消费者监听的QD会收到消息,以此来实现死信队列
QA和QB和QC都会通过路由key YD绑定到死信交换机Y上。
1、引入依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.6</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- Spring Boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2、编写配置文件
package com.example.demo.configuration; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class QueueTtlConfig { //普通交换机 private final static String X_EXCHANGE="X"; //死信交换机 private final static String Y_DEAD_EXCHANGE="Y"; //普通队列 private final static String QUEUE_A="QA"; private final static String QUEUE_B="QB"; private final static String QUEUE_C="QC"; //死信队列 private final static String DEAD_QUEUE_D="QD"; /** * 声明普通交换机 */ @Bean("xExchange") public DirectExchange xExchange(){ return new DirectExchange(X_EXCHANGE); } /** * 声明死信交换机 */ @Bean("yDeadExchange") public DirectExchange yDeadExchange(){ return new DirectExchange(Y_DEAD_EXCHANGE); } /** * 声明普通队列 */ @Bean("queueA") public Queue queueA(){ Map<String, Object> withArguments = new HashMap<>(3); withArguments.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE); withArguments.put("x-dead-letter-routing-key","YD"); withArguments.put("x-message-ttl",10000); return QueueBuilder.durable(QUEUE_A).withArguments(withArguments).build(); } /** * 声明普通队列 */ @Bean("queueB") public Queue queueB(){ Map<String, Object> withArguments = new HashMap<>(3); withArguments.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE); withArguments.put("x-dead-letter-routing-key","YD"); withArguments.put("x-message-ttl",40000); return QueueBuilder.durable(QUEUE_B).withArguments(withArguments).build(); } /** * 声明普通队列 */ @Bean("queueC") public Queue queueC(){ Map<String, Object> withArguments = new HashMap<>(3); withArguments.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE); withArguments.put("x-dead-letter-routing-key","YD"); return QueueBuilder.durable(QUEUE_C).withArguments(withArguments).build(); } /** * 声明死信队列 */ @Bean("deadQueueD") public Queue deadQueueD(){ return QueueBuilder.durable(DEAD_QUEUE_D).build(); } /** * 绑定队列与交换机queueA--xExchange */ @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queue,@Qualifier("xExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("XA"); } /** * 绑定队列与交换机queueB--xExchange */ @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queue,@Qualifier("xExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("XB"); } /** * 绑定队列与交换机queueC--xExchange */ @Bean public Binding queueCBindingX(@Qualifier("queueC") Queue queue,@Qualifier("xExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("XC"); } /** * 绑定队列与交换机queueD--yDeadExchange */ @Bean public Binding queueDBindingX(@Qualifier("deadQueueD") Queue queue,@Qualifier("yDeadExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("YD"); } }
3、编写消费者
package com.example.demo.rabbitmq; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { @RabbitListener(queues = "QD") public void receiveMessage(Message message, Channel channel) { String s = new String(message.getBody()); System.out.println("接收消息: " + s); } }
4、编写生产者
package com.example.demo.rabbitmq; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class Producer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private final static String X_EXCHANGE="X"; @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message) { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); rabbitTemplate.convertAndSend(X_EXCHANGE,"XA","10秒消息延迟XA:"+message); rabbitTemplate.convertAndSend(X_EXCHANGE,"XB","40秒消息延迟XB:"+message); rabbitTemplate.convertAndSend(X_EXCHANGE,"XC","消息延迟XC:"+message,message1 -> { message1.getMessageProperties().setExpiration("2000"); return message1; }); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息成功发送到队列"); } else { System.out.println("消息发送失败:" + cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("消息发送失败,返回原因:" + replyText); } }
5、编写测试类
package com.example.demo; import com.example.demo.rabbitmq.Producer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class DemoApplicationTests { @Autowired private Producer producer; @Test public void testRabbitMQ() { String message = "Hello, RabbitMQ!"; producer.sendMessage(message); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。