赞
踩
消息队列中,有些消息可能一直不会被消费,可以设置过期时间,防止消息队列堵塞,可以通过两种方式设置,一种是xml配置的方式,一种是代码的方式,这里写一下代码配置的方式
/**
过期消息
* 该消息投递任何交换机或队列中的时候;如果到了过期时间则将从该队列中删除
*/
@Test
public void ttlMessageTest(){
MessageProperties messageProperties = new MessageProperties();
//设置消息的过期时间,5 秒
messageProperties.setExpiration("5000");
Message message = new Message("测试过期消息,5 秒钟过期".getBytes(), messageProperties);
//路由键与队列同名
rabbitTemplate.convertAndSend("my_ttl_queue", message);
}
如果xml和代码中同时配置了过期时间,则两者中较小的那个才会起作用。
如果是xml的配置方式需要再启动类中注入配置文件
<rabbit:queue id="my_ttl_queue" name="my_ttl_queue" auto-declare="true">
<rabbit:queue-arguments>
<!--投递到该队列的消息如果没有消费都将在 6 秒之后被删除-->
<entry key="x-message-ttl" value-type="long" value="6000"/>
</rabbit:queue>
上面一种是直接设置消息在队列中存在多久以后直接删除,但是有些需求不能删除,需要做特殊处理,咋办?这就诞生了死信队列,死信队列是别人不要的丢到里面,然后进行特殊的处理。
第一种:在规定时间内,没有被消费掉
第二种:消息队列太长了,设置一定的长度,达到长度以后,就会把先前的消息丢到死信队列,
配置如下:
<!--定义定向交换机中的持久化死信队列,不存在则自动创建--> <rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true"/> <!--定义广播类型交换机;并绑定上述两个队列--> <rabbit:direct-exchange id="my_dlx_exchange" name="my_dlx_exchange" auto-declare="true"> <rabbit:bindings> <!--绑定路由键my_ttl_dlx、my_max_dlx,可以将过期的消息转移到my_dlx_queue队列--> <rabbit:binding key="my_ttl_dlx" queue="my_dlx_queue"/> <rabbit:binding key="my_max_dlx" queue="my_dlx_queue"/> </rabbit:bindings> </rabbit:direct-exchange> <!--定义过期队列及其属性,不存在则自动创建--> <rabbit:queue id="my_ttl_dlx_queue" name="my_ttl_dlx_queue" auto-declare="true"> <rabbit:queue-arguments> <!--投递到该队列的消息如果没有消费都将在6秒之后被投递到死信交换机--> <entry key="x-message-ttl" value-type="long" value="6000"/> <!--设置当消息过期后投递到对应的死信交换机--> <entry key="x-dead-letter-exchange" value="my_dlx_exchange"/> </rabbit:queue-arguments> </rabbit:queue> <!--定义限制长度的队列及其属性,不存在则自动创建--> <rabbit:queue id="my_max_dlx_queue" name="my_max_dlx_queue" auto-declare="true"> <rabbit:queue-arguments> <!--投递到该队列的消息最多2个消息,如果超过则最早的消息被删除投递到死信交换机--> <entry key="x-max-length" value-type="long" value="2"/> <!--设置当消息过期后投递到对应的死信交换机--> <entry key="x-dead-letter-exchange" value="my_dlx_exchange"/> </rabbit:queue-arguments> </rabbit:queue> <!--定义定向交换机 根据不同的路由key投递消息--> <rabbit:direct-exchange id="my_normal_exchange" name="my_normal_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding key="my_ttl_dlx" queue="my_ttl_dlx_queue"/> <rabbit:binding key="my_max_dlx" queue="my_max_dlx_queue"/> </rabbit:bindings> </rabbit:direct-exchange>
测试代码:
/** * 过期消息投递到死信队列 * 投递到一个正常的队列,但是该队列有设置过期时间,到过期时间之后消息会被投递到死信交换机(队列) */ @Test public void dlxTTLMessageTest(){ rabbitTemplate.convertAndSend( "my_normal_exchange", "my_ttl_dlx", "测试过期消息;6秒过期后会被投递到死信交换机2222"); } /** * 消息长度超过2,会投递到死信队列中 */ @Test public void dlxMaxMessageTest(){ rabbitTemplate.convertAndSend( "my_normal_exchange", "my_max_dlx", "发送消息4:消息长度超过2,会被投递到死信队列中!"); rabbitTemplate.convertAndSend( "my_normal_exchange", "my_max_dlx", "发送消息5:消息长度超过2,会被投递到死信队列中!"); rabbitTemplate.convertAndSend( "my_normal_exchange", "my_max_dlx", "发送消息6:消息长度超过2,会被投递到死信队列中!"); }
有时候不想让消费者立马拿到消息消费,就涉及到延迟队列的使用,使用场景比如典型的付款,提交订单以后,用户如果迟迟没有支付,设置一个过期时间半小时,半小时后进入死信队列进行处理,由系统自动取消订单,所以延迟队列的实现可以使用消息过期+死信队列的方式来实现。
确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)在 channel 为事务时,不可引入确认模式;同样 channel 为确认模式下,不可使用事务。
几个需要用到的配置
第一:连接配置
publisher-confirms="true" 事务是否发送出去确认
publisher-returns="true" 事务处理成功或者失败返回
-->
<rabbit:connection-factory id="connectionFactory" host="192.168.126.131"
port="5672"
username="huali"
password="huali"
virtual-host="huali"
publisher-confirms="true"
publisher-returns="true"
/>
第二:消息处理类配置
<!-- 消息回调处理类 -->
<bean id="confirmCallback" class="com.huali.rabbitmq.MsgSendConfirmCallBack"/>
<!-- 消息失败回调类 -->
<bean id="sendReturnCallback" class="com.huali.rabbitmq.MsgSendReturnCallback"/>
第三:整合配置
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<!-- confirm-callback="confirmCallback" 表示:消息失败回调 -->
<!-- return-callback="sendReturnCallback" 表示:消息失败回调 ,同时需配置mandatory="true",否则消息则丢失-->
<!-- channel-transacted="true" 表示:支持事务操作 -->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
confirm-callback="confirmCallback"
return-callback="sendReturnCallback"
mandatory="true"
/>
第四:处理类
package com.huali.rabbitmq; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; /** * 消息确认与回退,确认消息是否发出去了 */ public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback { public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息确认成功...."); } else { //处理丢失的消息 System.out.println("消息确认失败," + cause); } } } package com.huali.rabbitmq; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; //消息发送回调-确认处理成功或者失败 public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback { public void returnedMessage(Message message, int i, String s, String s1, String s2) { String msgJson = new String(message.getBody()); System.out.println("Returned Message:"+msgJson); } }
第五:测试
/**
* 消息确认,测试消息发送确认代码是否正确
*/
@Test
public void queueTest(){
//路由键与队列同名
rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息。");
}
//消息回调测试
@Test
public void testFailQueueTest() throws InterruptedException {
//exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE
rabbitTemplate.convertAndSend("test_fail_exchange", "", "测试消息发送失败进行确认应答。");
}
场景:业务处理伴随消息的发送,业务处理失败(事务回滚)后要求消息不发送。rabbitmq 使用调用者的外部事务,通常是首选,因为它是非侵入性的(低耦合)。方法比较简单,在上面配置的接触上,去掉第一的 publisher-confirms=“true” publisher-returns=“true”,因为二者不能共存,然后在第三 整合配置的时候加上支持事务, channel-transacted=“true”,再加平台事务管理。然后在代码方法上加上事务注解即可、
第一:配置
<rabbit:connection-factory id="connectionFactory" host="192.168.126.131" port="5672" username="huali" password="huali" virtual-host="huali" /> <!-- 消息回调处理类 --> <bean id="confirmCallback" class="com.huali.rabbitmq.MsgSendConfirmCallBack"/> <!-- 消息失败回调类 --> <bean id="sendReturnCallback" class="com.huali.rabbitmq.MsgSendReturnCallback"/> <!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> <!-- confirm-callback="confirmCallback" 表示:消息失败回调 --> <!-- return-callback="sendReturnCallback" 表示:消息失败回调 ,同时需配置mandatory="true",否则消息则丢失--> <!-- channel-transacted="true" 表示:支持事务操作 --> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallback" return-callback="sendReturnCallback" mandatory="true" channel-transacted="true" /> <!--平台事务管理器--> <bean id="transactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"> <property name="connectionFactory" ref="connectionFactory"/> </bean>
第二:测试类
@Test
@Transactional //开启事务
//@Rollback(false)//在测试的时候,需要手动的方式制定回滚的策略
public void queueTest2(){
//路由键与队列同名
rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息--02222222222222222222。");
//System.out.println("----------------dosoming:可以是数据库的操作,也可以是其他业务类型的操作---------------");
//模拟业务处理失败
//System.out.println(1/0);
//rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息--02。");
}
消息中心的消息追踪需要使用 Trace 实现,Trace 是 Rabbitmq 用于记录每一次发送的消息,方便使用 Rabbitmq 的开发者调试、排错。可通过插件形式提供可视化界面。Trace 启动后会自动创建系统 Exchange:amq.rabbitmq.trace ,每个队列会自动绑定该 Exchange,绑定后发送到队列的消息都会记录到 Trace 日志。
需要先 用rabbitmq 启用插件,再打开开关才能使用
命令集 | 描述 |
---|---|
rabbitmq-plugins list | 查看插件列表 |
rabbitmq-plugins enable rabbitmq_tracing | rabbitmq 启用 trace 插件 |
rabbitmqctl trace_on | 打开 trace 的开关 |
rabbitmqctl trace_on -p huali | 打开 trace 的开关(huali为需要日志追踪的 vhost) |
rabbitmqctl trace_off | 关闭 trace 的开关 |
rabbitmq-plugins disable rabbitmq_tracing | rabbitmq 关闭 Trace 插件 |
rabbitmqctl set_user_tags heima administrator | 只有 administrator 的角色才能查看日志界面 |
安装插件并开启 trace_on 之后,会发现多个 exchange:amq.rabbitmq.trace ,类型为:topic。
手动文件log文件,然后发送一条消息
发送一条消息
rabbitTemplate.convertAndSend("spring_queue", "只发队列 spring_queue 的消息--01。");
即可看到刚刚发送的消息,这里乱码了,暂时不管。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。