java org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message) org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message) org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message, long) org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
java //发送消息 public SendResult send(Message msg) { //this.defaultMQProducer.getSendMsgTimeout(),发送超时时间默认3秒 return send(msg, this.defaultMQProducer.getSendMsgTimeout()); }
java //发送消息,默认超时时间为3s public SendResult send(Message msg,long timeout){ //发送消息的模式是CommunicationMode.SYNC,即同步 return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }
```java private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); //1.校验消息 Validators.checkMessage(msg, this.defaultMQProducer);
- final long invokeID = random.nextLong();
- //第一次发送消息的时间
- long beginTimestampFirst = System.currentTimeMillis();
- //上一次发送消息的时间
- //第一次发送消息的时间-上一次发送消息的时间等于发送消息的花费时间
- long beginTimestampPrev = beginTimestampFirst;
- long endTimestamp = beginTimestampFirst;
- //2.根据topic查找路由
- //从本地的topicPublishInfoTable查找TopicPublishInfo。
- //(TopicPublishInfo为空)或者(messageQueueList为empty或者null)
- //那么从nameServer拉取。
- //TopicPublishInfo主要包括messageQueueList,全局的sendWhichQueue。
- //判断是否发生改变 若发生改变需要更新原来的路由信息
- TopicPublishInfo topicPublishInfo=this.tryToFindTopicPublishInfo(msg.getTopic());
- //如果路由信息不为空 && 路由信息ok指的是topic对应的队列不为空
- if (topicPublishInfo != null && topicPublishInfo.ok()) {
- boolean callTimeout = false;
- MessageQueue mq = null;
- Exception exception = null;
- SendResult sendResult = null;
- //private int retryTimesWhenSendFailed = 2;
- //如果是同步模式 那么timesTotal是 1+2 =3,即会重试2次
- //如果是异步发送 那么timesTotal是 1,既不会重试
- //发送超时即花费时间超过超时时间不会再去重试,而是直接抛出异常。
- //重试是在一个for循环里立刻去重试
- //总次数
- int timesTotal =
- communicationMode == CommunicationMode.SYNC ?
- 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
- //已使用次数
- int times = 0;
- String[] brokersSent = new String[timesTotal];
- //for循环开始
- for (; times < timesTotal; times++) {
- //lastBrokerName是故障的brokerNmae,初始值是null
- //只有发送失败了才会再次进入循环
- //等第二次进入时lastBrokerName是上一次发送失败的brokerNmae
- String lastBrokerName = null == mq ? null : mq.getBrokerName();
- // 3.选择队列
- // 这里的lastBrokerName第一次是null
- // 第二次lastBrokerName是上一次发送失败的brokerNmae
- MessageQueue mqSelected =
- this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
- if (mqSelected != null) {
- mq = mqSelected;
- //保存每次发送时的BrokerName到对应的数组下标
- brokersSent[times] = mq.getBrokerName();
- try {
- beginTimestampPrev = System.currentTimeMillis();
- if (times > 0) {
- //Reset topic with namespace during resend.
- msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
- }
- //第一次发送消息的时间-上一次发送消息的时间等于发送消息的花费时间
- long costTime = beginTimestampPrev - beginTimestampFirst;
- //如果总的花费时间超过3秒那么认为超时 超时不在重试跳出循环然后直接报错
- //private int sendMsgTimeout = 3000;
- if (timeout < costTime) {
- callTimeout = true;
- break;
- }
- //核心代码:发送消息
- sendResult = this.sendKernelImpl(
- msg,
- mq,
- communicationMode,
- sendCallback,
- topicPublishInfo,
- timeout - costTime);
- endTimestamp = System.currentTimeMillis();
- //更新不可用的broker
- this.updateFaultItem
- (mq.getBrokerName(),
- endTimestamp - beginTimestampPrev, false);
- switch (communicationMode) {
- case ASYNC:
- return null;
- case ONEWAY:
- return null;
- case SYNC:
- //同步模式返回
- if (sendResult.getSendStatus()!= SendStatus.SEND_OK) {
- //isRetryAnotherBrokerWhenNotStoreOK默认是true
- //代表当消息发送失败尝试发送到其他broker
- if (this.defaultMQProducer
- .isRetryAnotherBrokerWhenNotStoreOK()) {
- continue;
- }
- }
- return sendResult;
- default:
- break;
- }
- } catch (RemotingException e) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem
- (mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- exception = e;
- continue;
- } catch (MQClientException e) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem
- (mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- exception = e;
- continue;
- } catch (MQBrokerException e) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem
- (mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- exception = e;
- switch (e.getResponseCode()) {
- case ResponseCode.TOPIC_NOT_EXIST:
- case ResponseCode.SERVICE_NOT_AVAILABLE:
- case ResponseCode.SYSTEM_ERROR:
- case ResponseCode.NO_PERMISSION:
- case ResponseCode.NO_BUYER_ID:
- case ResponseCode.NOT_IN_CURRENT_UNIT:
- continue;
- default:
- if (sendResult != null) {
- return sendResult;
- }
- throw e;
- }
- } catch (InterruptedException e) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem
- (mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
- throw e;
- }
- } else {
- break;
- }
- }
- //for循环结束
- //返回发送结果
- if (sendResult != null) {
- return sendResult;
- }
- //如果发送结果为空
- String info = String.format
- ("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
- times,
- System.currentTimeMillis() - beginTimestampFirst,
- msg.getTopic(),
- Arrays.toString(brokersSent));
- info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
- MQClientException mqClientException = new MQClientException(info, exception);
- //如果是发送超时直接报错
- if (callTimeout) {
- throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
- }
- if (exception instanceof MQBrokerException) {
- mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
- } else if (exception instanceof RemotingConnectException) {
- mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
- } else if (exception instanceof RemotingTimeoutException) {
- mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
- } else if (exception instanceof MQClientException) {
- mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
- }
- throw mqClientException;
- //if (topicPublishInfo != null && topicPublishInfo.ok()) 判断结束
- }
- //路由信息为空 或者 路由信息不ok即topic对应的队列为空
- //获取nameserverList
- List<String> nsList=this.getmQClientFactory()
- .getMQClientAPIImpl()
- .getNameServerAddressList();
- //如果nameserverList为空 抛出异常 没有nameserver的地址,请配置
- if (null == nsList || nsList.isEmpty()) {
- throw new MQClientException(
- "No name server address, please set it.");
- }
- //直接抛出没有topic对应的路由信息的异常
- throw new MQClientException("No route info of this topic, ");
- }
```java public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { //判断是否为空 if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // 校验Topic Validators.checkTopic(msg.getTopic());
- // 校验消息体
- if (null == msg.getBody()) {
- throw new
- MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
- }
- if (0 == msg.getBody().length) {
- throw new
- MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the messagelength is zero");
- }
- //默认是4M 4194304=1024 * 1024 * 4
- if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
- throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
- "the message body size over max value, MAX: ");
- }
- }
- public static void checkTopic(String topic) throws MQClientException {
- if (UtilAll.isBlank(topic)) {
- throw new MQClientException("The specified topic is blank", null);
- }
- if (!regularExpressionMatcher(topic, PATTERN)) {
- throw new MQClientException
- ("The specified topic[%s] contains illegal characters, allowing only");
- }
- if (topic.length() > CHARACTER_MAX_LENGTH) {
- throw new MQClientException
- ("The specified topic is longer than topic max length 255.");
- }
- //whether the same with system reserved keyword
- if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
- throw new MQClientException
- ("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.");
- }
- }
```java private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { //从缓存中获得主题的路由信息 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); //缓存的路由信息为空,则从NameServer获取路由 if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); }
- if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
- return topicPublishInfo;
- } else {
- //MQClientInstance#startScheduledTask
- //调用updateTopicRouteInfoFromNameServer起来一个定时器 30秒执行一次
- //如果未找到当前主题的路由信息,则用默认主题继续查找
- //#进入下一个方法
- this.mQClientFactory.updateTopicRouteInfoFromNameServer
- (topic, true, this.defaultMQProducer);
- topicPublishInfo = this.topicPublishInfoTable.get(topic);
- return topicPublishInfo;
- }
- }
java public class TopicPublishInfo { private boolean orderTopic = false; //是否是顺序消息 private boolean haveTopicRouterInfo = false; //该主题对应的所有的消息队列 private List
messageQueueList = new ArrayList
//每选择一次消息队列,该值+1 默认值是一个随机数 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); //关联Topic路由元信息 private TopicRouteData topicRouteData; }
java public class TopicRouteData extends RemotingSerializable { private String orderTopicConf; private List
queueDatas; private List
brokerDatas; private HashMap
/* Filter Server */> filterServerTable;
- public TopicRouteData cloneTopicRouteData() {
- TopicRouteData topicRouteData = new TopicRouteData();
- topicRouteData.setQueueDatas(new ArrayList<QueueData>());
- topicRouteData.setBrokerDatas(new ArrayList<BrokerData>());
- topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
- topicRouteData.setOrderTopicConf(this.orderTopicConf);
- if (this.queueDatas != null) {
- topicRouteData.getQueueDatas().addAll(this.queueDatas);
- }
- if (this.brokerDatas != null) {
- topicRouteData.getBrokerDatas().addAll(this.brokerDatas);
- }
- if (this.filterServerTable != null) {
- topicRouteData.getFilterServerTable().putAll(this.filterServerTable);
- }
- return topicRouteData;
- }
```java //使用默认主题从NameServer获取路由信息 返回的是TopicRouteData if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer (defaultMQProducer.getCreateTopicKey(),1000 * 3);
- if (topicRouteData != null) {
- for (QueueData data : topicRouteData.getQueueDatas()) {
- //defaultMQProducer.getDefaultTopicQueueNums() = 4
- int queueNums
- = Math.min(defaultMQProducer.getDefaultTopicQueueNums(),
- data.getReadQueueNums());
- data.setReadQueueNums(queueNums);
- data.setWriteQueueNums(queueNums);
- }
- }
- } else {
- //使用指定主题从NameServer获取路由信息
- topicRouteData =
- this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
- }
- //判断路由是否发生变化 如果发生变化 那么需要更改路由信息
- TopicRouteData old = this.topicRouteTable.get(topic);
- boolean changed = topicRouteDataIsChange(old, topicRouteData);
- if (!changed) {
- changed = this.isNeedUpdateTopicRouteInfo(topic);
- } else {
- log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
- }
- if (changed) {
- //将topicRouteData转换为发布队列
- TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
- publishInfo.setHaveTopicRouterInfo(true);
- //遍历生产
- Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, MQProducerInner> entry = it.next();
- MQProducerInner impl = entry.getValue();
- if (impl != null) {
- //生产者不为空时,更新publishInfo信息
- impl.updateTopicPublishInfo(topic, publishInfo);
- }
- }
- }
```java public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) { //创建TopicPublishInfo对象 TopicPublishInfo info = new TopicPublishInfo(); //关联topicRoute info.setTopicRouteData(route); //顺序消息,更新TopicPublishInfo if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) { String[] brokers = route.getOrderTopicConf().split(";"); for (String broker : brokers) { String[] item = broker.split(":"); int nums = Integer.parseInt(item[1]); for (int i = 0; i < nums; i++) { MessageQueue mq = new MessageQueue(topic, item[0], i); info.getMessageQueueList().add(mq); } }
- info.setOrderTopic(true);
- } else {
- //非顺序消息更新TopicPublishInfo
- List<QueueData> qds = route.getQueueDatas();
- Collections.sort(qds);
- //遍历topic队列信息
- for (QueueData qd : qds) {
- //是否是写队列
- if (PermName.isWriteable(qd.getPerm())) {
- BrokerData brokerData = null;
- //遍历写队列Broker
- for (BrokerData bd : route.getBrokerDatas()) {
- //根据名称获得读队列对应的Broker
- if (bd.getBrokerName().equals(qd.getBrokerName())) {
- brokerData = bd;
- break;
- }
- }
- if (null == brokerData) {
- continue;
- }
- if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
- continue;
- }
- //封装TopicPublishInfo写队列
- for (int i = 0; i < qd.getWriteQueueNums(); i++) {
- MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
- info.getMessageQueueList().add(mq);
- }
- }
- }
- info.setOrderTopic(false);
- }
- //返回TopicPublishInfo对象
- return info;
- }
java public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); }
MQFaultStrategy#selectOneMessageQueue ```java public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //Broker故障延迟机制 //默认不启用Broker故障延迟机制 if (this.sendLatencyFaultEnable) { try { //累加 int index = tpInfo.getSendWhichQueue().getAndIncrement(); //循环遍历规避不可用broker for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //从不可用brokerMap中根据brokerName获取value //如果isAvailable是true 说明是可用的broker
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { //如果lastBrokerName 说明是第一次进入该方法 if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } }
- //如果遍历完所有的broker发现都是出过故障的
- //只能尝试从规避的Broker中选择一个可用的Broker
- //具体策略是从存放不可用的broker的Map中的元素放入List
- //shuffle以后再sort 再对size 取半 记为half
- // 如果 half 小于0 取0
- // 否则 tmpList.get(whichItemWorst.getAndIncrement() % half).getName()
- final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
- int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
- if (writeQueueNums > 0) {
- final MessageQueue mq = tpInfo.selectOneMessageQueue();
- if (notBestBroker != null) {
- //此处设置了name selectOneMessageQueue没设置name
- //个人猜测设置了name 是根据name找到对应的queueList 再根据下标获取
- //没设置name 直接从全局的messageQueueList根据下标获取
- mq.setBrokerName(notBestBroker);
- //对该broker上的queueSize 取模选择队列
- mq.setQueueId
- (tpInfo.getSendWhichQueue().getAndIncrement()%writeQueueNums);
- }
- return mq;
- } else {
- //写队列小于0 意味着没必要参与发送消息 所以从故障的brokerMap中移除notBestBroker
- latencyFaultTolerance.remove(notBestBroker);
- }
- } catch (Exception e) {
- log.error("Error occurred when selecting message queue", e);
- }
- return tpInfo.selectOneMessageQueue();
- }
- //进入TopicPublishInfo#selectOneMessageQueue(lastBrokerName)
- return tpInfo.selectOneMessageQueue(lastBrokerName);
- }
- // 验证该broker是否可用 ,维护了一个存放不可用的broker的Map faultItemTable
- //如果map里有这个broker 那么就是不可用 返回false 否则返回true
- @Override
- public boolean isAvailable(final String name) {
- final FaultItem faultItem = this.faultItemTable.get(name);
- if (faultItem != null) {
- return faultItem.isAvailable();
- }
- return true;
- }
- //存放不可用的broker的Map 根据brokerName获取
- public boolean isAvailable(final String name) {
- final FaultItem faultItem = this.faultItemTable.get(name);
- if (faultItem != null) {
- return faultItem.isAvailable();
- }
- return true;
- }
- //old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
- //因为startTimestamp 等于 update时的当前系统时间 + 规避时长
- //所以这里直接用当前的系统时间和startTimestamp 比较
- public boolean isAvailable() {
- return (System.currentTimeMillis() - startTimestamp) >= 0;
- }
java public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //第一次选择队列 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //sendWhichQueue int index = this.sendWhichQueue.getAndIncrement(); //遍历消息队列集合 for (int i = 0; i < this.messageQueueList.size(); i++) { //sendWhichQueue自增后 对所有的queueSize取模 int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; //尽量规避上次已经发送过的Broker队列 MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //如果以上情况都不满足,返回sendWhichQueue取模后的队列 return selectOneMessageQueue(); } }
代码:TopicPublishInfo#selectOneMessageQueue() java //第一次选择队列 public MessageQueue selectOneMessageQueue() { //sendWhichQueue自增 int index = this.sendWhichQueue.getAndIncrement(); //对队列大小取模 int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; //返回对应的队列 return this.messageQueueList.get(pos); }
java public interface LatencyFaultTolerance<T> { //更新失败条目 void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration); //判断Broker是否可用 boolean isAvailable(final T name); //移除Fault条目 void remove(final T name); //尝试从规避的Broker中选择一个可用的Broker T pickOneAtLeast(); }
* FaultItem:失败条目
java class FaultItem implements Comparable<FaultItem> { //条目唯一键,这里为brokerName private final String name; //本次消息发送延迟 private volatile long currentLatency; //故障规避开始时间 private volatile long startTimestamp; }
* 消息失败策略
java public class MQFaultStrategy { //根据currentLatency本地消息发送延迟,从latencyMax尾部向前找到第一个比currentLatency小的索引,如果没有找到,返回0 private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; //根据这个索引从notAvailableDuration取出对应的时间,在该时长内,Broker设置为不可用 private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; }
代码:DefaultMQProducerImpl#sendDefaultImpl java sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
java public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { //参数一:broker名称 //参数二:本次消息发送延迟时间 //参数三:是否隔离 this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); }
代码:MQFaultStrategy#updateFaultItem ```java public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { //计算broker规避的时长 long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); //更新该FaultItem规避时长 this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } }
- private long computeNotAvailableDuration(final long currentLatency) {
- //遍历latencyMax
- for (int i = latencyMax.length - 1; i >= 0; i--) {
- //找到第一个比currentLatency的latencyMax值
- if (currentLatency >= latencyMax[i])
- return this.notAvailableDuration[i];
- }
- //没有找到则返回0
- return 0;
- }
java public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { //获得原FaultItem FaultItem old = this.faultItemTable.get(name); //为空新建faultItem对象,设置规避时长和开始时间 if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
- old = this.faultItemTable.putIfAbsent(name, faultItem);
- if (old != null) {
- old.setCurrentLatency(currentLatency);
- old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
- }
- } else {
- //更新规避时长和开始时间
- old.setCurrentLatency(currentLatency);
- old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
- }
- }
java org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message) org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message) org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message, long) org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
```java private SendResult sendKernelImpl( final Message msg, //待发送消息 final MessageQueue mq, //消息发送队列 final CommunicationMode communicationMode, //消息发送模式 final SendCallback sendCallback, //异步消息回调函数 final TopicPublishInfo topicPublishInfo, //主题路由信息 final long timeout //超时时间 ){ throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); //获得broker网络地址信息 //首先从缓存的map中找 String brokerAddr = this.mQClientFactory .findBrokerAddressInPublish(mq.getBrokerName()); //没有找到从NameServer更新broker网络地址信息放入缓存map if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); //从缓存的map中找 brokerAddr = this.mQClientFactory .findBrokerAddressInPublish(mq.getBrokerName()); }
- SendMessageContext context = null;
- if (brokerAddr != null) {
- brokerAddr = MixAll
- .brokerVIPChannel(this.defaultMQProducer
- .isSendMessageWithVIPChannel(), brokerAddr);
- byte[] prevBody = msg.getBody();
- try {
- //判断不是批量发送消息 为消息分配唯一ID
- if (!(msg instanceof MessageBatch)) {
- MessageClientIDSetter.setUniqID(msg);
- }
- boolean topicWithNamespace = false;
- if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
- msg.setInstanceId(this.mQClientFactory.getClientConfig()
- .getNamespace());
- topicWithNamespace = true;
- }
- //消息大小超过4K,启用消息压缩
- int sysFlag = 0;
- boolean msgBodyCompressed = false;
- if (this.tryToCompressMessage(msg)) {
- sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
- msgBodyCompressed = true;
- }
- //如果是事务消息,设置消息标记MessageSysFlag.TRANSACTION_PREPARED_TYPE
- final String tranMsg = msg
- if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
- sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
- }
- //如果注册了消息发送钩子函数,在执行消息发送前的增强逻辑
- if (hasCheckForbiddenHook()) {
- CheckForbiddenContext
- checkForbiddenContext = new CheckForbiddenContext();
- checkForbiddenContext
- .setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
- checkForbiddenContext
- .setGroup(this.defaultMQProducer.getProducerGroup());
- checkForbiddenContext.
- setCommunicationMode(communicationMode);
- checkForbiddenContext.setBrokerAddr(brokerAddr);
- checkForbiddenContext.setMessage(msg);
- checkForbiddenContext.setMq(mq);
- checkForbiddenContext.setUnitMode(this.isUnitMode());
- //可以通过 registerCheckForbiddenHook 注册 CheckForbiddenHook
- this.executeCheckForbiddenHook(checkForbiddenContext);
- }
- if (this.hasSendMessageHook()) {
- context = new SendMessageContext();
- context.setProducer(this);
- context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
- context.setCommunicationMode(communicationMode);
- context.setBornHost(this.defaultMQProducer.getClientIP());
- context.setBrokerAddr(brokerAddr);
- context.setMessage(msg);
- context.setMq(mq);
- context.setNamespace(this.defaultMQProducer.getNamespace());
- String isTrans = msg.
- if (isTrans != null && isTrans.equals("true")) {
- context.setMsgType(MessageType.Trans_Msg_Half);
- }
- if (msg.getProperty("__STARTDELIVERTIME") != null
- || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL)
- != null) {
- context.setMsgType(MessageType.Delay_Msg);
- }
- //可以通过 registerSendMessageHook(final SendMessageHook hook) 注册hook
- //执行钩子前置函数
- this.executeSendMessageHookBefore(context);
- }
- //构建消息发送请求包
- SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
- //生产者组
- requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
- //主题
- requestHeader.setTopic(msg.getTopic());
- //默认创建主题Key
- requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
- //该主题在单个Broker默认队列树
- requestHeader.setDefaultTopicQueueNums
- (this.defaultMQProducer.getDefaultTopicQueueNums());
- //队列ID
- requestHeader.setQueueId(mq.getQueueId());
- //消息系统标记
- requestHeader.setSysFlag(sysFlag);
- //消息发送时间
- requestHeader.setBornTimestamp(System.currentTimeMillis());
- //消息标记
- requestHeader.setFlag(msg.getFlag());
- //消息扩展信息
- requestHeader.setProperties
- (MessageDecoder.messageProperties2String(msg.getProperties()));
- //消息重试次数
- requestHeader.setReconsumeTimes(0);
- requestHeader.setUnitMode(this.isUnitMode());
- //是否是批量消息等
- requestHeader.setBatch(msg instanceof MessageBatch);
- //如果topic以%RETRY% 开头
- if (requestHeader.getTopic()
- .startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)){
- String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
- if (reconsumeTimes != null) {
- requestHeader.
- setReconsumeTimes(Integer.valueOf(reconsumeTimes));
- MessageAccessor.
- clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
- }
- String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
- if (maxReconsumeTimes != null) {
- requestHeader
- .setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
- MessageAccessor.clearProperty
- }
- }
- SendResult sendResult = null;
- switch (communicationMode) {
- //异步发送消息
- case ASYNC:
- Message tmpMessage = msg;
- boolean messageCloned = false;
- //如果消息被压缩了
- if (msgBodyCompressed) {
- tmpMessage = MessageAccessor.cloneMessage(msg);
- messageCloned = true;
- msg.setBody(prevBody);
- }
- if (topicWithNamespace) {
- if (!messageCloned) {
- tmpMessage = MessageAccessor.cloneMessage(msg);
- messageCloned = true;
- }
- msg.setTopic(
- NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
- }
- long costTimeAsync = System.currentTimeMillis() - beginStartTime;
- if (timeout < costTimeAsync) {
- throw new RemotingTooMuchRequestException ("sendKernelImpl call timeout");
- }
- //发送消息
- sendResult = this.mQClientFactory.getMQClientAPIImpl()
- .sendMessage(
- brokerAddr,
- mq.getBrokerName(),
- tmpMessage,
- requestHeader,
- timeout - costTimeAsync,
- communicationMode,
- sendCallback,
- topicPublishInfo,
- this.mQClientFactory,
- this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
- context,
- this);
- break;
- case ONEWAY:
- case SYNC:
- //同步发送消息
- long costTimeSync = System.currentTimeMillis() - beginStartTime;
- if (timeout < costTimeSync) {
- throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
- }
- sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
- brokerAddr,
- mq.getBrokerName(),
- msg,
- requestHeader,
- timeout - costTimeSync,
- communicationMode,
- context,
- this);
- break;
- default:
- assert false;
- break;
- }
- //可以通过 registerSendMessageHook(final SendMessageHook hook) 注册hook
- //执行后置hook函数
- if (this.hasSendMessageHook()) {
- context.setSendResult(sendResult);
- this.executeSendMessageHookAfter(context);
- }
- return sendResult;
- } catch (RemotingException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- } catch (MQBrokerException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- } catch (InterruptedException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- } finally {
- msg.setBody(prevBody);
- msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(),
- this.defaultMQProducer.getNamespace()));
- }
- }
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
- }
- public MQClientInstance getmQClientFactory() {
- return mQClientFactory;
- }
- //单条消息大于4K需要压缩
- //private int compressMsgBodyOverHowmuch = 1024 * 4;
- private boolean tryToCompressMessage(final Message msg) {
- if (msg instanceof MessageBatch) {
- //batch dose not support compressing right now
- return false;
- }
- byte[] body = msg.getBody();
- if (body != null) {
- //
- if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
- try {
- byte[] data = UtilAll.compress(body, zipCompressLevel);
- if (data != null) {
- msg.setBody(data);
- return true;
- }
- } catch (IOException e) {
- log.error("tryToCompressMessage exception", e);
- log.warn(msg.toString());
- }
- }
- }
- return false;
- }
java //在producer中可以通过 registerSendMessageHook(final SendMessageHook hook) 注册hook public interface SendMessageHook { String hookName(); void sendMessageBefore(final SendMessageContext context); void sendMessageAfter(final SendMessageContext context); }
java public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { //压缩消息集合成一条消息,然后发送出去 return this.defaultMQProducerImpl.send(batch(msgs)); }
java private MessageBatch batch(Collection<Message> msgs) throws MQClientException { MessageBatch msgBatch; try { //将集合消息封装到MessageBatch msgBatch = MessageBatch.generateFromList(msgs); //遍历消息集合,检查消息合法性,设置消息ID,设置Topic for (Message message : msgBatch) { Validators.checkMessage(message, this); MessageClientIDSetter.setUniqID(message); message.setTopic(withNamespace(message.getTopic())); } //压缩消息,设置消息body msgBatch.setBody(msgBatch.encode()); } catch (Exception e) { throw new MQClientException("Failed to initiate the MessageBatch", e); } //设置msgBatch的topic msgBatch.setTopic(withNamespace(msgBatch.getTopic())); return msgBatch; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。