当前位置:   article > 正文

RocketMQ 位移提交源码分析

rocketmq 位移

向大家提个问题:

RocketMQ 消息消费进度是如何提交的,并发消费的时候,一次从 一个队列拉 32 条消息,这 32 条消息会提交到线程池中处理,如果偏移量  m5 比 m4 先执行完成,消息消费后,提交的消费进度是哪个?是提交消息 m5 的偏移量?

下面跟着我的节奏,撸一波源码。

RocketMQ 每次拉取完消息都会将消息存储到 PullRequest 对象中的 ProcessQueue 中:

org.apache.rocketmq.client.consumer.PullCallback#onSuccess

boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());

接着将消息放进消费线程中去执行:

org.apache.rocketmq.client.consumer.PullCallback#onSuccess

  1. DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
  2. pullResult.getMsgFoundList(), //
  3. processQueue, //
  4. pullRequest.getMessageQueue(), //
  5. dispathToConsume);

ConsumeMessageService 类实现消息消费的逻辑,它有两个实现类:

  1. // 并发消息消费逻辑实现类
  2. org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
  3. // 顺序消息消费逻辑实现类
  4. org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;

这里我们只分析并发消费:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest

  1. ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
  2. try {
  3. this.consumeExecutor.submit(consumeRequest);
  4. } catch (RejectedExecutionException e) {
  5. // ...
  6. }

将消息消费任务封装成 ConsumeRequest 对象,然后将其交给消费线程池中去执行。

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run:

  1. if (!processQueue.isDropped()) {
  2. ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
  3. } else {
  4. log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
  5. }

ConsumeRequest 是一个实现了 Runnable 的类,因此消息消费的核心逻辑都写在了 run 方法中,如上代码是提交已消费位移的逻辑,当 ProcessQueue 没有被丢弃,则进行已消费位移的提交。

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult

  1. // 移除已消费的消息,并返回已消费的
  2. long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
  3. if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
  4. this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
  5. }

移除已消费的位移,并返回最小位移量,如果最小位移量大于 0,并且 ProcessQueue 没有被丢弃,则更新本地缓存,

org.apache.rocketmq.client.impl.consumer.ProcessQueue#removeMessage

  1. public long removeMessage(final List<MessageExt> msgs) {
  2. long result = -1;
  3. final long now = System.currentTimeMillis();
  4. try {
  5. this.lockTreeMap.writeLock().lockInterruptibly();
  6. this.lastConsumeTimestamp = now;
  7. try {
  8. if (!msgTreeMap.isEmpty()) {
  9. result = this.queueOffsetMax + 1;
  10. int removedCnt = 0;
  11. // 移除已消费的消息
  12. for (MessageExt msg : msgs) {
  13. MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
  14. if (prev != null) {
  15. removedCnt--;
  16. }
  17. }
  18. // 消息总量累加
  19. msgCount.addAndGet(removedCnt);
  20. // 返回消息容器中最小元素 key
  21. if (!msgTreeMap.isEmpty()) {
  22. result = msgTreeMap.firstKey();
  23. }
  24. }
  25. } finally {
  26. this.lockTreeMap.writeLock().unlock();
  27. }
  28. } catch (Throwable t) {
  29. log.error("removeMessage exception", t);
  30. }
  31. return result;
  32. }

以上方法就是解答文章开头问题的关键,由于该方法是各个消费线程并发执行,因此需要对其进行加锁操作,msgTreeMap 是 ProcessQueue 的消息容器,它的格式如下:

private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();

它是一个 TreeMap 结构,key 为消息位移,value 为消息数据,消息容器中,消息可以按照位移进行排序,那也就意味着,当消息消费完,只需要在消息容器中移除即可,然后返回消息容器中最小元素(最小位移),如下:

640?wx_fmt=png

由于消息是按照位移进行排序,因此我们只需移除已消费的消息,并且确保不会将未消费的位移提交,就可避免了位移大的消息先消费导致消息丢失的问题了。

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateOffset:

  1. public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
  2. if (mq != null) {
  3. AtomicLong offsetOld = this.offsetTable.get(mq);
  4. if (null == offsetOld) {
  5. offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
  6. }
  7. if (null != offsetOld) {
  8. if (increaseOnly) {
  9. MixAll.compareAndIncreaseOnly(offsetOld, offset);
  10. } else {
  11. offsetOld.set(offset);
  12. }
  13. }
  14. }
  15. }

offsetTable 为本地位移缓存容器,它的结构如下:

private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<>();

它是一个 ConcurrentMap,一个线程安全容器,key 为 MessageQueue,value 为当前 MessageQueue 的消费位移,从源码看出,当前消费位移的更新,只能是递增更新。

在更新完本地缓存之后,RocketMQ 是如何将其提交到 broker 的呢?

org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask:

  1. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  2. @Override
  3. public void run() {
  4. try {
  5. MQClientInstance.this.persistAllConsumerOffset();
  6. } catch (Exception e) {
  7. log.error("ScheduledTask persistAllConsumerOffset exception", e);
  8. }
  9. }
  10. }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

以上,消费者在启动的时候,开启了一个定时任务,定时将本地缓存提交到broker。

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persistAll:

  1. // 参数mqs是当前分配的队列
  2. public void persistAll(Set<MessageQueue> mqs) {
  3. if (null == mqs || mqs.isEmpty())
  4. return;
  5. final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
  6. if (!mqs.isEmpty()) {
  7. // 遍历位移缓存容器
  8. for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
  9. MessageQueue mq = entry.getKey();
  10. AtomicLong offset = entry.getValue();
  11. if (offset != null) {
  12. // 位移缓存容器包含在当前分配队列,则进行消费位移提交
  13. if (mqs.contains(mq)) {
  14. try {
  15. // 提交消费位移
  16. this.updateConsumeOffsetToBroker(mq, offset.get());
  17. } catch (Exception e) {
  18. log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
  19. }
  20. } else {
  21. unusedMQ.add(mq);
  22. }
  23. }
  24. }
  25. }
  26. // 将未分配的队列从位移缓存中移除
  27. if (!unusedMQ.isEmpty()) {
  28. for (MessageQueue mq : unusedMQ) {
  29. this.offsetTable.remove(mq);
  30. log.info("remove unused mq, {}, {}", mq, this.groupName);
  31. }
  32. }
  33. }

最终会调用以上方法,RocketMQ 会从重平衡那里获取当前消费者已分配的队列,如果位移缓存容器包含在当前分配队列,则进行消费位移提交,否则将从位移缓存容器中移除。

broker 端处理:

org.apache.rocketmq.broker.offset.ConsumerOffsetManager#commitOffset

  1. private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
  2. ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
  3. if (null == map) {
  4. map = new ConcurrentHashMap<Integer, Long>(32);
  5. map.put(queueId, offset);
  6. this.offsetTable.put(key, map);
  7. } else {
  8. Long storeOffset = map.put(queueId, offset);
  9. if (storeOffset != null && offset < storeOffset) {
  10. log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
  11. }
  12. }
  13. }

以上,offsetTable 为 broker 端的消费位移缓存容器,它的结构如下:

  1. private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
  2. new ConcurrentHashMap<>(512);

它同样是一个 ConcurrentMap,一个线程安全容器,key 为的形式为 “topic@group”,value 也是一个 ConcurrentMap 它的 key 为 queueId,value 为位移,它会以 json 的形式持久化到磁盘 ${ROCKETMQ_HOME}/store/config/consumerOffset.json 文件中,具体格式如下:

  1. {
  2. "offsetTable": {
  3. "test-topic@test-group": {
  4. "0": 88526,
  5. "1": 88528,
  6. "2": 88532,
  7. "3": 88537
  8. }
  9. }
  10. }

 

近期热文

 

![公众号「后端进阶」,专注后端技术分享!](https://gitee.com/objcoding/md-picture/raw/master/img/official_accounts.jpg)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/936697
推荐阅读
相关标签
  

闽ICP备14008679号