PoolEntry 是 HikariCP 中对数据库物理连接的封装。 那我们现在探索问题的关键点就是:
我们先看下 HikariCP 中数据源、连接、连接池之间的关系。
连接池的初始化过程中 HikariCP 做了很多工作,如校验配置等。在此,我们只讨论连接的创建过程。在连接池的初始化过程中一共有 3 种创建连接的过程:
/** * Creating new poolEntry. If maxLifetime is configured, create a future End-of-life task with 2.5% variance from * the maxLifetime time to ensure there is no massive die-off of Connections in the pool. */ private PoolEntry createPoolEntry() { try { final PoolEntry poolEntry = newPoolEntry(); final long maxLifetime = config.getMaxLifetime(); if (maxLifetime > 0) { // variance up to 2.5% of the maxlifetime // 对每个连接的 maxLifetime 设置一些偏差,避免大量连接同时失效 final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0; final long lifetime = maxLifetime - variance; poolEntry.setFutureEol(houseKeepingExecutorService.schedule(new MaxLifetimeTask(poolEntry), lifetime, MILLISECONDS)); } final long keepaliveTime = config.getKeepaliveTime(); if (keepaliveTime > 0) { // variance up to 10% of the heartbeat time final long variance = ThreadLocalRandom.current().nextLong(keepaliveTime / 10); final long heartbeatTime = keepaliveTime - variance; poolEntry.setKeepalive(houseKeepingExecutorService.scheduleWithFixedDelay(new KeepaliveTask(poolEntry), heartbeatTime, heartbeatTime, MILLISECONDS)); } return poolEntry; } catch (ConnectionSetupException e) { if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently logger.error("{} - Error thrown while acquiring connection from data source", poolName, e.getCause()); lastConnectionFailure.set(e); } } catch (Exception e) { if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently logger.debug("{} - Cannot acquire connection from data source", poolName, e); } } return null; }
我们可以看到,创建连接 PoolEntry 时,会注册两个异步延时任务:
evit = true
,如果连接不是使用中状态的话则关闭连接,调用 addBagItem(final int waiting) 方法;evit = true
同时在注册这两个异步延时任务时,注意到对两个异步任务的延迟时间都做了特殊处理,分别增加了一定范围的时间变化(MaxLifetimeTask, 2.5%;KeepaliveTask:10%)。主要是避免同时连接过期,连接同时进行心跳检测。
数据库物理连接的创建都是由 PoolBase#newConnection 完成的。代码具体内容如下:
/** * Obtain connection from data source. * * @return a Connection connection */ private Connection newConnection() throws Exception { final long start = currentTime(); Connection connection = null; try { String username = config.getUsername(); String password = config.getPassword(); connection = (username == null) ? dataSource.getConnection() : dataSource.getConnection(username, password); if (connection == null) { throw new SQLTransientConnectionException("DataSource returned null unexpectedly"); } setupConnection(connection); lastConnectionFailure.set(null); return connection; } catch (Exception e) { if (connection != null) { quietlyCloseConnection(connection, "(Failed to create/setup connection)"); } else if (getLastConnectionFailure() == null) { logger.debug("{} - Failed to create/setup connection: {}", poolName, e.getMessage()); } lastConnectionFailure.set(e); throw e; } finally { // tracker will be null during failFast check if (metricsTracker != null) { metricsTracker.recordConnectionCreated(elapsedMillis(start)); } } }
当 HikariConfig 没有配置 dataSource 时,DataSource#getConnection 是由 hikari 中的实现类 DriverDataSource#getConnection 完成的,其代码如下:
@Override public Connection getConnection(final String username, final String password) throws SQLException { final Properties cloned = (Properties) driverProperties.clone(); if (username != null) { cloned.put(USER, username); if (cloned.containsKey("username")) { cloned.put("username", username); } } if (password != null) { cloned.put(PASSWORD, password); } return driver.connect(jdbcUrl, cloned); }
创建的连接 PoolEntry 通过 ConcurrentBag#add 加入到了连接池中:
/** * Add a new object to the bag for others to borrow. * * @param bagEntry an object to add to the bag */ public void add(final T bagEntry) { if (closed) { LOGGER.info("ConcurrentBag has been closed, ignoring add()"); throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()"); } sharedList.add(bagEntry); // spin until a thread takes it or none are waiting while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) { Thread.yield(); } }
综上,HikariCP 创建连接的时序图如下:
连接的创建开始于连接池的初始化。无论我们以 HikariConfig 还是 no-args 的方式配置,连接池的初始化都是一样的,这个阶段会在快速失败阶段和启动管家线程的方式进行连接的创建:
/** * Construct a HikariPool with the specified configuration. * * @param config a HikariConfig instance */ public HikariPool(final HikariConfig config) { super(config); this.connectionBag = new ConcurrentBag<>(this); this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK; // 初始化管家定时执行服务 this.houseKeepingExecutorService = initializeHouseKeepingExecutorService(); // 快速失败,这个阶段只创建了一个数据库连接 checkFailFast(); if (config.getMetricsTrackerFactory() != null) { setMetricsTrackerFactory(config.getMetricsTrackerFactory()); } else { setMetricRegistry(config.getMetricRegistry()); } // 设置健康检查 setHealthCheckRegistry(config.getHealthCheckRegistry()); // 注册 MBean handleMBeans(this, true); ThreadFactory threadFactory = config.getThreadFactory(); final int maxPoolSize = config.getMaximumPoolSize(); LinkedBlockingQueue<Runnable> addConnectionQueue = new LinkedBlockingQueue<>(maxPoolSize); this.addConnectionQueueReadOnlyView = unmodifiableCollection(addConnectionQueue); this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy()); this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService); // 开始执行管家任务,开始管理数据库连接 // 在初始化阶段主要是添加数据库连接 // 在后期也负责关闭空闲连接 this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS); if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && config.getInitializationFailTimeout() > 1) { addConnectionExecutor.setMaximumPoolSize(Math.min(16, Runtime.getRuntime().availableProcessors())); addConnectionExecutor.setCorePoolSize(Math.min(16, Runtime.getRuntime().availableProcessors())); final long startTime = currentTime(); while (elapsedMillis(startTime) < config.getInitializationFailTimeout() && getTotalConnections() < config.getMinimumIdle()) { quietlySleep(MILLISECONDS.toMillis(100)); } addConnectionExecutor.setCorePoolSize(1); addConnectionExecutor.setMaximumPoolSize(1); } }
checkFailFast 具体内容如下:
/** * If initializationFailFast is configured, check that we have DB connectivity. * * @throws PoolInitializationException if fails to create or validate connection * @see HikariConfig#setInitializationFailTimeout(long) */ private void checkFailFast() { final long initializationTimeout = config.getInitializationFailTimeout(); if (initializationTimeout < 0) { return; } final long startTime = currentTime(); do { final PoolEntry poolEntry = createPoolEntry(); if (poolEntry != null) { if (config.getMinimumIdle() > 0) { connectionBag.add(poolEntry); logger.debug("{} - Added connection {}", poolName, poolEntry.connection); } else { quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)"); } return; } if (getLastConnectionFailure() instanceof ConnectionSetupException) { throwPoolInitializationException(getLastConnectionFailure().getCause()); } quietlySleep(SECONDS.toMillis(1)); } while (elapsedMillis(startTime) < initializationTimeout); if (initializationTimeout > 0) { throwPoolInitializationException(getLastConnectionFailure()); } }
this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
addConnectionExecutor 采用的是 DiscardPolicy ,在任务满了的情况下丢弃被拒绝的任务,不会产生异常。任务队列的长度就是 maxPoolSize .
连接池的构造最后阶段即是开启了管家线程 HouseKeeper . 管家线程的主要功能就是管理线程池,创建、关闭连接。
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
private final class HouseKeeper implements Runnable { private volatile long previous = plusMillis(currentTime(), -housekeepingPeriodMs); @Override public void run() { try { // refresh values in case they changed via MBean connectionTimeout = config.getConnectionTimeout(); validationTimeout = config.getValidationTimeout(); leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold()); catalog = (config.getCatalog() != null && !config.getCatalog().equals(catalog)) ? config.getCatalog() : catalog; final long idleTimeout = config.getIdleTimeout(); final long now = currentTime(); // Detect retrograde time, allowing +128ms as per NTP spec. if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs)) { logger.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.", poolName, elapsedDisplayString(previous, now)); previous = now; softEvictConnections(); return; } else if (now > plusMillis(previous, (3 * housekeepingPeriodMs) / 2)) { // No point evicting for forward clock motion, this merely accelerates connection retirement anyway logger.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now)); } previous = now; // 关闭连接,维持最小连接数 String afterPrefix = "Pool "; if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) { logPoolState("Before cleanup "); afterPrefix = "After cleanup "; final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE); int toRemove = notInUse.size() - config.getMinimumIdle(); for (PoolEntry entry : notInUse) { if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) { closeConnection(entry, "(connection has passed idleTimeout)"); toRemove--; } } } // logPoolState(afterPrefix); // 增加连接,维持最小连接数 fillPool(); // Try to maintain minimum connections } catch (Exception e) { logger.error("Unexpected exception in housekeeping task", e); } } }
其中 fillPool() 会通过 addConnectionExecutor 调用 PoolEntryCreator 进行连接的创建。代码如下:
* Fill pool up from current idle connections (as they are perceived at the point of execution) to minimumIdle connections.
private synchronized void fillPool()
final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
- addConnectionQueueReadOnlyView.size();
if (connectionsToAdd <= 0) logger.debug("{} - Fill pool skipped, pool is at sufficient level.", poolName);
for (int i = 0; i < connectionsToAdd; i++) {
addConnectionExecutor.submit((i < connectionsToAdd - 1) ? poolEntryCreator : postFillPoolEntryCreator);
从 fillPool() 代码可看出其主要职责就是维持连接池最小的连接数。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。