赞
踩
目录
1) 采用workqueue,多个消费者监听同一队列。
2)接收到消息以后,而是通过线程池,异步消费。
如何保证消息的可靠性呢?也就是说如何保证消息不丢失呢?
如果在消费者消费之前,MQ就宕机了,消息就没了?可以将消息进行持久化。要将消息持久化,前提是:队列、Exchange都持久化。
交换机持久化:
注意,虽然我们标记了消息是需要持久化的,但RabbitMQ接收到消息-持久化到磁盘仍然需要一定时间,这就意味着消息可能在缓存,依然有丢失的可能。不过对于简单的任务队列这也够用了,如果还需要更强的保证消息不丢失,则需要使用"发布者确认”pubLisher、confirms。
消息持久化保证了消息正常投递后,消息不丢失。但是如果消息没有正常投递呢?
生产者消息确认机制来监视消息有没有安全的投递。
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
rabbitmq整个消息投递的路径为:
producer--->rabbitmq broker--->exchange--->queue--->consumer
我们将利用这两个 callback 控制消息的可靠性投递。
(1)application.properties:
- # 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调
- spring.rabbitmq.publisher-confirms=true
-
(2)生产者:
- package com.baiqi.test;
-
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.test.context.ContextConfiguration;
- import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
- /**
- * @author CMS
- */
- @RunWith(SpringJUnit4ClassRunner.class)
- public class ProducerTest {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- //测试 Confirm 模式
- @Test
- public void testConfirm() {
-
- //定义回调
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- /**
- *
- * @param correlationData 相关配置信息
- * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
- * @param cause 失败原因
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- //System.out.println("confirm方法被执行了...."+correlationData.getId());
-
- //ack 为 true表示 消息已经到达交换机
- if (ack) {
- //接收成功
- System.out.println("接收成功消息" + cause);
- } else {
- //接收失败
- System.out.println("接收失败消息" + cause);
- //做一些处理,让消息再次发送。
- }
- }
- });
-
- //进行消息发送
- for (int i = 0; i < 5; i++) {
- rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message Confirm...");
-
- }
-
- //进行睡眠操作(原因:发送消息之后有个回调,回调需要时间,所以此处阻塞1s)
- try {
- Thread.sleep(1000);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
(3)运行程序及结果
1、正常运行
2、模拟confirm
(1)application.properties:
- # 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调
- spring.rabbitmq.publisher-returns=true
(2)生产者
- //测试 return模式
- @Test
- public void testReturn() {
-
- //设置交换机处理失败消息的模式 为true的时候,消息达到不了 队列时,会将消息重新返回给生产者
- rabbitTemplate.setMandatory(true);
-
- //定义回调
- rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
- /**
- *
- * @param message 消息对象
- * @param replyCode 错误码
- * @param replyText 错误信息
- * @param exchange 交换机
- * @param routingKey 路由键
- */
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- System.out.println("return 执行了....");
-
- System.out.println("message:"+message);
- System.out.println("replyCode:"+replyCode);
- System.out.println("replyText:"+replyText);
- System.out.println("exchange:"+exchange);
- System.out.println("routingKey:"+routingKey);
-
- //处理
- }
- });
- //进行消息发送
- rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message return...");
-
- //进行睡眠操作
- try {
- Thread.sleep(5000);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
(3)运行程序及结果
1、程序正常运行
2、模拟return
3、注释掉rabbitTemplate.setMandatory(true)
application.properties:
- # 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调
- spring.rabbitmq.publisher-confirms=true
- # 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调
- spring.rabbitmq.publisher-returns=true
confirm(config配置):
这种写法的好处:rabbitTemplate作为整个项目的一个‘单例对象’,只能绑定一个confirm和return回调。如果项目中多处用到了rabbitTemplate,那么为了保证消息可靠性,我们只需要在全局声明一次rabbitTemplate和confirm、return的绑定就好。
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
-
- @Component
- public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- /**
- * * 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法
- * * 设置消息确认回调方法
- * * 设置消息回退回调方法
- *
- */
- @PostConstruct
- public void initRabbitTemplate() {
- //设置消息确认回调方法
- rabbitTemplate.setConfirmCallback(this::confirm);
- rabbitTemplate.setReturnCallback(this::returnedMessage);
- }
- /**
- * 投递到交换机,不论投递成功还是失败都回调次方法
- * @param correlationData 投递相关数据
- * @param ack 是否投递到交换机
- * @param cause 投递失败原因
- *
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- if (ack) {
- System.out.println("消息进入交换机成功");
- } else {
- System.out.println("消息进入交换机失败, 失败原因:" + cause);
- }
- }
- /**
- * 当消息投递到交换机,交换机路由到消息队列中出现异常,执行returnedMessaged方法
- * @param message 投递消息内容
- * @param replyCode 返回错误状态码
- * @param replyText 返回错误内容
- * @param exchange 交换机名称
- * @param routingKey 路由键
- *
- */
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- System.out.println("交换机路由至消息队列出错:>>>>>>>");
- System.out.println("交换机:" + exchange);
- System.out.println("路由键:" + routingKey);
- System.out.println("错误状态码:" + replyCode);
- System.out.println("错误原因:" + replyText);
- System.out.println("发送消息内容:" + message.toString());
- System.out.println("<<<<<<<<");
- }
- }
消费者的手动ACK机制。可以防止消费者丢失消息。
如果是自动ACK,那么当消费者接收到消息就会立马自动签收,如果这时消费者还没有来得及消费就宕机或者消费出现异常都会导致消息丢失;
ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()或者basicReject()方法,让其自动重新发送消息。
application.properties:
- # 配置开启手动签收
- # 简单模式的开启手动签收
- spring.rabbitmq.listener.simple.acknowledge-mode=manual
- # 路由模式开启手动签收
- spring.rabbitmq.listener.direct.acknowledge-mode=manual
- # 是否支持重试
- spring.rabbitmq.listener.direct.retry.enabled=true
消费者:
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageProperties;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Component
- @RabbitListener(queues = "routing_queue1")
- public class RoutingListener01 {
- @RabbitHandler
- public void simpleHandler(String msg, Message message, Channel channel) throws IOException {
- System.out.println("routing_queue1: " + msg);
- //获取投递标签
- MessageProperties messageProperties = message.getMessageProperties();
- long deliveryTag = messageProperties.getDeliveryTag();
- try {
- // 模拟异常
- // if (msg.contains("苹果")) {
- // throw new RuntimeException("不允许卖苹果手机!!!");
- //}
- /**
- * 手动签收消息
- * 参数1:消息投递标签
- * 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
- */
- channel.basicAck(deliveryTag, false);
- System.out.println("手动签收完成:{}");
- } catch (Exception ex) {
- /**
- * 手动拒绝签收
- * 参数1:当前消息的投递标签
- * 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
- * 参数3:是否重回队列,true为重回队列,false为不重回
- */
- channel.basicNack(deliveryTag, false, true);
- System.out.println("拒绝签收,重回队列:{}" + ex);
- }
- }
- }
CachingConnectionFactory.java:1517) o.s.a.r.c.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
原因:
当采用‘springboot’方式开发rabbitmq的时候,计划使用‘收到ack’;然而,代码中的确进行了ack(调用了channel.basicAck),但是yaml配置中没有声明:spring.rabbitmq.listener.simple.acknowledge-mode=manual。
分析:
根本原因是我们调用了两次channel.basicAck:一次是我,一次是springboot默认的自动提交;
如何把springboot的默认自动提交忽略掉:yaml配置中声明:spring.rabbitmq.listener.simple.acknowledge-mode=manual。(springboot底层有个判断,如果发现配置了manual,则不会再次basicAck)。
1、持久化
2、生产方确认Confirm
3、消费方确认Ack
4、Broker高可用
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。