赞
踩
目录
用户对于同一操作发起的一次请求或者多次请求的结果是一致的。
如果一个插入操作的消费者,插入完成之后的代码发生了异常,这样就抛出异常,然后rabbitmq就会使用重试机制,这样这个消费者就会不断的往数据库里插入该订单数据,这样就会产生多条同样的订单数据,造成结果不一致的。
重试机制实现原理:
重试机制:
当我们消费者处理执行我们业务代码的时候,如果抛出异常:
在这时候mq会自动触发重试机制,默认的情况下rabbitmq是无限次数的重试,需要人为指定重试次数限制问题。
- spring:
- rabbitmq:
- # 连接地址
- host: 127.0.0.1
- # 端口号
- port: 5672
- # 账号
- username: guest
- # 密码
- password: guest
- # 地址(类似于数据库的概念)
- virtual-host: /admin_vhost
- # 消费者监听相关配置
- listener:
- simple:
- retry:
- # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
- enabled: true
- # 最大重试次数
- max-attempts: 5
- # 重试间隔时间(毫秒)
- initial-interval: 3000
哪些情况需要重试?哪些情况不需要?
A、消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试?
该情况下需要实现重试策略,网络延迟只是暂时调用不通,重试多次有可能会调用通。
B、消费者获取消息后,因为代码问题抛出数据异常,是否需要重试?
该情况下是不需要实现重试策略,就算重试多次,最终还是失败的。可以将日志存放起来,后期通过定时任务或者人工补偿形式。
还可以使用第三方接口自动进行补偿(补偿,就行将没有消费的消息按照消费逻辑处理):RabbitMQ在消费者消费发生异常时,消费者 会调用第三方接口进行补偿,可以根据返回结果判断是否成功:
模拟会发生重试的消费者:
- @Slf4j
- @Component
- @RabbitListener(queues = "fanout_order_queue")
- public class FanoutOrderConsumer {
- @Autowired
- private OrderManager orderManager;
- @Autowired
- private OrderMapper orderMapper;
-
- @RabbitHandler
- public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
- log.info(">>orderEntity:{}<<", orderEntity.toString());
- //保存订单
- int result = orderManager.addOrder(orderEntity);
- //执行到这里会抛出异常 此时rabbitmq会发生重试 这样该订单就会在数据库中存入多份
- int i=1/0;
- log.info(">>插入数据库中数据成功<<");
- }
- }
投递消息前,生成一个全局的订单id,然后封装到orderEntiry实体类中。消费者插入订单前,先通过该订单id去数据库中查询这个订单,如果存在就结束 不存在,就往数据库中插入。
- import com.alibaba.fastjson.JSONObject;
- import com.mayikt.entity.OrderEntity;
- import com.mayikt.manager.OrderManager;
- import com.mayikt.mapper.OrderMapper;
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
-
-
- @Slf4j
- @Component
- @RabbitListener(queues = "fanout_order_queue")
- public class FanoutOrderConsumer {
-
- @Autowired
- private OrderManager orderManager;
- @Autowired
- private OrderMapper orderMapper;
-
- @RabbitHandler
- public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
- try {
- log.info(">>orderEntity:{}<<", orderEntity.toString());
-
- //获取订单号,
- String orderId = orderEntity.getOrderId();
- //订单号为空直接结束
- if (StringUtils.isEmpty(orderId)) {
- return;
- }
-
- //防止重复消费 全局id
- OrderEntity dbOrderEntity = orderMapper.getOrder(orderId);
- //如果发生重试 会先看刚订单是否已经插入到数据库 如果存在 就 renturn;结束掉
- if (dbOrderEntity != null) {
- log.info("另外消费者已经处理过该业务逻辑");
- //手动确认消息 已经消费
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- return;
- }
-
- //保存订单
- int result = orderManager.addOrder(orderEntity);
- log.info(">>插入数据库中数据成功<<");
-
- //手动确认消息 已经消费
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- } catch (Exception e) {
- //1、记录该消息日志形式 存放数据库db中、后期通过定时任务实现消息补偿、人工实现补偿
-
- //2、将该消息存放到死信队列中,单独写一个死信消费者实现消费。
-
- //3、也可以调用第三方接口进行补偿,如果补偿失败 就抛异常,让rabbitmq进行重试
- }
- }
- }
如果消费者处理消息时,因为代码原因抛出异常是需要从新发布版本才能解决的,那么就不需要重试,重试也解决不了该问题的。存放到死信队列或者是数据库表记录、后期人工实现补偿。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。