赞
踩
我当前的jdbc驱动版本如下
mysql-connector-java-8.0.30.jar
在实际业务中一般用普通查询就可以覆盖90%的场景了。但是在一些环境需要去处理大量数据,而这些数据可能会超过jvm的堆内存上限。所以此时需要改变一下查询方式;改为一批一批查询数据然后进行业务处理,有点分治法的味道。
那怎么看当前查询是哪种查询呢,其实只要看获取数据时是实现类就很清晰了。
com.mysql.cj.protocol.a.result.ResultsetRowsStatic
:普通查询,将结果集一次性全部拉取到内存com.mysql.cj.protocol.a.result.ResultsetRowsCursor
:游标查询,将结果集分批拉取到内存,会占用当前连接直到连接关闭。在mysql
那边会建立一个临时表写入磁盘(查询结束后由mysql回收处理),会导致mysql server
磁盘io
飙升。com.mysql.cj.protocol.a.result.ResultsetRowsStreaming
:流式查询,将结果集一条一条的拉取进内存,占用当前mysql
连接。这个没啥好说的就是普通的sql查询;
我当前使用的mysql是8.0的需要在jdbc链接参数里声明开启游标查询
useCursorFetch=true
原生查询如下
DataSource dataSource = null; Connection connection = null; Statement statement = null; ResultSet resultSet = null; AtomicLong count = new AtomicLong(0); try { dataSource = CycleDependenciesUtil.getBean(DataSource.class); connection = dataSource.getConnection(); statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); String sql = "select * from xxx"; statement.setFetchSize(1000); resultSet = statement.executeQuery(sql); while (resultSet.next()){ long countResult = count.incrementAndGet(); //处理业务 } System.out.println("总共获取行数:" + count.get()); } catch (SQLException e) { throw new RuntimeException(e); }finally { if(resultSet!=null){ try {resultSet.close();} catch (SQLException e) { e.printStackTrace();} } if(statement!=null){ try {statement.close();} catch (SQLException e) {e.printStackTrace();} } if(connection!=null){ try {connection.close();} catch (SQLException e) {e.printStackTrace();} } }
mybatis游标查询
mysql8.0
必需在jdbc
驱动链接处加上useCursorFetch=true
开启游标查询
//mapper接口
Cursor<Map<String, Object>> cursorQuery();
xml语句
<select id="cursorQuery" resultType="java.util.Map" resultMap="streamQuery" resultSetType="FORWARD_ONLY" fetchSize="1000">
select * from `suppliers-system`.as_fact_as
</select>
//使用demo代码 AtomicLong count = new AtomicLong(0); Cursor<Map<String, Object>> cursor = null; SqlSession sqlSession = null; try { SqlSessionFactory sqlSessionFactory = CycleDependenciesUtil.getBean(SqlSessionFactory.class); sqlSession = sqlSessionFactory.openSession(); ProductAccessoriesSkuMapper mapper = sqlSession.getMapper(ProductAccessoriesSkuMapper.class); cursor = mapper.cursorQuery(); Iterator<Map<String, Object>> iterator = cursor.iterator(); while (iterator.hasNext()){ count.incrementAndGet(); //mybatis相对于原生jdbc操作映射了结果集,方便操作 Map<String, Object> rowData = iterator.next(); //todo 业务操作 } System.out.println("总共获取行数:" + count.get()); } finally { if(Objects.nonNull(cursor)){ try { cursor.close(); } catch (IOException e) { throw new RuntimeException(e); } } if(Objects.nonNull(sqlSession)){ sqlSession.close(); } }
原生statement查询
DataSource dataSource = null; Connection connection = null; Statement statement = null; ResultSet resultSet = null; AtomicLong count = new AtomicLong(0); String sql = "select * from `suppliers-system`.as_fact_as"; try { dataSource = CycleDependenciesUtil.getBean(DataSource.class); //流式查询会一直占用这个tcp连接,直到close connection = dataSource.getConnection(); //ResultSet.TYPE_FORWARD_ONLY 设置游标只能向下 //ResultSet.CONCUR_READ_ONLY 设置结果集只读不可修改 statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); //这里必需设置为Integer.MIN_VALUE,源码里做了强制校验 statement.setFetchSize(Integer.MIN_VALUE); resultSet = statement.executeQuery(sql); while (resultSet.next()){ long countResult = count.incrementAndGet(); //处理业务 //String code = resultSet.getString("serv_info_id"); } System.out.println("总共获取行数:" + count.get()); } catch (SQLException e) { throw new RuntimeException(e); }finally { if(resultSet!=null){ try {resultSet.close();} catch (SQLException e) { e.printStackTrace();} } if(statement!=null){ try {statement.close();} catch (SQLException e) {e.printStackTrace();} } if(connection!=null){ try {connection.close();} catch (SQLException e) {e.printStackTrace();} } }
mybatis流式查询
//mapper接口
void streamQuery(ResultHandler<Map<String,Object>> resultHandler);
<!-- xml -->
<resultMap id="streamQuery" type="java.util.HashMap">
<result column="reg_date" property="reg_date"/>
</resultMap>
<!-- 这里必需声明一个resultMap,原因见源码 com.baomidou.mybatisplus.core.override.MybatisMapperMethod.executeWithResultHandler-->
<select id="streamQuery" resultSetType="FORWARD_ONLY" fetchSize="-2147483648" resultMap="streamQuery">
select * from `suppliers-system`.as_fact_as
</select>
//使用demo代码 AtomicLong count = new AtomicLong(0); SqlSession sqlSession = null; try { SqlSessionFactory sqlSessionFactory = CycleDependenciesUtil.getBean(SqlSessionFactory.class); sqlSession = sqlSessionFactory.openSession(); ProductAccessoriesSkuMapper mapper = sqlSession.getMapper(ProductAccessoriesSkuMapper.class); //流式查询每次只获取一条 //当前sqlSession只能处理当前流式查询,不能再查询其他 mapper.streamQuery(new ResultHandler<Map<String, Object>>() { @Override public void handleResult(ResultContext<? extends Map<String, Object>> resultContext) { long countResult = count.incrementAndGet(); Map<String, Object> rowData = resultContext.getResultObject(); //处理业务 } }); System.out.println("总共获取行数:" + count.get()); } finally { if(sqlSession!=null){ sqlSession.close(); } }
最后概括 流式查询、游标查询可以避免 OOM,数据量大可以考虑此方案。
但是这两种方式会占用数据库连接,使用中不会释放
,所以线上针对大数据量业务用到游标和流式操作,一定要进行并发控制 另外针对 JDBC
原生流式查询,Mybatis 中也进行了封装,虽然会慢一些,但是 功能以及代码的整洁程度会好上不少
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。