赞
踩
public abstract class BaseExecutor implements Executor { private static final Log log = LogFactory.getLog(BaseExecutor.class); protected Transaction transaction; protected Executor wrapper; protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads; // 一级缓存 protected PerpetualCache localCache; protected PerpetualCache localOutputParameterCache; protected Configuration configuration; protected int queryStack; private boolean closed; protected BaseExecutor(Configuration configuration, Transaction transaction) { this.transaction = transaction; this.deferredLoads = new ConcurrentLinkedQueue<DeferredLoad>(); this.localCache = new PerpetualCache("LocalCache"); this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache"); this.closed = false; this.configuration = configuration; this.wrapper = this; } @Override public Transaction getTransaction() { if (closed) { throw new ExecutorException("Executor was closed."); } return transaction; } @Override public void close(boolean forceRollback) { try { try { rollback(forceRollback); } finally { if (transaction != null) { transaction.close(); } } } catch (SQLException e) { // Ignore. There's nothing that can be done at this point. log.warn("Unexpected exception on closing transaction. Cause: " + e); } finally { transaction = null; deferredLoads = null; localCache = null; localOutputParameterCache = null; closed = true; } } @Override public boolean isClosed() { return closed; } @Override public int update(MappedStatement ms, Object parameter) throws SQLException { ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId()); if (closed) { throw new ExecutorException("Executor was closed."); } // 清除一级缓存 clearLocalCache(); return doUpdate(ms, parameter); } @Override public List<BatchResult> flushStatements() throws SQLException { return flushStatements(false); } public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException { if (closed) { throw new ExecutorException("Executor was closed."); } return doFlushStatements(isRollBack); } @Override public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException { // 维护了一条真正的SQL BoundSql boundSql = ms.getBoundSql(parameter); // 建缓存条件CacheKey CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql); return query(ms, parameter, rowBounds, resultHandler, key, boundSql); } @SuppressWarnings("unchecked") @Override public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId()); if (closed) { throw new ExecutorException("Executor was closed."); } if (queryStack == 0 && ms.isFlushCacheRequired()) { // 清除一级缓存 clearLocalCache(); } List<E> list; try { queryStack++; // resultHandler为null时,从一级缓存中取 list = resultHandler == null ? (List<E>) localCache.getObject(key) : null; if (list != null) { // 从一级缓存中取结果 handleLocallyCachedOutputParameters(ms, key, parameter, boundSql); } else { // 从数据库中查询 list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql); } } finally { queryStack--; } if (queryStack == 0) { for (DeferredLoad deferredLoad : deferredLoads) { deferredLoad.load(); } // issue #601 deferredLoads.clear(); if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) { // issue #482 clearLocalCache(); } } return list; } @Override public <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException { BoundSql boundSql = ms.getBoundSql(parameter); return doQueryCursor(ms, parameter, rowBounds, boundSql); } @Override public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) { if (closed) { throw new ExecutorException("Executor was closed."); } DeferredLoad deferredLoad = new DeferredLoad(resultObject, property, key, localCache, configuration, targetType); if (deferredLoad.canLoad()) { deferredLoad.load(); } else { deferredLoads.add(new DeferredLoad(resultObject, property, key, localCache, configuration, targetType)); } } @Override public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) { if (closed) { throw new ExecutorException("Executor was closed."); } CacheKey cacheKey = new CacheKey(); cacheKey.update(ms.getId()); cacheKey.update(rowBounds.getOffset()); cacheKey.update(rowBounds.getLimit()); cacheKey.update(boundSql.getSql()); List<ParameterMapping> parameterMappings = boundSql.getParameterMappings(); TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry(); // mimic DefaultParameterHandler logic for (ParameterMapping parameterMapping : parameterMappings) { if (parameterMapping.getMode() != ParameterMode.OUT) { Object value; String propertyName = parameterMapping.getProperty(); if (boundSql.hasAdditionalParameter(propertyName)) { value = boundSql.getAdditionalParameter(propertyName); } else if (parameterObject == null) { value = null; } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) { value = parameterObject; } else { MetaObject metaObject = configuration.newMetaObject(parameterObject); value = metaObject.getValue(propertyName); } cacheKey.update(value); } } if (configuration.getEnvironment() != null) { // issue #176 cacheKey.update(configuration.getEnvironment().getId()); } return cacheKey; } @Override public boolean isCached(MappedStatement ms, CacheKey key) { return localCache.getObject(key) != null; } @Override public void commit(boolean required) throws SQLException { if (closed) { throw new ExecutorException("Cannot commit, transaction is already closed"); } clearLocalCache(); flushStatements(); if (required) { transaction.commit(); } } @Override public void rollback(boolean required) throws SQLException { if (!closed) { try { clearLocalCache(); flushStatements(true); } finally { if (required) { transaction.rollback(); } } } } @Override public void clearLocalCache() { if (!closed) { localCache.clear(); localOutputParameterCache.clear(); } } protected abstract int doUpdate(MappedStatement ms, Object parameter) throws SQLException; protected abstract List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException; protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException; protected abstract <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException; protected void closeStatement(Statement statement) { if (statement != null) { try { statement.close(); } catch (SQLException e) { // ignore } } } /** * Apply a transaction timeout. * @param statement a current statement * @throws SQLException if a database access error occurs, this method is called on a closed <code>Statement</code> * @since 3.4.0 * @see StatementUtil#applyTransactionTimeout(Statement, Integer, Integer) */ protected void applyTransactionTimeout(Statement statement) throws SQLException { StatementUtil.applyTransactionTimeout(statement, statement.getQueryTimeout(), transaction.getTimeout()); } private void handleLocallyCachedOutputParameters(MappedStatement ms, CacheKey key, Object parameter, BoundSql boundSql) { if (ms.getStatementType() == StatementType.CALLABLE) { // 按key取一级缓存 final Object cachedParameter = localOutputParameterCache.getObject(key); if (cachedParameter != null && parameter != null) { final MetaObject metaCachedParameter = configuration.newMetaObject(cachedParameter); final MetaObject metaParameter = configuration.newMetaObject(parameter); for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) { if (parameterMapping.getMode() != ParameterMode.IN) { final String parameterName = parameterMapping.getProperty(); final Object cachedValue = metaCachedParameter.getValue(parameterName); metaParameter.setValue(parameterName, cachedValue); } } } } } private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { List<E> list; localCache.putObject(key, EXECUTION_PLACEHOLDER); try { // 查询结果集 list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql); } finally { localCache.removeObject(key); } // 把最新查询的结果集放入一级缓存中 localCache.putObject(key, list); if (ms.getStatementType() == StatementType.CALLABLE) { localOutputParameterCache.putObject(key, parameter); } return list; } protected Connection getConnection(Log statementLog) throws SQLException { Connection connection = transaction.getConnection(); if (statementLog.isDebugEnabled()) { return ConnectionLogger.newInstance(connection, statementLog, queryStack); } else { return connection; } } @Override public void setExecutorWrapper(Executor wrapper) { this.wrapper = wrapper; } private static class DeferredLoad { private final MetaObject resultObject; private final String property; private final Class<?> targetType; private final CacheKey key; private final PerpetualCache localCache; private final ObjectFactory objectFactory; private final ResultExtractor resultExtractor; // issue #781 public DeferredLoad(MetaObject resultObject, String property, CacheKey key, PerpetualCache localCache, Configuration configuration, Class<?> targetType) { this.resultObject = resultObject; this.property = property; this.key = key; this.localCache = localCache; this.objectFactory = configuration.getObjectFactory(); this.resultExtractor = new ResultExtractor(configuration, objectFactory); this.targetType = targetType; } public boolean canLoad() { return localCache.getObject(key) != null && localCache.getObject(key) != EXECUTION_PLACEHOLDER; } public void load() { @SuppressWarnings( "unchecked" ) // we suppose we get back a List List<Object> list = (List<Object>) localCache.getObject(key); Object value = resultExtractor.extractObjectFromList(list, targetType); resultObject.setValue(property, value); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。