赞
踩
基于SpringBoot 2.2.7.RELEASE 依赖的 HikariCP 3.4.3。
源码包中源码和实际Class文件反编译代码有出入,以Class反编译代码为准。
Hikari连接池有两篇
Hikari连接池1–初始化连接池
Hikari连接池2–获取和归还连接
Hikari有两个连接池
private final HikariPool fastPathPool;
private volatile HikariPool pool;
public HikariDataSource() {
super();
//将 fastPathPool 置空
fastPathPool = null;
}
HikariDataSource 继承 HikariConfig,super 调的是 HikariConfig 的构造方法。
HikariDataSource 也实现了 DataSource 接口
public HikariConfig() { dataSourceProperties = new Properties(); healthCheckProperties = new Properties(); minIdle = -1; maxPoolSize = -1; maxLifetime = MAX_LIFETIME; connectionTimeout = CONNECTION_TIMEOUT; validationTimeout = VALIDATION_TIMEOUT; idleTimeout = IDLE_TIMEOUT; initializationFailTimeout = 1; isAutoCommit = true; String systemProp = System.getProperty("hikaricp.configurationFile"); if (systemProp != null) { loadProperties(systemProp); } }
使用 HikariDataSource 的默认构造方法创建数据源,多次创建,其父 HikariConfig 只有一个。
并且,这个构造器没有初始化连接池。
public HikariDataSource(HikariConfig configuration){
//校验
configuration.validate();
//拷贝属性
configuration.copyStateTo(this);
LOGGER.info("{} - Starting...", configuration.getPoolName());
//创建新的连接池
pool = fastPathPool = new HikariPool(this);
LOGGER.info("{} - Start completed.", configuration.getPoolName());
//设置标志
this.seal();
}
创建 HikariPool
public HikariPool(final HikariConfig config) { //调用父类构造器初始化数据源 super(config); //创建bag this.connectionBag = new ConcurrentBag<>(this); //创建Semaphore锁 this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK; //创建线程池 this.houseKeepingExecutorService = initializeHouseKeepingExecutorService(); //初始化连接池,向连接池里放连接 checkFailFast(); if (config.getMetricsTrackerFactory() != null) { setMetricsTrackerFactory(config.getMetricsTrackerFactory()); } else { setMetricRegistry(config.getMetricRegistry()); } setHealthCheckRegistry(config.getHealthCheckRegistry()); handleMBeans(this, true); ThreadFactory threadFactory = config.getThreadFactory(); final int maxPoolSize = config.getMaximumPoolSize(); LinkedBlockingQueue<Runnable> addConnectionQueue = new LinkedBlockingQueue<>(maxPoolSize); this.addConnectionQueueReadOnlyView = unmodifiableCollection(addConnectionQueue); this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy()); this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService); this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS); if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && config.getInitializationFailTimeout() > 1) { addConnectionExecutor.setCorePoolSize(Math.min(16, Runtime.getRuntime().availableProcessors())); addConnectionExecutor.setMaximumPoolSize(Math.min(16, Runtime.getRuntime().availableProcessors())); final long startTime = currentTime(); while (elapsedMillis(startTime) < config.getInitializationFailTimeout() && getTotalConnections() < config.getMinimumIdle()) { quietlySleep(MILLISECONDS.toMillis(100)); } addConnectionExecutor.setCorePoolSize(1); addConnectionExecutor.setMaximumPoolSize(1); } }
HikariPool的父类
PoolBase(final HikariConfig config) { this.config = config; this.networkTimeout = UNINITIALIZED; this.catalog = config.getCatalog(); this.schema = config.getSchema(); this.isReadOnly = config.isReadOnly(); this.isAutoCommit = config.isAutoCommit(); this.exceptionOverride = UtilityElf.createInstance(config.getExceptionOverrideClassName(), SQLExceptionOverride.class); this.transactionIsolation = UtilityElf.getTransactionIsolation(config.getTransactionIsolation()); this.isQueryTimeoutSupported = UNINITIALIZED; this.isNetworkTimeoutSupported = UNINITIALIZED; this.isUseJdbc4Validation = config.getConnectionTestQuery() == null; this.isIsolateInternalQueries = config.isIsolateInternalQueries(); this.poolName = config.getPoolName(); this.connectionTimeout = config.getConnectionTimeout(); this.validationTimeout = config.getValidationTimeout(); this.lastConnectionFailure = new AtomicReference<>(); //初始化数据源 initializeDataSource(); }
初始化DataSource
private void initializeDataSource() { final String jdbcUrl = config.getJdbcUrl(); final String username = config.getUsername(); final String password = config.getPassword(); final String dsClassName = config.getDataSourceClassName(); final String driverClassName = config.getDriverClassName(); final String dataSourceJNDI = config.getDataSourceJNDI(); final Properties dataSourceProperties = config.getDataSourceProperties(); //DataSource 为空,创建 DataSource ds = config.getDataSource(); if (dsClassName != null && ds == null) { ds = createInstance(dsClassName, DataSource.class); PropertyElf.setTargetFromProperties(ds, dataSourceProperties); } else if (jdbcUrl != null && ds == null) {//一般会执行这里 ds = new DriverDataSource(jdbcUrl, driverClassName, dataSourceProperties, username, password); } else if (dataSourceJNDI != null && ds == null) { try { InitialContext ic = new InitialContext(); ds = (DataSource) ic.lookup(dataSourceJNDI); } catch (NamingException e) { throw new PoolInitializationException(e); } } if (ds != null) { //设置登录超时时间 setLoginTimeout(ds); //创建网络超时执行器 createNetworkTimeoutExecutor(ds, dsClassName, jdbcUrl); } this.dataSource = ds; }
创建数据源
public DriverDataSource(String jdbcUrl, String driverClassName, Properties properties, String username, String password) { this.jdbcUrl = jdbcUrl; this.driverProperties = new Properties(); for (Entry<Object, Object> entry : properties.entrySet()) { driverProperties.setProperty(entry.getKey().toString(), entry.getValue().toString()); } if (username != null) { driverProperties.put(USER, driverProperties.getProperty("user", username)); } if (password != null) { driverProperties.put(PASSWORD, driverProperties.getProperty("password", password)); } if (driverClassName != null) { Enumeration<Driver> drivers = DriverManager.getDrivers(); while (drivers.hasMoreElements()) { Driver d = drivers.nextElement(); if (d.getClass().getName().equals(driverClassName)) { driver = d; break; } } if (driver == null) { LOGGER.warn("Registered driver with driverClassName={} was not found, trying direct instantiation.", driverClassName); Class<?> driverClass = null; ClassLoader threadContextClassLoader = Thread.currentThread().getContextClassLoader(); try { if (threadContextClassLoader != null) { try { driverClass = threadContextClassLoader.loadClass(driverClassName); LOGGER.debug("Driver class {} found in Thread context class loader {}", driverClassName, threadContextClassLoader); } catch (ClassNotFoundException e) { LOGGER.debug("Driver class {} not found in Thread context class loader {}, trying classloader {}", driverClassName, threadContextClassLoader, this.getClass().getClassLoader()); } } if (driverClass == null) { driverClass = this.getClass().getClassLoader().loadClass(driverClassName); LOGGER.debug("Driver class {} found in the HikariConfig class classloader {}", driverClassName, this.getClass().getClassLoader()); } } catch (ClassNotFoundException e) { LOGGER.debug("Failed to load driver class {} from HikariConfig class classloader {}", driverClassName, this.getClass().getClassLoader()); } if (driverClass != null) { try { //初始化驱动 driver = (Driver) driverClass.newInstance(); } catch (Exception e) { LOGGER.warn("Failed to create instance of driver class {}, trying jdbcUrl resolution", driverClassName, e); } } } } final String sanitizedUrl = jdbcUrl.replaceAll("([?&;]password=)[^&#;]*(.*)", "$1<masked>$2"); try { if (driver == null) { driver = DriverManager.getDriver(jdbcUrl); LOGGER.debug("Loaded driver with class name {} for jdbcUrl={}", driver.getClass().getName(), sanitizedUrl); } else if (!driver.acceptsURL(jdbcUrl)) { throw new RuntimeException("Driver " + driverClassName + " claims to not accept jdbcUrl, " + sanitizedUrl); } } catch (SQLException e) { throw new RuntimeException("Failed to get driver instance for jdbcUrl=" + sanitizedUrl, e); } }
private final ConcurrentBag<PoolEntry> connectionBag; //com.zaxxer.hikari.pool.HikariPool#checkFailFast private void checkFailFast() { final long initializationTimeout = config.getInitializationFailTimeout(); if (initializationTimeout < 0) { return; } final long startTime = currentTime(); do { //创建entry final PoolEntry poolEntry = createPoolEntry(); //entry不为空,将entry设置到bag里面,这是初始化时创建的一条连接 if (poolEntry != null) { if (config.getMinimumIdle() > 0) { //添加到连接池 connectionBag.add(poolEntry); logger.debug("{} - Added connection {}", poolName, poolEntry.connection); } else { quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)"); } return; } if (getLastConnectionFailure() instanceof ConnectionSetupException) { throwPoolInitializationException(getLastConnectionFailure().getCause()); } quietlySleep(SECONDS.toMillis(1)); } while (elapsedMillis(startTime) < initializationTimeout); if (initializationTimeout > 0) { throwPoolInitializationException(getLastConnectionFailure()); } }
//com.zaxxer.hikari.pool.HikariPool#createPoolEntry private PoolEntry createPoolEntry() { try { final PoolEntry poolEntry = newPoolEntry(); final long maxLifetime = config.getMaxLifetime(); if (maxLifetime > 0) { // variance up to 2.5% of the maxlifetime final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0; final long lifetime = maxLifetime - variance; poolEntry.setFutureEol(houseKeepingExecutorService.schedule( () -> { if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) { addBagItem(connectionBag.getWaitingThreadCount()); } }, lifetime, MILLISECONDS)); } return poolEntry; } catch (ConnectionSetupException e) { if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently logger.error("{} - Error thrown while acquiring connection from data source", poolName, e.getCause()); lastConnectionFailure.set(e); } } catch (Exception e) { if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently logger.debug("{} - Cannot acquire connection from data source", poolName, e); } } return null; }
创建连接
//com.zaxxer.hikari.pool.PoolBase#newPoolEntry
PoolEntry newPoolEntry() throws Exception {
return new PoolEntry(newConnection(), this, isReadOnly, isAutoCommit);
}
//com.zaxxer.hikari.pool.PoolBase#newConnection private Connection newConnection() throws Exception { final long start = currentTime(); Connection connection = null; try { String username = config.getUsername(); String password = config.getPassword(); //获取连接 connection = (username == null) ? dataSource.getConnection() : dataSource.getConnection(username, password); if (connection == null) { throw new SQLTransientConnectionException("DataSource returned null unexpectedly"); } //设置连接属性 setupConnection(connection); lastConnectionFailure.set(null); return connection; } catch (Exception e) { if (connection != null) { quietlyCloseConnection(connection, "(Failed to create/setup connection)"); } else if (getLastConnectionFailure() == null) { logger.debug("{} - Failed to create/setup connection: {}", poolName, e.getMessage()); } lastConnectionFailure.set(e); throw e; } finally { // tracker will be null during failFast check if (metricsTracker != null) { metricsTracker.recordConnectionCreated(elapsedMillis(start)); } } }
//com.zaxxer.hikari.util.DriverDataSource#getConnection public Connection getConnection(final String username, final String password) throws SQLException { final Properties cloned = (Properties) driverProperties.clone(); if (username != null) { cloned.put("user", username); if (cloned.containsKey("username")) { cloned.put("username", username); } } if (password != null) { cloned.put("password", password); } //从数据库驱动获取连接 return driver.connect(jdbcUrl, cloned); }
//com.zaxxer.hikari.pool.PoolBase#setupConnection private void setupConnection(final Connection connection) throws ConnectionSetupException { try { if (networkTimeout == UNINITIALIZED) { networkTimeout = getAndSetNetworkTimeout(connection, validationTimeout); } else { setNetworkTimeout(connection, validationTimeout); } if (connection.isReadOnly() != isReadOnly) { connection.setReadOnly(isReadOnly); } if (connection.getAutoCommit() != isAutoCommit) { connection.setAutoCommit(isAutoCommit); } //检查驱动 checkDriverSupport(connection); if (transactionIsolation != defaultTransactionIsolation) { connection.setTransactionIsolation(transactionIsolation); } if (catalog != null) { connection.setCatalog(catalog); } if (schema != null) { connection.setSchema(schema); } //执行sql executeSql(connection, config.getConnectionInitSql(), true); setNetworkTimeout(connection, networkTimeout); } catch (SQLException e) { throw new ConnectionSetupException(e); } }
执行初始化sql
//com.zaxxer.hikari.pool.PoolBase#executeSql private void executeSql(final Connection connection, final String sql, final boolean isCommit) throws SQLException { if (sql != null) { try (Statement statement = connection.createStatement()) { // connection was created a few milliseconds before, so set query timeout is omitted (we assume it will succeed) //执行sql statement.execute(sql); } if (isIsolateInternalQueries && !isAutoCommit) { if (isCommit) { connection.commit(); } else { connection.rollback(); } } } }
PoolEntry(final Connection connection, final PoolBase pool, final boolean isReadOnly, final boolean isAutoCommit) {
this.connection = connection;
this.hikariPool = (HikariPool) pool;
this.isReadOnly = isReadOnly;
this.isAutoCommit = isAutoCommit;
this.lastAccessed = currentTime();
this.openStatements = new FastList<>(Statement.class, 16);
}
private final CopyOnWriteArrayList<T> sharedList;
//com.zaxxer.hikari.util.ConcurrentBag#add
public void add(final T bagEntry) {
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
//添加到sharedList
sharedList.add(bagEntry);
// spin until a thread takes it or none are waiting
while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
Thread.yield();
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。