赞
踩
1、如下图
1、生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。此时可以选择用RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect ,然后发送消息,如果消息没有成功RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 channel.txRollback ,然后重试发送消息;如果收到了消息,那么可以提交事务 channel.txCommit
2、但是问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能。所以一般来说,如果你要确保说 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。
3、如果RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
4、事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。
1、就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
设置持久化有两个步骤:
这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
就是将消息设置为持久化,此时 RabbitMQ 就会将消息持久化到磁盘上去。
2、必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个 queue 里的数据。注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack ,你也是可以自己重发的
RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,自己还没处理,结果自己的进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
1、模拟业务,我们希望会员服务在多个实例的情况下,每个实例只需要收到一次消息
2、项目结构
3、生产者和消费者引入pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4、生产者和消费者配置rabbitmq信息
spring:
# rabbitmq 配置信息
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
1、RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
2、我们先来了解一下 rabbitmq 整个消息投递的路径,如下:
producer—>rabbitmq broker—>exchange—>queue—>consumer
3、消息从 producer 到 exchange 则会返回一个 confirmCallback 。
4、消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递
1、在配置文件中直接开启即可
spring:
# rabbitmq 配置信息
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#1、确保消息从发送端到服务端投递可靠(分为以下两个步骤)
#1.1、确认消息已发送到交换机(Exchange) 可以把publisher-confirms: true 替换为 publisher-confirm-type: correlate
publisher-confirm-type: correlated
#1.2、确认消息从交换机中到队列中
publisher-returns: true
1、生产者项目结构
2、DirectRabbitConfig
这里根据前面的需求,我们创建Direct
类型的交换器即可,至于其他类型的交换器以及区别,后面有说明
package cn.gxm.producer.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author GXM * @version 1.0.0 * @Description 创建direct类型的交换机 * @createTime 2023年01月03日 */ @Slf4j @Configuration public class DirectRabbitConfig { private static final String QUEUE = "TestDirectQueue"; private static final String EXCHANGE = "TestDirectExchange"; private static final String ROUTING_KEY = "TestDirectRouting"; /** * 创建一个名为TestDirectQueue的队列 * * @return */ @Bean public Queue testDirectQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,有消息者订阅本队列,然后所有消费者都解除订阅此队列,会自动删除。 // arguments:队列携带的参数,比如设置队列的死信队列,消息的过期时间等等。 return new Queue(QUEUE, true); } /** * 创建一个名为TestDirectExchange的Direct类型的交换机 * * @return */ @Bean public DirectExchange testDirectExchange() { // durable:是否持久化,默认是false,持久化交换机。 // autoDelete:是否自动删除,交换机先有队列或者其他交换机绑定的时候,然后当该交换机没有队列或其他交换机绑定的时候,会自动删除。 // arguments:交换机设置的参数,比如设置交换机的备用交换机(Alternate Exchange),当消息不能被路由到该交换机绑定的队列上时,会自动路由到备用交换机 return new DirectExchange(EXCHANGE, true, false); } /** * 绑定交换机和队列 * * @return */ @Bean public Binding bindingDirect() { //bind队列to交换机中with路由key(routing key) return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(ROUTING_KEY); } }
3、RabbitTemplate
配置配置,这里有一个点非常重要,
rabbitTemplate.setMandatory(true);
必须得设置成true,否则无法回调ReturnsCallback
。- Mandatory:为true时,消息通过交换器无法匹配到队列会返回给生产者 并触发MessageReturn,而为false时,匹配不到会直接被丢弃
- 如果配置了发送回调ReturnCallback,插件延迟队列则会回调该方法,因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列。并非是BUG,而是有原因的,建议利用if 去拦截这个异常,判断延迟队列交换机名称,然后break;
package cn.gxm.producer.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class RabbitConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。 //我们既然要做可靠性,当然是设置为返回到客户端(true是返回客户端,false是自动删除) rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("ConfirmCallback 关联数据:{},投递成功,确认情况:{}", correlationData, ack); } else { log.info("ConfirmCallback 关联数据:{},投递失败,确认情况:{},原因:{}", correlationData, ack, cause); } } }); rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { // 请注意!如果你使用了延迟队列插件,那么一定会调用该callback方法,因为数据并没有提交上去, // 而是提交在交换器中,过期时间到了才提交上去,并非是bug!你可以用if进行判断交换机名称来捕捉该报错 if (exchange.equals("你声明的延迟队列的交换机")) { return; } log.info("ReturnsCallback 消息:{},回应码:{},回应信息:{},交换机:{},路由键:{}" , returnedMessage.getMessage(), returnedMessage.getReplyCode() , returnedMessage.getReplyText(), returnedMessage.getExchange() , returnedMessage.getRoutingKey()); } }); return rabbitTemplate; } }
4、写一个TestController
,用作消息发送,一共发送5条消息,其中一条触发confirmCallback的失败,一条触发returnCallback,但是五条都会触发confirmCallback,他们的关系是并行的.
package cn.gxm.producer.controller; import cn.gxm.producer.vo.User; import com.alibaba.fastjson.JSON; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; /** * @author GXM * @version 1.0.0 * @Description TODO * @createTime 2023年01月03日 */ @RestController public class TestController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/test") public String test() { return "producer ok"; } @GetMapping("/push") public String push() { for (int i = 1; i <= 5; i++) { //这个参数是用来做消息的唯一标识 //发布消息时使用,存储在消息的headers中 User user = new User(i, "汪涵"); // 关联的数据,可以用在消息投递失败的时候,作为一个线索,比如我把当前用户的id放进去,如果user消息投递失败 // 我后面可以根据id再找到user,再次投递数据 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().concat("-") + i); if (i == 2) { //故意把交换机写错,演示 confirmCallback rabbitTemplate.convertAndSend("TestDirectExchange_111", "TestDirectRouting", JSON.toJSONString(user), correlationData); } else if (i == 3) { //故意把路由键写错,演示 returnCallback rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting_111", JSON.toJSONString(user), correlationData); } else { //正常发送 rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", JSON.toJSONString(user), correlationData); } } return "producer push ok"; } }
5、开始请求测试,打印日志如下,你会发现,和测试controller的注释写的一样,这里就不再多说了。
2023-01-03 16:34:14.912 INFO 27080 --- [nio-6072-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-01-03 16:34:14.912 INFO 27080 --- [nio-6072-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2023-01-03 16:34:14.913 INFO 27080 --- [nio-6072-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2023-01-03 16:34:14.975 INFO 27080 --- [nio-6072-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2023-01-03 16:34:14.992 INFO 27080 --- [nio-6072-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#5f95f1e1:0/SimpleConnection@12b7544d [delegate=amqp://guest@127.0.0.1:5672/, localPort= 63178]
2023-01-03 16:34:15.020 ERROR 27080 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'TestDirectExchange_111' in vhost '/', class-id=60, method-id=40)
2023-01-03 16:34:15.022 INFO 27080 --- [nectionFactory2] cn.gxm.producer.config.RabbitConfig : ConfirmCallback 关联数据:CorrelationData [id=0ba3b21e-e4fc-44cf-84d7-17461c8c9c18-1],投递成功,确认情况:true
2023-01-03 16:34:15.022 INFO 27080 --- [nectionFactory3] cn.gxm.producer.config.RabbitConfig : ConfirmCallback 关联数据:CorrelationData [id=54c62476-6847-4193-8f11-2a3efd90c284-2],投递失败,确认情况:false,原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'TestDirectExchange_111' in vhost '/', class-id=60, method-id=40)
2023-01-03 16:34:15.023 INFO 27080 --- [nectionFactory3] cn.gxm.producer.config.RabbitConfig : ReturnsCallback 消息:(Body:'{"id":3,"name":"汪涵"}' MessageProperties [headers={spring_returned_message_correlation=d91bb792-b6b0-4bea-a909-6eb8d0997b74-3}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),回应码:312,回应信息:NO_ROUTE,交换机:TestDirectExchange,路由键:TestDirectRouting_111
2023-01-03 16:34:15.024 INFO 27080 --- [nectionFactory2] cn.gxm.producer.config.RabbitConfig : ConfirmCallback 关联数据:CorrelationData [id=d91bb792-b6b0-4bea-a909-6eb8d0997b74-3],投递成功,确认情况:true
2023-01-03 16:34:15.025 INFO 27080 --- [nectionFactory2] cn.gxm.producer.config.RabbitConfig : ConfirmCallback 关联数据:CorrelationData [id=70852973-9d21-49a4-aa07-67ca8f11aed8-4],投递成功,确认情况:true
2023-01-03 16:34:15.026 INFO 27080 --- [nectionFactory2] cn.gxm.producer.config.RabbitConfig : ConfirmCallback 关联数据:CorrelationData [id=9dc23013-5b97-47b4-8ebf-3f842615c2f7-5],投递成功,确认情况:true
6、因为我们此刻是没有启动消费者的,所以,在控制台是能看到三条数据的,id为2和id为3的数据是有问题的,都分别通过confirmCallback
和returnCallback
来进行回调了,则具体的补偿措施,可以根据自身的业务来处理。
1、这个阶段我们只要保证队列,交换机,以及发送数据的持久化就
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。