当前位置:   article > 正文

RocketMq源码解读(四)消息发送_send [6] times, still failed, cost [619]ms, topic:

send [6] times, still failed, cost [619]ms, topic: galaxy-sms-cache-sync, br

前两章内容中,介绍了Producer的启动过程,然后是如何创建topic,本章内容将主要集中在如何发送消息到broker 

 一、消息的投递

消息的投递分为普通消息的投递和事务消息的投递

  1. public class DefaultMQProducer extends ClientConfig implements MQProducer{
  2. // 所有的实际操作委托给这个实现
  3. protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
  4. // 默认topic队列数
  5. private volatile int defaultTopicQueueNums = 4;
  6. private int sendMsgTimeout = 3000;
  7. private int compressMsgBodyOverHowmuch = 1024 * 4;
  8. private int retryTimesWhenSendFailed = 2;
  9. private int retryTimesWhenSendAsyncFailed = 2;
  10. private boolean retryAnotherBrokerWhenNotStoreOK = false;
  11. private int maxMessageSize = 1024 * 1024 * 4;
  12. ...
  13. @Override
  14. public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  15. return this.defaultMQProducerImpl.send(msg);
  16. }
  17. @Override
  18. public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  19. return this.defaultMQProducerImpl.send(msg, timeout);
  20. }
  21. @Override
  22. public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
  23. this.defaultMQProducerImpl.send(msg, sendCallback);
  24. }
  25. @Override
  26. public SendResult send(Message msg, MessageQueue mq, long timeout)
  27. throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  28. return this.defaultMQProducerImpl.send(msg, mq, timeout);
  29. }
  30. @Override
  31. public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
  32. throws MQClientException, RemotingException, InterruptedException {
  33. this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
  34. }
  35. @Override
  36. public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
  37. throws MQClientException, RemotingException, InterruptedException {
  38. this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
  39. }
  40. @Override
  41. public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
  42. throws MQClientException, RemotingException, InterruptedException {
  43. this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
  44. }
  45. @Override
  46. public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg)
  47. throws MQClientException {
  48. throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
  49. }
  50. ...
  51. }

二、消息投递的具体实现过程

1.、同样的方式,消息的具体投递是委托给DefaultMQProducer内部的DefaultMQProducerImpl

  1. private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode,
  2. final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  3. // 校验 Producer 处于运行状态
  4. this.makeSureStateOK();
  5. // 校验消息格式
  6. Validators.checkMessage(msg, this.defaultMQProducer);
  7. final long invokeID = random.nextLong(); // 调用编号;用于下面打印日志,标记为同一次发送消息
  8. long beginTimestampFirst = System.currentTimeMillis();
  9. long beginTimestampPrev = beginTimestampFirst;
  10. @SuppressWarnings("UnusedAssignment")
  11. long endTimestamp = beginTimestampFirst;
  12. // 获取 Topic路由信息
  13. TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  14. if (topicPublishInfo != null && topicPublishInfo.ok()) {
  15. MessageQueue mq = null; // 最后选择消息要发送到的队列
  16. Exception exception = null;
  17. SendResult sendResult = null; // 最后一次发送结果
  18. int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次调用
  19. int times = 0; // 第几次发送
  20. String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
  21. // 循环调用发送消息,直到成功
  22. for (; times < timesTotal; times++) {
  23. String lastBrokerName = null == mq ? null : mq.getBrokerName();
  24. @SuppressWarnings("SpellCheckingInspection")
  25. MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列
  26. if (tmpmq != null) {
  27. mq = tmpmq;
  28. brokersSent[times] = mq.getBrokerName();
  29. try {
  30. beginTimestampPrev = System.currentTimeMillis();
  31. // 调用发送消息核心方法
  32. sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
  33. endTimestamp = System.currentTimeMillis();
  34. // 更新Broker可用性信息
  35. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
  36. switch (communicationMode) {
  37. case ASYNC:
  38. return null;
  39. case ONEWAY:
  40. return null;
  41. case SYNC:
  42. if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
  43. if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试
  44. continue;
  45. }
  46. }
  47. return sendResult;
  48. default:
  49. break;
  50. }catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环
  51. endTimestamp = System.currentTimeMillis();
  52. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  53. log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
  54. log.warn(msg.toString());
  55. exception = e;
  56. continue;
  57. } catch (MQClientException e) { // 打印异常,更新Broker可用性信息,继续循环
  58. endTimestamp = System.currentTimeMillis();
  59. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  60. log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
  61. log.warn(msg.toString());
  62. exception = e;
  63. continue;
  64. } catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
  65. endTimestamp = System.currentTimeMillis();
  66. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  67. log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
  68. log.warn(msg.toString());
  69. exception = e;
  70. switch (e.getResponseCode()) {
  71. // 如下异常continue,进行发送消息重试
  72. case ResponseCode.TOPIC_NOT_EXIST:
  73. case ResponseCode.SERVICE_NOT_AVAILABLE:
  74. case ResponseCode.SYSTEM_ERROR:
  75. case ResponseCode.NO_PERMISSION:
  76. case ResponseCode.NO_BUYER_ID:
  77. case ResponseCode.NOT_IN_CURRENT_UNIT:
  78. continue;
  79. // 如果有发送结果,进行返回,否则,抛出异常;
  80. default:
  81. if (sendResult != null) {
  82. return sendResult;
  83. }
  84. throw e;
  85. }
  86. } catch (InterruptedException e) {
  87. endTimestamp = System.currentTimeMillis();
  88. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
  89. log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
  90. log.warn(msg.toString());
  91. throw e;
  92. }
  93. } else {
  94. break;
  95. }
  96. }
  97. // 返回发送结果
  98. if (sendResult != null) {
  99. return sendResult;
  100. }
  101. // 根据不同情况,抛出不同的异常
  102. String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst,
  103. msg.getTopic(), Arrays.toString(brokersSent)) + FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
  104. MQClientException mqClientException = new MQClientException(info, exception);
  105. if (exception instanceof MQBrokerException) {
  106. mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
  107. } else if (exception instanceof RemotingConnectException) {
  108. mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
  109. } else if (exception instanceof RemotingTimeoutException) {
  110. mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
  111. } else if (exception instanceof MQClientException) {
  112. mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
  113. }
  114. throw mqClientException;
  115. }
  116. // Namesrv找不到异常
  117. List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
  118. if (null == nsList || nsList.isEmpty()) {
  119. throw new MQClientException(
  120. "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
  121. }
  122. // 消息路由找不到异常
  123. throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
  124. null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
  125. }

2、关键函数获取topic的路由信息tryToFindTopicPublishInfo,因为在创建topic时,会将topic和broker的信息同步到NameSrv,所以这里可以从NameSrv获取发送消息的topic理由信息。

  1. private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  2. // 缓存中获取 Topic发布信息,在start的时候,只是放入了一个空的topicPublicInfo
  3. TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
  4. // 当无可用的 Topic发布信息时,从Namesrv获取一次
  5. if (null == topicPublishInfo || !topicPublishInfo.ok()) {
  6. this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
  7. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
  8. topicPublishInfo = this.topicPublishInfoTable.get(topic);
  9. }
  10. // 若获取的 Topic发布信息时候可用,则返回
  11. if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
  12. return topicPublishInfo;
  13. } else { // 使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。用于 Topic发布信息不存在 && Broker支持自动创建Topic
  14. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
  15. topicPublishInfo = this.topicPublishInfoTable.get(topic);
  16. return topicPublishInfo;
  17. }
  18. }

接着看更新topic的路由信息

  1. public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
  2. try {
  3. if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
  4. try {
  5. TopicRouteData topicRouteData;
  6. if (isDefault && defaultMQProducer != null) {
  7. // 使用默认TopicKey获取TopicRouteData。
  8. // 当broker开启自动创建topic开关时,会使用MixAll.DEFAULT_TOPIC进行创建。
  9. // 当producer的createTopic为MixAll.DEFAULT_TOPIC时,则可以获得TopicRouteData。
  10. // 目的:用于新的topic,发送消息时,未创建路由信息,先使用createTopic的路由信息,等到发送到broker时,进行自动创建。
  11. topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);
  12. if (topicRouteData != null) {
  13. for (QueueData data : topicRouteData.getQueueDatas()) {
  14. int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
  15. data.setReadQueueNums(queueNums);
  16. data.setWriteQueueNums(queueNums);
  17. }
  18. }
  19. } else {
  20. topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
  21. }
  22. if (topicRouteData != null) {
  23. TopicRouteData old = this.topicRouteTable.get(topic);
  24. boolean changed = topicRouteDataIsChange(old, topicRouteData);
  25. if (!changed) {
  26. changed = this.isNeedUpdateTopicRouteInfo(topic);
  27. } else {
  28. log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
  29. }
  30. if (changed) {
  31. TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); // 克隆对象的原因:topicRouteData会被设置到下面的publishInfo/subscribeInfo
  32. // 更新 Broker 地址相关信息
  33. for (BrokerData bd : topicRouteData.getBrokerDatas()) {
  34. this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
  35. }
  36. // 更新发布topic的消息,这里可能涉及到创建MessageQueue
  37. {
  38. TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
  39. publishInfo.setHaveTopicRouterInfo(true);
  40. for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {
  41. MQProducerInner impl = entry.getValue();
  42. if (impl != null) {
  43. impl.updateTopicPublishInfo(topic, publishInfo);
  44. }
  45. }
  46. }
  47. // 这里会更新一次consumer的信息
  48. {
  49. Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
  50. for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
  51. MQConsumerInner impl = entry.getValue();
  52. if (impl != null) {
  53. impl.updateTopicSubscribeInfo(topic, subscribeInfo);
  54. }
  55. }
  56. }
  57. log.info("topicRouteTable.put TopicRouteData[{}]", cloneTopicRouteData);
  58. this.topicRouteTable.put(topic, cloneTopicRouteData);
  59. return true;
  60. }
  61. } else {
  62. log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
  63. }
  64. } catch (Exception e) {
  65. if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
  66. log.warn("updateTopicRouteInfoFromNameServer Exception", e);
  67. }
  68. } finally {
  69. this.lockNamesrv.unlock();
  70. }
  71. } else {
  72. log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
  73. }
  74. } catch (InterruptedException e) {
  75. log.warn("updateTopicRouteInfoFromNameServer Exception", e);
  76. }
  77. return false;
  78. }

3、因为一个topic可能有很多的消息队列MessageQueue,如何选取MessageQueue路由呢?这里会通过一种容错机制选取出最优的MessageQueue。默认的MQFaultStrategy实现

  1. // MQ 故障策略
  2. public class MQFaultStrategy {
  3. /**
  4. * 延迟故障容错,维护每个Broker的发送消息的延迟
  5. * key:brokerName
  6. */
  7. private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
  8. /**
  9. * 发送消息延迟容错开关
  10. */
  11. private boolean sendLatencyFaultEnable = false;
  12. /**
  13. * 延迟级别数组
  14. */
  15. private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
  16. /**
  17. * 不可用时长数组
  18. */
  19. private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
  20. public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
  21. if (this.sendLatencyFaultEnable) {
  22. try {
  23. // 感觉是在MessageQueue中轮流选取
  24. int index = tpInfo.getSendWhichQueue().getAndIncrement();
  25. for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
  26. int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
  27. if (pos < 0)
  28. pos = 0;
  29. MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
  30. if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
  31. if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
  32. return mq;
  33. }
  34. }
  35. // 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性
  36. final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
  37. int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
  38. if (writeQueueNums > 0) {
  39. final MessageQueue mq = tpInfo.selectOneMessageQueue();
  40. if (notBestBroker != null) {
  41. mq.setBrokerName(notBestBroker);
  42. mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
  43. }
  44. return mq;
  45. } else {
  46. latencyFaultTolerance.remove(notBestBroker);
  47. }
  48. } catch (Exception e) {
  49. log.error("Error occurred when selecting message queue", e);
  50. }
  51. // 选择一个消息队列,不考虑队列的可用性
  52. return tpInfo.selectOneMessageQueue();
  53. }
  54. // 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
  55. return tpInfo.selectOneMessageQueue(lastBrokerName);
  56. }
  57. }

4、发送消息的核心:非常主要一点,因为MessageQueue可能在不同的broker上,每一个brokerName可能有一组broker,所以这个时候会向这组broker的主broker投递消息。

  1. //发送消息的实际操作者
  2. private SendResult sendKernelImpl(final Message msg,
  3. final MessageQueue mq,
  4. final CommunicationMode communicationMode,
  5. final SendCallback sendCallback,
  6. final TopicPublishInfo topicPublishInfo,
  7. final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  8. long beginStartTime = System.currentTimeMillis();
  9. //获得需要发送的broker地址,主要是发送给master
  10. String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  11. if (null == brokerAddr) {
  12. //重新拉取topic信息
  13. tryToFindTopicPublishInfo(mq.getTopic());
  14. brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  15. }
  16. SendMessageContext context = null;
  17. if (brokerAddr != null) {
  18. //是否是vip的操作,主要是端口-2的操作,然后重新拼接addr
  19. brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
  20. //发送的消息体
  21. byte[] prevBody = msg.getBody();
  22. try {
  23. //for MessageBatch,ID has been set in the generating process
  24. //是否是批量消息体
  25. if (!(msg instanceof MessageBatch)) {
  26. MessageClientIDSetter.setUniqID(msg);
  27. }
  28. int sysFlag = 0;
  29. boolean msgBodyCompressed = false;
  30. //压缩消息体
  31. if (this.tryToCompressMessage(msg)) {
  32. sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
  33. msgBodyCompressed = true;
  34. }
  35. //判断是否是事务消息,设置事务属性
  36. final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
  37. if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
  38. sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
  39. }
  40. //验证自定义的hook
  41. if (hasCheckForbiddenHook()) {
  42. CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
  43. checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
  44. checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
  45. checkForbiddenContext.setCommunicationMode(communicationMode);
  46. checkForbiddenContext.setBrokerAddr(brokerAddr);
  47. checkForbiddenContext.setMessage(msg);
  48. checkForbiddenContext.setMq(mq);
  49. checkForbiddenContext.setUnitMode(this.isUnitMode());
  50. this.executeCheckForbiddenHook(checkForbiddenContext);
  51. }
  52. //验证发送消息的hook
  53. if (this.hasSendMessageHook()) {
  54. context = new SendMessageContext();
  55. context.setProducer(this);
  56. context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  57. context.setCommunicationMode(communicationMode);
  58. context.setBornHost(this.defaultMQProducer.getClientIP());
  59. context.setBrokerAddr(brokerAddr);
  60. context.setMessage(msg);
  61. context.setMq(mq);
  62. String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
  63. if (isTrans != null && isTrans.equals("true")) {
  64. context.setMsgType(MessageType.Trans_Msg_Half);
  65. }
  66. if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
  67. context.setMsgType(MessageType.Delay_Msg);
  68. }
  69. this.executeSendMessageHookBefore(context);
  70. }
  71. //消息头的封装
  72. SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
  73. //消息发送组
  74. requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  75. //消息的topic
  76. requestHeader.setTopic(msg.getTopic());
  77. //消息的默认topic
  78. requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
  79. //消息的默认queue数量
  80. requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
  81. //消息发送的queueid
  82. requestHeader.setQueueId(mq.getQueueId());
  83. //特殊标识
  84. requestHeader.setSysFlag(sysFlag);
  85. //消息创建的时间戳
  86. requestHeader.setBornTimestamp(System.currentTimeMillis());
  87. //标识
  88. requestHeader.setFlag(msg.getFlag());
  89. //消息的扩展属性
  90. requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
  91. //消息的消费
  92. requestHeader.setReconsumeTimes(0);
  93. //模型
  94. requestHeader.setUnitMode(this.isUnitMode());
  95. //批量
  96. requestHeader.setBatch(msg instanceof MessageBatch);
  97. //特殊标记的处理
  98. if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  99. String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
  100. if (reconsumeTimes != null) {
  101. requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
  102. MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
  103. }
  104. String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
  105. if (maxReconsumeTimes != null) {
  106. requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
  107. MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
  108. }
  109. }
  110. SendResult sendResult = null;
  111. //消息发送的模型
  112. switch (communicationMode) {
  113. //异步发送
  114. case ASYNC:
  115. Message tmpMessage = msg;
  116. if (msgBodyCompressed) {
  117. //If msg body was compressed, msgbody should be reset using prevBody.
  118. //Clone new message using commpressed message body and recover origin massage.
  119. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
  120. tmpMessage = MessageAccessor.cloneMessage(msg);
  121. msg.setBody(prevBody);
  122. }
  123. long costTimeAsync = System.currentTimeMillis() - beginStartTime;
  124. if (timeout < costTimeAsync) {
  125. throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
  126. }
  127. //参数区别主要是回调相关的配置
  128. sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
  129. brokerAddr,
  130. mq.getBrokerName(),
  131. tmpMessage,
  132. requestHeader,
  133. timeout - costTimeAsync,
  134. communicationMode,
  135. sendCallback,
  136. topicPublishInfo,
  137. this.mQClientFactory,
  138. this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
  139. context,
  140. this);
  141. break;
  142. //同步及单发
  143. case ONEWAY:
  144. case SYNC:
  145. long costTimeSync = System.currentTimeMillis() - beginStartTime;
  146. if (timeout < costTimeSync) {
  147. throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
  148. }
  149. //消息的最终发送
  150. sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
  151. brokerAddr,
  152. mq.getBrokerName(),
  153. msg,
  154. requestHeader,
  155. timeout - costTimeSync,
  156. communicationMode,
  157. context,
  158. this);
  159. break;
  160. default:
  161. assert false;
  162. break;
  163. }
  164. //是否有发送消息hook
  165. if (this.hasSendMessageHook()) {
  166. context.setSendResult(sendResult);
  167. this.executeSendMessageHookAfter(context);
  168. }
  169. return sendResult;
  170. } catch (RemotingException e) {
  171. if (this.hasSendMessageHook()) {
  172. context.setException(e);
  173. this.executeSendMessageHookAfter(context);
  174. }
  175. throw e;
  176. } catch (MQBrokerException e) {
  177. if (this.hasSendMessageHook()) {
  178. context.setException(e);
  179. this.executeSendMessageHookAfter(context);
  180. }
  181. throw e;
  182. } catch (InterruptedException e) {
  183. if (this.hasSendMessageHook()) {
  184. context.setException(e);
  185. this.executeSendMessageHookAfter(context);
  186. }
  187. throw e;
  188. } finally {
  189. msg.setBody(prevBody);
  190. }
  191. }
  192. throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
  193. }

6、组装netty的发送指令,开始消息的最终发送:

  1. //发送消息的核心操作实现
  2. public SendResult sendMessage(
  3. final String addr,//地址
  4. final String brokerName,//broker名称
  5. final Message msg,//消息
  6. final SendMessageRequestHeader requestHeader,//消息头
  7. final long timeoutMillis,//超时
  8. final CommunicationMode communicationMode,//发送模式
  9. final SendCallback sendCallback,//发送后的回调
  10. final TopicPublishInfo topicPublishInfo,//topic的配置
  11. final MQClientInstance instance,//基于client的发送实例
  12. final int retryTimesWhenSendFailed,//重试次数
  13. final SendMessageContext context,//发送消息的上下文信息
  14. final DefaultMQProducerImpl producer//发送消息者
  15. ) throws RemotingException, MQBrokerException, InterruptedException {
  16. long beginStartTime = System.currentTimeMillis();
  17. RemotingCommand request = null;
  18. //协议转换
  19. if (sendSmartMsg || msg instanceof MessageBatch) {
  20. SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
  21. request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
  22. } else {
  23. request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
  24. }
  25. request.setBody(msg.getBody());
  26. switch (communicationMode) {
  27. case ONEWAY:
  28. this.remotingClient.invokeOneway(addr, request, timeoutMillis);
  29. return null;
  30. case ASYNC:
  31. final AtomicInteger times = new AtomicInteger();
  32. long costTimeAsync = System.currentTimeMillis() - beginStartTime;
  33. if (timeoutMillis < costTimeAsync) {
  34. throw new RemotingTooMuchRequestException("sendMessage call timeout");
  35. }
  36. this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
  37. retryTimesWhenSendFailed, times, context, producer);
  38. return null;
  39. case SYNC:
  40. long costTimeSync = System.currentTimeMillis() - beginStartTime;
  41. if (timeoutMillis < costTimeSync) {
  42. throw new RemotingTooMuchRequestException("sendMessage call timeout");
  43. }
  44. return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
  45. default:
  46. assert false;
  47. break;
  48. }
  49. return null;
  50. }
  51. // 同步调用发送方法
  52. private SendResult sendMessageSync(
  53. final String addr,
  54. final String brokerName,
  55. final Message msg,
  56. final long timeoutMillis,
  57. final RemotingCommand request
  58. ) throws RemotingException, MQBrokerException, InterruptedException {
  59. //同步发送消息
  60. RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
  61. assert response != null;
  62. //处理请求的相应结果
  63. return this.processSendResponse(brokerName, msg, response);
  64. }
  65. //网络通信的核心方法实现,同步
  66. @Override
  67. public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
  68. throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
  69. long beginStartTime = System.currentTimeMillis();
  70. //获得通信的channel
  71. final Channel channel = this.getAndCreateChannel(addr);
  72. //验证channel的有效性
  73. if (channel != null && channel.isActive()) {
  74. try {
  75. //前置处理
  76. doBeforeRpcHooks(addr, request);
  77. long costTime = System.currentTimeMillis() - beginStartTime;
  78. if (timeoutMillis < costTime) {
  79. throw new RemotingTimeoutException("invokeSync call timeout");
  80. }
  81. //执行方法调用
  82. RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
  83. //后置处理
  84. doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
  85. return response;
  86. } catch (RemotingSendRequestException e) {
  87. log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
  88. this.closeChannel(addr, channel);
  89. throw e;
  90. } catch (RemotingTimeoutException e) {
  91. if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
  92. this.closeChannel(addr, channel);
  93. log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
  94. }
  95. log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
  96. throw e;
  97. }
  98. } else {
  99. //如果异常则删除执行的channel的操作
  100. this.closeChannel(addr, channel);
  101. throw new RemotingConnectException(addr);
  102. }
  103. }
  104. //同步操作的核心
  105. public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
  106. final long timeoutMillis)
  107. throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
  108. final int opaque = request.getOpaque();
  109. try {
  110. //异步调用封装,处理netty通信后的结果
  111. final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
  112. //结果的并发处理及通知,及后续的异常处理收集
  113. this.responseTable.put(opaque, responseFuture);
  114. final SocketAddress addr = channel.remoteAddress();
  115. //netty的消息发送,只负责发送
  116. channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
  117. @Override
  118. public void operationComplete(ChannelFuture f) throws Exception {
  119. //操作成功后马上返回
  120. if (f.isSuccess()) {
  121. responseFuture.setSendRequestOK(true);
  122. return;
  123. } else {
  124. responseFuture.setSendRequestOK(false);
  125. }
  126. //同步处理后删除对应的请求对象
  127. responseTable.remove(opaque);
  128. responseFuture.setCause(f.cause());
  129. responseFuture.putResponse(null);
  130. log.warn("send a request command to channel <" + addr + "> failed.");
  131. }
  132. });
  133. //相应结果的处理,实际的操作在NettyClientHandler中实现,根据opaque来更新结果
  134. RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
  135. if (null == responseCommand) {
  136. if (responseFuture.isSendRequestOK()) {
  137. throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
  138. responseFuture.getCause());
  139. } else {
  140. throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
  141. }
  142. }
  143. return responseCommand;
  144. } finally {
  145. this.responseTable.remove(opaque);
  146. }
  147. }

如果消息是异步发送机制,会通过sendCallBack进行回调处理。

7、broker接收到消息发送的指令后,会做消息的持久化处理,后面在broker环节将重要介绍。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/256942
推荐阅读
相关标签
  

闽ICP备14008679号