- public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException,
- InterruptedException {
- send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
- }
- public void send(Message msg, SendCallback sendCallback, long timeout) throws MQClientException,
- RemotingException, InterruptedException {
- try {
- this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
- }
- catch (MQBrokerException e) {
- throw new MQClientException("unknown exception", e);
- }
- }
- private SendResult sendDefaultImpl(//
- Message msg,//
- final CommunicationMode communicationMode,//
- final SendCallback sendCallback, final long timeout//
- ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
- final long maxTimeout = this.defaultMQProducer.getSendMsgTimeout() + 1000;
- final long beginTimestamp = System.currentTimeMillis();
- long endTimestamp = beginTimestamp;
- TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
- if (topicPublishInfo != null && topicPublishInfo.ok()) {
- MessageQueue mq = null;
- Exception exception = null;
- SendResult sendResult = null;
- int timesTotal = 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed();
- int times = 0;
- String[] brokersSent = new String[timesTotal];
- for (; times < timesTotal && (endTimestamp - beginTimestamp) < maxTimeout; times++) {
- String lastBrokerName = null == mq ? null : mq.getBrokerName();
- MessageQueue tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
- if (tmpmq != null) {
- mq = tmpmq;
- brokersSent[times] = mq.getBrokerName();
- try {
- sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
- endTimestamp = System.currentTimeMillis();
- switch (communicationMode) {
- case ASYNC:
- return null;
- case ONEWAY:
- return null;
- case SYNC:
- if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
- if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
- continue;
- }
- }
- return sendResult;
- default:
- break;
- }
- }
- catch (RemotingException e) {
- log.warn("sendKernelImpl exception", e);
- log.warn(msg.toString());
- exception = e;
- endTimestamp = System.currentTimeMillis();
- continue;
- }
- catch (MQClientException e) {
- log.warn("sendKernelImpl exception", e);
- log.warn(msg.toString());
- exception = e;
- endTimestamp = System.currentTimeMillis();
- continue;
- }
- catch (MQBrokerException e) {
- log.warn("sendKernelImpl exception", e);
- log.warn(msg.toString());
- exception = e;
- endTimestamp = System.currentTimeMillis();
- switch (e.getResponseCode()) {
- case ResponseCode.TOPIC_NOT_EXIST:
- case ResponseCode.SERVICE_NOT_AVAILABLE:
- case ResponseCode.SYSTEM_ERROR:
- case ResponseCode.NO_PERMISSION:
- case ResponseCode.NO_BUYER_ID:
- case ResponseCode.NOT_IN_CURRENT_UNIT:
- continue;
- default:
- if (sendResult != null) {
- return sendResult;
- }
- throw e;
- }
- }
- catch (InterruptedException e) {
- log.warn("sendKernelImpl exception", e);
- log.warn(msg.toString());
- throw e;
- }
- }
- else {
- break;
- }
- } // end of for
- if (sendResult != null) {
- return sendResult;
- }
- String info =
- String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", //
- times, //
- (System.currentTimeMillis() - beginTimestamp), //
- msg.getTopic(),//
- Arrays.toString(brokersSent));
- info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
- throw new MQClientException(info, exception);
- }
- List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
- if (null == nsList || nsList.isEmpty()) {
- throw new MQClientException("No name server address, please set it."
- + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null);
- }
- throw new MQClientException("No route info of this topic, " + msg.getTopic()
- + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null);
- }
- public enum CommunicationMode {
- }
- private void makeSureStateOK() throws MQClientException {
- if (this.serviceState != ServiceState.RUNNING) {
- throw new MQClientException("The producer service state not OK, "//
- + this.serviceState//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
- }
- }
- public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
- throws MQClientException {
- if (null == msg) {
- throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
- }
- // topic
- Validators.checkTopic(msg.getTopic());
- // body
- if (null == msg.getBody()) {
- throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
- }
- if (0 == msg.getBody().length) {
- throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
- }
- if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
- throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
- "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
- }
- }
- private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
- TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
- if (null == topicPublishInfo || !topicPublishInfo.ok()) {
- this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- topicPublishInfo = this.topicPublishInfoTable.get(topic);
- }
- if (topicPublishInfo.isHaveTopicRouterInfo() || (topicPublishInfo != null && topicPublishInfo.ok())) {
- return topicPublishInfo;
- }
- else {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
- topicPublishInfo = this.topicPublishInfoTable.get(topic);
- return topicPublishInfo;
- }
- }
- public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
- if (lastBrokerName != null) {
- int index = this.sendWhichQueue.getAndIncrement();
- for (int i = 0; i < this.messageQueueList.size(); i++) {
- int pos = Math.abs(index++) % this.messageQueueList.size();
- MessageQueue mq = this.messageQueueList.get(pos);
- if (!mq.getBrokerName().equals(lastBrokerName)) {
- return mq;
- }
- }
- return null;
- }
- else {
- int index = this.sendWhichQueue.getAndIncrement();
- int pos = Math.abs(index) % this.messageQueueList.size();
- return this.messageQueueList.get(pos);
- }
- }
- private SendResult sendKernelImpl(final Message msg,//
- final MessageQueue mq,//
- final CommunicationMode communicationMode,//
- final SendCallback sendCallback,//
- final long timeout) throws MQClientException, RemotingException, MQBrokerException,
- InterruptedException {
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- if (null == brokerAddr) {
- tryToFindTopicPublishInfo(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- }
- SendMessageContext context = null;
- if (brokerAddr != null) {
- if(this.defaultMQProducer.isSendMessageWithVIPChannel()) {
- brokerAddr = MixAll.brokerVIPChannel(brokerAddr);
- }
- byte[] prevBody = msg.getBody();
- try {
- int sysFlag = 0;
- if (this.tryToCompressMessage(msg)) {
- sysFlag |= MessageSysFlag.CompressedFlag;
- }
- final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
- sysFlag |= MessageSysFlag.TransactionPreparedType;
- }
- if (hasCheckForbiddenHook()) {
- CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
- checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
- checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
- checkForbiddenContext.setCommunicationMode(communicationMode);
- checkForbiddenContext.setBrokerAddr(brokerAddr);
- checkForbiddenContext.setMessage(msg);
- checkForbiddenContext.setMq(mq);
- checkForbiddenContext.setUnitMode(this.isUnitMode());
- this.executeCheckForbiddenHook(checkForbiddenContext);
- }
- if (this.hasSendMessageHook()) {
- context = new SendMessageContext();
- context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
- context.setCommunicationMode(communicationMode);
- context.setBornHost(this.defaultMQProducer.getClientIP());
- context.setBrokerAddr(brokerAddr);
- context.setMessage(msg);
- context.setMq(mq);
- this.executeSendMessageHookBefore(context);
- }
- SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
- requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
- requestHeader.setTopic(msg.getTopic());
- requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
- requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
- requestHeader.setQueueId(mq.getQueueId());
- requestHeader.setSysFlag(sysFlag);
- requestHeader.setBornTimestamp(System.currentTimeMillis());
- requestHeader.setFlag(msg.getFlag());
- requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
- requestHeader.setReconsumeTimes(0);
- requestHeader.setUnitMode(this.isUnitMode());
- if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
- if (reconsumeTimes != null) {
- requestHeader.setReconsumeTimes(new Integer(reconsumeTimes));
- MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
- }
- }
- SendResult sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
- brokerAddr,// 1
- mq.getBrokerName(),// 2
- msg,// 3
- requestHeader,// 4
- timeout,// 5
- communicationMode,// 6
- sendCallback// 7
- );
- if (this.hasSendMessageHook()) {
- context.setSendResult(sendResult);
- this.executeSendMessageHookAfter(context);
- }
- return sendResult;
- }
- catch (RemotingException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- }
- catch (MQBrokerException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- }
- catch (InterruptedException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- }
- finally {
- msg.setBody(prevBody);
- }
- }
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
- }
- private boolean tryToCompressMessage(final Message msg) {
- byte[] body = msg.getBody();
- if (body != null) {
- if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
- try {
- byte[] data = UtilAll.compress(body, zipCompressLevel);
- if (data != null) {
- msg.setBody(data);
- return true;
- }
- }
- catch (IOException e) {
- log.error("tryToCompressMessage exception", e);
- log.warn(msg.toString());
- }
- }
- }
- return false;
- }
public final static int CompressedFlag = (0x1 << 0);
类似的,在接下里如果跟之前的钩子一样的方式配置注册了sendMessageHook消息发送钩子,则会在这里遍历调用所有钩子的executesendMessageHookBefore()方法,相应的,在消息发送完毕之后也会 执行executeSendMessageHookAfter()方法。
- SendResult sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
- brokerAddr,// 1
- mq.getBrokerName(),// 2
- msg,// 3
- requestHeader,// 4
- timeout,// 5
- communicationMode,// 6
- sendCallback// 7
- );
- public SendResult sendMessage(//
- final String addr,// 1
- final String brokerName,// 2
- final Message msg,// 3
- final SendMessageRequestHeader requestHeader,// 4
- final long timeoutMillis,// 5
- final CommunicationMode communicationMode,// 6
- final SendCallback sendCallback// 7
- ) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand request = null;
- if (sendSmartMsg) {
- SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
- request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
- }
- else {
- request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
- }
- request.setBody(msg.getBody());
- switch (communicationMode) {
- case ONEWAY:
- this.remotingClient.invokeOneway(addr, request, timeoutMillis);
- return null;
- case ASYNC:
- this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback);
- return null;
- case SYNC:
- return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
- default:
- assert false;
- break;
- }
- return null;
- }
- private void sendMessageAsync(//
- final String addr,//
- final String brokerName,//
- final Message msg,//
- final long timeoutMillis,//
- final RemotingCommand request,//
- final SendCallback sendCallback//
- ) throws RemotingException, InterruptedException {
- this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- if (null == sendCallback)
- return;
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
- assert sendResult != null;
- sendCallback.onSuccess(sendResult);
- }
- catch (Exception e) {
- sendCallback.onException(e);
- }
- }
- else {
- if (!responseFuture.isSendRequestOK()) {
- sendCallback.onException(new MQClientException("send request failed", responseFuture.getCause()));
- }
- else if (responseFuture.isTimeout()) {
- sendCallback.onException(new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause()));
- }
- else {
- sendCallback.onException(new MQClientException("unknow reseaon", responseFuture.getCause()));
- }
- }
- }
- });
- }
- public void invokeAsyncImpl(final Channel channel, final RemotingCommand request,
- final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException,
- RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
- boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
- if (acquired) {
- final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
- final ResponseFuture responseFuture =
- new ResponseFuture(request.getOpaque(), timeoutMillis, invokeCallback, once);
- this.responseTable.put(request.getOpaque(), responseFuture);
- try {
- channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture f) throws Exception {
- if (f.isSuccess()) {
- responseFuture.setSendRequestOK(true);
- return;
- }
- else {
- responseFuture.setSendRequestOK(false);
- }
- responseFuture.putResponse(null);
- responseTable.remove(request.getOpaque());
- try {
- responseFuture.executeInvokeCallback();
- }
- catch (Throwable e) {
- plog.warn("excute callback in writeAndFlush addListener, and callback throw", e);
- }
- finally {
- responseFuture.release();
- }
- plog.warn("send a request command to channel <{}> failed.",
- RemotingHelper.parseChannelRemoteAddr(channel));
- plog.warn(request.toString());
- }
- });
- }
- catch (Exception e) {
- responseFuture.release();
- plog.warn(
- "send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel)
- + "> Exception", e);
- throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
- }
- }
- else {
- if (timeoutMillis <= 0) {
- throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
- }
- else {
- String info =
- String
- .format(
- "invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
- timeoutMillis,//
- this.semaphoreAsync.getQueueLength(),//
- this.semaphoreAsync.availablePermits()//
- );
- plog.warn(info);
- plog.warn(request.toString());
- throw new RemotingTimeoutException(info);
- }
- }
- }
