赞
踩
“不积跬步,无以至千里。”
org.apache.curator.framework.state.ConnectionStateListener#stateChanged
方法,且事件类型为org.apache.curator.framework.state.ConnectionState#RECONNECTED
首先需要构建一个CuratorFramework对象,并基于这个CuratorFramework对象创建一个用于实现Leader选举功能的LeaderSelector,并将它启动
public static final String leaderSelectorPath = "/source-code-analyse/reconnect"; public static void main(String[] args) throws InterruptedException { CuratorFramework curatorFramework = newCuratorFramework(); LeaderSelectorListener leaderSelectorListener = new LeaderSelectorListener() { @Override public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { System.out.println("Thread " + Thread.currentThread().getName() + " Connection state changed : " + connectionState); } @Override public void takeLeadership(CuratorFramework curatorFramework) throws Exception { System.out.println("Thread " + Thread.currentThread().getName() + " get the leader."); TimeUnit.SECONDS.sleep(20); } }; LeaderSelector leaderSelector = new LeaderSelector(curatorFramework, leaderSelectorPath, leaderSelectorListener); leaderSelector.start(); TimeUnit.SECONDS.sleep(100); leaderSelector.close(); curatorFramework.close(); System.out.println("Test completed."); }
public static CuratorFramework newCuratorFramework() {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString("192.168.0.104:2181")
.sessionTimeoutMs(30000)
.connectionTimeoutMs(6000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.threadFactory(ThreadUtils.newThreadFactory("ReconnectionTestThread")).build();
curatorFramework.start();
return curatorFramework;
}
由于LeaderSelector的功能实现需要基于CuratorFramework,于是应该先看看CuratorFramework的start方法,直接看实现类CuratorFrameworkImpl
@Override
public void start()
{
log.info("Starting");
if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
{
throw new IllegalStateException("Cannot be started more than once");
}
try
{
connectionStateManager.start();
//省略代码
发现CuratorFrameworkImpl
内部维护了一个与连接状态管理器,start方法中会启动它
在ConnectionStateManager
的start方法中,会向线程池提交一个任务,去调用processEvents方法
public void start() { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); service.submit ( new Callable<Object>() { @Override public Object call() throws Exception { processEvents(); return null; } } ); }
processEvents方法里面核心的内容就是,从eventQueue
的一个阻塞队列中不断调用poll方法获取ConnectionState对象,因为处在一个while循环中,只要当前连接状态正常,就会一直去poll
private void processEvents() { while ( state.get() == State.STARTED ) { try { int useSessionTimeoutMs = getUseSessionTimeoutMs(); long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch; long pollMaxMs = useSessionTimeoutMs - elapsedMs; final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS); if ( newState != null ) { if ( listeners.isEmpty() ) { log.warn("There are no ConnectionStateListeners registered."); } listeners.forEach(listener -> listener.stateChanged(client, newState)); } else if ( sessionExpirationPercent > 0 ) { synchronized(this) { checkSessionExpiration(); } } synchronized(this) { if ( (currentConnectionState == ConnectionState.LOST) && client.getZookeeperClient().isConnected() ) { // CURATOR-525 - there is a race whereby LOST is sometimes set after the connection has been repaired // this "hack" fixes it by forcing the state to RECONNECTED log.warn("ConnectionState is LOST but isConnected() is true. Forcing RECONNECTED."); addStateChange(ConnectionState.RECONNECTED); } } } catch ( InterruptedException e ) { // swallow the interrupt as it's only possible from either a background // operation and, thus, doesn't apply to this loop or the instance // is being closed in which case the while test will get it } } }
随后遍历所有的ConnectionStateListener
,回调stateChanged
方法,LeaderSelector
有一个静态内部类叫做WrappedListener
实现了LeaderSelectorListener
,则这个WrappedListener的stateChanged方法会被回调
@Override public void stateChanged(CuratorFramework client, ConnectionState newState) { try { listener.stateChanged(client, newState); } catch ( CancelLeadershipException dummy ) { // If we cancel only leadership but not whole election, then we could hand over // dated leadership to client with no further cancellation. Dated leadership is // possible due to separated steps in leadership acquire: server data(e.g. election sequence) // change and client flag(e.g. hasLeadership) set. leaderSelector.cancelElection(); } }
而上面的listener.stateChanged(client, newState)
中listener变量就是构造LeaderSelector时传入的第三个构造参数:LeaderSelectorListener
,就是我们自己实现的LeaderSelectorListener
所以最终会回调到我们自定义的LeaderSelectorListener#stateChanged()
方法
那么现在需要搞清楚ConnectionStateManager
中的eventQueue
是在哪里被放进去的
追溯一下方法调用,发现eventQueue中的元素,是在ConnectionStateManager#postState
方法中offer进去的
private void postState(ConnectionState state)
{
log.info("State change: " + state);
notifyAll();
//如果队列满了,offer失败,会先poll,之后继续offer
while ( !eventQueue.offer(state) )
{
eventQueue.poll();
log.warn("ConnectionStateManager queue full - dropping events to make room");
}
}
继续追溯来到org.apache.curator.framework.state.ConnectionStateManager#addStateChange
方法
public synchronized boolean addStateChange(ConnectionState newConnectionState) { //如果client不是启动状态直接返回false if ( state.get() != State.STARTED ) { return false; } ConnectionState previousState = currentConnectionState; //如果新的连接状态和前一个一样,说明连接状态没有发生变化,不产生事件,直接返回了 if ( previousState == newConnectionState ) { return false; } currentConnectionState = newConnectionState; ConnectionState localState = newConnectionState; boolean isNegativeMessage = ((newConnectionState == ConnectionState.LOST) || (newConnectionState == ConnectionState.SUSPENDED) || (newConnectionState == ConnectionState.READ_ONLY)); //如果是第一次连接,设置状态为CONNECTED if ( !isNegativeMessage && initialConnectMessageSent.compareAndSet(false, true) ) { localState = ConnectionState.CONNECTED; } postState(localState); return true; }
继续看addStateChange方法被org.apache.curator.framework.imps.CuratorFrameworkImpl#validateConnection
调用
void validateConnection(Watcher.Event.KeeperState state) { //state为Disconnected的时候产生SUSPENDED事件 if ( state == Watcher.Event.KeeperState.Disconnected ) { suspendConnection(); } //state为Expired的时候产生LOST事件 else if ( state == Watcher.Event.KeeperState.Expired ) { connectionStateManager.addStateChange(ConnectionState.LOST); } //state为SyncConnected的时候产生RECONNECTED事件 else if ( state == Watcher.Event.KeeperState.SyncConnected ) { connectionStateManager.addStateChange(ConnectionState.RECONNECTED); } //state为ConnectedReadOnly的时候产生READ_ONLY事件 else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly ) { connectionStateManager.addStateChange(ConnectionState.READ_ONLY); } }
继续追溯validateConnection()
的调用方是org.apache.curator.framework.imps.CuratorFrameworkImpl#processEvent
private void processEvent(final CuratorEvent curatorEvent)
{
//只有事件类型是WATCHED时候,会调用这个validateConnection方法,连接状态的变更事件就是WARCHED
if ( curatorEvent.getType() == CuratorEventType.WATCHED )
{
validateConnection(curatorEvent.getWatchedEvent().getState());
}
//省略代码
}
这个processEvent方法在连接状态发生变化时,会被CuratorFrameworkImpl
中CuratorZookeeperClient
传入的一个匿名内部类Watcher给调用
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) { //这个ZookeeperFactory就是Curator创建Zookeeper的一个工厂 ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory(), builder.getZkClientConfig()); this.client = new CuratorZookeeperClient ( localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), builder.getWaitForShutdownTimeoutMs(), new Watcher() { @Override public void process(WatchedEvent watchedEvent) { CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null); processEvent(event); } }, builder.getRetryPolicy(), builder.canBeReadOnly() ); //省略代码 }
并且在CuratorZookeeperClient
构造函数中,创建了一个ConnectionState
对象,用来管理客户端与zk的连接事件,同时把刚才的Watcher作为构造参数传给了ConnectionState,放到一个parentWatchers的队列中
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly) { if ( sessionTimeoutMs < connectionTimeoutMs ) { log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs)); } retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null"); ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null"); this.connectionTimeoutMs = connectionTimeoutMs; this.waitForShutdownTimeoutMs = waitForShutdownTimeoutMs; //创建了一个ConnectionState对象,管理客户端与zk的连接状态 state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, watcher, tracer, canBeReadOnly); setRetryPolicy(retryPolicy); }
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
{
this.ensembleProvider = ensembleProvider;
this.tracer = tracer;
if ( parentWatcher != null )
{
//把匿名内部类的Watcher对象传进来,放到parentWatchers中
parentWatchers.offer(parentWatcher);
}
handleHolder = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
}
然后在ConnectionState
对象中看看哪些地方使用了这个parentWatchers
对象,发现是一个process()
方法
@Override public void process(WatchedEvent event) { if ( LOG_EVENTS ) { log.debug("ConnectState watcher: " + event); } if ( event.getType() == Watcher.Event.EventType.None ) { boolean wasConnected = isConnected.get(); boolean newIsConnected = checkState(event.getState(), wasConnected); if ( newIsConnected != wasConnected ) { isConnected.set(newIsConnected); connectionStartMs = System.currentTimeMillis(); if ( newIsConnected ) { lastNegotiatedSessionTimeoutMs.set(handleHolder.getNegotiatedSessionTimeoutMs()); log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get()); } } } for ( Watcher parentWatcher : parentWatchers ) { OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId()); //遍历Watcher,调用process方法,目前已知是在CuratorFrameworkImpl构造器中new的一个匿名Watcher,会回到我们自定义的ConnectionStateListener parentWatcher.process(event); trace.commit(); } }
那么ConnectionState#process
方法又是在哪里被调用的呢?这个找的有点深了,最终经过断点发现是在org.apache.zookeeper.ClientCnxn.EventThread#processEvent
中被调用
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
//省略代码
这个ClientCnxn
已经不是Curator的源码了,属于Zookeeper原生API,是最底层用来管理客户端和zookeeper连接的一个组件,在new Zookeeper的时候被初始化,这个Zookeeper之前提了一下,会被Curator框架封装在ConnectionState中
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
{
this.ensembleProvider = ensembleProvider;
this.tracer = tracer;
if ( parentWatcher != null )
{
parentWatchers.offer(parentWatcher);
}
//这个zookeeperFactory里面封装了获取Zookepper的方法
handleHolder = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
}
org.apache.zookeeper.ClientCnxn.EventThread#processEvent
方法又是在org.apache.zookeeper.ClientCnxn.EventThread#run
中调用,因为EventThread
这个内部类继承了Thread类,所以在创建Zookeeper的时候就调用start()将线程启动了,同时启动的还有SendThread
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
//省略代码... ...
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this.clientConfig,
watcher,
getClientCnxnSocket(),
canBeReadOnly);
cnxn.start();
}
public void start() {
sendThread.start();
eventThread.start();
}
跟踪EventThread源码,可以看到,这个线程的run方法中也是采用while循环的方式不断从一个叫做waitingEvents
的阻塞队列中take事件
private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>(); @Override @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER") public void run() { try { isRunning = true; while (true) { Object event = waitingEvents.take(); //如果不是一个new Object对象,交给processEvent方法处理 if (event == eventOfDeath) { wasKilled = true; } else { processEvent(event); } //省略无关代码... ... }
那么重点就是这个waitingEvents的元素是在哪里add的?
在ClientCnxn中拿这个变量搜索一下,发现有两个地方会add,一个是queueEvent方法,一个是queuePacket方法,显然根据名字来看,第二个应该是添加和ZK进行交互的具体数据的(而后通过打断点的方式也确实验证了这一点),而queueEvent()
才是用来添加事件数据的
public void queueEvent(WatchedEvent event) { queueEvent(event, null); } private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); final Set<Watcher> watchers; if (materializedWatchers == null) { // materialize the watchers based on the event watchers = watchManager.materialize(event.getState(), event.getType(), event.getPath()); } else { watchers = new HashSet<>(materializedWatchers); } WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event); // queue the pair (watch set & event) for later processing waitingEvents.add(pair); }
也就是说,当客户端和ZK server连接状态变更时(如重连)一定会在某个地方调用这个queueEvent方法,把变更状态放到阻塞队列中,等待消费
这块代码比较复杂,有兴趣可以自主阅读org.apache.zookeeper.ClientCnxn.SendThread
源码
简单的说,这块的处理流程是这样的:Zookeerper被创建的时候,会创建ClientCnxn,启动两个线程,一个是eventThread,另一个就是sendThread
这个SendThread主要作用就是用来跟zk通信的,而且还会搞一个心跳机制,定期去和zk ping一下,确定连接是正常的
在SendThread的run方法里有一个while循环,会检查如果你是断网状态,会不停的通过ClientCnxnSocket重新建立连接,连不上会重复进行此步骤
//如果不是连接状态,会一直尝试建立连接,有兴趣的可以去startConnect方法看看,如果失败,会被外层的Catch块捕获,然后继续来到while循环,重新尝试建立连接 if (!clientCnxnSocket.isConnected()) { // don't re-establish connection if we are closing if (closing) { break; } if (rwServerAddress != null) { serverAddress = rwServerAddress; rwServerAddress = null; } else { serverAddress = hostProvider.next(1000); } onConnecting(serverAddress); //这个方法中,最后会通过clientCnxnSocket组件连接zk,clientCnxnSocket.connect(addr); startConnect(serverAddress); // Update now to start the connection timer right after we make a connection attempt clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); }
一旦重新重新建立,会在org.apache.zookeeper.ClientCnxn.SendThread#run
方法中调用clientCnxnSocket.doTransport
,开始和zk收发数据包
//pengingQueue是已经发送并正在等待响应的数据包
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
doTransport方法里面是NIO的代码,有兴趣可以自己研究下
最终会在org.apache.zookeeper.ClientCnxnSocket#readConnectResult
读取zk响应的数据包,调用org.apache.zookeeper.ClientCnxn.SendThread#onConnected
方法,将数据放入waitingEvents阻塞队列中
void readConnectResult() throws IOException {
//省略无关代码
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}
因为我们和ZK建立的不是一个只读连接,所以事件类型会是SyncConnected
void onConnected(
int _negotiatedSessionTimeout,
long _sessionId,
byte[] _sessionPasswd,
boolean isRO) throws IOException {
//省略无关代码
KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null));
}
前面已经看到代码,在validate的时候,如果KeeperState是KeeperState.SyncConnected
,会触发RECONNECTED
事件,最终回调到我们自定义的ConnectionStateListener#stateChanged
方法中
有兴趣的可以根据我的思路进行断点调试验证,不过有一些事异步的,注意打断点的时机
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。