当前位置:   article > 正文

JAVA 千万级数据多线程保存入库_postgresql java千万数据入库

postgresql java千万数据入库

创建一个抽象类,以便继承次接口

package com.mm.fa.data.process.bigt.utils;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
@Data
public abstract class BigDataInsert<T> {

    @Resource
    JdbcTemplate jdbcTemplate;

    int groupCount = 2000;

    int threadPoolCount = 5;

    // 创建一个固定大小的线程池
    private ExecutorService service = null;

    public void insertBigData(List<T> list, String sql) {
        // 创建线程池对象
        service = Executors.newFixedThreadPool(threadPoolCount);
        //确保所有线程都执行结束后,再执行后面的逻辑
		CountDownLatch countDownLatch = new CountDownLatch(listList.size());
        // 将需保存集合分组
        List<List<T>> listList = new ArrayList<>();
        if (list.size() > groupCount) {
            listList = fixedGrouping(list, groupCount);
        } else {
            listList.add(list);
        }

        //循环10次,每次十万数据,一共100万
        for (int i = 0; i < listList.size(); i++) {
            int finalI = i;
            List<List<T>> finalListList = listList;
            // 多线程保存
            service.execute(() -> {

                StringBuilder sb = new StringBuilder();
                sb.append(sql);
                List<String> finalList = new LinkedList<>();
                StringBuilder spliceS = new StringBuilder();
                finalListList.get(finalI).stream().forEach(item -> {
                    spliceS.append("(");
                    List<String> resultList = pstmToSetValue(item);
                    String join = String.join(",", resultList);
                    spliceS.append(join).append(")");
                    finalList.add(spliceS.toString());
                    spliceS.setLength(0);
                });
                String valueSql = String.join(",", finalList);
                sb.append(valueSql);
                try {
                    this.jdbcTemplate.execute(sb.toString());
                    countDownLatch.countDown();
                    log.info("insert into data successfully...");
                } catch (Exception e) {
                    log.error("insert into data error...{}", e.getMessage());
                }
            });
        }
		try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        service.shutdown();
    }

    /**
     * 将一组数据固定分组,每组n个元素
     *
     * @param source 要分组的数据源
     * @param n      每组n个元素
     * @param <T>
     * @return
     */
    public static <T> List<List<T>> fixedGrouping(List<T> source, int n) {

        if (null == source || source.size() == 0 || n <= 0) {
            return null;
        }
        List<List<T>> result = new ArrayList<List<T>>();
        int remainder = source.size() % n;
        int size = (source.size() / n);
        for (int i = 0; i < size; i++) {
            List<T> subset = null;
            subset = source.subList(i * n, (i + 1) * n);
            result.add(subset);
        }
        if (remainder > 0) {
            List<T> subset = null;
            subset = source.subList(size * n, size * n + remainder);
            result.add(subset);
        }
        return result;
    }

    public abstract List<String> pstmToSetValue(T dto);

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112

Servcice接口 继承 抽象类

package com.mm.fa.data.api.service;

import cn.hutool.core.collection.CollectionUtil;
import com.mm.fa.common.Result;
import com.mm.fa.data.process.bigt.utils.BigDataInsert;
import com.mm.fa.ext.supplyChain.dto.ForecastSalesDTO;
import com.mm.fa.ext.supplyChain.mapper.Supplier360DataMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

@Service
@Slf4j
public class SupplierSalesDataService extends BigDataInsert<Map<String, Object>> {


    @Resource
    private Supplier360DataMapper mapper;

    String SQL = "INSERT INTO supply_chain.t_supplier_sales_data(`manufacturer` , `product_code` , `product_name`, `" +
            "quantity`, `unit_price`) VALUES ";

    @Override
    public List<String> pstmToSetValue(Map<String, Object> dto) {
        List<String> list = new LinkedList<>();
        list.add("'" + dto.get("manufacturer") + "'");
        list.add("'" + dto.get("productCode") + "'");
        list.add("'" + dto.get("productName") + "'");
        list.add("'" + dto.get("quantity") + "'");
        list.add("'" + dto.get("unitPrice") + "'");
        return list;
    }

    public Result saveSupplierSalesData() {

        List<Map<String, Object>> resultList = new ArrayList<>();
        List<String> strList = mapper.getSupplierNameList();
        if (CollectionUtil.isNotEmpty(strList)) {
            List<Callable<List<Map<String, Object>>>> tasks = new ArrayList<>();//添加任务
            for (String str : strList) {
                Callable<List<Map<String, Object>>> qfe = () -> {
                    try {
                        List<Map<String, Object>> newDataList = mapper.selectSupplierSalesData(str);
                        return newDataList;
                    } catch (Exception e) {
                        log.error("供应商销售情况中间表多线程查询调用失败:" + e);
                        throw new Exception("供应商销售情况中间表多线程查询调用异常!");
                    }
                };
                tasks.add(qfe);
            }

            try {
                //定义固定长度的线程池  防止线程过多
                ExecutorService execService = Executors.newFixedThreadPool(100);

                List<Future<List<Map<String, Object>>>> futures = execService.invokeAll(tasks);
                // 处理线程返回结果
                if (futures != null && futures.size() > 0) {
                    for (Future<List<Map<String, Object>>> future : futures) {
                        if (CollectionUtil.isNotEmpty(future.get())) {
                            resultList.addAll(future.get());
                        }
                    }
                }
                execService.shutdown();
            } catch (Exception e) {
                log.error("供应商销售情况中间表获取数据失败:" + e);
            }
        }

        if(CollectionUtil.isNotEmpty(resultList)){
            log.info("供应商销售情况中间表获取数据{}", resultList.size());

            //删除库里数据
            mapper.deleteAll("t_supplier_sales_data");
            log.info("数据删除成功!");

            //保存数据
            log.info(" 供应商销售情况中间表 开启多线程数据保存!");
            insertBigData(resultList, SQL);
        }

         return Result.create();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/560396
推荐阅读
相关标签
  

闽ICP备14008679号