赞
踩
创建一个抽象类,以便继承次接口
package com.mm.fa.data.process.bigt.utils; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.jdbc.core.JdbcTemplate; import javax.annotation.Resource; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j @Data public abstract class BigDataInsert<T> { @Resource JdbcTemplate jdbcTemplate; int groupCount = 2000; int threadPoolCount = 5; // 创建一个固定大小的线程池 private ExecutorService service = null; public void insertBigData(List<T> list, String sql) { // 创建线程池对象 service = Executors.newFixedThreadPool(threadPoolCount); //确保所有线程都执行结束后,再执行后面的逻辑 CountDownLatch countDownLatch = new CountDownLatch(listList.size()); // 将需保存集合分组 List<List<T>> listList = new ArrayList<>(); if (list.size() > groupCount) { listList = fixedGrouping(list, groupCount); } else { listList.add(list); } //循环10次,每次十万数据,一共100万 for (int i = 0; i < listList.size(); i++) { int finalI = i; List<List<T>> finalListList = listList; // 多线程保存 service.execute(() -> { StringBuilder sb = new StringBuilder(); sb.append(sql); List<String> finalList = new LinkedList<>(); StringBuilder spliceS = new StringBuilder(); finalListList.get(finalI).stream().forEach(item -> { spliceS.append("("); List<String> resultList = pstmToSetValue(item); String join = String.join(",", resultList); spliceS.append(join).append(")"); finalList.add(spliceS.toString()); spliceS.setLength(0); }); String valueSql = String.join(",", finalList); sb.append(valueSql); try { this.jdbcTemplate.execute(sb.toString()); countDownLatch.countDown(); log.info("insert into data successfully..."); } catch (Exception e) { log.error("insert into data error...{}", e.getMessage()); } }); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } service.shutdown(); } /** * 将一组数据固定分组,每组n个元素 * * @param source 要分组的数据源 * @param n 每组n个元素 * @param <T> * @return */ public static <T> List<List<T>> fixedGrouping(List<T> source, int n) { if (null == source || source.size() == 0 || n <= 0) { return null; } List<List<T>> result = new ArrayList<List<T>>(); int remainder = source.size() % n; int size = (source.size() / n); for (int i = 0; i < size; i++) { List<T> subset = null; subset = source.subList(i * n, (i + 1) * n); result.add(subset); } if (remainder > 0) { List<T> subset = null; subset = source.subList(size * n, size * n + remainder); result.add(subset); } return result; } public abstract List<String> pstmToSetValue(T dto); }
Servcice接口 继承 抽象类
package com.mm.fa.data.api.service; import cn.hutool.core.collection.CollectionUtil; import com.mm.fa.common.Result; import com.mm.fa.data.process.bigt.utils.BigDataInsert; import com.mm.fa.ext.supplyChain.dto.ForecastSalesDTO; import com.mm.fa.ext.supplyChain.mapper.Supplier360DataMapper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @Service @Slf4j public class SupplierSalesDataService extends BigDataInsert<Map<String, Object>> { @Resource private Supplier360DataMapper mapper; String SQL = "INSERT INTO supply_chain.t_supplier_sales_data(`manufacturer` , `product_code` , `product_name`, `" + "quantity`, `unit_price`) VALUES "; @Override public List<String> pstmToSetValue(Map<String, Object> dto) { List<String> list = new LinkedList<>(); list.add("'" + dto.get("manufacturer") + "'"); list.add("'" + dto.get("productCode") + "'"); list.add("'" + dto.get("productName") + "'"); list.add("'" + dto.get("quantity") + "'"); list.add("'" + dto.get("unitPrice") + "'"); return list; } public Result saveSupplierSalesData() { List<Map<String, Object>> resultList = new ArrayList<>(); List<String> strList = mapper.getSupplierNameList(); if (CollectionUtil.isNotEmpty(strList)) { List<Callable<List<Map<String, Object>>>> tasks = new ArrayList<>();//添加任务 for (String str : strList) { Callable<List<Map<String, Object>>> qfe = () -> { try { List<Map<String, Object>> newDataList = mapper.selectSupplierSalesData(str); return newDataList; } catch (Exception e) { log.error("供应商销售情况中间表多线程查询调用失败:" + e); throw new Exception("供应商销售情况中间表多线程查询调用异常!"); } }; tasks.add(qfe); } try { //定义固定长度的线程池 防止线程过多 ExecutorService execService = Executors.newFixedThreadPool(100); List<Future<List<Map<String, Object>>>> futures = execService.invokeAll(tasks); // 处理线程返回结果 if (futures != null && futures.size() > 0) { for (Future<List<Map<String, Object>>> future : futures) { if (CollectionUtil.isNotEmpty(future.get())) { resultList.addAll(future.get()); } } } execService.shutdown(); } catch (Exception e) { log.error("供应商销售情况中间表获取数据失败:" + e); } } if(CollectionUtil.isNotEmpty(resultList)){ log.info("供应商销售情况中间表获取数据{}", resultList.size()); //删除库里数据 mapper.deleteAll("t_supplier_sales_data"); log.info("数据删除成功!"); //保存数据 log.info(" 供应商销售情况中间表 开启多线程数据保存!"); insertBigData(resultList, SQL); } return Result.create(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。