赞
踩
1.500万大数据批处理;
2.多线程的使用;
3.线程池;
4.StopWatch 计时;
5.CountDownLatch 计数器。
(此处只是举例多线程、线程池的使用,这个场景完全可以直接 UPDATE。)
package com.acgkaka.test.utils; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * <p> @Title ThreadPoolUtils * <p> @Description 线程池工具类 * * @author ACGkaka * @date 2021/1/19 11:22 */ @Slf4j public class ThreadPoolUtils { /** 最佳线程数: 操作系统最大线程数+2 */ public static final int POOL_SIZE = Runtime.getRuntime().availableProcessors() + 2; /** 线程池关闭前等待时长 */ private static final int AWAIT_TIME = 5_000; /** * 初始化线程池 * * @return 线程池 */ public static ExecutorService initPool() { return initPool("pool-%d"); } /** * 初始化线程池 - 重命名线程 * * @param nameFormat 线程命名格式,例如:CustHis96Lc-pool-%d * @return 线程池 */ public static ExecutorService initPool(String nameFormat) { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(nameFormat).build(); return new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); } /** * 关闭线程池 * shutdown() 只起到通知的作用,需要进一步保证线程池关闭 * * @param pool 线程池 */ public static void shutdownPool(ExecutorService pool) { try { pool.shutdown(); // 所有的任务都结束的时候,返回true if (pool.awaitTermination(AWAIT_TIME, TimeUnit.MILLISECONDS)) { pool.shutdownNow(); } } catch (InterruptedException e) { // awaitTermination方法被中断的时候也中止线程池中全部的线程的执行。 log.info("awaitTermination: " + e.getMessage()); pool.shutdownNow(); } } }
package com.acgkaka.test.controller; import com.acgkaka.test.service.UserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.util.StopWatch; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import java.math.BigDecimal; import java.math.RoundingMode; import java.time.format.DateTimeParseException; /** * 用户数据更新入口类 * * @author ACGkaka */ @Controller @RequestMapping("user") public class UserController { @Autowired UserService service; /** * 更新用户数据 * * @param startYmd 开始日期 * @param endYmd 结束日期 * @return 是否更新成功 */ @RequestMapping("update") @ResponseBody public String update(@RequestParam(value="startYmd")String startYmd, @RequestParam(value="endYmd")String endYmd) { try { StopWatch sw = new StopWatch(); sw.start(); // 业务逻辑处理 service.update(startYmd, endYmd); sw.stop(); String minutes = new BigDecimal(String.valueOf(sw.getTotalTimeSeconds()/60)).setScale(0, RoundingMode.DOWN).toString(); String seconds = new BigDecimal(String.valueOf(sw.getTotalTimeSeconds()%60)).setScale(0, RoundingMode.HALF_UP).toString(); return String.format("<h1>更新成功</h1><h1>耗时: %sm%ss", minutes, seconds); } catch(DateTimeParseException e) { e.printStackTrace(); return "<h1>更新失败,日期参数格式异常,示例:20200101</h1>"; } catch(Exception e) { e.printStackTrace(); return "<h1>更新失败," + e.getMessage() + "</h1>"; } } }
package com.acgkaka.test.entity; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; /** * <p> @Title User * <p> @Description 用户 * * @author ACGkaka */ @Data @Builder @NoArgsConstructor @AllArgsConstructor public class User { /** 用户名 */ private String username; /** 密码 */ private String password; /** 有效性; 1-有效,0-无效 */ private int valid; }
package com.acgkaka.test.handle; import com.acgkaka.test.entity.User; import com.acgkaka.test.mapper.UserMapper; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; /** * 多线程处理数据 * * @author ACGkaka */ @Slf4j public class UserThreadHandle implements Runnable { /** 数据集合 */ private List<User> list; /** 计数器 */ private CountDownLatch count; /** mapper */ private UserMapper userMapper; /** * 构造方法 - 初始化多线程执行时需要的实例 * * @param list 数据集合 * @param userMapper mapper * @param count 计数器 */ public UserThreadHandle(List<User> list, UserMapper userMapper, CountDownLatch count) { this.list = list; this.userMapper = userMapper; this.count = count; } @Override public void run() { log.info(">>>>>>>>>> 正在执行: Thread ID: {}, Thread Name: {}", Thread.currentThread().getId(), Thread.currentThread().getName()); try { if (list != null && !list.isEmpty()) { // 修改密码为12345,此处只是举例,这个场景完全可以直接用SQL UPDATE。 list.forEach(item -> item.setPassword("123456")); // 注意批量保存只有MySQL支持,Oracle不支持 userMapper.saveOrUpdate(list); } } catch (Exception e) { e.printStackTrace(); }finally { // 计数器 - 1(重要) count.countDown(); } } }
package com.acgkaka.test.mapper; import com.acgkaka.test.entity.User; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import java.util.List; /** * 用户表数据库访问层 * * @author ACGkaka */ @Mapper public interface UserMapper { /** * 根据时间查询条数 * * @param ymd 日期 * @return 条数 */ int queryCount(@Param("ymd") String ymd); /** * 根据日期,分页查询用户数据 * * @param ymd 日期 * @param rowNumStart 开始行数 * @param rowNumEnd 结束行数 * @return 对象列表 */ List<User> queryAll(@Param("ymd") String ymd, @Param("rowNumStart") int rowNumStart, @Param("rowNumEnd") int rowNumEnd); /** * 批量更新 * (前提是必须设置主键,会自动根据主键进行 “无则插入,有则更新”) * * @param list 待更新的数据 */ void saveOrUpdate(@Param("list") List<User> list); }
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.acgkaka.test.mapper.UserMapper"> <resultMap type="com.acgkaka.test.entity.User" id="dataResult"> <result column="username" property="username"/> <result column="password" property="password"/> <result column="valid" property="valid"/> </resultMap> <sql id="columns">USERNAME, PASSWORD, VALID</sql> <select id="queryCount" resultType="java.lang.Integer"> SELECT count(1) FROM T_SYS_USER U <if test="ymd != null"> WHERE U.CREATED_TIME = to_date(#{ymd}||' 00:00:00','yyyymmdd hh24:mi:ss') </if> </select> <!--通过实体作为筛选条件查询--> <select id="queryAll" resultMap="dataResult"> SELECT T.* FROM (SELECT ROWNUM ROWNO, <include refid="columns"/> FROM T_SYS_USER <if test="ymd != null"> WHERE CREATED_TIME = to_date(#{ymd}||' 00:00:00','yyyymmdd hh24:mi:ss') </if> ) T WHERE T.ROWNO >=#{rowNumStart} AND T.ROWNO < #{rowNumEnd} </select> <insert id="saveOrUpdate" parameterType="java.util.List"> INSERT INTO T_SYS_USER (<include refid="columns"/>) VALUES <foreach collection="list" item="item" index="index" separator=","> (#{item.username}, #{item.password}, #{item.valid}) </foreach> ON DUPLICATE KEY UPDATE PASSWORD=VALUES(PASSWORD), VALID=VALUES(VALID) </insert> </mapper>
package com.acgkaka.test.service; /** * 用户表服务接口 * * @author ACGkaka */ public interface UserService { /** * 用户数据采集 * * @param startYmd 开始日期 * @param endYmd 结束日期 * @throws InterruptedException 线程异常 */ void update(String startYmd, String endYmd) throws InterruptedException; }
package com.acgkaka.test.service.impl; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.acgkaka.test.entity.User; import com.acgkaka.test.handle.UserThreadHandle; import com.acgkaka.test.mapper.UserMapper; import com.acgkaka.test.service.UserService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.concurrent.*; /** * (T_SYS_User)表服务实现类 * * @author ACGkaka */ @Slf4j @Service public class UserServiceImpl implements UserService { @Autowired private UserMapper userMapper; /** 数据长度, 经过测试 3000 的数据长度是最快的 */ private static final int DATA_LENGTH = 3000; @Override public void update(String startYmd,String endYmd) throws InterruptedException { log.info("查询参数,开始时间:{} 结束时间:{} ",startYmd,endYmd); // 初始化线程池 ExecutorService pool = ThreadPoolUtils.initPool("User-pool-%d"); LocalDate date = LocalDate.parse(startYmd, DateTimeFormatter.BASIC_ISO_DATE); LocalDate endDate = LocalDate.parse(endYmd, DateTimeFormatter.BASIC_ISO_DATE); while (date.compareTo(endDate) <= 0) { int rownum = 0; String ymd = DateTimeFormatter.BASIC_ISO_DATE.format(date); // 查询数量,方便分页 int size = userMapper.queryCount(ymd); List<User> list = userMapper.queryAll(ymd,rownum, DATA_LENGTH); // 循环分页查询处理 while (CollectionUtils.isNotEmpty(list)) { log.info("开始新的循环...."); // 剩余需要的线程数 int lastNum = new Double(Math.ceil((size - rownum) / DATA_LENGTH)).intValue(); // 计数器数量 = MIN(剩余线程数, 线程池数) int countNum = lastNum < ThreadPoolUtils.POOL_SIZE ? lastNum : ThreadPoolUtils.POOL_SIZE; // 计数器,调用await方法分阶段等待线程执行 CountDownLatch count = new CountDownLatch(countNum); for (int i = 0; i < countNum; i++) { rownum+=DATA_LENGTH; // 多线程处理 pool.execute(new UserThreadHandle(list, userMapper, count)); list = userMapper.queryAll(ymd,rownum, rownum + DATA_LENGTH); } log.info("正在等待..."); count.await(); } date = date.plusDays(1); } log.info("更新完毕。"); ThreadPoolUtils.shutdownPool(pool); } }
import org.springframework.util.StopWatch; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; /** * <p> @Title Test * <p> @Description 并发测试 * * @author ACGkaka * @date 2021/4/1 16:10 */ public class Test { public static void main(String[] args) throws InterruptedException { // size-请求次数;ThreadPoolUtils.POOL_SIZE - 并发量(操作系统最大线程数+2) int size = 1_000_00; StopWatch sw = new StopWatch(); sw.start(); ExecutorService pool = ThreadPoolUtils.initPool(); AtomicInteger success = new AtomicInteger(); // 统计成功、失败次数 AtomicInteger fail = new AtomicInteger(); AtomicInteger index = new AtomicInteger(); while (index.get() < size) { // 剩余需要的线程数 int lastNum = size - index.get(); // 计数器数量 = MIN(剩余线程数, 线程池数) int countNum = lastNum > ThreadPoolUtils.POOL_SIZE ? ThreadPoolUtils.POOL_SIZE : lastNum; // 计数器,调用await方法分阶段等待线程执行 CountDownLatch countDownLatch = new CountDownLatch(countNum); for (int i = 0; i < countNum; i++) { int iCounter = index.get(); pool.execute(new Thread(() -> { try { // TODO 处理业务 String result = ""; System.out.println(String.format(">>>>>>>>>> %s. result: %s", iCounter, result)); success.incrementAndGet(); } catch (Exception e) { e.printStackTrace(); fail.incrementAndGet(); } finally { countDownLatch.countDown(); } })); index.incrementAndGet(); } countDownLatch.await(); } ThreadPoolUtils.shutdownPool(pool); sw.stop(); String minutes = new BigDecimal(String.valueOf(sw.getTotalTimeSeconds()/60)).setScale(0, RoundingMode.DOWN).toString(); String seconds = new BigDecimal(String.valueOf(sw.getTotalTimeSeconds()%60)).setScale(0, RoundingMode.HALF_UP).toString(); System.out.println(String.format("成功次数:%s;失败次数:%s", success, fail)); System.out.println(String.format("耗时: %sm%ss", minutes, seconds)); } }
文章为本人开发总结分享,如果有更快的方式,欢迎大家分享~
分享文章:
这里分享一篇大佬写得比较详细的《CountDownLatch用法详解》https://blog.csdn.net/qq812908087/article/details/81112188
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。