当前位置:   article > 正文

rocketMQ 工作原理 知识点扩充_rocketmq 消费者默认不报错就ack

rocketmq 消费者默认不报错就ack

首先列出mq产品各方面对比如下:在这里插入图片描述
选择rocketmq的原因

  • 性能是首要考虑的要素。
  • 开发语言是java,主要是为了方便二次开发。
  • 对于高并发的业务场景是需要支持分布式架构的设计。
  • 功能全面,由于不同的业务场景,可能会用到顺序消息、事务消息等。

为什么要使用mq

  1. 实现分布式系统之间的解耦调用
    在分布式系统中,经常会出现一个服务会有多个消费端调用,而且可能每个消费方需要接入的逻辑不一致,又或者随着项目的不断发展,我们需要接口的未来维护和发展确定一套可扩展的规范,引入消息系统,在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

  2. 实现异步调用
    有时候我们会遇到这样的场景,用户在客户端提交了一个请求,后端处理这个请求的业务相对比较复杂,如果这个请求使用的是同步调用,客户端就会出现发送请求后过了很久才相应的情况,这对用户体验来说是十分致命的.如果说用户并不关心请求是否处理,对于一些耗时的非事务性的业务处理,我们可以使用mq异步请求的方式,将处理信息放入队列,由后端监听队列自行处理,在将消息存入队列后直接返回客户端相应,加快响应速度.

  3. 削峰(队列),解决高并发问题
    例如秒杀活动,可能在短时间内会有很大请求同时到后端,如果后端对每个请求都执行业务操作,例如查询数据库和写数据库,会造成服务器压力过大,同时,在同一时间进行大量数据库操作,可能会出现数据异常,我们可以使用mq实现缓冲,将所有请求先放入消息队列中,服务端每次处理业务先从消息队列获取.

使用mq的缺点

  1. 系统可用性降低
    你想啊,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低,这个问题我们可以通过部署高可用mq解决,但这又会引发下面的问题.

  2. 系统复杂性增加
    要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输,出现问题需要排查的范围扩大。因此,需要考虑的东西更多,系统复杂性增大。

1 工作流程:

  1. 启动NameServer,NameServer启动后开始监听端口等待broker,producer,consumer连接
  2. 启动broker时,broker会和所有的NameServer建立并保持长链接然后每隔30s向NameServer定时发送心跳包
  3. 发送消息前,可以先创建Topic,创建Topic时需要指定该topic要存储在哪些broker上,同时也会将topic和broker的关系写入到NameServer,不过这步并不是必需的,也可以在发送消息时自动创建topic

手动创建Topic有两种模式:
1)集群模式:该模式下创建的Topic在该集群中,所有Broker的Queue数量是相同的。
2)Broker模式:该模式下创建的Topic在该集群中,每个Broker的Queue数量可以不同。
自动创建Topic时,默认采用Broker模式,会为每个Broker默认创建4个Queue

  1. producer发送消息,启动时与NameServer集群中的其中一台建立长链接,并从NameServer中获取路由信息,即当前发送的Topic和Queue与Broker的地址(IP+Port)的映射关系。然后根据算法策略从队列选择一个Queue,与队列所在的broker建立长链接从而向broker发送消息。在获取到路由信息后,Producer会首先将路由信息缓存到本地,再每隔30s从NameServer更新一个路由信息
  2. 、Consumer和Producer类似。和其中一个NameServer建立长链接,获取其所订阅的Topic信息,然后根据算法策略从路由信息中获取到所要消费的Queue,然后和broker建立长链接,开始消费其中的消息。Consumer在获取到路由信息后,也会每隔30s从NameServer更新一次路由信息,不过和Producer不同的是Consumer还会向Broker发送心跳,以确保Broker的存活状态

2 RocketMQ为什么速度快

  • 是因为使用了顺序存储、Page Cache和异步刷盘。
  • 我们在写入commitlog的时候是顺序写入的,这样比随机写入的性能就会提高很多,写入commitlog的时候并不是直接写入磁盘,而是先写入操作系统的PageCache
  • 最后由操作系统异步将缓存中的数据刷到磁盘

3 如何保证消息的顺序性

多个queue只能保证单个queue里的顺序,queue是典型的FIFO,天然顺序。多个queue同时消费是无法绝对保证消息的有序性的。

所以总结如下:

  1. Topic只创建一个queue,发消息的时候一个线程去发送消息,消费的时候 一个线程去消费一个queue里的消息。
  2. Rocket MQ给提供了MessageQueueSelector接口,可以自己重写里面的接口,通过业务需要发送到指定的queue队列

4 复制和刷盘

复制策略: 指Broker的Master和Slave间的数据同步方式,分为同步复制和异步复制

  • 同步复制:消息写入到Master后,Master会等待Salve同步数据成功后才向Producer返回ACK
  • 异步复制:消息写入到Master后,Master会立即向Producer返回成功ACK,无需等待Salve同步数据成功

异步复制策略可以降低系统的写入延迟,RT变小,提高系统的吞吐量

刷盘策略: 指Broker中的消息数据从内存持久化到磁盘的落盘方式,分为同步刷盘和异步刷盘

  • 同步刷盘:当消息持久化到Broker的磁盘后才是消息写入成功
  • 异步刷盘:当消息写入到Broker的内存后即表示消息写入成功,无序等待消息持久化到磁盘

概念延伸
1)异步刷盘策略会降低系统写入延迟,RT变小,提高系统的吞吐量
2)消息写入到Broker的内存,一般是写入到PageCache
3)异步刷盘策略,消息会写入到PageCache后立刻返回成功ACK,但并不会立刻做落盘操作,而是当PageCache达到一定量时会自动进行落盘
在这里插入图片描述

5 异步发送,消息可靠性怎么保证

消息丢失可能发生在生产者发送消息、MQ本身丢失消息、消费者丢失消息3个方面。

生产者丢失

生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是网络异常MQ没收到,消息就丢失了。

由于同步发送的一般不会出现这样使用方式,所以我们就不考虑同步发送的问题,我们基于异步发送的场景来说。

异步发送分为两个方式:异步有回调和异步无回调,无回调的方式,生产者发送完后不管结果可能就会造成消息丢失,而通过异步发送+回调通知+本地消息表的形式我们就可以做出一个解决方案。以下单的场景举例。

下单后先保存本地数据和MQ消息表,这时候消息的状态是发送中,如果本地事务失败,那么下单失败,事务回滚。

  1. 下单成功,直接返回客户端成功,异步发送MQ消息

  2. MQ回调通知消息发送结果,对应更新数据库MQ发送状态

  3. JOB轮询超过一定时间(时间根据业务配置)还未发送成功的消息去重试

在监控平台配置或者JOB程序处理超过一定次数一直发送不成功的消息,告警,人工介入。一般而言,对于大部分场景来说异步回调的形式就可以了,只有那种需要完全保证不能丢失消息的场景我们做一套完整的解决方案。

MQ丢失

如果生产者保证消息发送到MQ,而MQ收到消息后还在内存中,这时候宕机了又没来得及同步给从节点,就有可能导致消息丢失。
RocketMQ分为同步刷盘和异步刷盘两种方式,默认的是异步刷盘,就有可能导致消息还未刷到硬盘上就丢失了,可以通过设置为同步刷盘的方式来保证消息可靠性,这样即使MQ挂了,恢复的时候也可以从磁盘中去恢复消息。

虽然可以通过配置的方式来达到MQ本身高可用的目的,但是都对性能有损耗,怎样配置需要根据业务做出权衡。

消费者丢失

消费者丢失消息的场景:消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,造成消息丢失。

RocketMQ默认是需要消费者回复ack确认,而kafka需要手动开启配置关闭自动offset。消费方不返回ack确认,重发的机制根据MQ类型的不同发送时间间隔、次数都不相同,如果重试超过次数之后会进入死信队列,需要手工来处理了

6 消费者消费失败的问题,一直消费失败导致消息积压

因为考虑到消费者消费一直出错的问题,从以下几个角度来考虑:

(1)消费者出错,肯定是程序或者其他问题导致的,如果容易修复,先把问题修复,让consumer恢复正常消费

(1)如果时间来不及处理很麻烦,做转发处理,写一个临时的consumer消费方案,先把消息消费,然后再转发到一个新的topic和MQ资源,这个新的topic的机器资源单独申请,要能承载住当前积压的消息,处理完积压数据后,修复consumer,去消费新的MQ和现有的MQ数据,新MQ消费完成后恢复原状

7 消费消息是push还是pull

RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式
broker端属性 longPollingEnable 标记是否开启长轮询。默认开启
追问:为什么要主动拉取消息而不使用事件监听方式?
事件驱动方式是建立好长连接,由事件(发送数据)的方式来实时推送。

如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,同时又不能被其他consumer消费的情况。而pull的方式可以根据当前自身情况来pull,不会造成过多的压力而造成瓶颈所以采取了pull的方式

8 broker处理拉取请求

  1. Consumer首次请求Broker
  2. Broker中是否有符合条件的消息,有的话响应Consumer,等待下次Consumer的请求
  3. 没有的话挂起consumer的请求,即不断开连接,也不返回数据
  4. 使用consumer的offset, DefaultMessageStore#ReputMessageService#run方法
  5. 每隔1ms检查commitLog中是否有新消息,有的话写入到pullRequestTable
  6. 当有新消息的时候返回请求
  7. PullRequestHoldService 来Hold连接,每个5s执行一次检查pullRequestTable有没有消息,有的话立即推送

9 RocketMQ的负载均衡

通过Topic在多Broker中分布式存储实现。

producer端,默认策略是随机选择:
producer维护一个index,每次取节点会自增,index向所有broker个数取余,自带容错策略
我们也可以自主的实现:发送端指定message queue发送消息到相应的broker,来达到写入时的负载均衡。提升写入吞吐量,但是当多个producer同时向一个broker写入数据的时候,性能会下降。
消息分布在多broker中,为负载消费做准备

consumer端
采用的是平均分配算法来进行负载均衡

  1. queue个数大于Consumer个数, 那么Consumer会平均分配queue。会有多个consumer消费多个queue的情况
  2. queue个数小于Consumer个数,那么会有Consumer闲置,就是浪费掉了,其余Consumer平均分配到queue上。
  3. 一个queue只会被同一个groop下的一个consumer消费

其他负载均衡算法:

  • 平均分配策略(默认)(AllocateMessageQueueAveragely):是前面讲的默认方式
  • 环形分配策略(AllocateMessageQueueAveragelyByCircle):每个消费者依次消费一个partition,环状。
  • 手动配置分配策略(AllocateMessageQueueByConfig):是通过配置的方式
  • 一致性哈希分配策略(AllocateMessageQueueConsistentHash):一致性hash算法
  • 靠近机房策略(AllocateMachineRoomNearby):就近元则,离的近的消费

10 RocketMQ在分布式事务支持的底层原理

你们用的是RocketMQ?RocketMQ很大的一个特点是对分布式事务的支持,你说说他在分布式事务支持这块机制的底层原理?

分布式系统中的事务可以使用TCC(Try、Confirm、Cancel)、2pc来解决分布式系统中的消息原子性

RocketMQ 4.3+提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致

RocketMQ实现方式:

Half Message: 预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中

检查事务状态: Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,Broker会定时去回调在重新检查。

超时: 如果超过回查次数,默认回滚消息。

也就是他并未真正进入Topic的queue,而是用了临时queue来放所谓的half message,等提交事务后才会真正的将half message转移到topic下的queue。

注意: mq的事务在消息生产者方,消息生产者发送事务消息,broker向消息生产者查询执行状态,broker执行回滚或者提交操作

11 mq一些时间和次数上的规则:

  • 消息在消费失败后会进入重试队列,16次(默认16次)才会进入死信队列 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

  • Broker每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。

  • nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接。

  • 同步双写保证消息不会丢失,如果5s内未完成消息复制,则给生产者Producer返回结果:数据同步到Slave服务器超时

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/712853
推荐阅读
相关标签
  

闽ICP备14008679号