赞
踩
对于一个消息中间件来讲,高可用功能是极其重要的,RocketMQ 当然也具有其对应的高可用方案。
在 RocketMQ 中,有主从架构和 Dledger 两种高可用方案:
第一种通过主 Broker 将消息发送到从 Broker 实现高可用,在主 Broker IO 压力大或宕机的时候,从 Broker 可以接管读请求,但这种方案不支持在主 Broker 宕机后自动进行故障转移,且从 Broker 不支持写请求,也就是说在主 Broker 宕机后我们只能手动处理。
第二种是在 RocketMQ 4.5.X 的时候才加入的新的方案,其为基于 Raft 算法实现的一个高可用方案,支持集群自动选主与故障转移,但 TPS 低于第一种方案。
本文主要介绍前者的实现
RocketMQ 的主从高可用的实现的代码量比较少,大概就一两千行,其主要在 HAService 类和 HAConnection 类。
HAService 有三个内部类:
AcceptSocketService
用来监听 HAClient 的连接请求,接收请求后将建立好的 channel 包装成 HAConnection 保存起来
GroupTransferService
用以监听与处理分发请求,当外部发送了异步的分发请求后,该类中的线程将同步的处理该请求,并将其执行结果交给 Future 以执行回调函数
HAClient
高可用客户端,在从 Broker 上启动,用以从主 Broker 拉取消息
HAService 主要是对上面三个类的包装,通过控制它们来对外提供服务。
首先创建了一个注册了 OP_ACCEPT
事件的 selector ,用以监听绑定在 HA 服务端口上的 ServerSocketChannel(也就是一个标准的 NIO 服务器)
- public void beginAccept() throws Exception {
- this.serverSocketChannel = ServerSocketChannel.open();
- this.selector = RemotingUtil.openSelector();
- this.serverSocketChannel.socket().setReuseAddress(true);
- this.serverSocketChannel.socket().bind(this.socketAddressListen);
- this.serverSocketChannel.configureBlocking(false);
- // 监听 accept 事件
- this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
- }
不过需要注意 RemotingUtil.openSelector()
方法,这里如果在 Linux 平台上,会使用 Epoll 来做多路复用的 selector
- public static Selector openSelector() throws IOException {
- Selector result = null;
-
- // 如果在 linux 平台, 则使用 Epoll 作为多路复用的 selector
- if (isLinuxPlatform()) {
- try {
- final Class<?> providerClazz = Class.forName("sun.nio.ch.EPollSelectorProvider");
- if (providerClazz != null) {
- // pass: 这里通过反射调用 provider 方法获取 SelectorProvider 以创建 epoll 的 selector
- }
- } catch (final Exception e) {
- // ignore
- }
- }
-
- // 否则如果在其他平台上使用 nio 默认的实现
- if (result == null) {
- result = Selector.open();
- }
-
- return result;
- }
在创建完成后,就开始通过使用 selector 监听 accpet 事件发生,然后进行以下处理
- SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
-
- if (sc != null) {
- try {
- // 在创建 socket 后, 将该创建好的 channel 包装成 HAConnection 类
- // 放入主类进行管理
- HAConnection conn = new HAConnection(HAService.this, sc);
- conn.start();
- HAService.this.addConnection(conn);
- } catch (Exception e) {
- log.error("new HAConnection exception", e);
- sc.close();
- }
- }
在创建了 HAConnection
并启动后,这个服务就能自动的从存储服务中拉取已经持久化的消息 (准确来讲,是否已经持久化取决于使用的刷盘方案) ,并发送给该 Channel 对应的从 Broker ,且响应从 Broker 发送过来的请求。
如同简介介绍的,这个服务主要用来处理上层发过来的分发请求
- public void putRequest(final CommitLog.GroupCommitRequest request) {
- lock.lock();
- tr
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。