当前位置:   article > 正文

消息队列+如何保证消息不丢失+如何保证消息的有序性+如何处理消息堆积_消息队列如何保证消息不丢失

消息队列如何保证消息不丢失

1、消息队列基本概念 

消息队列模型分为:队列模型和发布/订阅模型。

队列模型介绍:生产者向队列发送消息,一个队列可以存储多个生产者发送的消息,一个队列也可以有多个消费者。消费者之间是竞争关系,即每条消息只能被一个消费者消费。

2、发布/订阅模型 

 简介:该模型是将消息发往一个Topic 主题中,所有订阅了这个 Topic 的订阅者都能消费这条消息。

简单来说:一个会议室里,一个人发言,参会人员都能听到讲话内容。

队列模型:每条消息只能被一个消费者消费。

发布/订阅模型:让一条消息可以被多个消费者消费

队列模型也可以将消息存储到多个队列实现一条消息被多个消费者消费,但是存在数据冗余。

3、如何保证消息不丢失

消息生产消费示意图,接下来需要分别从生产者,Broker,消费者分析:

在生产者端:

        生产者发送消息至 Broker ,需要处理 Broker 的响应,不论是同步还是异步发送消息,同步和异步回 调都需要做好 try-catch ,妥善的处理响应,如果 Broker 返回写入失败等错误消息,需要重试发送。 当多次发送失败需要作报警,日志记录等。

        这样就能保证在生产消息阶段消息不会丢失。

Broker存储消息:

        1:存储消息阶段需要在消息刷盘之后再给生产者响应。否则提前给生产者响应后机器宕机,会导致数据丢失

       2::集群部署:

        使用多副本机制,消息不仅仅要写入当前 Broker ,还需要写入副本机中。配置成至少写入两台机子后再给生产者响应。

消费者:

        在消费者真正执行完业务逻辑 之后,再发送给 Broker 消费成功。防止消费过程中机器宕机

4、如何处理重复消息

        产生原因

            1:生产者已经发送消息到broker,broker可能已经写入了,生产者等待broker的响应,等待过程中可能存在网络波动等原因,导致生产者没有收到broker写入之后的响应,然后生产者重发了一次,此时消息重复。

            2:重试机制:RocketMQ 提供了消息重试机制,如果消息在消费过程中发生异常,消费者可能会重新拉取相同的消息进行重试。如果消费者的处理逻辑没有处理幂等性(即多次处理不会产生不一致结果),则消息可能会重复消费。

            3:集群模式重复消费:在 RocketMQ 的集群模式下,如果多个消费者组订阅相同的主题,并且每个消费者组都独立消费消息,那么同一条消息就有可能被不同的消费者组重复消费。

        原因总结:生产者重复发送,重试机制,消费端宕机,集群模式下的消息重复消费

                消费者拿到消息之后执行了业务逻辑,事务也提交了,此时需要更新 Consumer offset,这个时候可能存在消费者挂掉,另外的消费者拿到重复发送的消息又执行了一遍业务逻辑。

        消息重复是无可避免的,可以在业务层面上解决消息重复的问题

        解决办法

           关键点是幂等

           幂等:同样的参数多次调用同一个接口和调用一次产生的结果是一致的。

      具体措施:以下是 rabbitMq的示例代码:         

第一种:在生产者端实现消息的幂等性

        发送端应该保证消息的唯一性,可以在业务层面使用唯一的消息标识来避免重复发送相同的消息。在发送消息时给消息生成一个唯一id。

 

  1. public class MqConst {
  2. /**
  3. * 商品上下架
  4. */
  5. public static final String EXCHANGE_GOODS_DIRECT = "goods.direct";
  6. public static final String ROUTING_GOODS_UPPER = "goods.upper";
  7. public static final String ROUTING_GOODS_LOWER = "goods.lower";
  8. public static final String MESSAGE_ID_PREFIX = "MESSAGE_ID_";
  9. }
  1. //首先创建一个接口
  2. public interface RabbitService {
  3. public boolean sendMessage( String exchange , String routeKey , Object msg);
  4. // 调用该方法
  5. boolean sendMessage(String exchange, String routeKey, Object msg, String msgId);
  6. }
  1. @Service
  2. public class RabbitServiceImpl implements RabbitService {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @Override
  6. public boolean sendMessage(String exchange, String routeKey, Object msg) {
  7. rabbitTemplate.convertAndSend(exchange, routeKey, msg);
  8. return true;
  9. }
  10. /** 消息发生方法
  11. * exchange : 交换机
  12. * routeKey: 路由
  13. * msg: 发送的消息
  14. * msgId: 消息唯一id
  15. * @param exchange
  16. * @param routeKey
  17. * @param msg
  18. * @param msgId
  19. */
  20. @Override
  21. public boolean sendMessage(String exchange, String routeKey, Object msg, String msgId) {
  22. rabbitTemplate.convertAndSend(exchange, routeKey, msg,new CorrelationData(msgId));
  23. return true;
  24. }
  1. //商品上下架
  2. @Override
  3. public void publish(Long skuId, Integer status) {
  4. //生产者发送消息生成唯一id
  5. String msgId = MqConst.MESSAGE_ID_PREFIX + System.currentTimeMillis()+ UUID.randomUUID().toString();
  6. //存入集合,消费者可以判断
  7. Set<String> processedMessageIds = new HashSet<>();
  8. processedMessageIds.add(msgId);
  9. //存入
  10. if (status == 1) { //上架
  11. SkuInfo skuInfo = baseMapper.selectById(skuId);
  12. skuInfo.setPublishStatus(status);
  13. baseMapper.updateById(skuInfo);
  14. //发送
  15. rabbitService.sendMessage(MqConst.EXCHANGE_GOODS_DIRECT,
  16. MqConst.ROUTING_GOODS_UPPER,
  17. skuId,
  18. msgId);
  19. } else { //下架
  20. SkuInfo skuInfo = baseMapper.selectById(skuId);
  21. skuInfo.setPublishStatus(status);
  22. baseMapper.updateById(skuInfo);
  23. //发送
  24. rabbitService.sendMessage(MqConst.EXCHANGE_GOODS_DIRECT,
  25. MqConst.ROUTING_GOODS_LOWER,
  26. skuId,
  27. msgId);
  28. }
  29. }

消费者拿到MqConst.EXCHANGE_GOODS_DIRECT,获取消息id再做比较即可。

其他方式:

        例如这条 SQL  update table1 set count= 100 where id = 1 and count= 50; 执行多少遍 count都是100,这就叫幂等。 因此需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果。 可以通过上面我那条 SQL 一样,做了个前置条件判断,即 count= 50情况,并且直接修改。

        更通用 的是做个 version 即版本号控制,对比消息中的版本号和数据库中的版本号。 或者通过数据库的约束例如唯一键,例如 insert into update on duplicate key... 。

具体还是看业务,根据业务具体分析。

5、如何处理重复消息

有序性分:全局有序和部分有序。

全局有序: 如果要保证消息的全局有序,首先只能由一个生产者往 Topic 发送消息,并且一个 Topic 内部只能有 一个队列(分区)。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的。

部分有序:绝大部分的有序需求是部分有序,我们就可以将 Topic 内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。这样即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率。

如图所示:将同类的消息发送至同一个队列即可。

6、如何处理消息堆积

        消息的堆积产生原因:因为生产者的生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复 重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了。

         因此先定位消费慢的原因,如果是代码出现 bug 则处理 bug 。

        若因为本身消费能力较弱,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次我们批量处理,比如数据库的插入,一 条一条插和批量插效率是不一样的。

         如果逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加 Topic 的队列数和消费者数量。

        注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一 个消费者。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/826241
推荐阅读
相关标签
  

闽ICP备14008679号