赞
踩
目录
在详细介绍各个领域模型之前,首先让我们整体来预览下各个模型之间的关系图。
从图中可以看出整体关系是,生产者发送消息到某个Topic中的某个队列中,消费者通过订阅关系订阅指定Topic中消息。
主题是RocketMQ中消息传输和存储的的顶层容器,用于标识同一类型业务逻辑的消息。主题只是一个逻辑概念,它并不是一个实际的消息容器。
主题的作用有两个:
由于主题(Topic)非常重要,RocketMQ官方建议在生产环境中不能开启自动创建主题的配置,以免产生大量垃圾主题,无法管理和回收浪费系统资源。
RocketMQ官方推荐在RocketMQ 5.0版本下使用 myadmin命令来创建主题,创建命令是:
./bin/mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=<message_type>
nameserver_address: 是Nameserver集群的地址,比如:172.31.184.89:9876
topic_name:是主题名称
cluster_name:是broker集群的名称
message_type:表示消息类型,可以填入: Normal ,FIFO ,Delay,Transaction,不填默认就是普通消息。
比如下面创建一个名称为 feige_FIFOTopic的存放顺序消息的主题
./bin/mqadmin updateTopic -n 172.31.184.89:9876 -t feige_FIFOTopic -c rocketmq-cluster -a +message.type=FIFO
队列是RocketMQ中消息传输和存储的实际容器,是RocketMQ中消息存储的最小单元。RocketMQ中所有主题都是由多个队列组成。
队列具有天然的顺序性,即按照消息写入的顺序存入队列中。队头存的是最先写入的消息,队尾存的是最近写入的消息。
消息在队列里的顺序和消息之间的顺序通过位点(offset)来进行标记管理,RocketMQ中消息被消费会记录当前已经消费到的消息的offset,下次则从此offset位点继续消费。
RocketMQ支持在任意位点消费任意数量的消息,RocketMQ中消息在队列中被消费之后并不会随即出队列,而是会默认保留48小时,这也保证了在RocketMQ中消息可以被回溯消费,以及进行消息失败重试等操作。
一个Topic的MessageQueue中的消息只能被一个消费者组中的一个消费者消费。一个MessageQueue中的消息不允许同一个消费者组中的多个消费者同时消费。
消息是RocketMQ中最小的数据传输单元,生产者将需要发送的数据包装成消息发送给RocketMQ的服务端。
消息具有两大特性:
在消息被发送成功之后我们可以看到如下输出信息:
SendResult [sendStatus=SEND_OK, msgId=0A299C7A551414DAD5DC2C3A61960000, offsetMsgId=AC1FB85900002A9F00000000002D9A88, messageQueue=MessageQueue [topic=SQLFilterTest, brokerName=broker-b, queueId=2], queueOffset=2]
RocketMQ限制消息大小,普通消息限制在4MB以内,事务和定时消息限制在 64KB。
用来构建并发送消息到RocketMQ服务端的运行实体,一般是集成到业务系统中。
生产者可以发送普通消息,顺序消息,定时消息以及事务消息。
由于创建和销毁生产比较耗费系统资源, 故RocketMQ官方不建议在单一进程中创建大量生产者。
RocketMQ系统中承载多个消费行为一致的消费者的负载均衡分组,消费者组是一个逻辑概念。
RocketMQ通过消费者分组来实现消费者的管理,同一分组内的消费者共同分摊消息并进行消费,因此,为了保证分组内消息的正常负载和消费。RocketMQ要求同一个消费者分组下所有消费者的消费行为要保持一致。
分组内消费者的投递顺序一致
同一消费者分组下所有消费者的消费投递顺序是相同的,统一都是顺序投递或并发投递,不同业务场景不能混用消费者分组。
分组内消费者的业务类型一致
一般消费者分组和主题对应不同业务域对消息消费的要求不同,因此,不同业务域主题的消费建议使用不同的消费者分组,避免一个消费者分组消费超过10个主题。
消费者是RocketMQ中用来接收并处理消息的运行实体,消费者从RocketMQ服务端获取消息并进行解析。消费者通常被集成到业务系统中。
RocketMQ中提供了推模式的消费者DefaultMQPushConsumer,以及拉模式的消费者
RocketMQ系统中消费者获取消息,处理消息的规则和状态配置。
RocketMQ 的订阅关系按照消费者分组和主题粒度设计,因此,一个订阅关系指的是指定某个消费者分组对于某个主题的订阅,判断原则如下:
不同消费者分组对于同一个主题的订阅相互独立如下图所示,消费者分组Group A和消费者分组Group B分别以不同的订阅关系订阅了同一个主题Topic A,这两个订阅关系互相独立,可以各自定义,不受影响。
同一个消费者分组对于不同主题的订阅也相互独立如下图所示,消费者分组Group A订阅了两个主题Topic A和Topic B,对于Group A中的消费者来说,订阅的Topic A为一个订阅关系,订阅的Topic B为另外一个订阅关系,且这两个订阅关系互相独立,可以各自定义,不受影响。
过滤类型
消息过滤规则的类型。订阅关系中设置消息过滤规则后,系统将按照过滤规则匹配主题中的消息,只将符合条件的消息投递给消费者消费,实现消息的再次分类。
RocketMQ提供了三种集群搭建方式。
2主2从同步复制方式( 2m-2s-sync)
2主2从同步复制方式是本次集群搭建采取的方式。它使用同步复制的方式进行主从之间的数据复制,保证了消息的安全投递,不会丢失,但是会影响吞吐量。一般应用在对消息可靠性要求比较高的场景,比如订单系统,金融系统这种不容许消息数据丢失的场景。
在RocketMQ中可以使用 ./conf/2m-2s-sync
文件夹内的配置文件做集群配置。
在2主2从同步复制场景下,当生产者向broker集群中的某个broker的master节点的队列中写入消息之后,只有当消息被同步到该broker的slave节点之后,broker集群才会给生产者发送ack消息。就像下图中当消息被发送到 broker-a 的master节点之后,只有消息被同步到 broker-a的slave节点之后,broker集群才会向生产者发送ack消息。
2主2从异步复制方式(2m-2s-async)
2主2从异步复制的方式即主从之间的数据复制采取的是异步复制的方式,这种方式相比于同步复制的方式吞吐量有提升,但是可能会丢失消息。
在RocketMQ中可以使用 ./conf/2m-2s-async
文件夹内的配置文件做集群配置。
在2主2从异步复制场景下,当生产者向broker集群中的某个broker的master节点的队列中写入消息之后,broker集群才会给生产者发送ack消息。就像下图中当消息被发送到 broker-a 的master节点之后,broker集群就会向生产者发送ack消息。
2主无从方式(2m-noslave)
2主无从的方式由于没有从服务器,所以,不存在主从之间的数据复制,一般在生产环境不会被采用,因为主服务器一旦宕机,消息就有可能会丢失。
在RocketMQ中可以使用 ./conf/2m-noslave
文件夹内的配置文件做集群配置。
生产者的负载均衡策略其实就是说生产者在发送消息时如何选择队列的。查看RocketMQ的源代码可以发下生产者采取的是轮询的方式。
- int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
- MessageQueue mq = messageQueueList.get(index);
详细介绍可以查看 【RocketMQ系列七】消费者和生产者的实现细节 本文。
在RocketMQ中,Consumer端的两种消费模式(Pull/Push)都是基于拉模式来获取消息的,而在Push模式只是对Pull模式的一种封装,其本质实现为消息拉取线程在从服务器上拉取到一批消息后,提交到消息消费线程池,然后,又"马不停蹄"继续向服务器再次常识拉取消息。如果没有拉取到消息,则延迟一下又继续拉取。
在两种基于拉模式的消费方式(Pull/Push)中,均需要Consumer端知道从Broker端的哪个消息队列中去获取消息。所以,需要在Consumer端来做负载均衡,即Broker端中多个MessageQueue 分配给同一个ConsumerGroup中的哪些Consumer消息。
Consumer的负载均衡策略可以通过Consumer的api来进行设置。
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());
所有负载均衡策略均实现了AbstractAllocateMessageQueueStrategy接口。RocketMQ提供了如下几个负载均衡策略。
RocketMQ默认使用的是 AllocateMessageQueueAveragely。需要注意的是,在MessageQueue和Consumer之间一旦发生对应关系的改变,就会触发rebalance,进行重新分配。
消息重试是指消费者在消费某条消息失败之后,RocketMQ服务端会根据重试策略重新消费该消息,若超过最大重试次数还未消费成功则不在进行重新消费,而是直接将该消息发送到死信队列中。
消息重试主要是为了解决偶发性的消息消费失败导致的消费完整性问题,这些消费失败的原因包括业务处理逻辑的问题,网络抖动问题。
消费重试应用场景主要有两个:
不要把消息失败来作为条件判断的结果分流,也不要通过使用消息失败来对处理速率限流。
消费重试的状态机如下图所示:会重试的消息可能会经历如下四种状态。
消费失败:
当消息消费失败就会触发消费重试,即消费者没有向RocketMQ服务端返回offset的情况下都被认为是消费失败。都会触发消费重试。
对应的代码没有返回 CONSUME_SUCCESS 的状态是:
- // 4.创建一个回调函数
- consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
- // 5.处理消息
- for (MessageExt msg : msgs) {
- System.out.println(msg);
- System.out.println("收到的消息内容:" + new String(msg.getBody()));
- }
- // 1. 消费监听返回null则会消费重试
- return null;
- //2.消费监听返回RECONSUME_LATER也会消费重试
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- });
消息处理超时,包括在PushConsumer中排队超时。
RocketMQ 会为每个消费组都设置一个Topic名称为"%RETRY%+consumerGroup"的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费者组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而消费失败的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重试间隔,随着重试次数的增多,重试间隔也会越来越大。
RocketMQ对于重试消息的处理是先保存至Topic名为 “SCHEDULE_TOPIC_XXXX” 的延迟队列,后台定时任务按照对应的时间进行Delay后重新保存至 “%RETRY%+consumerGroup” 的重试队列中。
与延迟队列的设置相同,消息默认会重试16次,每次重试的时间间隔如下:
10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
前面说某个消息被重试超过最大重试次数16次之后,则会被直接发送到死信队列中。也就是说死信队列用来存放的是无法被正常消费的消息。
RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message), 将存放死信消息的队列称为死信队列(Dead-Letter Queue)。可以使用Console控制台对死信队列里的消息进行重发来使得消费者可以进行重新消费。
死信队列具备如下特点:
幂等的定义:幂等性指的是多次操作造成的结果是一致的。在http接口中查询操作是幂等的,
新增操作:非幂等的,每次都会插入新数据
更新操作:幂等的,对同样的数据进行修改
删除操作:根据id删除是幂等的。
那么,非幂等的操作如何保证幂等性呢?
消息队列中,很可能会存在一条消息被重复发送,或者一条消息被多个消费者消费。对于像用户注册等非幂等操作,就需要做幂等性保证。可以将情况概况为如下几种:
生产者重复发送消息:由于网络抖动,导致生产者没有收到broker的ack消息而重发消息,从而造成消息队列中消息重复。
消费者重复消费消息:由于网络抖动,导致消费者没有返回ack给broker,导致消费者重复消费。
rebalance时的重复消费:由于网络抖动,在rebalance重分配时也可能会出现消费者重复消费某条消息的情况。
比如在创建订单场景下,我们在发送消息的时候传入orderId作为业务唯一ID。当消息重复发送或者重复消息的时候可以根据订单ID 来做一个逻辑判断。
为了防止两个消费者同时消费相同重复消息的情况,这时候可以在OderId上加上分布式锁,保证同一时间内相同的消息只能有一个消费者消费。
消息堆积顾名思义就是消息队列中堆积了大量未被处理的消息,主要发生在高并发的场景下,生产者发送消息的速率远大于消费者组消息的速度。在物联网的AIOT场景中比较常见。
在RocketMQ的Console上可以查看某个Topic上消息堆积的情况。
这里有个延迟就表示目前堆积的消息数。
消息堆积的本质原因还是消费者消费消息的速度赶不上生产者发送消息的速度。可能的情况有:
第一种情况: 新上线的消费者的消费逻辑存在Bug,导致消息不能被正常消费。这种场景主要存在于代码逻辑不严谨导致某些消息消费失败,或者消费超时,从而导致消息被大量堆积。
第二种情况:消费者实例宕机或者由于网络的原因不能连上Broker集群。这种情况主要是消费者实例可能是单节点或者机房网络不好的情况。
第三种情况:生产者短时间内大量发送消息到Broker端,消费者的消费能力不足。消费者消费消息往往是一些比较耗时的IO操作,比如操作数据库,调用其他服务。这导致消费者的消费速率远低于生产者发送速率。这种情况也是消息堆积的常见场景。
解决第一种情况:对需要上线的消费者进行严格的测试,确保每种消息的场景都能覆盖到。另外,在上线的时候采用灰度发布,先灰度小范围的用户进行使用,确认没有问题了,在全量放开所有用户使用。
解决第二种情况:在上线消费者实例时需要,采用多实例,异地多活的方式,确保极端的情况下都能有消费者能够正常消费消息。
解决第三种情况:这种情况的解决本质上是如何提高消费者的消费速率。主要可以从如下方面解决:
同一个消费者组下,增加消费者实例。比如Topic中有8个队列,那么可以将消费者数量最多增加到8个。那么有同学会问为啥只增加到8个,我增加到9个,乃至10个行不行?答案是你可以增加10个消费者,但是多余的2个消费者是分不到Queue的。这是因为 在RocketMQ中某个topic下的某个队列只能被同一消费者组中的某个消费者消费。 如果消费者数量少于Queue的数量,那么有可能会出现消费不均的情况。
提高单个消费者的消费并行线程。RocketMQ 支持批量消费消息,可以通过修改DefaultMQPushConsumer 消费者类的consumeThreadMin(最少消费线程数),以及consumeThreadMax(最大消费线程数)来提高单个消费者的消费能力。
批量消费消息:
某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量。建议使用5.x SDK的SimpleConsumer,每次接口调用设置批次大小,一次性拉取消费多条消息。
下面就让我们来看个例子:
生产者:使用的是DefaultMQProducer;
- //4.创建消息
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- for (int i = 0; i < 20000; i++) {
- // 创建消息,指定topic,以及消息体
- Message message = new Message("heap_topic", ("消息堆积测试" + i).getBytes());
- //5.发送消息
- SendResult send = defaultMQProducer.send(message);
- System.out.println(send);
- }
- stopWatch.stop();
- System.out.println("生产者发送2万条消息用时="+stopWatch.getTotalTimeSeconds()+"秒");
消费者:使用的是DefaultMQPushConsumer;
- // 4.创建一个回调函数
- consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
- System.out.println("本批次收到的消息数="+msgs.size());
- // 5.处理消息
- for (MessageExt msg : msgs) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- System.out.println("当前时间="+System.currentTimeMillis()+" 收到的消息内容:" + new String(msg.getBody()));
- }
- // 返回消费成功的对象
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
生产者329秒内发送了2万条消息,平均60条,
而消费者消费一条消息需要一秒,所以生产者发送完消息之后,两个消费者还在消费。
这里消费者使用的是DefaultMQPushConsumer消费者 每批次Broker端会向消费者推送32条消息,通过pullBatchSize字段设置,而消费者,每次消费1条消息,通过consumeMessageBatchMaxSize字段设置。
当然,官方推荐使用SimpleConsumer进行批量消费消息。
- //每批次拉取16条消息
- int maxMessageNum = 16;
- // Set message invisible duration after it is received.
- Duration invisibleDuration = Duration.ofSeconds(15);
- // Receive message, multi-threading is more recommended.
- do {
- final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
- log.info("Received {} message(s)", messages.size());
- for (MessageView message : messages) {
- final MessageId messageId = message.getMessageId();
- try {
- consumer.ack(message);
- log.info("Message is acknowledged successfully, messageId={}", messageId);
- } catch (Throwable t) {
- log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
- }
- }
- } while (true);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。