当前位置:   article > 正文

消息中间件 RocketMQ源码解析:Message发送&接收

rocket怎么把消息解析成发送的数据格式

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-send-and-receive/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 RocketMQ 4.0.x 正式版



1、概述

  1. Producer 发送消息。主要是同步发送消息源码,涉及到 异步/Oneway发送消息,事务消息会跳过。
  2. Broker 接收消息。(存储消息在《RocketMQ 源码分析 —— Message 存储》解析)

Producer发送消息全局顺序图

2、Producer 发送消息

Producer发送消息顺序图

DefaultMQProducer#send(Message)

  1. 1: @Override
  2. 2: public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  3. 3: return this.defaultMQProducerImpl.send(msg);
  4. 4: }
  • 说明:发送同步消息,DefaultMQProducer#send(Message)DefaultMQProducerImpl#send(Message) 进行封装。

DefaultMQProducerImpl#sendDefaultImpl()

  1. 1: public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  2. 2: return send(msg, this.defaultMQProducer.getSendMsgTimeout());
  3. 3: }
  4. 4:
  5. 5: public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  6. 6: return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
  7. 7: }
  8. 8:
  9. 9: private SendResult sendDefaultImpl(//
  10. 10: Message msg, //
  11. 11: final CommunicationMode communicationMode, //
  12. 12: final SendCallback sendCallback, //
  13. 13: final long timeout//
  14. 14: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  15. 15: // 校验 Producer 处于运行状态
  16. 16: this.makeSureStateOK();
  17. 17: // 校验消息格式
  18. 18: Validators.checkMessage(msg, this.defaultMQProducer);
  19. 19: //
  20. 20: final long invokeID = random.nextLong(); // 调用编号;用于下面打印日志,标记为同一次发送消息
  21. 21: long beginTimestampFirst = System.currentTimeMillis();
  22. 22: long beginTimestampPrev = beginTimestampFirst;
  23. 23: long endTimestamp = beginTimestampFirst;
  24. 24: // 获取 Topic路由信息
  25. 25: TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  26. 26: if (topicPublishInfo != null && topicPublishInfo.ok()) {
  27. 27: MessageQueue mq = null; // 最后选择消息要发送到的队列
  28. 28: Exception exception = null;
  29. 29: SendResult sendResult = null; // 最后一次发送结果
  30. 30: int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次调用
  31. 31: int times = 0; // 第几次发送
  32. 32: String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
  33. 33: // 循环调用发送消息,直到成功
  34. 34: for (; times < timesTotal; times++) {
  35. 35: String lastBrokerName = null == mq ? null : mq.getBrokerName();
  36. 36: MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列
  37. 37: if (tmpmq != null) {
  38. 38: mq = tmpmq;
  39. 39: brokersSent[times] = mq.getBrokerName();
  40. 40: try {
  41. 41: beginTimestampPrev = System.currentTimeMillis();
  42. 42: // 调用发送消息核心方法
  43. 43: sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
  44. 44: endTimestamp = System.currentTimeMillis();
  45. 45: // 更新Broker可用性信息
  46. 46: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
  47. 47: switch (communicationMode) {
  48. 48: case ASYNC:
  49. 49: return null;
  50. 50: case ONEWAY:
  51. 51: return null;
  52. 52: case SYNC:
  53. 53: if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
  54. 54: if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试
  55. 55: continue;
  56. 56: }
  57. 57: }
  58. 58: return sendResult;
  59. 59: default:
  60. 60: break;
  61. 61: }
  62. 62: } catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环
  63. 63: endTimestamp = System.currentTimeMillis();
  64. 64: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  65. 65: log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
  66. 66: log.warn(msg.toString());
  67. 67: exception = e;
  68. 68: continue;
  69. 69: } catch (MQClientException e) { // 打印异常,更新Broker可用性信息,继续循环
  70. 70: endTimestamp = System.currentTimeMillis();
  71. 71: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  72. 72: log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
  73. 73: log.warn(msg.toString());
  74. 74: exception = e;
  75. 75: continue;
  76. 76: } catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
  77. 77: endTimestamp = System.currentTimeMillis();
  78. 78: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  79. 79: log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
  80. 80: log.warn(msg.toString());
  81. 81: exception = e;
  82. 82: switch (e.getResponseCode()) {
  83. 83: // 如下异常continue,进行发送消息重试
  84. 84: case ResponseCode.TOPIC_NOT_EXIST:
  85. 85: case ResponseCode.SERVICE_NOT_AVAILABLE:
  86. 86: case ResponseCode.SYSTEM_ERROR:
  87. 87: case ResponseCode.NO_PERMISSION:
  88. 88: case ResponseCode.NO_BUYER_ID:
  89. 89: case ResponseCode.NOT_IN_CURRENT_UNIT:
  90. 90: continue;
  91. 91: // 如果有发送结果,进行返回,否则,抛出异常;
  92. 92: default:
  93. 93: if (sendResult != null) {
  94. 94: return sendResult;
  95. 95: }
  96. 96: throw e;
  97. 97: }
  98. 98: } catch (InterruptedException e) {
  99. 99: endTimestamp = System.currentTimeMillis();
  100. 100: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
  101. 101: log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
  102. 102: log.warn(msg.toString());
  103. 103: throw e;
  104. 104: }
  105. 105: } else {
  106. 106: break;
  107. 107: }
  108. 108: }
  109. 109: // 返回发送结果
  110. 110: if (sendResult != null) {
  111. 111: return sendResult;
  112. 112: }
  113. 113: // 根据不同情况,抛出不同的异常
  114. 114: String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst,
  115. 115: msg.getTopic(), Arrays.toString(brokersSent)) + FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
  116. 116: MQClientException mqClientException = new MQClientException(info, exception);
  117. 117: if (exception instanceof MQBrokerException) {
  118. 118: mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
  119. 119: } else if (exception instanceof RemotingConnectException) {
  120. 120: mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
  121. 121: } else if (exception instanceof RemotingTimeoutException) {
  122. 122: mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
  123. 123: } else if (exception instanceof MQClientException) {
  124. 124: mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
  125. 125: }
  126. 126: throw mqClientException;
  127. 127: }
  128. 128: // Namesrv找不到异常
  129. 129: List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
  130. 130: if (null == nsList || nsList.isEmpty()) {
  131. 131: throw new MQClientException(
  132. 132: "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
  133. 133: }
  134. 134: // 消息路由找不到异常
  135. 135: throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
  136. 136: null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
  137. 137: }
  • 说明 :发送消息。步骤:获取消息路由信息,选择要发送到的消息队列,执行消息发送核心方法,并对发送结果进行封装返回。
  • 第 1 至 7 行:对sendsendDefaultImpl(...)进行封装。
  • 第 20 行 :invokeID仅仅用于打印日志,无实际的业务用途。
  • 第 25 行 :获取 Topic路由信息, 详细解析见:DefaultMQProducerImpl#tryToFindTopicPublishInfo()
  • 第 30 & 34 行 :计算调用发送消息到成功为止的最大次数,并进行循环。同步或异步发送消息会调用多次,默认配置为3次。
  • 第 36 行 :选择消息要发送到的队列,详细解析见:MQFaultStrategy
  • 第 43 行 :调用发送消息核心方法,详细解析见:DefaultMQProducerImpl#sendKernelImpl()
  • 第 46 行 :更新Broker可用性信息。在选择发送到的消息队列时,会参考Broker发送消息的延迟,详细解析见:MQFaultStrategy
  • 第 62 至 68 行:当抛出RemotingException时,如果进行消息发送失败重试,则可能导致消息发送重复。例如,发送消息超时(RemotingTimeoutException),实际Broker接收到该消息并处理成功。因此,Consumer在消费时,需要保证幂等性。

DefaultMQProducerImpl#tryToFindTopicPublishInfo()

  1. 1: private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  2. 2: // 缓存中获取 Topic发布信息
  3. 3: TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
  4. 4: // 当无可用的 Topic发布信息时,从Namesrv获取一次
  5. 5: if (null == topicPublishInfo || !topicPublishInfo.ok()) {
  6. 6: this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
  7. 7: this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
  8. 8: topicPublishInfo = this.topicPublishInfoTable.get(topic);
  9. 9: }
  10. 10: // 若获取的 Topic发布信息时候可用,则返回
  11. 11: if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
  12. 12: return topicPublishInfo;
  13. 13: } else { // 使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。用于 Topic发布信息不存在 && Broker支持自动创建Topic
  14. 14: this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
  15. 15: topicPublishInfo = this.topicPublishInfoTable.get(topic);
  16. 16: return topicPublishInfo;
  17. 17: }
  18. 18: }
  • 说明 :获得 Topic发布信息。优先从缓存topicPublishInfoTable,其次从Namesrv中获得。
  • 第 3 行 :从缓存topicPublishInfoTable中获得 Topic发布信息。
  • 第 5 至 9 行 :从 Namesrv 中获得 Topic发布信息。
  • 第 13 至 17 行 :当从 Namesrv 无法获取时,使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。目的是当 Broker 开启自动创建 Topic开关时,Broker 接收到消息后自动创建Topic,详细解析见《RocketMQ 源码分析 —— Topic》
MQFaultStrategy

Latency类图

MQFaultStrategy
  1. 1: public class MQFaultStrategy {
  2. 2: private final static Logger log = ClientLogger.getLog();
  3. 3:
  4. 4: /**
  5. 5: * 延迟故障容错,维护每个Broker的发送消息的延迟
  6. 6: * key:brokerName
  7. 7: */
  8. 8: private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
  9. 9: /**
  10. 10: * 发送消息延迟容错开关
  11. 11: */
  12. 12: private boolean sendLatencyFaultEnable = false;
  13. 13: /**
  14. 14: * 延迟级别数组
  15. 15: */
  16. 16: private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
  17. 17: /**
  18. 18: * 不可用时长数组
  19. 19: */
  20. 20: private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
  21. 21:
  22. 22: /**
  23. 23: * 根据 Topic发布信息 选择一个消息队列
  24. 24: *
  25. 25: * @param tpInfo Topic发布信息
  26. 26: * @param lastBrokerName brokerName
  27. 27: * @return 消息队列
  28. 28: */
  29. 29: public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
  30. 30: if (this.sendLatencyFaultEnable) {
  31. 31: try {
  32. 32: // 获取 brokerName=lastBrokerName && 可用的一个消息队列
  33. 33: int index = tpInfo.getSendWhichQueue().getAndIncrement();
  34. 34: for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
  35. 35: int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
  36. 36: if (pos < 0)
  37. 37: pos = 0;
  38. 38: MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
  39. 39: if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
  40. 40: if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
  41. 41: return mq;
  42. 42: }
  43. 43: }
  44. 44: // 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性
  45. 45: final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
  46. 46: int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
  47. 47: if (writeQueueNums > 0) {
  48. 48: final MessageQueue mq = tpInfo.selectOneMessageQueue();
  49. 49: if (notBestBroker != null) {
  50. 50: mq.setBrokerName(notBestBroker);
  51. 51: mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
  52. 52: }
  53. 53: return mq;
  54. 54: } else {
  55. 55: latencyFaultTolerance.remove(notBestBroker);
  56. 56: }
  57. 57: } catch (Exception e) {
  58. 58: log.error("Error occurred when selecting message queue", e);
  59. 59: }
  60. 60: // 选择一个消息队列,不考虑队列的可用性
  61. 61: return tpInfo.selectOneMessageQueue();
  62. 62: }
  63. 63: // 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
  64. 64: return tpInfo.selectOneMessageQueue(lastBrokerName);
  65. 65: }
  66. 66:
  67. 67: /**
  68. 68: * 更新延迟容错信息
  69. 69: *
  70. 70: * @param brokerName brokerName
  71. 71: * @param currentLatency 延迟
  72. 72: * @param isolation 是否隔离。当开启隔离时,默认延迟为30000。目前主要用于发送消息异常时
  73. 73: */
  74. 74: public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
  75. 75: if (this.sendLatencyFaultEnable) {
  76. 76: long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
  77. 77: this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
  78. 78: }
  79. 79: }
  80. 80:
  81. 81: /**
  82. 82: * 计算延迟对应的不可用时间
  83. 83: *
  84. 84: * @param currentLatency 延迟
  85. 85: * @return 不可用时间
  86. 86: */
  87. 87: private long computeNotAvailableDuration(final long currentLatency) {
  88. 88: for (int i = latencyMax.length - 1; i >= 0; i--) {
  89. 89: if (currentLatency >= latencyMax[i])
  90. 90: return this.notAvailableDuration[i];
  91. 91: }
  92. 92: return 0;
  93. 93: }
  • 说明 :Producer消息发送容错策略。默认情况下容错策略关闭,即sendLatencyFaultEnable=false

  • 第 30 至 62 行 :容错策略选择消息队列逻辑。优先获取可用队列,其次选择一个broker获取队列,最差返回任意broker的一个队列。

  • 第 64 行 :未开启容错策略选择消息队列逻辑。

  • 第 74 至 79 行 :更新延迟容错信息。当 Producer 发送消息时间过长,则逻辑认为N秒内不可用。按照latencyMaxnotAvailableDuration的配置,对应如下:

    | Producer发送消息消耗时长 | Broker不可用时长 | | --- | --- | | >= 15000 ms | 600 * 1000 ms | | >= 3000 ms | 180 * 1000 ms | | >= 2000 ms | 120 * 1000 ms | | >= 1000 ms | 60 * 1000 ms | | >= 550 ms | 30 * 1000 ms | | >= 100 ms | 0 ms | | >= 50 ms | 0 ms |

LatencyFaultTolerance
  1. 1: public interface LatencyFaultTolerance<T> {
  2. 2:
  3. 3: /**
  4. 4: * 更新对应的延迟和不可用时长
  5. 5: *
  6. 6: * @param name 对象
  7. 7: * @param currentLatency 延迟
  8. 8: * @param notAvailableDuration 不可用时长
  9. 9: */
  10. 10: void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
  11. 11:
  12. 12: /**
  13. 13: * 对象是否可用
  14. 14: *
  15. 15: * @param name 对象
  16. 16: * @return 是否可用
  17. 17: */
  18. 18: boolean isAvailable(final T name);
  19. 19:
  20. 20: /**
  21. 21: * 移除对象
  22. 22: *
  23. 23: * @param name 对象
  24. 24: */
  25. 25: void remove(final T name);
  26. 26:
  27. 27: /**
  28. 28: * 获取一个对象
  29. 29: *
  30. 30: * @return 对象
  31. 31: */
  32. 32: T pickOneAtLeast();
  33. 33: }
  • 说明 :延迟故障容错接口
LatencyFaultToleranceImpl
  1. 1: public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
  2. 2:
  3. 3: /**
  4. 4: * 对象故障信息Table
  5. 5: */
  6. 6: private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<>(16);
  7. 7: /**
  8. 8: * 对象选择Index
  9. 9: * @see #pickOneAtLeast()
  10. 10: */
  11. 11: private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
  12. 12:
  13. 13: @Override
  14. 14: public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
  15. 15: FaultItem old = this.faultItemTable.get(name);
  16. 16: if (null == old) {
  17. 17: // 创建对象
  18. 18: final FaultItem faultItem = new FaultItem(name);
  19. 19: faultItem.setCurrentLatency(currentLatency);
  20. 20: faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
  21. 21: // 更新对象
  22. 22: old = this.faultItemTable.putIfAbsent(name, faultItem);
  23. 23: if (old != null) {
  24. 24: old.setCurrentLatency(currentLatency);
  25. 25: old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
  26. 26: }
  27. 27: } else { // 更新对象
  28. 28: old.setCurrentLatency(currentLatency);
  29. 29: old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
  30. 30: }
  31. 31: }
  32. 32:
  33. 33: @Override
  34. 34: public boolean isAvailable(final String name) {
  35. 35: final FaultItem faultItem = this.faultItemTable.get(name);
  36. 36: if (faultItem != null) {
  37. 37: return faultItem.isAvailable();
  38. 38: }
  39. 39: return true;
  40. 40: }
  41. 41:
  42. 42: @Override
  43. 43: public void remove(final String name) {
  44. 44: this.faultItemTable.remove(name);
  45. 45: }
  46. 46:
  47. 47: /**
  48. 48: * 选择一个相对优秀的对象
  49. 49: *
  50. 50: * @return 对象
  51. 51: */
  52. 52: @Override
  53. 53: public String pickOneAtLeast() {
  54. 54: // 创建数组
  55. 55: final Enumeration<FaultItem> elements = this.faultItemTable.elements();
  56. 56: List<FaultItem> tmpList = new LinkedList<>();
  57. 57: while (elements.hasMoreElements()) {
  58. 58: final FaultItem faultItem = elements.nextElement();
  59. 59: tmpList.add(faultItem);
  60. 60: }
  61. 61: //
  62. 62: if (!tmpList.isEmpty()) {
  63. 63: // 打乱 + 排序。TODO 疑问:应该只能二选一。猜测Collections.shuffle(tmpList)去掉。
  64. 64: Collections.shuffle(tmpList);
  65. 65: Collections.sort(tmpList);
  66. 66: // 选择顺序在前一半的对象
  67. 67: final int half = tmpList.size() / 2;
  68. 68: if (half <= 0) {
  69. 69: return tmpList.get(0).getName();
  70. 70: } else {
  71. 71: final int i = this.whichItemWorst.getAndIncrement() % half;
  72. 72: return tmpList.get(i).getName();
  73. 73: }
  74. 74: }
  75. 75: return null;
  76. 76: }
  77. 77: }
  • 说明 :延迟故障容错实现。维护每个对象的信息。
FaultItem
  1. 1: class FaultItem implements Comparable<FaultItem> {
  2. 2: /**
  3. 3: * 对象名
  4. 4: */
  5. 5: private final String name;
  6. 6: /**
  7. 7: * 延迟
  8. 8: */
  9. 9: private volatile long currentLatency;
  10. 10: /**
  11. 11: * 开始可用时间
  12. 12: */
  13. 13: private volatile long startTimestamp;
  14. 14:
  15. 15: public FaultItem(final String name) {
  16. 16: this.name = name;
  17. 17: }
  18. 18:
  19. 19: /**
  20. 20: * 比较对象
  21. 21: * 可用性 > 延迟 > 开始可用时间
  22. 22: *
  23. 23: * @param other other
  24. 24: * @return 升序
  25. 25: */
  26. 26: @Override
  27. 27: public int compareTo(final FaultItem other) {
  28. 28: if (this.isAvailable() != other.isAvailable()) {
  29. 29: if (this.isAvailable())
  30. 30: return -1;
  31. 31:
  32. 32: if (other.isAvailable())
  33. 33: return 1;
  34. 34: }
  35. 35:
  36. 36: if (this.currentLatency < other.currentLatency)
  37. 37: return -1;
  38. 38: else if (this.currentLatency > other.currentLatency) {
  39. 39: return 1;
  40. 40: }
  41. 41:
  42. 42: if (this.startTimestamp < other.startTimestamp)
  43. 43: return -1;
  44. 44: else if (this.startTimestamp > other.startTimestamp) {
  45. 45: return 1;
  46. 46: }
  47. 47:
  48. 48: return 0;
  49. 49: }
  50. 50:
  51. 51: /**
  52. 52: * 是否可用:当开始可用时间大于当前时间
  53. 53: *
  54. 54: * @return 是否可用
  55. 55: */
  56. 56: public boolean isAvailable() {
  57. 57: return (System.currentTimeMillis() - startTimestamp) >= 0;
  58. 58: }
  59. 59:
  60. 60: @Override
  61. 61: public int hashCode() {
  62. 62: int result = getName() != null ? getName().hashCode() : 0;
  63. 63: result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
  64. 64: result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
  65. 65: return result;
  66. 66: }
  67. 67:
  68. 68: @Override
  69. 69: public boolean equals(final Object o) {
  70. 70: if (this == o)
  71. 71: return true;
  72. 72: if (!(o instanceof FaultItem))
  73. 73: return false;
  74. 74:
  75. 75: final FaultItem faultItem = (FaultItem) o;
  76. 76:
  77. 77: if (getCurrentLatency() != faultItem.getCurrentLatency())
  78. 78: return false;
  79. 79: if (getStartTimestamp() != faultItem.getStartTimestamp())
  80. 80: return false;
  81. 81: return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;
  82. 82:
  83. 83: }
  84. 84: }
  • 说明 :对象故障信息。维护对象的名字、延迟、开始可用的时间。

DefaultMQProducerImpl#sendKernelImpl()

  1. 1: private SendResult sendKernelImpl(final Message msg, //
  2. 2: final MessageQueue mq, //
  3. 3: final CommunicationMode communicationMode, //
  4. 4: final SendCallback sendCallback, //
  5. 5: final TopicPublishInfo topicPublishInfo, //
  6. 6: final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  7. 7: // 获取 broker地址
  8. 8: String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  9. 9: if (null == brokerAddr) {
  10. 10: tryToFindTopicPublishInfo(mq.getTopic());
  11. 11: brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  12. 12: }
  13. 13: //
  14. 14: SendMessageContext context = null;
  15. 15: if (brokerAddr != null) {
  16. 16: // 是否使用broker vip通道。broker会开启两个端口对外服务。
  17. 17: brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
  18. 18: byte[] prevBody = msg.getBody(); // 记录消息内容。下面逻辑可能改变消息内容,例如消息压缩。
  19. 19: try {
  20. 20: // 设置唯一编号
  21. 21: MessageClientIDSetter.setUniqID(msg);
  22. 22: // 消息压缩
  23. 23: int sysFlag = 0;
  24. 24: if (this.tryToCompressMessage(msg)) {
  25. 25: sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
  26. 26: }
  27. 27: // 事务
  28. 28: final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
  29. 29: if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
  30. 30: sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
  31. 31: }
  32. 32: // hook:发送消息校验
  33. 33: if (hasCheckForbiddenHook()) {
  34. 34: CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
  35. 35: checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
  36. 36: checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
  37. 37: checkForbiddenContext.setCommunicationMode(communicationMode);
  38. 38: checkForbiddenContext.setBrokerAddr(brokerAddr);
  39. 39: checkForbiddenContext.setMessage(msg);
  40. 40: checkForbiddenContext.setMq(mq);
  41. 41: checkForbiddenContext.setUnitMode(this.isUnitMode());
  42. 42: this.executeCheckForbiddenHook(checkForbiddenContext);
  43. 43: }
  44. 44: // hook:发送消息前逻辑
  45. 45: if (this.hasSendMessageHook()) {
  46. 46: context = new SendMessageContext();
  47. 47: context.setProducer(this);
  48. 48: context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  49. 49: context.setCommunicationMode(communicationMode);
  50. 50: context.setBornHost(this.defaultMQProducer.getClientIP());
  51. 51: context.setBrokerAddr(brokerAddr);
  52. 52: context.setMessage(msg);
  53. 53: context.setMq(mq);
  54. 54: String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
  55. 55: if (isTrans != null && isTrans.equals("true")) {
  56. 56: context.setMsgType(MessageType.Trans_Msg_Half);
  57. 57: }
  58. 58: if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
  59. 59: context.setMsgType(MessageType.Delay_Msg);
  60. 60: }
  61. 61: this.executeSendMessageHookBefore(context);
  62. 62: }
  63. 63: // 构建发送消息请求
  64. 64: SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
  65. 65: requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  66. 66: requestHeader.setTopic(msg.getTopic());
  67. 67: requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
  68. 68: requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
  69. 69: requestHeader.setQueueId(mq.getQueueId());
  70. 70: requestHeader.setSysFlag(sysFlag);
  71. 71: requestHeader.setBornTimestamp(System.currentTimeMillis());
  72. 72: requestHeader.setFlag(msg.getFlag());
  73. 73: requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
  74. 74: requestHeader.setReconsumeTimes(0);
  75. 75: requestHeader.setUnitMode(this.isUnitMode());
  76. 76: if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 消息重发Topic
  77. 77: String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
  78. 78: if (reconsumeTimes != null) {
  79. 79: requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
  80. 80: MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
  81. 81: }
  82. 82: String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
  83. 83: if (maxReconsumeTimes != null) {
  84. 84: requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
  85. 85: MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
  86. 86: }
  87. 87: }
  88. 88: // 发送消息
  89. 89: SendResult sendResult = null;
  90. 90: switch (communicationMode) {
  91. 91: case ASYNC:
  92. 92: sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
  93. 93: brokerAddr, // 1
  94. 94: mq.getBrokerName(), // 2
  95. 95: msg, // 3
  96. 96: requestHeader, // 4
  97. 97: timeout, // 5
  98. 98: communicationMode, // 6
  99. 99: sendCallback, // 7
  100. 100: topicPublishInfo, // 8
  101. 101: this.mQClientFactory, // 9
  102. 102: this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
  103. 103: context, //
  104. 104: this);
  105. 105: break;
  106. 106: case ONEWAY:
  107. 107: case SYNC:
  108. 108: sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
  109. 109: brokerAddr,
  110. 110: mq.getBrokerName(),
  111. 111: msg,
  112. 112: requestHeader,
  113. 113: timeout,
  114. 114: communicationMode,
  115. 115: context,
  116. 116: this);
  117. 117: break;
  118. 118: default:
  119. 119: assert false;
  120. 120: break;
  121. 121: }
  122. 122: // hook:发送消息后逻辑
  123. 123: if (this.hasSendMessageHook()) {
  124. 124: context.setSendResult(sendResult);
  125. 125: this.executeSendMessageHookAfter(context);
  126. 126: }
  127. 127: // 返回发送结果
  128. 128: return sendResult;
  129. 129: } catch (RemotingException e) {
  130. 130: if (this.hasSendMessageHook()) {
  131. 131: context.setException(e);
  132. 132: this.executeSendMessageHookAfter(context);
  133. 133: }
  134. 134: throw e;
  135. 135: } catch (MQBrokerException e) {
  136. 136: if (this.hasSendMessageHook()) {
  137. 137: context.setException(e);
  138. 138: this.executeSendMessageHookAfter(context);
  139. 139: }
  140. 140: throw e;
  141. 141: } catch (InterruptedException e) {
  142. 142: if (this.hasSendMessageHook()) {
  143. 143: context.setException(e);
  144. 144: this.executeSendMessageHookAfter(context);
  145. 145: }
  146. 146: throw e;
  147. 147: } finally {
  148. 148: msg.setBody(prevBody);
  149. 149: }
  150. 150: }
  151. 151: // broker为空抛出异常
  152. 152: throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
  153. 153: }
  • 说明 :发送消息核心方法。该方法真正发起网络请求,发送消息给 Broker
  • 第 21 行 :生产消息编号,详细解析见《RocketMQ 源码分析 —— Message 基础》
  • 第 64 至 121 行 :构建发送消息请求SendMessageRequestHeader
  • 第 107 至 117 行 :执行 MQClientInstance#sendMessage(...) 发起网络请求。

3、Broker 接收消息

接收发送消息API顺序图

SendMessageProcessor#sendMessage

  1. 1: @Override
  2. 2: public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
  3. 3: SendMessageContext mqtraceContext;
  4. 4: switch (request.getCode()) {
  5. 5: case RequestCode.CONSUMER_SEND_MSG_BACK:
  6. 6: return this.consumerSendMsgBack(ctx, request);
  7. 7: default:
  8. 8: // 解析请求
  9. 9: SendMessageRequestHeader requestHeader = parseRequestHeader(request);
  10. 10: if (requestHeader == null) {
  11. 11: return null;
  12. 12: }
  13. 13: // 发送请求Context。在 hook 场景下使用
  14. 14: mqtraceContext = buildMsgContext(ctx, requestHeader);
  15. 15: // hook:处理发送消息前逻辑
  16. 16: this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
  17. 17: // 处理发送消息逻辑
  18. 18: final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
  19. 19: // hook:处理发送消息后逻辑
  20. 20: this.executeSendMessageHookAfter(response, mqtraceContext);
  21. 21: return response;
  22. 22: }
  23. 23: }
  24. 24:
  25. 25: private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
  26. 26: final RemotingCommand request, //
  27. 27: final SendMessageContext sendMessageContext, //
  28. 28: final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
  29. 29:
  30. 30: // 初始化响应
  31. 31: final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
  32. 32: final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
  33. 33: response.setOpaque(request.getOpaque());
  34. 34: response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
  35. 35: response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
  36. 36:
  37. 37: if (log.isDebugEnabled()) {
  38. 38: log.debug("receive SendMessage request command, {}", request);
  39. 39: }
  40. 40:
  41. 41: // 如果未开始接收消息,抛出系统异常
  42. 42: @SuppressWarnings("SpellCheckingInspection")
  43. 43: final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
  44. 44: if (this.brokerController.getMessageStore().now() < startTimstamp) {
  45. 45: response.setCode(ResponseCode.SYSTEM_ERROR);
  46. 46: response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
  47. 47: return response;
  48. 48: }
  49. 49:
  50. 50: // 消息配置(Topic配置)校验
  51. 51: response.setCode(-1);
  52. 52: super.msgCheck(ctx, requestHeader, response);
  53. 53: if (response.getCode() != -1) {
  54. 54: return response;
  55. 55: }
  56. 56:
  57. 57: final byte[] body = request.getBody();
  58. 58:
  59. 59: // 如果队列小于0,从可用队列随机选择
  60. 60: int queueIdInt = requestHeader.getQueueId();
  61. 61: TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
  62. 62: if (queueIdInt < 0) {
  63. 63: queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
  64. 64: }
  65. 65:
  66. 66: //
  67. 67: int sysFlag = requestHeader.getSysFlag();
  68. 68: if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
  69. 69: sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
  70. 70: }
  71. 71:
  72. 72: // 对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名,即加入 死信队列(Dead Letter Queue)
  73. 73: String newTopic = requestHeader.getTopic();
  74. 74: if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  75. 75: // 获取订阅分组配置
  76. 76: String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
  77. 77: SubscriptionGroupConfig subscriptionGroupConfig =
  78. 78: this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
  79. 79: if (null == subscriptionGroupConfig) {
  80. 80: response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
  81. 81: response.setRemark("subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
  82. 82: return response;
  83. 83: }
  84. 84: // 计算最大可消费次数
  85. 85: int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
  86. 86: if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
  87. 87: maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
  88. 88: }
  89. 89: int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
  90. 90: if (reconsumeTimes >= maxReconsumeTimes) { // 超过最大消费次数
  91. 91: newTopic = MixAll.getDLQTopic(groupName);
  92. 92: queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
  93. 93: topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
  94. 94: DLQ_NUMS_PER_GROUP, //
  95. 95: PermName.PERM_WRITE, 0
  96. 96: );
  97. 97: if (null == topicConfig) {
  98. 98: response.setCode(ResponseCode.SYSTEM_ERROR);
  99. 99: response.setRemark("topic[" + newTopic + "] not exist");
  100. 100: return response;
  101. 101: }
  102. 102: }
  103. 103: }
  104. 104:
  105. 105: // 创建MessageExtBrokerInner
  106. 106: MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
  107. 107: msgInner.setTopic(newTopic);
  108. 108: msgInner.setBody(body);
  109. 109: msgInner.setFlag(requestHeader.getFlag());
  110. 110: MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
  111. 111: msgInner.setPropertiesString(requestHeader.getProperties());
  112. 112: msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
  113. 113: msgInner.setQueueId(queueIdInt);
  114. 114: msgInner.setSysFlag(sysFlag);
  115. 115: msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
  116. 116: msgInner.setBornHost(ctx.channel().remoteAddress());
  117. 117: msgInner.setStoreHost(this.getStoreHost());
  118. 118: msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
  119. 119:
  120. 120: // 校验是否不允许发送事务消息
  121. 121: if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
  122. 122: String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
  123. 123: if (traFlag != null) {
  124. 124: response.setCode(ResponseCode.NO_PERMISSION);
  125. 125: response.setRemark(
  126. 126: "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
  127. 127: return response;
  128. 128: }
  129. 129: }
  130. 130:
  131. 131: // 添加消息
  132. 132: PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
  133. 133: if (putMessageResult != null) {
  134. 134: boolean sendOK = false;
  135. 135:
  136. 136: switch (putMessageResult.getPutMessageStatus()) {
  137. 137: // Success
  138. 138: case PUT_OK:
  139. 139: sendOK = true;
  140. 140: response.setCode(ResponseCode.SUCCESS);
  141. 141: break;
  142. 142: case FLUSH_DISK_TIMEOUT:
  143. 143: response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
  144. 144: sendOK = true;
  145. 145: break;
  146. 146: case FLUSH_SLAVE_TIMEOUT:
  147. 147: response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
  148. 148: sendOK = true;
  149. 149: break;
  150. 150: case SLAVE_NOT_AVAILABLE:
  151. 151: response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
  152. 152: sendOK = true;
  153. 153: break;
  154. 154:
  155. 155: // Failed
  156. 156: case CREATE_MAPEDFILE_FAILED:
  157. 157: response.setCode(ResponseCode.SYSTEM_ERROR);
  158. 158: response.setRemark("create mapped file failed, server is busy or broken.");
  159. 159: break;
  160. 160: case MESSAGE_ILLEGAL:
  161. 161: case PROPERTIES_SIZE_EXCEEDED:
  162. 162: response.setCode(ResponseCode.MESSAGE_ILLEGAL);
  163. 163: response.setRemark(
  164. 164: "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
  165. 165: break;
  166. 166: case SERVICE_NOT_AVAILABLE:
  167. 167: response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
  168. 168: response.setRemark(
  169. 169: "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
  170. 170: break;
  171. 171: case OS_PAGECACHE_BUSY:
  172. 172: response.setCode(ResponseCode.SYSTEM_ERROR);
  173. 173: response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
  174. 174: break;
  175. 175: case UNKNOWN_ERROR:
  176. 176: response.setCode(ResponseCode.SYSTEM_ERROR);
  177. 177: response.setRemark("UNKNOWN_ERROR");
  178. 178: break;
  179. 179: default:
  180. 180: response.setCode(ResponseCode.SYSTEM_ERROR);
  181. 181: response.setRemark("UNKNOWN_ERROR DEFAULT");
  182. 182: break;
  183. 183: }
  184. 184:
  185. 185: String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
  186. 186: if (sendOK) {
  187. 187: // 统计
  188. 188: this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
  189. 189: this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
  190. 190: this.brokerController.getBrokerStatsManager().incBrokerPutNums();
  191. 191:
  192. 192: // 响应
  193. 193: response.setRemark(null);
  194. 194: responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
  195. 195: responseHeader.setQueueId(queueIdInt);
  196. 196: responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
  197. 197: doResponse(ctx, request, response);
  198. 198:
  199. 199: // hook:设置发送成功到context
  200. 200: if (hasSendMessageHook()) {
  201. 201: sendMessageContext.setMsgId(responseHeader.getMsgId());
  202. 202: sendMessageContext.setQueueId(responseHeader.getQueueId());
  203. 203: sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
  204. 204:
  205. 205: int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
  206. 206: int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
  207. 207: int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
  208. 208:
  209. 209: sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
  210. 210: sendMessageContext.setCommercialSendTimes(incValue);
  211. 211: sendMessageContext.setCommercialSendSize(wroteSize);
  212. 212: sendMessageContext.setCommercialOwner(owner);
  213. 213: }
  214. 214: return null;
  215. 215: } else {
  216. 216: // hook:设置发送失败到context
  217. 217: if (hasSendMessageHook()) {
  218. 218: int wroteSize = request.getBody().length;
  219. 219: int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
  220. 220:
  221. 221: sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
  222. 222: sendMessageContext.setCommercialSendTimes(incValue);
  223. 223: sendMessageContext.setCommercialSendSize(wroteSize);
  224. 224: sendMessageContext.setCommercialOwner(owner);
  225. 225: }
  226. 226: }
  227. 227: } else {
  228. 228: response.setCode(ResponseCode.SYSTEM_ERROR);
  229. 229: response.setRemark("store putMessage return null");
  230. 230: }
  231. 231:
  232. 232: return response;
  233. 233: }
  • #processRequest() 说明 :处理消息请求。
  • #sendMessage() 说明 :发送消息,并返回发送消息结果。
  • 第 51 至 55 行 :消息配置(Topic配置)校验,详细解析见:AbstractSendMessageProcessor#msgCheck()
  • 第 60 至 64 行 :消息队列编号小于0时,Broker 可以设置随机选择一个消息队列。
  • 第 72 至 103 行 :对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名, 即加 死信队 (Dead Letter Queue),详细解析见:《RocketMQ 源码分析 —— Topic》
  • 第 105 至 118 行 :创建MessageExtBrokerInner
  • 第 132 :存储消息,详细解析见:DefaultMessageStore#putMessage()
  • 第 133 至 183 行 :处理消息发送结果,设置响应结果和提示。
  • 第 186 至 214 行 :发送成功,响应。这里doResponse(ctx, request, response)进行响应,最后return null,原因是:响应给 Producer 可能发生异常,#doResponse(ctx, request, response)捕捉了该异常并输出日志。这样做的话,我们进行排查 Broker 接收消息成功后响应是否存在异常会方便很多。

AbstractSendMessageProcessor#msgCheck

  1. 1: protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
  2. 2: final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
  3. 3: // 检查 broker 是否有写入权限
  4. 4: if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
  5. 5: && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
  6. 6: response.setCode(ResponseCode.NO_PERMISSION);
  7. 7: response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
  8. 8: + "] sending message is forbidden");
  9. 9: return response;
  10. 10: }
  11. 11: // 检查topic是否可以被发送。目前是{@link MixAll.DEFAULT_TOPIC}不被允许发送
  12. 12: if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
  13. 13: String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
  14. 14: log.warn(errorMsg);
  15. 15: response.setCode(ResponseCode.SYSTEM_ERROR);
  16. 16: response.setRemark(errorMsg);
  17. 17: return response;
  18. 18: }
  19. 19: TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
  20. 20: if (null == topicConfig) { // 不能存在topicConfig,则进行创建
  21. 21: int topicSysFlag = 0;
  22. 22: if (requestHeader.isUnitMode()) {
  23. 23: if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  24. 24: topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
  25. 25: } else {
  26. 26: topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
  27. 27: }
  28. 28: }
  29. 29: // 创建topic配置
  30. 30: log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
  31. 31: topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(//
  32. 32: requestHeader.getTopic(), //
  33. 33: requestHeader.getDefaultTopic(), //
  34. 34: RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
  35. 35: requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
  36. 36: if (null == topicConfig) {
  37. 37: if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  38. 38: topicConfig =
  39. 39: this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
  40. 40: requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
  41. 41: topicSysFlag);
  42. 42: }
  43. 43: }
  44. 44: // 如果没配置
  45. 45: if (null == topicConfig) {
  46. 46: response.setCode(ResponseCode.TOPIC_NOT_EXIST);
  47. 47: response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
  48. 48: + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
  49. 49: return response;
  50. 50: }
  51. 51: }
  52. 52: // 队列编号是否正确
  53. 53: int queueIdInt = requestHeader.getQueueId();
  54. 54: int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
  55. 55: if (queueIdInt >= idValid) {
  56. 56: String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
  57. 57: queueIdInt,
  58. 58: topicConfig.toString(),
  59. 59: RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
  60. 60: log.warn(errorInfo);
  61. 61: response.setCode(ResponseCode.SYSTEM_ERROR);
  62. 62: response.setRemark(errorInfo);
  63. 63: return response;
  64. 64: }
  65. 65: return response;
  66. 66: }
  • 说明:校验消息是否正确,主要是Topic配置方面,例如:Broker 是否有写入权限,topic配置是否存在,队列编号是否正确。
  • 第 11 至 18 行 :检查Topic是否可以被发送。目前是 {@link MixAll.DEFAULT_TOPIC} 不被允许发送。
  • 第 20 至 51 行 :当找不到Topic配置,则进行创建。当然,创建会存在不成功的情况,例如说:defaultTopic 的Topic配置不存在,又或者是 存在但是不允许继承,详细解析见《RocketMQ 源码分析 —— Topic》

DefaultMessageStore#putMessage

  1. 1: public PutMessageResult putMessage(MessageExtBrokerInner msg) {
  2. 2: if (this.shutdown) {
  3. 3: log.warn("message store has shutdown, so putMessage is forbidden");
  4. 4: return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
  5. 5: }
  6. 6:
  7. 7: // 从节点不允许写入
  8. 8: if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
  9. 9: long value = this.printTimes.getAndIncrement();
  10. 10: if ((value % 50000) == 0) {
  11. 11: log.warn("message store is slave mode, so putMessage is forbidden ");
  12. 12: }
  13. 13:
  14. 14: return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
  15. 15: }
  16. 16:
  17. 17: // store是否允许写入
  18. 18: if (!this.runningFlags.isWriteable()) {
  19. 19: long value = this.printTimes.getAndIncrement();
  20. 20: if ((value % 50000) == 0) {
  21. 21: log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
  22. 22: }
  23. 23:
  24. 24: return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
  25. 25: } else {
  26. 26: this.printTimes.set(0);
  27. 27: }
  28. 28:
  29. 29: // 消息过长
  30. 30: if (msg.getTopic().length() > Byte.MAX_VALUE) {
  31. 31: log.warn("putMessage message topic length too long " + msg.getTopic().length());
  32. 32: return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
  33. 33: }
  34. 34:
  35. 35: // 消息附加属性过长
  36. 36: if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
  37. 37: log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
  38. 38: return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
  39. 39: }
  40. 40:
  41. 41: if (this.isOSPageCacheBusy()) {
  42. 42: return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
  43. 43: }
  44. 44:
  45. 45: long beginTime = this.getSystemClock().now();
  46. 46: // 添加消息到commitLog
  47. 47: PutMessageResult result = this.commitLog.putMessage(msg);
  48. 48:
  49. 49: long eclipseTime = this.getSystemClock().now() - beginTime;
  50. 50: if (eclipseTime > 500) {
  51. 51: log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
  52. 52: }
  53. 53: this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
  54. 54:
  55. 55: if (null == result || !result.isOk()) {
  56. 56: this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
  57. 57: }
  58. 58:
  59. 59: return result;
  60. 60: }
  • 说明:存储消息封装,最终存储需要 CommitLog 实现。
  • 第 7 至 27 行 :校验 Broker 是否可以写入。
  • 第 29 至 39 行 :消息格式与大小校验。
  • 第 47 行 :调用 CommitLong 进行存储,详细逻辑见:《RocketMQ 源码分析 —— Message 存储》

4、某种结尾

知识星球

感谢阅读、收藏、点赞本文的工程师同学。

阅读源码是件令自己很愉悦的事情,编写源码解析是让自己脑细胞死伤无数的过程,痛并快乐着。

如果有内容写的存在错误,或是不清晰的地方,见笑了,?。欢迎加 QQ:7685413 我们一起探讨,共进步。

再次感谢阅读、收藏、点赞本文的工程师同学。

转载于:https://my.oschina.net/sword4j/blog/889433

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/603703
推荐阅读
相关标签
  

闽ICP备14008679号