赞
踩
RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)
RabbitMQ:第三章:Springboot集成RabbitMQ(直连模式,工作队列模式,发布订阅模式,路由模式,通配符模式
本文通过实战代码,Spring整合RabbitMQ,项目分二个模块,consumer和produle。
提示:以下是本篇文章正文内容,下面案例可供参考
代码如下(示例):
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sky</groupId> <artifactId>spring-rabbitmq-produle</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.1.7.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <!--定义管理交换机、队列--> <rabbit:admin connection-factory="connectionFactory"/> <!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机 默认交换机类型为direct,名字为:"",路由键为队列的名称 --> <!-- id:bean的名称 name:queue的名称 auto-declare:自动创建 auto-delete:自动删除。 最后一个消费者和该队列断开连接后,自动删除队列 durable:是否持久化 --> <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/> <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/> <!--定义广播类型交换机;并绑定上述两个队列--> <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding queue="spring_fanout_queue_1" /> <rabbit:binding queue="spring_fanout_queue_2"/> </rabbit:bindings> </rabbit:fanout-exchange> <!-- 定义队列--> <rabbit:queue id="spring_direct_queue" name="spring_direct_queue" auto-declare="true"/> <!-- 定义 Routing 路由模式 交互机 --> <rabbit:direct-exchange name="spring_direct_exchange" > <rabbit:bindings> <!--direct 类型的交换机绑定队列 key :路由key queue:队列名称--> <rabbit:binding queue="spring_direct_queue" key="direct"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_topic_queue_one" name="spring_topic_queue_one" auto-declare="true"/> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_topic_queue_two" name="spring_topic_queue_two" auto-declare="true"/> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_topic_queue_three" name="spring_topic_queue_three" auto-declare="true"/> <!-- 声明 topic 类型的交换机 --> <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding pattern="one.*" queue="spring_topic_queue_one"/> <rabbit:binding pattern="two.#" queue="spring_topic_queue_two"/> <rabbit:binding pattern="three.#" queue="spring_topic_queue_three"/> </rabbit:bindings> </rabbit:topic-exchange> <!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> </beans>
rabbitmq.host=110.42.239.246
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=spring
说明:这里免费提供rabbitmq连接方式给大家使用学习
package com.sky.springrabbitmqprodule; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate; /** * 简单模式发消息 */ @Test public void testHelloWorld(){ rabbitTemplate.convertAndSend("spring_queue","hello world spring...."); } /** * 广播模式发消息 */ @Test public void testFanout(){ rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout...."); } /** * 路由模式发消息 */ @Test public void testDirect(){ rabbitTemplate.convertAndSend("spring_direct_exchange","direct","spring Direct...."); } /** * 通配符模式发消息 */ @Test public void testTopics(){ rabbitTemplate.convertAndSend("spring_topic_exchange","one.onekey","spring topic one...."); rabbitTemplate.convertAndSend("spring_topic_exchange","two.twokey.topic","spring topic two...."); rabbitTemplate.convertAndSend("spring_topic_exchange","three.threekey.topic","spring topic three...."); } }
代码如下(示例):
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sky</groupId> <artifactId>spring-rabbitmq-consumer</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.1.7.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <!--简单模式--> <!-- <bean id="springQueueListener" class="com.sky.springrabbitmqconsumer.listener.SpringQueueListener"/>--> <!--广播模式--> <!-- <bean id="fanoutListener1" class="com.sky.springrabbitmqconsumer.listener.FanoutListener"/>--> <!-- <bean id="fanoutListener2" class="com.sky.springrabbitmqconsumer.listener.FanoutListener2"/>--> <!--路由模式--> <!-- <bean id="springDirectQueue" class="com.sky.springrabbitmqconsumer.listener.SpringDirectQueue"/>--> <!--通配符模式--> <bean id="topicListenerOne" class="com.sky.springrabbitmqconsumer.listener.TopicListenerOne"/> <bean id="topicListenerTwo" class="com.sky.springrabbitmqconsumer.listener.TopicListenerTwo"/> <bean id="topicListenerThree" class="com.sky.springrabbitmqconsumer.listener.TopicListenerThree"/> <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true"> <!--简单模式--> <!-- <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>--> <!--广播模式--> <!-- <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>--> <!-- <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>--> <!--路由模式--> <!-- <rabbit:listener ref="springDirectQueue" queue-names="spring_direct_queue"/>--> <!--通配符模式--> <rabbit:listener ref="topicListenerOne" queue-names="spring_topic_queue_one"/> <rabbit:listener ref="topicListenerTwo" queue-names="spring_topic_queue_two"/> <rabbit:listener ref="topicListenerThree" queue-names="spring_topic_queue_three"/> </rabbit:listener-container> </beans>
rabbitmq.host=110.42.239.246
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=spring
说明:配置和生产者的一致
package com.sky.springrabbitmqconsumer.test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ConsumerTest {
public static void main(String[] args) {
//初始化IOC容器
ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring-rabbitmq-consumer.xml");
}
}
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class FanoutListener implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class FanoutListener2 implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class SpringDirectQueue implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class SpringQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerOne implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerTwo implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerThree implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
上面就是这个项目的所有代码了,下面就是Demo演示内容。
消费者取消注释:
消费者启动服务:
生产者发送消息:
消费者查看消息:
消费者取消注释:
消费者启动服务:
生产者发送消息:
消费者查看消息:
消费者取消注释:
消费者启动服务:
生产者发送消息:
消费者查看消息:
消费者取消注释:
消费者启动服务:
生产者发送消息:
消费者查看消息:
消息可靠性实现需要保证以下几点:
持久化
exchange要持久化
queue要持久化
message要持久化
生产方确认Confirm
消费方确认Ack
Broker高可用
producer—>rabbitmq broker—>exchange—>queue—>consumer
说明:基于上述Spring整合RabbitMQ的代码进行改动
第一处改动:设置确认模式和退回模式
代码:
publisher-confirms="true"
publisher-returns="true"
第二处改动:声明队列和交互机的bean代码:
<!--消息可靠性投递(生产端)-->
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
<rabbit:direct-exchange name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
第三处改动:编写Confirm测试方法
//测试Confirm 模式 @Test public void testConfirm() { //定义回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * @param correlationData 相关配置信息 * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败 * @param cause 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm方法被执行了...."); //ack 为 true表示 消息已经到达交换机 if (ack) { //接收成功 System.out.println("接收成功消息" + cause); } else { //如果没有投递到交换机中去就会接收失败,比如:将交换机名称写错 System.out.println("接收失败消息" + cause); //做一些处理,让消息再次发送。 } } }); //进行消息发送 rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message Confirm..."); //进行睡眠操作 try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } }
第四处改动:编写Return测试方法
//测试 return模式 @Test public void testReturn() { //设置交换机处理失败消息的模式为true的时候,消息达到不了队列时,会将消息重新返回给生产者 rabbitTemplate.setMandatory(true); //定义回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * @param message 消息对象 * @param replyCode 错误码 * @param replyText 错误信息 * @param exchange 交换机 * @param routingKey 路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return 执行了...."); System.out.println("message:"+message); System.out.println("replyCode:"+replyCode); System.out.println("replyText:"+replyText); System.out.println("exchange:"+exchange); System.out.println("routingKey:"+routingKey); //处理业务 } }); //进行消息发送 rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message return..."); //进行睡眠操作 try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } }
第一处改动:
监听器:AckListener
package com.sky.springrabbitmqconsumer.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; @Component public class AckListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { //1、获取消息的id long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //2、获取消息 System.out.println("message:"+new String(message.getBody())); //3、进行业务处理 System.out.println("=====进行业务处理===="); //模拟出现异常 int i = 5/0; //4、进行消息签收 channel.basicAck(deliveryTag, true); } catch (Exception e) { //拒绝签收 /* * 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端 */ System.out.println("=====业务处理异常,消息重新回到queue,broker会重新发送该消息给消费端===="); channel.basicNack(deliveryTag, true, true); } } }
第二处改动:
原来是通过声明一个个的bean对象,现在改为了扫描某个包下面的类
<context:component-scan base-package="com.sky.springrabbitmqconsumer.listener" />
第三处改动:
在rabbit:listener-container标签中设置acknowledge属性改为手动确认,(限流设置:prefetch属性改为每次抓取2条消息,并且监听自定义的ackListener)
启动生产者Confirm模式:
启动消费者:
启动生产者Return模式:
消费者的控制台就会不停的打印:
生产者Confirm模式:
生产者Return模式:
第一处修改:监听器:QosListener
package com.sky.springrabbitmqconsumer.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; @Component public class QosListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { //获取到的消息 System.out.println(new String(message.getBody())); Thread.sleep(3000); //处理业务逻辑 //进行消息的签收,第二个参数:true表示之前没签收的都给他签收掉 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); } }
第二处修改:
<!--定义监听器容器
acknowledge="manual":手动签收
自动确认:acknowledge="none"
手动确认:acknowledge="manual"
根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)
prefetch="1":每次抓取多少条消息。只有消息确认签收了,才会拉取下一条,否则不会拉取消息
-->
<rabbit:listener-container connection-factory="connectionFactory"
auto-declare="true"
acknowledge="manual"
prefetch="2">
<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>
批量发送消息测试方法
//批量发送消息,让消费者每次拉去指定的数量
@Test
public void testQos(){
for (int i = 0; i < 10; i++) {
// 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
}
}
说明:每隔3秒打印一条消息
重启消费者和生产者发消息,这个时候会看到,原本发送的十条消息,实际只有二条消息打印在消费者的控制台上面,因为prefetch属性配置了2,所以一次性拉取了二条。
<hr style=" border:solid; width:100px; height:1px;" color=#000000 size=1">
比如我们在下订单的时候,如果超过30分钟未支付,就取消这个订单,把当前商品的库存加回去。
TTL 全称 Time To Live(存活时间/过期时间),当消息到达存活时间后,还没有被消费,会被自动清除。RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
超过5秒没有消费者消费,就自动失效了。
添加ttl队列
<!--ttl-->
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<!--设置queue的参数-->
<rabbit:queue-arguments>
<!--x-message-ttl指队列的过期时间-->
<entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl" >
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
发送消息测试方法
//ttl测试
@Test
public void testTtl(){
for (int i = 0; i < 10; i++) {
// 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm", "ttl.test", "message confirm....");
}
}
启动测试方法
等待10秒
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
说明:死信交换机和死信队列和普通的没有区别,当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列。
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
在spring-rabbitmq-producer.xml中添加队列和交换机
<!-- 死信队列: 1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx) 2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx) 3. 正常队列绑定死信交换机 设置两个参数: * x-dead-letter-exchange:死信交换机名称 * x-dead-letter-routing-key:发送给死信交换机的routingkey --> <!-- 1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx) --> <rabbit:queue name="test_queue_dlx" id="test_queue_dlx"> <!--3. 正常队列绑定死信交换机--> <rabbit:queue-arguments> <!--3.1 x-dead-letter-exchange:死信交换机名称--> <entry key="x-dead-letter-exchange" value="exchange_dlx" /> <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey--> <entry key="x-dead-letter-routing-key" value="dlx.test" /> <!--4.1 设置队列的过期时间 ttl--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /> <!--4.2 设置队列的长度限制 max-length --> <entry key="x-max-length" value="10" value-type="java.lang.Integer" /> </rabbit:queue-arguments> </rabbit:queue> <!-- 正常的交换机绑定正常的队列--> <rabbit:topic-exchange name="test_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!-- 2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx) --> <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue> <rabbit:topic-exchange name="exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
启动生产者:
RabbitMQ管控页面查看
消息拒收同理
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。在RabbitMQ中并未提供延迟队列功能,但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
下单后,30分钟未支付,取消订单,回滚库存。
新用户注册成功7天后,发送短信问候。
在spring-rabbitmq-producer.xml添加以下代码
<!-- 延迟队列: 1. 定义正常交换机(order_exchange)和队列(order_queue) 2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx) 3. 绑定,设置正常队列过期时间为30分钟 --> <!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)--> <rabbit:queue id="order_queue" name="order_queue"> <!--3. 绑定,设置正常队列过期时间为30分钟--> <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="order_exchange_dlx" /> <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" /> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /> </rabbit:queue-arguments> </rabbit:queue> <!-- 订单业务的交换机和队列--> <rabbit:topic-exchange name="order_exchange"> <rabbit:bindings> <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!--2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)--> <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue> <rabbit:topic-exchange name="order_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
发消息测试
/*
* 测试延时消息
* */
@Test
public void testDelay() throws InterruptedException {
//1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2021年10月...");
//2.打印倒计时10秒
for (int i = 10; i > 0 ; i--) {
System.out.println(i+"...");
Thread.sleep(1000);
}
}
spring-rabbitmq-consumer.xml配置
<!--延迟队列效果实现: 一定要监听的是 死信队列!!!-->
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
添加监听器OrderListener
package com.sky.springrabbitmqconsumer.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; @Component public class OrderListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //1.接收转换消息 System.out.println(new String(message.getBody())); //2. 处理业务逻辑 System.out.println("处理业务逻辑..."); System.out.println("根据订单id查询其状态..."); System.out.println("判断状态是否为支付成功"); System.out.println("取消订单,回滚库存...."); //3. 手动签收 channel.basicAck(deliveryTag,true); } catch (Exception e) { //e.printStackTrace(); System.out.println("出现异常,拒绝接受"); //4.拒绝签收,不重回队列 requeue=false channel.basicNack(deliveryTag,true,false); } } }
启动生产者
启动消费者
说明:过了十秒之后才发送消息
上线更多的消费者,进行正常消费,上线专门的队列消费访问,先将消息批量取出来,记录到数据库中,再慢慢处理。
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
消息幂等性保障–乐观锁机制
提示:以上就是今天要讲的内容,本文使用Springboot对Rabbitmq进行了整合,并且提供了简单模式,广播模式,路由模式,通配符模式四种模式的Demo演示和代码。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。