赞
踩
在深入讲解消息发送之前,我们可先简单概括消息的发送的主要步骤可分为:消息验证、路由查询、选择消息队列、消息组装、消息发送、消息结果处理、异常处理;(单向发送并不处理消息发送结果);同步、异步、单向发送消息的入口API有一些区别,本文将以下面接口实现类为入口分析消息发送的流程:
DefaultMQProducerImpl#sendDefaultImpl
(由于消息发送细节非常多,本文将分析核心步骤,如漏掉还请各位查漏补缺,自行分析哈)
同步发送总结流程图如下:
DefaultMQProducerImpl#sendDefaultImpl
- /**
- * 发送信息
- * @param msg 消息内容
- * @param communicationMode 发送模式
- * @param sendCallback 回掉
- * @param timeout 超时时间
- */
- private SendResult sendDefaultImpl(
- Message msg,
- final CommunicationMode communicationMode,
- final SendCallback sendCallback,
- final long timeout
- ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- this.makeSureStateOK(); //验证 serviceState == Running 运行中
- Validators.checkMessage(msg, this.defaultMQProducer); //1> 验证消息
-
- final long invokeID = random.nextLong();//随机的-invokeId
- long beginTimestampFirst = System.currentTimeMillis();//开始时间
- long beginTimestampPrev = beginTimestampFirst;
- long endTimestamp = beginTimestampFirst;
- TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // 2> 获取路由信息
- if (topicPublishInfo != null && topicPublishInfo.ok()) {
- boolean callTimeout = false;
- MessageQueue mq = null;
- Exception exception = null;
- SendResult sendResult = null;
- int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;//重试次数,同步默认3,其他1次
- int times = 0;
- String[] brokersSent = new String[timesTotal];//发送的brokerName集合
- for (; times < timesTotal; times++) {
- String lastBrokerName = null == mq ? null : mq.getBrokerName();
- MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 3>选择消息队列
- if (mqSelected != null) {
- mq = mqSelected;
- brokersSent[times] = mq.getBrokerName();
- try {
- beginTimestampPrev = System.currentTimeMillis();//本次开始时间
- long costTime = beginTimestampPrev - beginTimestampFirst;//计算发送消耗时间
- if (timeout < costTime) {//如果消耗时间 大于 超时时间,直接break
- callTimeout = true;
- break;
- }
- //发送消息
- sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); // 4>消息发送
- //发送完成时间
- endTimestamp = System.currentTimeMillis();
- //更新失败条目信息
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
- switch (communicationMode) {
- case ASYNC:
- return null;
- case ONEWAY:
- return null;
- case SYNC:
- if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
- if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
- continue;
- }
- }
-
- return sendResult;
- default:
- break;
- }
- } catch (RemotingException e) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); //5>更新失败条目
- log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
- log.warn(msg.toString());
- exception = e;
- continue;
- ...省略...
- } else { //没有找到消息队列,直接break
- break;
- }
- }
- if (sendResult != null) {
- return sendResult;
- }
- String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
- times,
- System.currentTimeMillis() - beginTimestampFirst,
- msg.getTopic(),
- Arrays.toString(brokersSent));
-
- info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
-
- MQClientException mqClientException = new MQClientException(info, exception);
- ...省略...
- throw mqClientException;
- }
- ...省略...
- }
Validators.checkMessage
- //Validate message 验证消息
- public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
- throws MQClientException {
- if (null == msg) {
- throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
- }
- Validators.checkTopic(msg.getTopic()); // 验证topic, 此处代码大家可自行查看,灰常简单
- if (null == msg.getBody()) { // body 消息体不能为空
- throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
- }
- if (0 == msg.getBody().length) {
- throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
- }
- //消息最大长度 不能大于 4M
- if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
- throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
- "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
- }
- }
备注:
主要验证消息分为两部分
tryToFindTopicPublishInfo
在'路由动态更新'我们以及分析过了,代码大家可以再回顾下,简单逻辑总结如下:
将在'系列5'着重分析此段代码功能消息
sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
由于代码篇幅太长,下面讲解只摘取sendKernelImpl方法的核心代码解析,但强烈建议仔细去撸一遍代码消息。
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- if (null == brokerAddr) {
- tryToFindTopicPublishInfo(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- }
- if(brokerAddr != null) {
- ... 省略 ...
- } else{
- 抛异常
- }
逻辑:
从brokerAddrTable获取主MasterId,获取不到则查询路由,如果继续获取不到则跑异常消息
- //MQClientInstance#findBrokerAddressInPublish(获取broker的网络地址(主-master的地址)
- public String findBrokerAddressInPublish(final String brokerName) {
- HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
- if (map != null && !map.isEmpty()) {
- return map.get(MixAll.MASTER_ID);
- }
- return null;
- }
备注:
brokerAddrTable 是路由更新维护的broker地址信息。
- int sysFlag = 0;
- boolean msgBodyCompressed = false;//压缩标记
- if (this.tryToCompressMessage(msg)) {//尝试压缩
- sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
- msgBodyCompressed = true;
- }
- // 压缩
- private boolean tryToCompressMessage(final Message msg) {
- if (msg instanceof MessageBatch) {
- //batch dose not support compressing right now
- return false;
- }
- byte[] body = msg.getBody();
- if (body != null) {
- if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
- try {
- byte[] data = UtilAll.compress(body, zipCompressLevel);
- if (data != null) {
- msg.setBody(data);
- return true;
- }
- } catch (IOException e) {
- log.error("tryToCompressMessage exception", e);
- log.warn(msg.toString());
- }
- }
- }
- return false;
- }
备注:
- /** 构建消息发送 请求包 。主要包含如下重要信息:生产者组、主题名称、默认创建主题Key、该主题在单个Broker默认队列数 、队列ID (队列序号)、消息系统标记 ( MessageSysFlag)、
- 消息发送时间、消息标记(RocketMQ对消息中的 flag不做任何处理, 供应用程序使用)、 消息扩展属性、消息重试次数、是否是批量消息等。
- */
- SendMessageRequestHeader requestHeader = newSendMessageRequestHeader();
- requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());//生产者组
- requestHeader.setTopic(msg.getTopic());//主题名称
- requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());//默认创建主题Key
- requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());//该主题在单个Broker默认队列数
- requestHeader.setQueueId(mq.getQueueId());//队列ID (队列序号)
- requestHeader.setSysFlag(sysFlag);//消息系统标记 ( MessageSysFlag)
- requestHeader.setBornTimestamp(System.currentTimeMillis());//消息发送时间
- requestHeader.setFlag(msg.getFlag());//消息标记(RocketMQ对消息中的 flag不做任何处理, 供应用程序使用)
- requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));//【重要】消息扩展属性
- requestHeader.setReconsumeTimes(0);//消息重试次数
- requestHeader.setUnitMode(this.isUnitMode());
- requestHeader.setBatch(msg instanceofMessageBatch);//是否是批量消息等
- if(requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {//主题 topic 包含:RETRY
- String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
- if (reconsumeTimes != null) {
- requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
- MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
- }
- String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
- if (maxReconsumeTimes != null) {
- requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
- MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
- }
- }
- public SendResult sendMessage(
- final String addr,
- final String brokerName,
- final Message msg,
- final SendMessageRequestHeader requestHeader,
- final long timeoutMillis,
- final CommunicationMode communicationMode,
- final SendCallback sendCallback,
- final TopicPublishInfo topicPublishInfo,
- final MQClientInstance instance,
- final int retryTimesWhenSendFailed,
- final SendMessageContext context,
- final DefaultMQProducerImpl producer
- ) throws RemotingException, MQBrokerException, InterruptedException {
- long beginStartTime = System.currentTimeMillis();
- RemotingCommand request = null;
- if (sendSmartMsg || msg instanceof MessageBatch) {
- //默认smartMsg(智能) 或者 批量消息
- SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
- request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
- } else {
- request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
- }
- request.setBody(msg.getBody());//设置消息内容
-
- switch (communicationMode) {
- case ONEWAY://单向
- this.remotingClient.invokeOneway(addr, request, timeoutMillis);
- return null;
- case ASYNC://异步
- final AtomicInteger times = new AtomicInteger();
- long costTimeAsync = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTimeAsync) {
- throw new RemotingTooMuchRequestException("sendMessage call timeout");
- }
- this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, context, producer);
- return null;
- case SYNC://同步
- long costTimeSync = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTimeSync) { //超时判断
- throw new RemotingTooMuchRequestException("sendMessage call timeout");
- }
- return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
- default:
- assert false;
- break;
- }
- return null;
- }
分析:
从此处可知道,单向/异步/同步发送的实际差别了。单向发送直接返回null,同步需要等待返回结果,异步返回null但sendCallback会异步处理发送结果。牛逼的你一定会去研究 invokeOneway、sendMessageAsync、sendMessageSync 三个方法的的源码,其实很简单。
其实发送流程涉及代码很多,这边没有一一分析,比如落下的一些可扩展的钩子函数,netty网络处理,最关键的是异常处理等,建议仔细研究哈。
程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例。
作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。