当前位置:   article > 正文

RabbitMQ高级 -- 分布式事务_rabbitmq实现分布式事务

rabbitmq实现分布式事务

一、引言

      前端时间因为忙着学吉他~  就耽误正事了~  现在就赶紧补回来~~

二、目录

目录

1.1 什么是分布式事务 ?事务的作用 ? 什么情况下会用到分布式事务 ?

概述:

什么情况下回用到分布式事务呢 ?

1.2 可靠生产和确认推送

概念:

可靠生产的问题:

代码实现:

1.2 可靠消费

a.怎么才能保证消息一定被可靠消费呢 ?

b.处理流程:

c.代码实现: 


1.1 什么是分布式事务 ?事务的作用 ? 什么情况下会用到分布式事务 ?

概述:

分布式事务是指事务的操作位于不同的节点上,需要保证事务的ACID特性 ~

(所谓分布式事务作用:在不同的系统之间,如何保证数据的一致性。

什么情况下回用到分布式事务呢 ?

1.主要是应用在我们 springCloud(微服务)当中 ~  具体是怎么应用的呢 ,看下图 !

解释:

如上图所示,现在有订单服务和配送中心两个服务,每个服务都有自己独立的数据库,假如有一个用户在订单服务下了下单,需要往订单数据库插入数据, 然后给配送中心配送订单,插入配送数据库。

问题

但是假设,给订单数据库保存数据成功了,在调用配送中心的时候发生了异常,这个时候,是不会回滚订单服务的。 就会造成数据的不一致性 !!这个时候就会用的我们的分布式事务.

注意:Spring 提供的也有事务的方法,比如 @Transactional 注解,但是在微服务当中,每个服务都是一个独立的jvm,两个jvm 之间没办法相互去控制。

1.2 可靠生产和确认推送

概念:

   可靠生产就是生产者发送一个消息,确认消息成功投递到了队列当中,那怎么能保证消息成功投递到队列当中呢 ? , RabbitMQ 提供的有确认机制,就是当生产者消息成功投递到队列当中,会给你回值,告诉你这个消息已成功投递,简称为  "可靠生产" 。

 

                                               图 19-7

上图大致流程:

第一步:生产者发送消息到交换机,交换机分发给队列。

第二步:消息的确认机制,判断消息是否成功投递到队列。

第三步:

             如果消息投递成功, MQ队列会给你个状态,告诉你消息投递成功。

             如果消息投递失败,有可能是服务器原因造成,就将投递失败的消息进行两次重发,如果两次投递消息都失败,证明这条消息有问题。

可靠生产的问题:

                                                                    图 19-8

上图说明:

   当有一个用户下了订单,发送消息可能会出现不可靠,可能就是MQ宕机了,为了防止这种情况,在我们的业务当中会加张冗余表,利用交换机的确认机制判断消息是否发送成功。如果发送成功,就把冗余表的状态改成成功(0和1代表成功都可)。如果失败,就通过定时器重发消息。

代码实现:

注意:MQ配置的时候把确认机制类型:correlated 才能生效。

  1. spring:
  2. # RabbitMQ 配置
  3. rabbitmq:
  4. username: admin
  5. password: 123456
  6. virtual-host: /
  7. addresses: 81.70.97.167:5672
  8. # 确认机制生效
  9. publisher-confirm-type: correlated

a.先保存消息到订单表和消息冗余表

  1. /**
  2. * @author WangYan
  3. * @date 2022/3/2 21:04
  4. */
  5. @Service
  6. public class DownOrderServiceImpl implements DownOrderService {
  7. @Autowired
  8. DownOrderMapper downOrderMapper;
  9. @Autowired
  10. DownOrderMessageMapper orderMessageMapper;
  11. @Autowired
  12. RabbitTemplate rabbitTemplate;
  13. @Autowired
  14. private RestTemplate restTemplate;
  15. @Override
  16. public String downOrder(DownOrderPO po) {
  17. // 1.保存到订单表
  18. downOrderMapper.insert(po);
  19. // 2.保存到订单冗余表
  20. this.saveRedundancy(po);
  21. // 3.发送消息到MQ
  22. this.sendMessage(po.getUuid());
  23. return "成功!";
  24. }
  25. /**
  26. * 保存到消息冗余表
  27. * @param po
  28. */
  29. public void saveRedundancy(DownOrderPO po){
  30. DownOrderMessagePO orderMessage = new DownOrderMessagePO();
  31. orderMessage.setUuid(po.getUuid());
  32. orderMessage.setCommodity(po.getCommodity());
  33. orderMessage.setAmount(po.getAmount());
  34. orderMessage.setOrderPerson(po.getOrderPerson());
  35. // 0 未发送
  36. orderMessage.setStatus(0);
  37. orderMessageMapper.insert(orderMessage);
  38. }
  39. /**
  40. * 发送消息到MQ
  41. */
  42. public void sendMessage(String orderId){
  43. rabbitTemplate.convertAndSend("amq.fanout","",orderId,new CorrelationData(orderId));
  44. }
  45. }

b.MQ的确认机制

注意:Springboot是个Web项目,一直在运行,下完订单之后,当前线程最好睡眠两毫秒,这样确认机制有一个好的回值。

  1. /**
  2. * @author WangYan
  3. * @date 2022/4/10 18:12
  4. */
  5. @Service
  6. public class MQService {
  7. @Autowired
  8. RabbitTemplate rabbitTemplate;
  9. @Autowired
  10. DownOrderMessageMapper orderMessageMapper;
  11. /**
  12. * @PostConstruct
  13. * 不是Spring提供的注解,而是Java自己的注解。
  14. * Java中该注解说明: @PostConstruc 该注解被用来修饰一个非静态的 void() 方法。
  15. * 被 @PostConstruc 修饰的方法会在服务器加载Servlet的时候运行。并且只会被服务器执行一次, PostConstruct 在构造函数之后执行, init() 方法之前。
  16. */
  17. @PostConstruct
  18. public void regCallback(){
  19. // 消息发送成功以后,给与生产者的消息回复,以此来确保生产者的可靠性
  20. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  21. @Override
  22. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  23. // 如果ack为true 代表消息已经收到
  24. String orderId = correlationData.getId();
  25. if (!ack){
  26. // 消息投递失败
  27. System.out.println("MQ队列应答失败,orderId是: " + orderId);
  28. }
  29. QueryWrapper<DownOrderMessagePO> queryWrapper = new QueryWrapper<>();
  30. queryWrapper.lambda().eq(DownOrderMessagePO::getUuid,orderId);
  31. DownOrderMessagePO orderMessagePO = orderMessageMapper.selectOne(queryWrapper);
  32. orderMessagePO.setStatus(1);
  33. int insert = orderMessageMapper.updateById(orderMessagePO);
  34. System.out.println("消息投递成功!!");
  35. }
  36. });
  37. }
  38. }

1.2 可靠消费

a.怎么才能保证消息一定被可靠消费呢 ?

解决方案:

第一种:

1.队列绑定一个死信队列。

2.当消息消费的时候遇到异常,利用MQ的nack机制把消息丢弃到死信队列。

3.监听死信队列中的消息进行业务处理或者人工干预。

第二种:

1.利用MQ的消息重发。

注意:暂时推荐第一种,比较好的解决方案。

注意:消费服务如果出现报错,会出现死循环,可能会把服务器磁盘消耗殆尽,导致服务宕机。

                                                               图列20-1

上图流程说明:

1.利用MQ的ack机制,由消费者自身控制消息的重发、清除和丢弃。

2.问题:

a.幂等性问题:定时重发会造成消息的重复发送。可以使用唯一主键,或者redis 的

分布式锁。

b.处理流程:

                                                                图例 20-2 

上图简单说明:

1.监听队列,获取消息,开始处理消息

2.当消息消费的时候遇到异常,利用MQ的nack机制把消息丢弃到死信队列。

3.监听死信队列中的消息进行业务处理或者人工干预。

c.代码实现: 

yml配置:

  1. spring:
  2. # RabbitMQ 配置
  3. rabbitmq:
  4. username: admin
  5. password: 123456
  6. virtual-host: /
  7. addresses: 81.70.97.167:5672
  8. listener:
  9. simple:
  10. acknowledge-mode: manual # 开启手动ack,让程序去控制MQ的消息重发和删除和转移
  11. retry:
  12. enabled: true #开启重试
  13. max-attempts: 2 #重试最大次数 (默认是3次)
  14. initial-interval: 2000ms #重试间隔时间

代码:

1.绑定死信交换机

  1. /**
  2. * @author WangYan
  3. * @date 2022/2/13 10:36
  4. * 绑定死信交换机
  5. */
  6. @Configuration
  7. public class TTLExChangebinding {
  8. /**
  9. * 声明注册交换机
  10. * @return
  11. */
  12. @Bean
  13. public FanoutExchange deadExChange(){
  14. return new FanoutExchange("dead_fanout_exChange",true,false);
  15. }
  16. /**
  17. * 声明队列
  18. * @return
  19. */
  20. @Bean
  21. public Queue deadQueue(){
  22. return new Queue("deadQueue",true);
  23. }
  24. /**
  25. * 绑定关系
  26. * @return
  27. */
  28. @Bean
  29. public Binding ttlBinding(){
  30. return BindingBuilder.bind(deadQueue()).to(deadExChange());
  31. }
  32. @Bean
  33. public FanoutExchange orderExChange(){
  34. return new FanoutExchange("amq.fanout",true,false);
  35. }
  36. @Bean
  37. public Queue orderQueue(){
  38. Map<String, Object> args = new HashMap<>();
  39. args.put("x-dead-letter-exchange","dead_fanout_exChange");
  40. return new Queue("吕洞宾",true,false,false,args);
  41. }
  42. @Bean
  43. public Binding orderBinding(){
  44. return BindingBuilder.bind(orderQueue()).to(orderExChange());
  45. }
  46. }

2.监听队列,消息处理异常丢到死信队列中

重点:

a.如果配置文件中设置了重发次数,但是代码中加了try+catch、channel.basicNack(tag,false,true),这个时候配置文件当中的重发次数是不会生效的,会造成死循环。

b.如果需要配置文件当中的重发次数生效,去掉try+catch,加上channel.basicNack(tag,false,false);

  1. /**
  2. * @author WangYan
  3. * @date 2022/3/5 16:13
  4. */
  5. @Service
  6. public class OrderMqConsumer {
  7. @Autowired
  8. TransportOrderMapper orderMapper;
  9. @RabbitListener(queues = {"吕洞宾"})
  10. public void getMessage(String message, Channel channel,
  11. CorrelationData correlationData,
  12. @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
  13. try {
  14. // 1.获取队列消息
  15. System.out.println("收到的MQ消息:" + message);
  16. // 2.保存运单消息
  17. TransportOrderPO orderPO = new TransportOrderPO();
  18. orderPO.setOrderId(Integer.parseInt(message));
  19. orderPO.setTransportDatetime(LocalDateTime.now());
  20. orderMapper.insert(orderPO);
  21. // 2.手动ack告诉mq消息已经正常消费
  22. System.out.println(1 / 0);
  23. channel.basicAck(tag,false);
  24. }catch (Exception ex){
  25. /**
  26. * @param : requeue
  27. * false 不会重发,会把消息打入到死信队列
  28. * true 会死循环重发.建议如果使用true的话,不加try/catch 否则的话就会死循环。
  29. *
  30. * tag 是消息的标签,类似唯一主键
  31. */
  32. channel.basicNack(tag,false,false);
  33. }
  34. }
  35. }

3.监听死信队列

  1. /**
  2. * @author WangYan
  3. * @date 2022/3/5 16:13
  4. * 死信队列
  5. */
  6. @Service
  7. public class deadMqConsumer {
  8. @Autowired
  9. TransportOrderMapper orderMapper;
  10. @RabbitListener(queues = {"deadQueue"})
  11. public void getMessage(String message, Channel channel,
  12. CorrelationData correlationData,
  13. @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
  14. try {
  15. // 1.获取队列消息
  16. System.out.println("收到死信队列的MQ消息:" + message);
  17. // 2.业务处理
  18. TransportOrderPO orderPO = new TransportOrderPO();
  19. orderPO.setOrderId(Integer.parseInt(message));
  20. orderPO.setTransportDatetime(LocalDateTime.now());
  21. orderMapper.insert(orderPO);
  22. // 3.手动ack告诉mq消息已经正常消费
  23. channel.basicAck(tag,false);
  24. }catch (Exception ex){
  25. // 4.处理异常死信队列中的消息
  26. System.out.println("人工干预,或者保存到数据库,根据你的业务进行处理死信队列的消息!!");
  27. // 5.将消息从死信队列中移除
  28. channel.basicNack(tag,false,false);
  29. }
  30. }
  31. }

 拜拜~

有任何问题欢迎大家指出~

Thank You !

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/127022
推荐阅读
相关标签
  

闽ICP备14008679号