赞
踩
前两章内容中,介绍了Producer的启动过程,然后是如何创建topic,本章内容将主要集中在如何发送消息到broker
消息的投递分为普通消息的投递和事务消息的投递
- public class DefaultMQProducer extends ClientConfig implements MQProducer{
- // 所有的实际操作委托给这个实现
- protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
- // 默认topic队列数
- private volatile int defaultTopicQueueNums = 4;
- private int sendMsgTimeout = 3000;
- private int compressMsgBodyOverHowmuch = 1024 * 4;
- private int retryTimesWhenSendFailed = 2;
- private int retryTimesWhenSendAsyncFailed = 2;
- private boolean retryAnotherBrokerWhenNotStoreOK = false;
- private int maxMessageSize = 1024 * 1024 * 4;
- ...
- @Override
- public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQProducerImpl.send(msg);
- }
- @Override
- public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQProducerImpl.send(msg, timeout);
- }
- @Override
- public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQProducerImpl.send(msg, sendCallback);
- }
- @Override
- public SendResult send(Message msg, MessageQueue mq, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQProducerImpl.send(msg, mq, timeout);
- }
- @Override
- public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
- }
- @Override
- public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
- }
-
- @Override
- public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
- throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
- }
- @Override
- public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg)
- throws MQClientException {
- throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
- }
- ...
- }

1.、同样的方式,消息的具体投递是委托给DefaultMQProducer内部的DefaultMQProducerImpl
- private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode,
- final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- // 校验 Producer 处于运行状态
- this.makeSureStateOK();
- // 校验消息格式
- Validators.checkMessage(msg, this.defaultMQProducer);
- final long invokeID = random.nextLong(); // 调用编号;用于下面打印日志,标记为同一次发送消息
- long beginTimestampFirst = System.currentTimeMillis();
- long beginTimestampPrev = beginTimestampFirst;
- @SuppressWarnings("UnusedAssignment")
- long endTimestamp = beginTimestampFirst;
- // 获取 Topic路由信息
- TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
- if (topicPublishInfo != null && topicPublishInfo.ok()) {
- MessageQueue mq = null; // 最后选择消息要发送到的队列
- Exception exception = null;
- SendResult sendResult = null; // 最后一次发送结果
- int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次调用
- int times = 0; // 第几次发送
- String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
- // 循环调用发送消息,直到成功
- for (; times < timesTotal; times++) {
- String lastBrokerName = null == mq ? null : mq.getBrokerName();
- @SuppressWarnings("SpellCheckingInspection")
- MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列
- if (tmpmq != null) {
- mq = tmpmq;
- brokersSent[times] = mq.getBrokerName();
- try {
- beginTimestampPrev = System.currentTimeMillis();
- // 调用发送消息核心方法
- sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
- endTimestamp = System.currentTimeMillis();
- // 更新Broker可用性信息
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
- switch (communicationMode) {
- case ASYNC:
- return null;
- case ONEWAY:
- return null;
- case SYNC:
- if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
- if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试
- continue;
- }
- }
- return sendResult;
- default:
- break;
- }catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
- log.warn(msg.toString());
- exception = e;
- continue;
- } catch (MQClientException e) { // 打印异常,更新Broker可用性信息,继续循环
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
- log.warn(msg.toString());
- exception = e;
- continue;
- } catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
- log.warn(msg.toString());
- exception = e;
- switch (e.getResponseCode()) {
- // 如下异常continue,进行发送消息重试
- case ResponseCode.TOPIC_NOT_EXIST:
- case ResponseCode.SERVICE_NOT_AVAILABLE:
- case ResponseCode.SYSTEM_ERROR:
- case ResponseCode.NO_PERMISSION:
- case ResponseCode.NO_BUYER_ID:
- case ResponseCode.NOT_IN_CURRENT_UNIT:
- continue;
- // 如果有发送结果,进行返回,否则,抛出异常;
- default:
- if (sendResult != null) {
- return sendResult;
- }
- throw e;
- }
- } catch (InterruptedException e) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
- log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
- log.warn(msg.toString());
- throw e;
- }
- } else {
- break;
- }
- }
- // 返回发送结果
- if (sendResult != null) {
- return sendResult;
- }
- // 根据不同情况,抛出不同的异常
- String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst,
- msg.getTopic(), Arrays.toString(brokersSent)) + FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
- MQClientException mqClientException = new MQClientException(info, exception);
- if (exception instanceof MQBrokerException) {
- mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
- } else if (exception instanceof RemotingConnectException) {
- mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
- } else if (exception instanceof RemotingTimeoutException) {
- mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
- } else if (exception instanceof MQClientException) {
- mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
- }
- throw mqClientException;
- }
- // Namesrv找不到异常
- List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
- if (null == nsList || nsList.isEmpty()) {
- throw new MQClientException(
- "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
- }
- // 消息路由找不到异常
- throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
- null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
- }

2、关键函数获取topic的路由信息tryToFindTopicPublishInfo,因为在创建topic时,会将topic和broker的信息同步到NameSrv,所以这里可以从NameSrv获取发送消息的topic理由信息。
- private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
- // 缓存中获取 Topic发布信息,在start的时候,只是放入了一个空的topicPublicInfo
- TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
- // 当无可用的 Topic发布信息时,从Namesrv获取一次
- if (null == topicPublishInfo || !topicPublishInfo.ok()) {
- this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- topicPublishInfo = this.topicPublishInfoTable.get(topic);
- }
- // 若获取的 Topic发布信息时候可用,则返回
- if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
- return topicPublishInfo;
- } else { // 使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。用于 Topic发布信息不存在 && Broker支持自动创建Topic
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
- topicPublishInfo = this.topicPublishInfoTable.get(topic);
- return topicPublishInfo;
- }
- }

接着看更新topic的路由信息
- public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
- try {
- if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- TopicRouteData topicRouteData;
- if (isDefault && defaultMQProducer != null) {
- // 使用默认TopicKey获取TopicRouteData。
- // 当broker开启自动创建topic开关时,会使用MixAll.DEFAULT_TOPIC进行创建。
- // 当producer的createTopic为MixAll.DEFAULT_TOPIC时,则可以获得TopicRouteData。
- // 目的:用于新的topic,发送消息时,未创建路由信息,先使用createTopic的路由信息,等到发送到broker时,进行自动创建。
-
- topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);
- if (topicRouteData != null) {
- for (QueueData data : topicRouteData.getQueueDatas()) {
- int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
- data.setReadQueueNums(queueNums);
- data.setWriteQueueNums(queueNums);
- }
- }
- } else {
- topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
- }
- if (topicRouteData != null) {
- TopicRouteData old = this.topicRouteTable.get(topic);
- boolean changed = topicRouteDataIsChange(old, topicRouteData);
- if (!changed) {
- changed = this.isNeedUpdateTopicRouteInfo(topic);
- } else {
- log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
- }
- if (changed) {
- TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); // 克隆对象的原因:topicRouteData会被设置到下面的publishInfo/subscribeInfo
-
- // 更新 Broker 地址相关信息
- for (BrokerData bd : topicRouteData.getBrokerDatas()) {
- this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
- }
- // 更新发布topic的消息,这里可能涉及到创建MessageQueue
- {
- TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
- publishInfo.setHaveTopicRouterInfo(true);
- for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {
- MQProducerInner impl = entry.getValue();
- if (impl != null) {
- impl.updateTopicPublishInfo(topic, publishInfo);
- }
- }
- }
- // 这里会更新一次consumer的信息
- {
- Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
- for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
- MQConsumerInner impl = entry.getValue();
- if (impl != null) {
- impl.updateTopicSubscribeInfo(topic, subscribeInfo);
- }
- }
- }
- log.info("topicRouteTable.put TopicRouteData[{}]", cloneTopicRouteData);
- this.topicRouteTable.put(topic, cloneTopicRouteData);
- return true;
- }
- } else {
- log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
- }
- } catch (Exception e) {
- if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
- log.warn("updateTopicRouteInfoFromNameServer Exception", e);
- }
- } finally {
- this.lockNamesrv.unlock();
- }
- } else {
- log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
- }
- } catch (InterruptedException e) {
- log.warn("updateTopicRouteInfoFromNameServer Exception", e);
- }
-
- return false;
- }

3、因为一个topic可能有很多的消息队列MessageQueue,如何选取MessageQueue路由呢?这里会通过一种容错机制选取出最优的MessageQueue。默认的MQFaultStrategy实现
- // MQ 故障策略
- public class MQFaultStrategy {
- /**
- * 延迟故障容错,维护每个Broker的发送消息的延迟
- * key:brokerName
- */
- private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
- /**
- * 发送消息延迟容错开关
- */
- private boolean sendLatencyFaultEnable = false;
- /**
- * 延迟级别数组
- */
- private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
- /**
- * 不可用时长数组
- */
- private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
- public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
- if (this.sendLatencyFaultEnable) {
- try {
- // 感觉是在MessageQueue中轮流选取
- int index = tpInfo.getSendWhichQueue().getAndIncrement();
- for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
- int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
- if (pos < 0)
- pos = 0;
- MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
- if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
- if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
- return mq;
- }
- }
- // 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性
- final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
- int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
- if (writeQueueNums > 0) {
- final MessageQueue mq = tpInfo.selectOneMessageQueue();
- if (notBestBroker != null) {
- mq.setBrokerName(notBestBroker);
- mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
- }
- return mq;
- } else {
- latencyFaultTolerance.remove(notBestBroker);
- }
- } catch (Exception e) {
- log.error("Error occurred when selecting message queue", e);
- }
- // 选择一个消息队列,不考虑队列的可用性
- return tpInfo.selectOneMessageQueue();
- }
- // 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
- return tpInfo.selectOneMessageQueue(lastBrokerName);
- }
- }

4、发送消息的核心:非常主要一点,因为MessageQueue可能在不同的broker上,每一个brokerName可能有一组broker,所以这个时候会向这组broker的主broker投递消息。
- //发送消息的实际操作者
- private SendResult sendKernelImpl(final Message msg,
- final MessageQueue mq,
- final CommunicationMode communicationMode,
- final SendCallback sendCallback,
- final TopicPublishInfo topicPublishInfo,
- final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- long beginStartTime = System.currentTimeMillis();
- //获得需要发送的broker地址,主要是发送给master
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- if (null == brokerAddr) {
- //重新拉取topic信息
- tryToFindTopicPublishInfo(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- }
- SendMessageContext context = null;
- if (brokerAddr != null) {
- //是否是vip的操作,主要是端口-2的操作,然后重新拼接addr
- brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
- //发送的消息体
- byte[] prevBody = msg.getBody();
- try {
- //for MessageBatch,ID has been set in the generating process
- //是否是批量消息体
- if (!(msg instanceof MessageBatch)) {
- MessageClientIDSetter.setUniqID(msg);
- }
-
- int sysFlag = 0;
- boolean msgBodyCompressed = false;
- //压缩消息体
- if (this.tryToCompressMessage(msg)) {
- sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
- msgBodyCompressed = true;
- }
-
- //判断是否是事务消息,设置事务属性
- final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
- sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
- }
-
- //验证自定义的hook
- if (hasCheckForbiddenHook()) {
- CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
- checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
- checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
- checkForbiddenContext.setCommunicationMode(communicationMode);
- checkForbiddenContext.setBrokerAddr(brokerAddr);
- checkForbiddenContext.setMessage(msg);
- checkForbiddenContext.setMq(mq);
- checkForbiddenContext.setUnitMode(this.isUnitMode());
- this.executeCheckForbiddenHook(checkForbiddenContext);
- }
-
- //验证发送消息的hook
- if (this.hasSendMessageHook()) {
- context = new SendMessageContext();
- context.setProducer(this);
- context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
- context.setCommunicationMode(communicationMode);
- context.setBornHost(this.defaultMQProducer.getClientIP());
- context.setBrokerAddr(brokerAddr);
- context.setMessage(msg);
- context.setMq(mq);
- String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- if (isTrans != null && isTrans.equals("true")) {
- context.setMsgType(MessageType.Trans_Msg_Half);
- }
-
- if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
- context.setMsgType(MessageType.Delay_Msg);
- }
- this.executeSendMessageHookBefore(context);
- }
-
- //消息头的封装
- SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
- //消息发送组
- requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
- //消息的topic
- requestHeader.setTopic(msg.getTopic());
- //消息的默认topic
- requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
- //消息的默认queue数量
- requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
- //消息发送的queueid
- requestHeader.setQueueId(mq.getQueueId());
- //特殊标识
- requestHeader.setSysFlag(sysFlag);
- //消息创建的时间戳
- requestHeader.setBornTimestamp(System.currentTimeMillis());
- //标识
- requestHeader.setFlag(msg.getFlag());
- //消息的扩展属性
- requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
- //消息的消费
- requestHeader.setReconsumeTimes(0);
- //模型
- requestHeader.setUnitMode(this.isUnitMode());
- //批量
- requestHeader.setBatch(msg instanceof MessageBatch);
- //特殊标记的处理
- if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
- if (reconsumeTimes != null) {
- requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
- MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
- }
-
- String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
- if (maxReconsumeTimes != null) {
- requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
- MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
- }
- }
-
- SendResult sendResult = null;
- //消息发送的模型
- switch (communicationMode) {
- //异步发送
- case ASYNC:
- Message tmpMessage = msg;
- if (msgBodyCompressed) {
- //If msg body was compressed, msgbody should be reset using prevBody.
- //Clone new message using commpressed message body and recover origin massage.
- //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
- tmpMessage = MessageAccessor.cloneMessage(msg);
- msg.setBody(prevBody);
- }
- long costTimeAsync = System.currentTimeMillis() - beginStartTime;
- if (timeout < costTimeAsync) {
- throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
- }
- //参数区别主要是回调相关的配置
- sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
- brokerAddr,
- mq.getBrokerName(),
- tmpMessage,
- requestHeader,
- timeout - costTimeAsync,
- communicationMode,
- sendCallback,
- topicPublishInfo,
- this.mQClientFactory,
- this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
- context,
- this);
- break;
- //同步及单发
- case ONEWAY:
- case SYNC:
- long costTimeSync = System.currentTimeMillis() - beginStartTime;
- if (timeout < costTimeSync) {
- throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
- }
- //消息的最终发送
- sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
- brokerAddr,
- mq.getBrokerName(),
- msg,
- requestHeader,
- timeout - costTimeSync,
- communicationMode,
- context,
- this);
- break;
- default:
- assert false;
- break;
- }
-
- //是否有发送消息hook
- if (this.hasSendMessageHook()) {
- context.setSendResult(sendResult);
- this.executeSendMessageHookAfter(context);
- }
-
- return sendResult;
- } catch (RemotingException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- } catch (MQBrokerException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- } catch (InterruptedException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- } finally {
- msg.setBody(prevBody);
- }
- }
-
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
- }

6、组装netty的发送指令,开始消息的最终发送:
- //发送消息的核心操作实现
- public SendResult sendMessage(
- final String addr,//地址
- final String brokerName,//broker名称
- final Message msg,//消息
- final SendMessageRequestHeader requestHeader,//消息头
- final long timeoutMillis,//超时
- final CommunicationMode communicationMode,//发送模式
- final SendCallback sendCallback,//发送后的回调
- final TopicPublishInfo topicPublishInfo,//topic的配置
- final MQClientInstance instance,//基于client的发送实例
- final int retryTimesWhenSendFailed,//重试次数
- final SendMessageContext context,//发送消息的上下文信息
- final DefaultMQProducerImpl producer//发送消息者
- ) throws RemotingException, MQBrokerException, InterruptedException {
- long beginStartTime = System.currentTimeMillis();
- RemotingCommand request = null;
- //协议转换
- if (sendSmartMsg || msg instanceof MessageBatch) {
- SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
- request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
- } else {
- request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
- }
-
- request.setBody(msg.getBody());
-
- switch (communicationMode) {
- case ONEWAY:
- this.remotingClient.invokeOneway(addr, request, timeoutMillis);
- return null;
- case ASYNC:
- final AtomicInteger times = new AtomicInteger();
- long costTimeAsync = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTimeAsync) {
- throw new RemotingTooMuchRequestException("sendMessage call timeout");
- }
- this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, context, producer);
- return null;
- case SYNC:
- long costTimeSync = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTimeSync) {
- throw new RemotingTooMuchRequestException("sendMessage call timeout");
- }
- return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
- default:
- assert false;
- break;
- }
-
- return null;
- }
-
- // 同步调用发送方法
- private SendResult sendMessageSync(
- final String addr,
- final String brokerName,
- final Message msg,
- final long timeoutMillis,
- final RemotingCommand request
- ) throws RemotingException, MQBrokerException, InterruptedException {
- //同步发送消息
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
- assert response != null;
-
- //处理请求的相应结果
- return this.processSendResponse(brokerName, msg, response);
- }
-
- //网络通信的核心方法实现,同步
- @Override
- public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
- throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
- long beginStartTime = System.currentTimeMillis();
-
- //获得通信的channel
- final Channel channel = this.getAndCreateChannel(addr);
-
- //验证channel的有效性
- if (channel != null && channel.isActive()) {
- try {
- //前置处理
- doBeforeRpcHooks(addr, request);
-
- long costTime = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTime) {
- throw new RemotingTimeoutException("invokeSync call timeout");
- }
-
- //执行方法调用
- RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
-
- //后置处理
- doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
- return response;
- } catch (RemotingSendRequestException e) {
- log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
- this.closeChannel(addr, channel);
- throw e;
- } catch (RemotingTimeoutException e) {
- if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
- this.closeChannel(addr, channel);
- log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
- }
- log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
- throw e;
- }
- } else {
- //如果异常则删除执行的channel的操作
- this.closeChannel(addr, channel);
- throw new RemotingConnectException(addr);
- }
- }
-
- //同步操作的核心
- public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
- final long timeoutMillis)
- throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
- final int opaque = request.getOpaque();
-
- try {
- //异步调用封装,处理netty通信后的结果
- final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
-
- //结果的并发处理及通知,及后续的异常处理收集
- this.responseTable.put(opaque, responseFuture);
- final SocketAddress addr = channel.remoteAddress();
- //netty的消息发送,只负责发送
- channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture f) throws Exception {
- //操作成功后马上返回
- if (f.isSuccess()) {
- responseFuture.setSendRequestOK(true);
- return;
- } else {
- responseFuture.setSendRequestOK(false);
- }
- //同步处理后删除对应的请求对象
- responseTable.remove(opaque);
- responseFuture.setCause(f.cause());
- responseFuture.putResponse(null);
- log.warn("send a request command to channel <" + addr + "> failed.");
- }
- });
-
- //相应结果的处理,实际的操作在NettyClientHandler中实现,根据opaque来更新结果
- RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
- if (null == responseCommand) {
- if (responseFuture.isSendRequestOK()) {
- throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
- responseFuture.getCause());
- } else {
- throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
- }
- }
-
- return responseCommand;
- } finally {
- this.responseTable.remove(opaque);
- }
- }

如果消息是异步发送机制,会通过sendCallBack进行回调处理。
7、broker接收到消息发送的指令后,会做消息的持久化处理,后面在broker环节将重要介绍。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。