赞
踩
上一篇博客RocketMQ原理学习-- Name Server中我们介绍了Name Server提供的相关功能,这篇博客我们来介绍一下生产者消息发送相关的内容。
消息发送示例:
- public class Producer {
-
- public static void main(String[] args) throws MQClientException, InterruptedException {
- DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
- producer.setNamesrvAddr("localhost:9876");
- producer.start();
- try {
- for (int i = 0; i < 3; i++) {
- Message msg = new Message("TopicA-test",// topic
- "TagA",// tag
- (new Date() + "Hello RocketMQ ,QuickStart 11" + i)
- .getBytes()// body
- );
- SendResult sendResult = producer.send(msg);
- System.out.println(sendResult);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- producer.shutdown();
- }
-
- }

在DefaultMQProducer中调用send方法发送消息
- public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQProducerImpl.send(msg);
- }
最终是调用DefaultMQProducerImpl的send方法来发送消息
- public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.send(msg, (long)this.defaultMQProducer.getSendMsgTimeout());
- }
最终会调用sendDefaultImpl方法来发送消息:
(1)在发送消息时会根据消息的Topic名称从Name Server中根据Topic名称来获取Broker相关的地址信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
(2)由于一个Broker中对于每个Topic会有多个MessageQueue,生产者默认选择MessageQueue策略为轮询MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
(3)选择了MessageQueue后,会根据MessageQueue中Broker信息调用sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);发送消息。
- private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
- long invokeID = this.random.nextLong();
- long beginTimestampFirst = System.currentTimeMillis();
- long beginTimestampPrev = beginTimestampFirst;
- TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
- if (topicPublishInfo != null && topicPublishInfo.ok()) {
- MessageQueue mq = null;
- Exception exception = null;
- SendResult sendResult = null;
- int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
- int times = 0;
- String[] brokersSent = new String[timesTotal];
-
- while(true) {
- String info;
- if (times < timesTotal) {
- info = null == mq ? null : mq.getBrokerName();
- MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
- if (mqSelected != null) {
- mq = mqSelected;
- brokersSent[times] = mqSelected.getBrokerName();
-
- long endTimestamp;
- try {
- beginTimestampPrev = System.currentTimeMillis();
- sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
- switch(communicationMode) {
- case ASYNC:
- return null;
- case ONEWAY:
- return null;
- case SYNC:
- if (sendResult.getSendStatus() == SendStatus.SEND_OK || !this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
- return sendResult;
- }
- }
- } catch (RemotingException var24) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var24);
- this.log.warn(msg.toString());
- exception = var24;
- } catch (MQClientException var25) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var25);
- this.log.warn(msg.toString());
- exception = var25;
- } catch (MQBrokerException var26) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var26);
- this.log.warn(msg.toString());
- exception = var26;
- switch(var26.getResponseCode()) {
- case 1:
- case 14:
- case 16:
- case 17:
- case 204:
- case 205:
- break;
- default:
- if (sendResult != null) {
- return sendResult;
- }
-
- throw var26;
- }
- } catch (InterruptedException var27) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, false);
- this.log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var27);
- this.log.warn(msg.toString());
- this.log.warn("sendKernelImpl exception", var27);
- this.log.warn(msg.toString());
- throw var27;
- }
-
- ++times;
- continue;
- }
- }
-
- if (sendResult != null) {
- return sendResult;
- }
-
- info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent));
- info = info + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/");
- MQClientException mqClientException = new MQClientException(info, (Throwable)exception);
- if (exception instanceof MQBrokerException) {
- mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode());
- } else if (exception instanceof RemotingConnectException) {
- mqClientException.setResponseCode(10001);
- } else if (exception instanceof RemotingTimeoutException) {
- mqClientException.setResponseCode(10002);
- } else if (exception instanceof MQClientException) {
- mqClientException.setResponseCode(10003);
- }
-
- throw mqClientException;
- }
- } else {
- List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
- if (null != nsList && !nsList.isEmpty()) {
- throw (new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10005);
- } else {
- throw (new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10004);
- }
- }
- }

在sendKernelImpl中会获取Broker的地址信息,然后组装消息信息,调用MQClientAPIImpl的sendMessage方法将消息发送到Broker中。
- private SendResult sendKernelImpl(Message msg, MessageQueue mq, CommunicationMode communicationMode, SendCallback sendCallback, TopicPublishInfo topicPublishInfo, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- //获取Broker的地址信息
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- if (null == brokerAddr) {
- this.tryToFindTopicPublishInfo(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- }
-
- SendMessageContext context = null;
- if (brokerAddr != null) {
- brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
- byte[] prevBody = msg.getBody();
-
- SendResult var28;
- try {
- if (!(msg instanceof MessageBatch)) {
- MessageClientIDSetter.setUniqID(msg);
- }
-
- int sysFlag = 0;
- if (this.tryToCompressMessage(msg)) {
- sysFlag |= 1;
- }
-
- String tranMsg = msg.getProperty("TRAN_MSG");
- if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
- sysFlag |= 4;
- }
-
- if (this.hasCheckForbiddenHook()) {
- CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
- checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
- checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
- checkForbiddenContext.setCommunicationMode(communicationMode);
- checkForbiddenContext.setBrokerAddr(brokerAddr);
- checkForbiddenContext.setMessage(msg);
- checkForbiddenContext.setMq(mq);
- checkForbiddenContext.setUnitMode(this.isUnitMode());
- this.executeCheckForbiddenHook(checkForbiddenContext);
- }
-
- if (this.hasSendMessageHook()) {
- context = new SendMessageContext();
- context.setProducer(this);
- context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
- context.setCommunicationMode(communicationMode);
- context.setBornHost(this.defaultMQProducer.getClientIP());
- context.setBrokerAddr(brokerAddr);
- context.setMessage(msg);
- context.setMq(mq);
- String isTrans = msg.getProperty("TRAN_MSG");
- if (isTrans != null && isTrans.equals("true")) {
- context.setMsgType(MessageType.Trans_Msg_Half);
- }
-
- if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty("DELAY") != null) {
- context.setMsgType(MessageType.Delay_Msg);
- }
-
- this.executeSendMessageHookBefore(context);
- }
- //组装消息请求头信息
- SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
- requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
- requestHeader.setTopic(msg.getTopic());
- requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
- requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
- requestHeader.setQueueId(mq.getQueueId());
- requestHeader.setSysFlag(sysFlag);
- requestHeader.setBornTimestamp(System.currentTimeMillis());
- requestHeader.setFlag(msg.getFlag());
- requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
- requestHeader.setReconsumeTimes(0);
- requestHeader.setUnitMode(this.isUnitMode());
- requestHeader.setBatch(msg instanceof MessageBatch);
- if (requestHeader.getTopic().startsWith("%RETRY%")) {
- String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
- if (reconsumeTimes != null) {
- requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
- MessageAccessor.clearProperty(msg, "RECONSUME_TIME");
- }
-
- String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
- if (maxReconsumeTimes != null) {
- requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
- MessageAccessor.clearProperty(msg, "MAX_RECONSUME_TIMES");
- }
- }
-
- SendResult sendResult = null;
- switch(communicationMode) {
- //异步消息
- case ASYNC:
- sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this);
- break;
- case ONEWAY:
- //同步消息
- case SYNC:
- sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout, communicationMode, context, this);
- break;
- default:
- assert false;
- }
-
- if (this.hasSendMessageHook()) {
- context.setSendResult(sendResult);
- this.executeSendMessageHookAfter(context);
- }
-
- var28 = sendResult;
- } catch (RemotingException var21) {
- if (this.hasSendMessageHook()) {
- context.setException(var21);
- this.executeSendMessageHookAfter(context);
- }
-
- throw var21;
- } catch (MQBrokerException var22) {
- if (this.hasSendMessageHook()) {
- context.setException(var22);
- this.executeSendMessageHookAfter(context);
- }
-
- throw var22;
- } catch (InterruptedException var23) {
- if (this.hasSendMessageHook()) {
- context.setException(var23);
- this.executeSendMessageHookAfter(context);
- }
-
- throw var23;
- } finally {
- msg.setBody(prevBody);
- }
-
- return var28;
- } else {
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", (Throwable)null);
- }
- }

在MQClientAPIImpl中会调用底层通信模块NettyRemotingClient的invokeSync方法将消息信息发送到Broker中。
- public SendResult sendMessage(String addr, String brokerName, Message msg, SendMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, SendCallback sendCallback, TopicPublishInfo topicPublishInfo, MQClientInstance instance, int retryTimesWhenSendFailed, SendMessageContext context, DefaultMQProducerImpl producer) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand request = null;
- //创建消息
- if (!sendSmartMsg && !(msg instanceof MessageBatch)) {
- request = RemotingCommand.createRequestCommand(10, requestHeader);
- } else {
- SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
- request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? 320 : 310, requestHeaderV2);
- }
- //添加消息请求体
- request.setBody(msg.getBody());
- switch(communicationMode) {
- case ONEWAY:
- this.remotingClient.invokeOneway(addr, request, timeoutMillis);
- return null;
- case ASYNC:
- AtomicInteger times = new AtomicInteger();
- this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer);
- return null;
- case SYNC:
- return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
- default:
- assert false;
-
- return null;
- }
- }

在NettyRemotingClient中调用invokeSync方法根据Broker地址信息创建Channel,然后根据request请求信息发起消息,并获取返回值RemotingCommand。
- public RemotingCommand invokeSync(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
- Channel channel = this.getAndCreateChannel(addr);
- if (channel != null && channel.isActive()) {
- try {
- if (this.rpcHook != null) {
- this.rpcHook.doBeforeRequest(addr, request);
- }
-
- RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
- if (this.rpcHook != null) {
- this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
- }
-
- return response;
- } catch (RemotingSendRequestException var7) {
- log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
- this.closeChannel(addr, channel);
- throw var7;
- } catch (RemotingTimeoutException var8) {
- if (this.nettyClientConfig.isClientCloseSocketIfTimeout()) {
- this.closeChannel(addr, channel);
- log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
- }
-
- log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
- throw var8;
- }
- } else {
- this.closeChannel(addr, channel);
- throw new RemotingConnectException(addr);
- }
- }

总结:
(1)生产者根据Topic名称从Name Server中获取相关Broker信息,根据Broker的地址信息将消息发送到对应Broker地址中
(2)对于同一个Topic,一个Broker对应多个MessageQueue(Topic可以在多个Broker中),生产者默认通过取模轮询方式将消息发送到对应的MessageQueue中。
(3)生产者提供了接口send(Message msg, MessageQueueSelector selector, Object arg),我们可以通过实现MessageQueueSelector接口可以实现选取MessageQueue的方法,例如实现有序消息就可以将需要有序的消息发送到同一个MessageQueue即可。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。