赞
踩
还记得在 01 开篇说的吗? namesrv
提供一个通过 topic
获取路由信息的接口(RouteInfoManager#pickupTopicRouteData
). producer
就是根据该接口返回的 TopicRouteData
, 知道要将 topic
发送至哪个 broker
.
TopicRouteData
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
从返回的结构来看,知道了 broker 地址,还有 topic
下有哪些队列,接下来就可以发消息了。
producer
获取了 topic
的信息,会将这些信息封装为另外一个数据结构
class TopicPublishInfo {
...
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
...
}
class MessageQueue {
private String topic;
private String brokerName;
private int queueId;
}
messageQueueList
大概长这样子
[ { "brokerName":"broker-a", "queueId":0, "topic":"TBW102%zouLeTopic" }, { "brokerName":"broker-a", "queueId":1, "topic":"TBW102%zouLeTopic" }, { "brokerName":"broker-a", "queueId":2, "topic":"TBW102%zouLeTopic" }, { "brokerName":"broker-a", "queueId":3, "topic":"TBW102%zouLeTopic" }, { "brokerName":"broker-b", "queueId":0, "topic":"TBW102%zouLeTopic" }, { "brokerName":"broker-b", "queueId":1, "topic":"TBW102%zouLeTopic" }, { "brokerName":"broker-b", "queueId":2, "topic":"TBW102%zouLeTopic" }, { "brokerName":"broker-b", "queueId":3, "topic":"TBW102%zouLeTopic" } ]
可以看到,上面的 topic TBW102%zouLeTopic
在 broker-b, broker-a
都存在
答案是 从 messageQueueList
中轮询选择一个队列进行发送(每个线程,都有自己的计数器)。
TopicPublishInfo#selectOneMessageQueue(final String lastBrokerName)
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { // 大部分情况走这个函数 return selectOneMessageQueue(); } else { // 如果 lastBrokerName 不为 null,那么会选择一个与 lastBrokerName 名称不一样的 broker。 int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } // 如果没有找到,兜底返回 return selectOneMessageQueue(); } } public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
从 TopicPublishInfo#selectOneMessageQueue(final String lastBrokerName)
方法的代码。可以看到,如果 lastBrokerName 不为 null 的时候,会选择一个与 lastBrokerName 名称不一样的 broker 返回。
那么 lastBrokerName 什么时候会不为 null 呢? 在消息发送失败,进行重试的时候,就会不为null.(默认至多重试2次)
实际上,在 producer 端,还有一个参数 sendLatencyFaultEnable
控制着,是否开启 发送延迟故障转移。但是,该参数默认为 false。
当该参数开启时,就不走上述的逻辑。
逻辑实现在该类下:MQFaultStrategy
该类主要做的事情就是:根据发送消息到 broker
花费的时间,判断 producer
应该在多久之内不选择此 broker
进行消息的发送。
producer 端提供了三种方式方式:同步、异步、单向
同步:
producer
会等待 broker
可以触发 producer
的重试机制
异步:
producer
会在线程池中,执行消息发送的逻辑
可以触发 producer
的重试机制
实际上,异步发送时,只能对 broker 端出现的错误进行重试。如果因为网络问题的话,是无法进行重试的。因为重试是在回调里面进行的
单向:
单向发送的意思时,只管发送,不管消息是否发送成功与否。
无法触发 producer
的重试机制
异步发送重试逻辑
MQClientAPIImpl#sendMessageAsync
private void sendMessageAsync( final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final AtomicInteger times, final SendMessageContext context, final DefaultMQProducerImpl producer ) throws InterruptedException, RemotingException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); // todo 没有设置回调 if (null == sendCallback && response != null) { try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); if (context != null && sendResult != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); } } catch (Throwable e) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); return; } if (response != null) { try { ... 业务错里 } catch (Exception e) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); // todo 出现异常,该方法会再次调用消息发送 onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, e, context, false, producer); } } else { //... if (!responseFuture.isSendRequestOK()) { // .. // todo 出现异常,该方法会再次调用消息发送 onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else if (responseFuture.isTimeout()) { // ... // todo 出现异常,该方法会再次调用消息发送 onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else { // ... // todo 出现异常,该方法会再次调用消息发送 onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } } } }); }
producer
,最后会在 onExceptionImpl
方法里面,进行重试(默认至多重试2次)
sendLatencyFaultEnable
可开启 发送延迟故障转移机制producer
消息发送失败时,默认的策略时,会避免上一次发送失败的 broker(如果是双主结构)聊完了,producer 的发送流程。那么 producer 与 broker 在通信时,肯定要遵循一定的协议,在 rocketMQ 中,消息的协议是如何设计的呢?
一个协议,正常会包含以下信息
NettyEncoder#encode()
RemotingCommand
rocketMQ 消息协议类
public class RemotingCommand { // 指令类型 private int code; private LanguageCode language = LanguageCode.JAVA; // 版本号 private int version = 0; private int opaque = requestId.getAndIncrement(); private int flag = 0; private String remark; private HashMap<String, String> extFields; private transient CommandCustomHeader customHeader; private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; // 消息正文 private transient byte[] body; // netty 会调用该方法,对消息进行编码 public ByteBuffer encodeHeader(final int bodyLength) { // 1> header length size int length = 4; // 2> header data length byte[] headerData; headerData = this.headerEncode(); length += headerData.length; // 3> body data length length += bodyLength; ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // length result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); result.flip(); return result; } }
NettyRemotingClient
Netty Bootstrap 构建
this.bootstrap
.group(this.eventLoopGroupWorker)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// ...
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
// ...
}
});
work EventLoopGroup线程数
// 写死的 1 ,不可变
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
IO 线程数
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
// 写死的 4 ,可配置
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
public class ThreadLocalIndex { private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>(); private final Random random = new Random(); public int getAndIncrement() { Integer index = this.threadLocalIndex.get(); if (null == index) { index = Math.abs(random.nextInt()); if (index < 0) index = 0; this.threadLocalIndex.set(index); } // todo 细节 index = Math.abs(index + 1); if (index < 0) index = 0; this.threadLocalIndex.set(index); return index; } @Override public String toString() { return "ThreadLocalIndex{" + "threadLocalIndex=" + threadLocalIndex.get() + '}'; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。