赞
踩
我们平时使用jedispool来连接Redis的集群、sentinel或者主从服务器,经常会遇到testOnBorrow、testOnReturn和testWhileIdle这些参数的设置问题,我们知道连接Redis服务器的连接是维护在通用对象池中的,如果想要正确的、符合自己业务场景的设置这些参数,需要了解其底层原理。
我们翻到jedispool的源码,发现其构造函数很多是可以通过GenericObjectPool来实现的:
public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port, int timeout, final String password) { this(poolConfig, host, port, timeout, password, Protocol.DEFAULT_DATABASE, null); } public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port, int timeout, final String password, final boolean ssl) { this(poolConfig, host, port, timeout, password, Protocol.DEFAULT_DATABASE, null, ssl); } public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port, int timeout, final String password, final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { this(poolConfig, host, port, timeout, password, Protocol.DEFAULT_DATABASE, null, ssl, sslSocketFactory, sslParameters, hostnameVerifier); } public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, final int port) { this(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE, null); }
分析原因主要在于JedisPool继承于Pool抽象类,
而Pool的基本功能的实现,依赖于GenericObjectPool,源码如下:
public abstract class Pool<T> implements Closeable {
protected GenericObjectPool<T> internalPool;
/**
* Using this constructor means you have to set and initialize the internalPool yourself.
*/
public Pool() {
}
public Pool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
initPool(poolConfig, factory);
}
...
}
而且JedisPool中配置的testOnBorrow参数、testOnReturn参数、testWhileIdle参数分别是通过GenericObjectPool中的borrowObject()、returnObject()和ensureIdle()、evict()、EvictionTimer实现的。
下面我们分别分析一下其实现原理:
如果池中存在空闲可用的连接实例,则需要连接时依据后进先出策略挑选一个连接实例,并激活返回给客户端。如果激活失败或者testOnBorrow被设置成true并且校验失败,则连接实例被废弃,继续检查下一个可用的连接实例,直到找到一个合法可用的连接实例或者连接池再没有可用的连接实例了。从中我们可以看出,像SpringBoot自动装配时默认设置这个参数为false是有道理的,testOnBorrow对性能的损耗是比较高的。
我们继续,如果没有可用的空闲可用的连接实例,则需要看maxTotal、连接池是否达到exhausted和borrowMaxWaitMillis参数的设置,如果从池中查询到的连接实例的数量少于maxTotal,则会创建一个新的实例,激活、校验并返回给调用者。如果校验失败,则会抛出NoSuchElementException异常。
在连接池没有可用的连接实例并且也不能新建一个实例的时候,如果getBlockWhenExhausted()参数是true,则阻塞,直到超过borrowMaxWaitMillis参数设置的超时时间,如果是false,则直接抛出NoSuchElementException异常。
在并发量较大的时候,可能同时有多个线程被阻塞在了连接池的等待过程,这是会采用公平的算法保证线程获取连接实例的顺序。
public T borrowObject(final long borrowMaxWaitMillis) throws Exception { assertOpen(); final AbandonedConfig ac = this.abandonedConfig; if (ac != null && ac.getRemoveAbandonedOnBorrow() && (getNumIdle() < 2) && (getNumActive() > getMaxTotal() - 3) ) { removeAbandoned(ac); } PooledObject<T> p = null; // Get local copy of current config so it is consistent for entire // method execution final boolean blockWhenExhausted = getBlockWhenExhausted(); boolean create; final long waitTime = System.currentTimeMillis(); while (p == null) { create = false; p = idleObjects.pollFirst(); if (p == null) { p = create(); if (p != null) { create = true; } } if (blockWhenExhausted) { if (p == null) { if (borrowMaxWaitMillis < 0) { p = idleObjects.takeFirst(); } else { p = idleObjects.pollFirst(borrowMaxWaitMillis, TimeUnit.MILLISECONDS); } } if (p == null) { throw new NoSuchElementException( "Timeout waiting for idle object"); } } else { if (p == null) { throw new NoSuchElementException("Pool exhausted"); } } if (!p.allocate()) { p = null; } if (p != null) { try { factory.activateObject(p); } catch (final Exception e) { try { destroy(p); } catch (final Exception e1) { // Ignore - activation failure is more important } p = null; if (create) { final NoSuchElementException nsee = new NoSuchElementException( "Unable to activate object"); nsee.initCause(e); throw nsee; } } if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) { boolean validate = false; Throwable validationThrowable = null; try { validate = factory.validateObject(p); } catch (final Throwable t) { PoolUtils.checkRethrow(t); validationThrowable = t; } if (!validate) { try { destroy(p); destroyedByBorrowValidationCount.incrementAndGet(); } catch (final Exception e) { // Ignore - validation failure is more important } p = null; if (create) { final NoSuchElementException nsee = new NoSuchElementException( "Unable to validate object"); nsee.initCause(validationThrowable); throw nsee; } } } } } updateStatsBorrow(p, System.currentTimeMillis() - waitTime); return p.getObject(); }
我们可以看到源码中,连接池是通过一个阻塞队列来维护的,可以实现让线程等待获取连接对象的场景,需要说明的是激活和校验,对于jedispool来讲,分别实现了PooledObjectFactory接口的activateObject(PooledObject p)和validateObject(PooledObject p)方法,激活是简单的一个选库的操作,校验是验证host和port的正确性并做一次ping的动作。
如果在业务层设置了maxIdle参数,并且池中的空闲实例数达到这个数值,返回的连接实例将被废弃。如果testOnReturn设置为true,连接实例在返回给连接池前需要做校验,如果校验失败,则实例废弃。否则,连接将被根据先进先出或者后进先出策略归还给线程池。
我们从源码中可以看出,jedispool的maxIdle参数,主要是通过这个方法来实现的,每次从连接池中使用连接归还后,都会验证是否超过了最大个数,如果超过了就会废弃实例,保证不超过maxIdle。
public void returnObject(final T obj) { final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj)); if (p == null) { if (!isAbandonedConfig()) { throw new IllegalStateException( "Returned object not currently part of this pool"); } return; // Object was abandoned and removed } markReturningState(p); final long activeTime = p.getActiveTimeMillis(); if (getTestOnReturn() && !factory.validateObject(p)) { try { destroy(p); } catch (final Exception e) { swallowException(e); } try { ensureIdle(1, false); } catch (final Exception e) { swallowException(e); } updateStatsReturn(activeTime); return; } try { factory.passivateObject(p); } catch (final Exception e1) { swallowException(e1); try { destroy(p); } catch (final Exception e) { swallowException(e); } try { ensureIdle(1, false); } catch (final Exception e) { swallowException(e); } updateStatsReturn(activeTime); return; } if (!p.deallocate()) { throw new IllegalStateException( "Object has already been returned to this pool or is invalid"); } final int maxIdleSave = getMaxIdle(); if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) { try { destroy(p); } catch (final Exception e) { swallowException(e); } } else { if (getLifo()) { idleObjects.addFirst(p); } else { idleObjects.addLast(p); } if (isClosed()) { // Pool closed while object was being added to idle objects. // Make sure the returned object is destroyed rather than left // in the idle object pool (which would effectively be a leak) clear(); } } updateStatsReturn(activeTime); }
在生产环境中,这两个参数往往是配合使用的,大概意思是说每隔多长时间验证一下空闲的连接实例是否有效。试想如果让我们自己去实现,势必要围绕一个定时线程池做设计,事实上源码中也是这样实现的,源码中EvictionTimer空闲连接回收器正是通过ScheduledThreadPoolExecutor来定时做校验的,我们看下源码的调用逻辑。
首先,要注意到GenericObjectPool继承自BaseGenericObjectPool,我们想到既然这两个参数是配置的,那自然在代码中也需要将参数配置上,然后再启动空闲连接回收器定时检测。看下BaseGenericObjectPool的代码块:
protected void setConfig(final BaseObjectPoolConfig<T> conf) { setLifo(conf.getLifo()); setMaxWaitMillis(conf.getMaxWaitMillis()); setBlockWhenExhausted(conf.getBlockWhenExhausted()); setTestOnCreate(conf.getTestOnCreate()); setTestOnBorrow(conf.getTestOnBorrow()); setTestOnReturn(conf.getTestOnReturn()); setTestWhileIdle(conf.getTestWhileIdle()); setNumTestsPerEvictionRun(conf.getNumTestsPerEvictionRun()); setMinEvictableIdleTimeMillis(conf.getMinEvictableIdleTimeMillis()); setTimeBetweenEvictionRunsMillis(conf.getTimeBetweenEvictionRunsMillis()); setSoftMinEvictableIdleTimeMillis(conf.getSoftMinEvictableIdleTimeMillis()); final EvictionPolicy<T> policy = conf.getEvictionPolicy(); if (policy == null) { // Use the class name (pre-2.6.0 compatible) setEvictionPolicyClassName(conf.getEvictionPolicyClassName()); } else { // Otherwise, use the class (2.6.0 feature) setEvictionPolicy(policy); } setEvictorShutdownTimeoutMillis(conf.getEvictorShutdownTimeoutMillis()); }
重点看这段代码:
public final void setTimeBetweenEvictionRunsMillis(
final long timeBetweenEvictionRunsMillis) {
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
startEvictor(timeBetweenEvictionRunsMillis);
}
可见,配置结束后就直接开启了回收器:
final void startEvictor(final long delay) { synchronized (evictionLock) { EvictionTimer.cancel(evictor, evictorShutdownTimeoutMillis, TimeUnit.MILLISECONDS); evictor = null; evictionIterator = null; if (delay > 0) { evictor = new Evictor(); EvictionTimer.schedule(evictor, delay, delay); } } } static synchronized void schedule( final BaseGenericObjectPool<?>.Evictor task, final long delay, final long period) { if (null == executor) { executor = new ScheduledThreadPoolExecutor(1, new EvictorThreadFactory()); executor.setRemoveOnCancelPolicy(true); } final ScheduledFuture<?> scheduledFuture = executor.scheduleWithFixedDelay(task, delay, period, TimeUnit.MILLISECONDS); task.setScheduledFuture(scheduledFuture); }
很容易判断,回收器的逻辑就是一个线程啦,也可以找到源码核实:
class Evictor implements Runnable { private ScheduledFuture<?> scheduledFuture; /** * Run pool maintenance. Evict objects qualifying for eviction and then * ensure that the minimum number of idle instances are available. * Since the Timer that invokes Evictors is shared for all Pools but * pools may exist in different class loaders, the Evictor ensures that * any actions taken are under the class loader of the factory * associated with the pool. */ @Override public void run() { final ClassLoader savedClassLoader = Thread.currentThread().getContextClassLoader(); try { if (factoryClassLoader != null) { // Set the class loader for the factory final ClassLoader cl = factoryClassLoader.get(); if (cl == null) { // The pool has been dereferenced and the class loader // GC'd. Cancel this timer so the pool can be GC'd as // well. cancel(); return; } Thread.currentThread().setContextClassLoader(cl); } // Evict from the pool try { evict(); } catch(final Exception e) { swallowException(e); } catch(final OutOfMemoryError oome) { // Log problem but give evictor thread a chance to continue // in case error is recoverable oome.printStackTrace(System.err); } // Re-create idle instances. try { ensureMinIdle(); } catch (final Exception e) { swallowException(e); } } finally { // Restore the previous CCL Thread.currentThread().setContextClassLoader(savedClassLoader); } } void setScheduledFuture(final ScheduledFuture<?> scheduledFuture) { this.scheduledFuture = scheduledFuture; } void cancel() { scheduledFuture.cancel(false); } }
而回收器中,有两个方法需要注意,一个是:evict(),即真正的回收逻辑,一个是:ensureMinIdle(),保证最小的minIdle数量。
evict()处理的回收策略是,当连接池中的空闲对象已经空闲了一段时间,并且大于参数IdleEvictTime,或者池中空闲的连接对象的空闲时间大于参数IdleEvictTime并且这样的空闲连接个数大于参数MinIdle的设置,就启用回收策略,需要注意的是在开启回收前,需要判定该连接对象没有被其他的线程占用,正在使用的连接当然不能被回收。如果不符合回收策略的话,就走激活、校验逻辑,都成功了就归还给连接池了。
只有回收逻辑肯定不行,还需要保证连接池中的可用连接要达到用户设置的MinIdle的要求,这个逻辑通过方法ensureMinIdle()来实现,主要就是在连接数不够的时候,创建新的对象,放入池中。一起看下这两个方法的源码:
public void evict() throws Exception { assertOpen(); if (idleObjects.size() > 0) { PooledObject<T> underTest = null; final EvictionPolicy<T> evictionPolicy = getEvictionPolicy(); synchronized (evictionLock) { final EvictionConfig evictionConfig = new EvictionConfig( getMinEvictableIdleTimeMillis(), getSoftMinEvictableIdleTimeMillis(), getMinIdle()); final boolean testWhileIdle = getTestWhileIdle(); for (int i = 0, m = getNumTests(); i < m; i++) { if (evictionIterator == null || !evictionIterator.hasNext()) { evictionIterator = new EvictionIterator(idleObjects); } if (!evictionIterator.hasNext()) { // Pool exhausted, nothing to do here return; } try { underTest = evictionIterator.next(); } catch (final NoSuchElementException nsee) { // Object was borrowed in another thread // Don't count this as an eviction test so reduce i; i--; evictionIterator = null; continue; } if (!underTest.startEvictionTest()) { // Object was borrowed in another thread // Don't count this as an eviction test so reduce i; i--; continue; } // User provided eviction policy could throw all sorts of // crazy exceptions. Protect against such an exception // killing the eviction thread. boolean evict; try { evict = evictionPolicy.evict(evictionConfig, underTest, idleObjects.size()); } catch (final Throwable t) { // Slightly convoluted as SwallowedExceptionListener // uses Exception rather than Throwable PoolUtils.checkRethrow(t); swallowException(new Exception(t)); // Don't evict on error conditions evict = false; } if (evict) { destroy(underTest); destroyedByEvictorCount.incrementAndGet(); } else { if (testWhileIdle) { boolean active = false; try { factory.activateObject(underTest); active = true; } catch (final Exception e) { destroy(underTest); destroyedByEvictorCount.incrementAndGet(); } if (active) { if (!factory.validateObject(underTest)) { destroy(underTest); destroyedByEvictorCount.incrementAndGet(); } else { try { factory.passivateObject(underTest); } catch (final Exception e) { destroy(underTest); destroyedByEvictorCount.incrementAndGet(); } } } } if (!underTest.endEvictionTest(idleObjects)) { // TODO - May need to add code here once additional // states are used } } } } } final AbandonedConfig ac = this.abandonedConfig; if (ac != null && ac.getRemoveAbandonedOnMaintenance()) { removeAbandoned(ac); } }
private void ensureIdle(final int idleCount, final boolean always) throws Exception { if (idleCount < 1 || isClosed() || (!always && !idleObjects.hasTakeWaiters())) { return; } while (idleObjects.size() < idleCount) { final PooledObject<T> p = create(); if (p == null) { // Can't create objects, no reason to think another call to // create will work. Give up. break; } if (getLifo()) { idleObjects.addFirst(p); } else { idleObjects.addLast(p); } } if (isClosed()) { // Pool closed while object was being added to idle objects. // Make sure the returned object is destroyed rather than left // in the idle object pool (which would effectively be a leak) clear(); } }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。