当前位置:   article > 正文

主流消息队列RocketMq,RabbitMq比对使用_rockernq跟rabbitmq那个好

rockernq跟rabbitmq那个好

首先整理这个文章是因为我正好有机会实战了一下rocketmq,阿里巴巴的一个开源消息中间件。所以就与以往中rabbitmq进行小小的比较一下。这里主线的根据常见面试问题进行整理。

1.消息队列常用的场景

1.1.削峰

例如我们做得考试系统中,用户通过人脸识别登录系统,考虑到考试系统的特殊性,三万名考生参加考试,需要记录人脸识别登录照片。从考试完结果上看,用户最大并发数在4000,于是我们采用rocketMq来进行异步消费用户人脸识别图片,当时统计rocketMq每秒1000消费消息。及时反馈了考生人脸识别登录成功,对数据库写操作也起到很大的缓冲功能。

1.2.解耦

如常用ABCD系统中,BCD系统都需要从A系统中调用接口返回数据,这时候突然来了E系统,也需要A系统数,又或者C系统不想要用这个接口数据了,而且A系统还得考虑,如果BCD接收不到数据,接收失败咋整之类的问题。

如果基于消息队列,这些问题就迎刃而解了。

A系统直接把数据扔到Mq中,BCDE系统直接从Mq中消费,如果消费失败,则重试消费。

1.3.异步

比如下订单系统中,会调用库存系统,会调用仓库系统,积分系统等,用户订单操作会直接返回给用户信息,提示订单完成,至于库存减少,或者仓库发货又或者积分的增加等,都是异步完成。极大的提高用户响应速度。

2.各种消息队列优缺点

1.1.rabbitMq

rabbitMq 几万级数据量,基于erlang语言开发,因此响应速度快些,并且社区活跃度比较活跃,可视化界面。缺点就是数据吞吐量相对与小一些,并且是基于erlang语言开发,比较重的问题难以维护。

1.2.rocketMq

rocketMq几十万级别数据量,基于Java开发,应对了淘宝双十一考验,并且文档十分的完善,拥有一些其他消息队列不具备的高级特性,如定时推送,其他消息队列是延迟推送,如rabbitMq通过设置expire字段设置延迟推送时间。又比如rocketmq实现分布式事务,比较可靠的。

1.3.kafka

kafka真正的大规模分布式消息队列,提供的核心功能比较少。基于zookeeper实现的分布式消息订阅。

3.消息队列常使用的注意事项或者面试时候经常问道的功能点

3.1.如何保证系统的高可用

就rabbitMq而言,有镜像模式概念,就是用户在发送数据时候,发送到mq机器上,并且持久化磁盘,然后通过设置镜像的queue,把数的持久化地址对应表同步到另外mq机器上。这种就有效防止一台mq挂了以后,另外的mq可以直接对外提供消费功能。

就rocketMq而言,分为多主集群结构,多主多备异步复制结构,多主多备同步复制结构。

3.2.如何保证消息不会丢失

就rabbitmq而言,从生产者,消费者,消息队列角度分析。生产者而言,发送消息如果失败,则定义重试次数,一般设置成五次。两种解决方式1.通过设置事务,进行事务回滚重试。2.通过发送者确认模式开启。

方式一:channel.waitForConfirms()普通发送方确认模式;

  方式二:channel.waitForConfirmsOrDie()批量确认模式;

  方式三:channel.addConfirmListener()异步监听发送方确认模式;

  1.     // ======== 批量确认模式 end ======
  2.     // 发送消息
  3.     while(num-- > 0) {
  4.         // 发送一个持久化消息到特定的交换机
  5.         channel.basicPublish(EXCHANGE_NAME, routingkey, MessageProperties,PERSISTENT_TEXT_PLAIN, message.getBates());
  6.         System.out.println(" [SimpleConfirmSend] Sent " + message);
  7.     }
  8.     // 批量等待确认: 返回true: 如果所有的消息都收到有确认应答,没有消息被拒绝
  9.     if(!channel.waitForConfirms()){
  10.         System.out.println("Not all message have arrived broker" );
  11.         // 实际应用中,需要在这是添加发送消息失败的处理逻辑: 如重发等等
  12.         // 在这种的模式中,如果发送N条消息,如果有一条失败,则所有的消息都需要重新推送
  13.     }

就mq本身而言,需要做队列的持久化到磁盘的操作。1.queque队列的持久化,通过channel.queue_declare(queue='hello', durable=True);设置

3.3.设置消息的持久化,通过delivery_mode=2来进行设置。

mq消费者而言,开启手动ACK模式,也就是需要真正的消费者入库成功,才会进行消费成功的确认。

总结就是一句话:发送者确认模式开启,消息持久化默认开启,消费者消费开启手动ack

rocketMq而言,生产者发送消息,生产者默认模式

  rocketMq持久化方式中,消息持久化通过如下配置。

  1. #Broker 的角色
  2. #- ASYNC MASTER 异步复制Master
  3. #- SYNC MASTER 同步双写Master
  4. #- SLAVE
  5. brokerRole=ASYNC MASTER
  6. #刷盘方式
  7. #- ASYNC ELUSH 异步刷盘
  8. #- SYNC ELUSH 同步刷盘
  9. flushDiskType=ASYNC FLUSH
  10. #checkTransactionMessageEnable=false#发消息线程池数量
  11. #sendMessageThreadPoolNums=12 8
  12. #拉消息线程池数量
  13. #pul1MessageThreadPoolNums=128

3.4.消费者幂等消费问题

感觉rabbitmq和rocketmq出现重复消费场景差不多

3.4.1.发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

3.4.2.投递时消息重复

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

3.4.3.负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)

当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

解决方式的话,通过messageId,作为数据库业务主键,重复插入会报错主键冲突问题。

或者通过redis唯一性,messageId作为key存入,去重重复的数据,在从redis中刷到数据库里面。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/347087
推荐阅读
相关标签
  

闽ICP备14008679号