当前位置:   article > 正文

RocketMQ原理学习---生产者普通消息发送_.setmaxreconsumetimes(0)

.setmaxreconsumetimes(0)

        上一篇博客RocketMQ原理学习-- Name Server中我们介绍了Name Server提供的相关功能,这篇博客我们来介绍一下生产者消息发送相关的内容。

消息发送示例:

  1. public class Producer {
  2. public static void main(String[] args) throws MQClientException, InterruptedException {
  3. DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
  4. producer.setNamesrvAddr("localhost:9876");
  5. producer.start();
  6. try {
  7. for (int i = 0; i < 3; i++) {
  8. Message msg = new Message("TopicA-test",// topic
  9. "TagA",// tag
  10. (new Date() + "Hello RocketMQ ,QuickStart 11" + i)
  11. .getBytes()// body
  12. );
  13. SendResult sendResult = producer.send(msg);
  14. System.out.println(sendResult);
  15. }
  16. } catch (Exception e) {
  17. e.printStackTrace();
  18. }
  19. producer.shutdown();
  20. }
  21. }

在DefaultMQProducer中调用send方法发送消息

  1. public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  2. return this.defaultMQProducerImpl.send(msg);
  3. }

最终是调用DefaultMQProducerImpl的send方法来发送消息

  1. public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  2. return this.send(msg, (long)this.defaultMQProducer.getSendMsgTimeout());
  3. }

最终会调用sendDefaultImpl方法来发送消息:

(1)在发送消息时会根据消息的Topic名称从Name Server中根据Topic名称来获取Broker相关的地址信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

(2)由于一个Broker中对于每个Topic会有多个MessageQueue,生产者默认选择MessageQueue策略为轮询MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);

(3)选择了MessageQueue后,会根据MessageQueue中Broker信息调用sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);发送消息。

  1. private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  2. this.makeSureStateOK();
  3. Validators.checkMessage(msg, this.defaultMQProducer);
  4. long invokeID = this.random.nextLong();
  5. long beginTimestampFirst = System.currentTimeMillis();
  6. long beginTimestampPrev = beginTimestampFirst;
  7. TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  8. if (topicPublishInfo != null && topicPublishInfo.ok()) {
  9. MessageQueue mq = null;
  10. Exception exception = null;
  11. SendResult sendResult = null;
  12. int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
  13. int times = 0;
  14. String[] brokersSent = new String[timesTotal];
  15. while(true) {
  16. String info;
  17. if (times < timesTotal) {
  18. info = null == mq ? null : mq.getBrokerName();
  19. MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
  20. if (mqSelected != null) {
  21. mq = mqSelected;
  22. brokersSent[times] = mqSelected.getBrokerName();
  23. long endTimestamp;
  24. try {
  25. beginTimestampPrev = System.currentTimeMillis();
  26. sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
  27. endTimestamp = System.currentTimeMillis();
  28. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
  29. switch(communicationMode) {
  30. case ASYNC:
  31. return null;
  32. case ONEWAY:
  33. return null;
  34. case SYNC:
  35. if (sendResult.getSendStatus() == SendStatus.SEND_OK || !this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
  36. return sendResult;
  37. }
  38. }
  39. } catch (RemotingException var24) {
  40. endTimestamp = System.currentTimeMillis();
  41. this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  42. this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var24);
  43. this.log.warn(msg.toString());
  44. exception = var24;
  45. } catch (MQClientException var25) {
  46. endTimestamp = System.currentTimeMillis();
  47. this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  48. this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var25);
  49. this.log.warn(msg.toString());
  50. exception = var25;
  51. } catch (MQBrokerException var26) {
  52. endTimestamp = System.currentTimeMillis();
  53. this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  54. this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var26);
  55. this.log.warn(msg.toString());
  56. exception = var26;
  57. switch(var26.getResponseCode()) {
  58. case 1:
  59. case 14:
  60. case 16:
  61. case 17:
  62. case 204:
  63. case 205:
  64. break;
  65. default:
  66. if (sendResult != null) {
  67. return sendResult;
  68. }
  69. throw var26;
  70. }
  71. } catch (InterruptedException var27) {
  72. endTimestamp = System.currentTimeMillis();
  73. this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, false);
  74. this.log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var27);
  75. this.log.warn(msg.toString());
  76. this.log.warn("sendKernelImpl exception", var27);
  77. this.log.warn(msg.toString());
  78. throw var27;
  79. }
  80. ++times;
  81. continue;
  82. }
  83. }
  84. if (sendResult != null) {
  85. return sendResult;
  86. }
  87. info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent));
  88. info = info + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/");
  89. MQClientException mqClientException = new MQClientException(info, (Throwable)exception);
  90. if (exception instanceof MQBrokerException) {
  91. mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode());
  92. } else if (exception instanceof RemotingConnectException) {
  93. mqClientException.setResponseCode(10001);
  94. } else if (exception instanceof RemotingTimeoutException) {
  95. mqClientException.setResponseCode(10002);
  96. } else if (exception instanceof MQClientException) {
  97. mqClientException.setResponseCode(10003);
  98. }
  99. throw mqClientException;
  100. }
  101. } else {
  102. List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
  103. if (null != nsList && !nsList.isEmpty()) {
  104. throw (new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10005);
  105. } else {
  106. throw (new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10004);
  107. }
  108. }
  109. }

在sendKernelImpl中会获取Broker的地址信息,然后组装消息信息,调用MQClientAPIImpl的sendMessage方法将消息发送到Broker中。

  1. private SendResult sendKernelImpl(Message msg, MessageQueue mq, CommunicationMode communicationMode, SendCallback sendCallback, TopicPublishInfo topicPublishInfo, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  2. //获取Broker的地址信息
  3. String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  4. if (null == brokerAddr) {
  5. this.tryToFindTopicPublishInfo(mq.getTopic());
  6. brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  7. }
  8. SendMessageContext context = null;
  9. if (brokerAddr != null) {
  10. brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
  11. byte[] prevBody = msg.getBody();
  12. SendResult var28;
  13. try {
  14. if (!(msg instanceof MessageBatch)) {
  15. MessageClientIDSetter.setUniqID(msg);
  16. }
  17. int sysFlag = 0;
  18. if (this.tryToCompressMessage(msg)) {
  19. sysFlag |= 1;
  20. }
  21. String tranMsg = msg.getProperty("TRAN_MSG");
  22. if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
  23. sysFlag |= 4;
  24. }
  25. if (this.hasCheckForbiddenHook()) {
  26. CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
  27. checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
  28. checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
  29. checkForbiddenContext.setCommunicationMode(communicationMode);
  30. checkForbiddenContext.setBrokerAddr(brokerAddr);
  31. checkForbiddenContext.setMessage(msg);
  32. checkForbiddenContext.setMq(mq);
  33. checkForbiddenContext.setUnitMode(this.isUnitMode());
  34. this.executeCheckForbiddenHook(checkForbiddenContext);
  35. }
  36. if (this.hasSendMessageHook()) {
  37. context = new SendMessageContext();
  38. context.setProducer(this);
  39. context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  40. context.setCommunicationMode(communicationMode);
  41. context.setBornHost(this.defaultMQProducer.getClientIP());
  42. context.setBrokerAddr(brokerAddr);
  43. context.setMessage(msg);
  44. context.setMq(mq);
  45. String isTrans = msg.getProperty("TRAN_MSG");
  46. if (isTrans != null && isTrans.equals("true")) {
  47. context.setMsgType(MessageType.Trans_Msg_Half);
  48. }
  49. if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty("DELAY") != null) {
  50. context.setMsgType(MessageType.Delay_Msg);
  51. }
  52. this.executeSendMessageHookBefore(context);
  53. }
  54. //组装消息请求头信息
  55. SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
  56. requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  57. requestHeader.setTopic(msg.getTopic());
  58. requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
  59. requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
  60. requestHeader.setQueueId(mq.getQueueId());
  61. requestHeader.setSysFlag(sysFlag);
  62. requestHeader.setBornTimestamp(System.currentTimeMillis());
  63. requestHeader.setFlag(msg.getFlag());
  64. requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
  65. requestHeader.setReconsumeTimes(0);
  66. requestHeader.setUnitMode(this.isUnitMode());
  67. requestHeader.setBatch(msg instanceof MessageBatch);
  68. if (requestHeader.getTopic().startsWith("%RETRY%")) {
  69. String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
  70. if (reconsumeTimes != null) {
  71. requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
  72. MessageAccessor.clearProperty(msg, "RECONSUME_TIME");
  73. }
  74. String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
  75. if (maxReconsumeTimes != null) {
  76. requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
  77. MessageAccessor.clearProperty(msg, "MAX_RECONSUME_TIMES");
  78. }
  79. }
  80. SendResult sendResult = null;
  81. switch(communicationMode) {
  82. //异步消息
  83. case ASYNC:
  84. sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this);
  85. break;
  86. case ONEWAY:
  87. //同步消息
  88. case SYNC:
  89. sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout, communicationMode, context, this);
  90. break;
  91. default:
  92. assert false;
  93. }
  94. if (this.hasSendMessageHook()) {
  95. context.setSendResult(sendResult);
  96. this.executeSendMessageHookAfter(context);
  97. }
  98. var28 = sendResult;
  99. } catch (RemotingException var21) {
  100. if (this.hasSendMessageHook()) {
  101. context.setException(var21);
  102. this.executeSendMessageHookAfter(context);
  103. }
  104. throw var21;
  105. } catch (MQBrokerException var22) {
  106. if (this.hasSendMessageHook()) {
  107. context.setException(var22);
  108. this.executeSendMessageHookAfter(context);
  109. }
  110. throw var22;
  111. } catch (InterruptedException var23) {
  112. if (this.hasSendMessageHook()) {
  113. context.setException(var23);
  114. this.executeSendMessageHookAfter(context);
  115. }
  116. throw var23;
  117. } finally {
  118. msg.setBody(prevBody);
  119. }
  120. return var28;
  121. } else {
  122. throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", (Throwable)null);
  123. }
  124. }

在MQClientAPIImpl中会调用底层通信模块NettyRemotingClient的invokeSync方法将消息信息发送到Broker中。

  1. public SendResult sendMessage(String addr, String brokerName, Message msg, SendMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, SendCallback sendCallback, TopicPublishInfo topicPublishInfo, MQClientInstance instance, int retryTimesWhenSendFailed, SendMessageContext context, DefaultMQProducerImpl producer) throws RemotingException, MQBrokerException, InterruptedException {
  2. RemotingCommand request = null;
  3. //创建消息
  4. if (!sendSmartMsg && !(msg instanceof MessageBatch)) {
  5. request = RemotingCommand.createRequestCommand(10, requestHeader);
  6. } else {
  7. SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
  8. request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? 320 : 310, requestHeaderV2);
  9. }
  10. //添加消息请求体
  11. request.setBody(msg.getBody());
  12. switch(communicationMode) {
  13. case ONEWAY:
  14. this.remotingClient.invokeOneway(addr, request, timeoutMillis);
  15. return null;
  16. case ASYNC:
  17. AtomicInteger times = new AtomicInteger();
  18. this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer);
  19. return null;
  20. case SYNC:
  21. return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
  22. default:
  23. assert false;
  24. return null;
  25. }
  26. }

在NettyRemotingClient中调用invokeSync方法根据Broker地址信息创建Channel,然后根据request请求信息发起消息,并获取返回值RemotingCommand。

  1. public RemotingCommand invokeSync(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
  2. Channel channel = this.getAndCreateChannel(addr);
  3. if (channel != null && channel.isActive()) {
  4. try {
  5. if (this.rpcHook != null) {
  6. this.rpcHook.doBeforeRequest(addr, request);
  7. }
  8. RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
  9. if (this.rpcHook != null) {
  10. this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
  11. }
  12. return response;
  13. } catch (RemotingSendRequestException var7) {
  14. log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
  15. this.closeChannel(addr, channel);
  16. throw var7;
  17. } catch (RemotingTimeoutException var8) {
  18. if (this.nettyClientConfig.isClientCloseSocketIfTimeout()) {
  19. this.closeChannel(addr, channel);
  20. log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
  21. }
  22. log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
  23. throw var8;
  24. }
  25. } else {
  26. this.closeChannel(addr, channel);
  27. throw new RemotingConnectException(addr);
  28. }
  29. }

总结:

(1)生产者根据Topic名称从Name Server中获取相关Broker信息,根据Broker的地址信息将消息发送到对应Broker地址中

(2)对于同一个Topic,一个Broker对应多个MessageQueue(Topic可以在多个Broker中),生产者默认通过取模轮询方式将消息发送到对应的MessageQueue中。

(3)生产者提供了接口send(Message msg, MessageQueueSelector selector, Object arg),我们可以通过实现MessageQueueSelector接口可以实现选取MessageQueue的方法,例如实现有序消息就可以将需要有序的消息发送到同一个MessageQueue即可。

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/604010
推荐阅读
相关标签
  

闽ICP备14008679号