赞
踩
这边的测试数据为大约20W条数据,97个字段。
直接使用JPA的save方法,一条一条的插入使用时间大约是:1000秒。
将所有对象存到一个LIst集合中,再将save方法改为saveAll 方法,全部插入时间:600多秒。
时间还是比较长,再继续进行优化。
修改文件配置,saveAll分批插入,
在application.properties中增加配置
- # 统计生成SQL执行情况
- # spring.jpa.properties.hibernate.generate_statistics=true
- # 开启批量插入
- spring.jpa.properties.hibernate.jdbc.batch_size=1000
- spring.jpa.properties.hibernate.order_inserts=true
- spring.jpa.properties.hibernate.order_updates=true
一般都是1000左右效率相对比较高,也可以适当调整
优化后全部插入时间大约是:430秒。
优化到这里是目前能优化的最快,如果时间感觉还是比较长,那就只能从逻辑优化与业务优化这两个方面下手!
业务方面有另外的新的需求,因入库时间过长,要求客户端能实时了解当前的入库进度,就是当前保存的时候存了多少条数据了,前端的使用进度条进行显示。
saveAll方法并没有实时显示当前进度的方法,而且因为事务的原因,只有方法执行完成才知道入库了多少条数据,逻辑方面下手,另辟蹊径。
在储存的时候会创建任务,写个文件管理实体类,主要是 当前进度 与 总条数 两个字段
- /**
- * @author: Yz
- * @Description:
- * @create: 2023-06-12
- **/
- @Data
- @Entity
- @Table(name = "medical_insurance_file")
- @ApiModel(value = "MedicalInsuranceFile", description = "文件管理")
- public class MedicalInsuranceFile extends BaseModel {
-
- @ApiModelProperty(value = "文件id")
- @Column(name = "resource_file_id", columnDefinition = "bigint(20) NOT NULL COMMENT '文件ID'")
- private Long resourceFileId;
-
- @ApiModelProperty(name = "当前进度")
- @Column(name = "current_Progress", columnDefinition = "int(11) DEFAULT '0' COMMENT '当前进度'")
- private Integer currentProgress;
-
- @ApiModelProperty(name = "总条数")
- @Column(name = "rows", columnDefinition = "int(11) DEFAULT '0' COMMENT '总条数'")
- private Integer rows;
-
-
- }
主要是其中两个字段,rows 条数 和 当前进度 currentProgress
条数是这个文件的总共有多少行数据,我的是20W行数据。当前进度就是存了多少行数据了。
- /**
- * Zip转txt并储存数据库。
- *outputFolderPath 解压的地址
- *zipFilePath zip文件地址
- *resourceFileId 资源文件ID
- */
- private void fileStoreTable(String outputFolderPath, Long resourceFileId, String zipFilePath) throws IOException, ParseException {
- // 创建解压目标文件夹
- File outputFolder = new File(outputFolderPath);
- if (!outputFolder.exists()) {
- outputFolder.mkdirs();
- }
- // 创建ZipInputStream对象
- ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(zipFilePath));
- ZipEntry entry = zipInputStream.getNextEntry();
- BufferedReader buffReader = null;
- if (!entry.isDirectory() && entry.getName().endsWith(".txt")) {
- String entryName = entry.getName();
- String outputPath = outputFolderPath + File.separator + entryName;
- FileOutputStream outputStream = new FileOutputStream(outputPath);
- /**从ZipInputStream中读取数据并写入到输出流中*/
- byte[] buffer = new byte[1024];
- int length;
- while ((length = zipInputStream.read(buffer)) > 0) {
- outputStream.write(buffer, 0, length);
- }
- outputStream.close(); // 关闭输出流
- buffReader = new BufferedReader(new InputStreamReader(new FileInputStream(outputPath), "UTF-8")); // 读取解压后的txt文件内容
- }
- List<MedicalInsuranceDirectory> medicalInsuranceDirectoryList = new ArrayList<>();
-
- /**对象*/
- String strTmp = "";
- while ((strTmp = buffReader.readLine()) != null) {
- String[] parts = strTmp.split("\t");
- MedicalInsuranceDirectory medicalInsuranceDirectory = new MedicalInsuranceDirectory();
- medicalInsuranceDirectory.setMedInsCode(parts[0]);
- medicalInsuranceDirectory.setMedInsName(parts[3].equals("null") ? null : parts[3]);
-
- .......................
- medicalInsuranceDirectory.setDosageForm(parts[90].equals("null") ? null : parts[90]);
- medicalInsuranceDirectory.setClassABMark(parts[91].equals("null") ? null : parts[91]);
- medicalInsuranceDirectory.setResourceFileCode(fileResource.getResourceFileCode());
- medicalInsuranceDirectory.setManMnemonicCode((HanyupinyinUtils.getFirstLettersUp(parts[53])).equals("null") ? null : (HanyupinyinUtils.getFirstLettersUp(parts[53])));
- medicalInsuranceDirectory.setMedMnemonicCode((HanyupinyinUtils.getFirstLettersUp(parts[3])).equals("null") ? null : (HanyupinyinUtils.getFirstLettersUp(parts[3])));
- medicalInsuranceDirectory.setParams(medicalInsuranceDirectory.assignment());
-
- medicalInsuranceDirectoryList.add(medicalInsuranceDirectory);
- }
-
- /**获取数据总条数*/
- ......
- /**更新或新增*/
- saveDataInBatches(medicalInsuranceDirectoryList, resourceFileId);
-
- buffReader.close();
- }
- zipInputStream.close();
- }
- /**获取数据总条数*/
- MedicalInsuranceFile MIF = medicalInsureFileRepository.findMedicalInsuranceFileByResourceFileIdAndFileStatus(resourceFileId, MedicalInsuranceFileEnum.Downloading.code());
- if (MIF == null) {
- throw new BusinessException(resourceFileId + "无正在下载的任务");
- }
- MIF.setRows(medicalInsuranceDirectoryList.size());
- medicalInsureFileRepository.save(MIF);
拿到的这个list集合里存了TXT文件的全部的数据。
- /**
- * 入库时有则更新,没有则新增
- */
- private void saveDataInBatches(List<MedicalInsuranceDirectory> westernChinesePatentMedicineCatalogs, Long resourceFileId) {
- List<MedicalInsuranceDirectory> existingEntities = new ArrayList<>(); //更新数据集合
- List<MedicalInsuranceDirectory> newEntities = new ArrayList<>(); //新增数据集合
- // 批量查询已存在的记录
- List<String> codeList = westernChinesePatentMedicineCatalogs.stream() //获取所有商品编码
- .map(MedicalInsuranceDirectory::getMedInsCode)
- .collect(Collectors.toList());
- int pageSize = MedicalInsuranceFileEnum.STOCKPILE_COUNT.code(); // 每页查询的记录数量
- for (int i = 0; i < codeList.size(); i += pageSize) {
- List<String> subCodeList = codeList.subList(i, Math.min(i + pageSize, codeList.size()));
- List<MedicalInsuranceDirectory> result = medicalInsuranceDirectoryRepository.findMedicalInsuranceDirectoriesByMedInsCodeIn(subCodeList);//获取数据库中重复的对象。
- existingEntities.addAll(result); //将数据库中重复的对象添加至 更新数据集合
- }
- // 根据查询结果进行更新或新增操作
- for (MedicalInsuranceDirectory entity : westernChinesePatentMedicineCatalogs) {
- if (existingEntities.stream().anyMatch(e -> e.getMedInsCode().equals(entity.getMedInsCode()))) {
- // 处理更新数据集合
- for (MedicalInsuranceDirectory MID : existingEntities) {
- //更加编码判断,编码相同的对象属性赋值.
- if (MID.getMedInsCode().equals(entity.getMedInsCode())) {
- //对无效商品进行保留。
-
- MID.setResourceFileCode(entity.getResourceFileCode());
- ...........
- MID.setDosageForm(entity.getDosageForm());
- MID.setClassABMark(entity.getClassABMark());
- MID.setAprvNo(entity.getAprvNo());
- MID.setParams(entity.getParams());
- if (MID.getValidMark().equals(MedicalInsuranceDirectoryEnum.VALID.code())) { //对无效商品进行保留。
- MID.setValidMark(MedicalInsuranceDirectoryEnum.VALID.code());
- } else {
- MID.setValidMark(MedicalInsuranceDirectoryEnum.INVALID.code());
- }
- if (entity.getManufacturers().equals(null)) {
- MID.setManMnemonicCode(null);
- } else {
- MID.setManMnemonicCode(HanyupinyinUtils.getFirstLettersUp(entity.getManufacturers()));
- }
- }
- }
- } else {
- // 新增集合
- newEntities.add(entity);
- }
- }
- // 批量更新
- if (!existingEntities.isEmpty()) {
- saveData(existingEntities, resourceFileId, MedicalInsuranceFileEnum.DICT_UPDATE.code());
- }
- // 批量新增
- if (!newEntities.isEmpty()) {
- saveData(newEntities, resourceFileId, MedicalInsuranceFileEnum.DICT_SAVE.code());
- }
- }
resourceFileId 是多文件入库时,文件的id,方便一个方法通用。
- /**
- * 每1000个更新或新增一次
- */
- private void saveData(List<MedicalInsuranceDirectory> westernChinesePatentMedicineCatalogs, Long resourceFileId, Integer a) {
- int totalCount = westernChinesePatentMedicineCatalogs.size();
- int savedCount = 0;
- int batchSize = MedicalInsuranceFileEnum.STOCKPILE_COUNT.code();
- while (savedCount < totalCount) {
- int endIndex = Math.min(savedCount + batchSize, totalCount);
- List<MedicalInsuranceDirectory> batchEntities = westernChinesePatentMedicineCatalogs.subList(savedCount, endIndex);
- medicalInsuranceDirectoryRepository.saveAll(batchEntities);
- medicalInsuranceDirectoryRepository.flush();
- savedCount += batchEntities.size();
- //每1000个更新一次当前进度
- updateProgress(resourceFileId, savedCount);
- }
- }
在每一次更新都调用方法 updateProgress(resourceFileId, savedCount); 来修改当前的进度。
每次saveAll之后调用 flush()方法 再更新当前的进度
- /**
- * 修改当前入库进度
- */
- private void updateProgress(Long resourceFileId, Integer a) {
- MedicalInsuranceFile MIF = medicalInsureFileRepository.findMedicalInsuranceFileByResourceFileId(resourceFileId);
- MIF.setCurrentProgress(a);
- medicalInsureFileRepository.save(MIF);
- medicalInsureFileRepository.flush();
- }
每入库1000条数据,当前进度字段就变化一次,客户端可以间隔3秒获取一次信息,就可以实时获取当前的入库进度了。
有两点要注意,
1、方法一定不能加事务,不能事务,加了事务就会导致全部入库完成后数据库才会发生变化,就无法获取实时入库进度了。
2、save或者saveAll之后一定要调用一次 flush() 。别问为什么,要实时获取就要加这个东西。
3、对于异常一定要 try cache 然后结束当前任务。
给前端提供一个查询接口,前端间隔3秒查询就可以获取这个文件最大条数,当前入库的条数进度等信息。
这样之后逻辑变了,但20W行数据入库的时间为440多秒。
如果时间还是太长,那只能从业务上入手,从97个字段中删减字段。留下必要的字段,其他字段不要了,优化到25个字段后,20W行数据只要了190 秒。
1、直接使用deleteAll() 20W数据大约需要80秒
建议使用sql语句删除
- //根据resourceFileCode进行删除
- @Transactional
- @Modifying
- @Query(value = "delete from medical_insurance_directory where resource_file_code=?1 ", nativeQuery = true)
- void deleteAllByResourceFileCode(@Param("resourceFileCode") String resourceFileCode);
优化后 20W条数据从 83秒 -> 18秒
链接 JPA批量删除优化
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。