当前位置:   article > 正文

Rocketmq面试(三)消息积压,增加消费者有用么?_rocketmq 增加消费者

rocketmq 增加消费者

目录

一.广播模式和集群模式的不同

二.延迟拉取

三.消费者延迟拉取消息的原因

四.增加消费者后是如何分配MessageQueue(引出负载策略)


一.广播模式和集群模式的不同

首先我们要强调一下。在广播模式(每条消息需要被消费者组中的每个消费者处理,也就是说消费者组内的每隔消费者都会收到订阅Topic的全量消息因此即使扩充消费者的数量也无法提升消费者能力)

 在集群模式下(也就是消息被消费者组中的任何一个消费者消费了就可以),如果消费者数量小于MessageQueue,那么增加消费者是用用的。比如一个 Topic 有 4 个 MessageQueue,2 个消费者进行消费,如果增加一个消费者,明细可以加快拉取消息的频率。

 如果消费者的数量大于等于 MessageQueue 的数量,增加消费者是没有用的。比如一个 Topic 有 4 个 MessageQueue,并且有 4 个消费者进行消费。如下图:

 除此之外,也会有一些特殊情况。

如果消费者本地消费的慢会延迟一段时间取拉取?

那什么情况下会出现延迟一段时间进行拉取呢?

二.延迟拉取

1.ProcessQueue保存的消息数量超过阈值(1000)可配置

2.ProcessQueue保存的消息大小超过阈值(100M)可配置

3.对于非顺序消费的场景,ProcessQueue中保存的最后一条和第一条消息偏移量之差超过了阈值(2000)可以配置 参考(org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl这个类)

4.对于顺序消费,ProcessQueue加锁失败也会导致延迟拉取,默认时间是3s.

三.消费者延迟拉取消息的原因

消费者消费慢,可是能下面的原因:

  • 消费者处理的业务逻辑复杂,耗时很长;

  • 消费者有慢查询,或者数据库负载高导致响应慢;

  • 缓存等中间件响应慢,比如 Redis 响应慢;

  • 调用外部服务接口响应慢。

对于外部服务接口响应慢,我们可以看实际情况,如果调用外部系通只是一个通知,可以采用异步的方式,异步逻辑里采用重试的方式保证接口调成功。

如果返回的结果必须处理,是不是可以考虑接口返回的结果可以缓存默认值,在调用失败后采用快速降级的方式,使用默认值代替返回接口返回值,果这个接口返回结果必须要处理,并且不能缓存,可以把拉取到的消息存入本地然后给 Broker 直接返回 CONSUME_SUCCESS。等外部系统恢复正常后再从本地取出来进行处理。

四.增加消费者后是如何分配MessageQueue(引出负载策略)

Consumer在拉取消息之前,需要对MessageQueue进行负载操作,RocketMQ使用一个定时器来完成负载操作,默认每间隔20s重新负载一次

平均负载策略

  1. 把消费者进行排序;

  2. 计算每个消费者可以平均分配的 MessageQueue 数量;

  3. 如果消费者数量大于 MessageQueue 数量,多出的消费者就分不到;

  4. 如果不可以平分,就使用 MessageQueue 总数量对消费者数量求余数 mod;

  5. 对前 mod 数量消费者,每个消费者加一个,这样就获取到了每个消费者分配的 MessageQueue 数量。

循环分配策略:

这个很容易理解,遍历消费者,把 MessageQueue 分一个给遍历到的消费者,如果 MessageQueue 数量比消费者多,需要进行多次遍历,遍历次数等于 (MessageQueue 数量/消费者数量),还是以 4 个 MessageQueue 和 3 个消费者的情况,如下图:

自定义分配策略

这种策略在消费者启动的时候可以指定消费哪些 MessageQueue。可以参考下面代码:

  1. AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig();
  2. //绑定消费 messageQueue1
  3. allocateMessageQueueByConfig.setMessageQueueList(Arrays.asList(new MessageQueue("messageQueue1","broker1",0)));
  4. consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);
  5. consumer.start();

按照机房分配策略

这种方式 Consumer 只消费指定机房的 MessageQueue,如下图:Consumer0、Consumer1、Consumer2 绑定 room1 和 room2 这两个机房,而 room3 这个机房没有消费者。

  1. AllocateMessageQueueByMachineRoom allocateMessageQueueByMachineRoom = new AllocateMessageQueueByMachineRoom();
  2. //绑定消费 room1 和 room2 这两个机房
  3. allocateMessageQueueByMachineRoom.setConsumeridcs(new HashSet<>(Arrays.asList("room1","room2")));
  4. consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByMachineRoom);
  5. consumer.start();

这种策略 broker 的命名必须按照格式:机房名@brokerName,因为消费者分配队列的时候,首先按照机房名称过滤出所有的 MessageQueue,然后再按照平均分配策略进行分配

按照机房就近分配

跟按照机房分配原则相比,就近分配的好处是可以对没有消费者的机房进行分配。如下图,机房 3 的 MessageQueue 也分配到了消费者:

一致性 Hash 算法策略

把所有的消费者经过 Hash 计算分布到 Hash 环上,对所有的 MessageQueue  进行 Hash  计算,找到顺时针方向最近的消费者节点进行绑定。如下图:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/712579
推荐阅读
相关标签
  

闽ICP备14008679号