当前位置:   article > 正文

JAVA读取TXT文件,将数据存到Mysql数据库,大量数据并使用 JPA 批量插入优化1000秒->190秒,有则更新,没有则新增 批量saveAll。并实时获取入库进度_jpa批量insert优化

jpa批量insert优化

        这边的测试数据为大约20W条数据97个字段。

        直接使用JPA的save方法,一条一条的插入使用时间大约是:1000秒

方法优化

        将所有对象存到一个LIst集合中,再将save方法改为saveAll 方法,全部插入时间:600多秒

        时间还是比较长,再继续进行优化。

配置优化

        修改文件配置,saveAll分批插入,

        在application.properties中增加配置 

  1. # 统计生成SQL执行情况
  2. # spring.jpa.properties.hibernate.generate_statistics=true
  3. # 开启批量插入
  4. spring.jpa.properties.hibernate.jdbc.batch_size=1000
  5. spring.jpa.properties.hibernate.order_inserts=true
  6. spring.jpa.properties.hibernate.order_updates=true

        一般都是1000左右效率相对比较高,也可以适当调整

        优化后全部插入时间大约是:430秒。

        优化到这里是目前能优化的最快,如果时间感觉还是比较长,那就只能从逻辑优化业务优化这两个方面下手!

        业务方面有另外的新的需求,因入库时间过长,要求客户端能实时了解当前的入库进度,就是当前保存的时候存了多少条数据了,前端的使用进度条进行显示。

        saveAll方法并没有实时显示当前进度的方法,而且因为事务的原因,只有方法执行完成才知道入库了多少条数据,逻辑方面下手,另辟蹊径

        在储存的时候会创建任务,写个文件管理实体类,主要是 当前进度  与 总条数 两个字段

  1. /**
  2. * @author: Yz
  3. * @Description:
  4. * @create: 2023-06-12
  5. **/
  6. @Data
  7. @Entity
  8. @Table(name = "medical_insurance_file")
  9. @ApiModel(value = "MedicalInsuranceFile", description = "文件管理")
  10. public class MedicalInsuranceFile extends BaseModel {
  11. @ApiModelProperty(value = "文件id")
  12. @Column(name = "resource_file_id", columnDefinition = "bigint(20) NOT NULL COMMENT '文件ID'")
  13. private Long resourceFileId;
  14. @ApiModelProperty(name = "当前进度")
  15. @Column(name = "current_Progress", columnDefinition = "int(11) DEFAULT '0' COMMENT '当前进度'")
  16. private Integer currentProgress;
  17. @ApiModelProperty(name = "总条数")
  18. @Column(name = "rows", columnDefinition = "int(11) DEFAULT '0' COMMENT '总条数'")
  19. private Integer rows;
  20. }

        主要是其中两个字段,rows 条数  和 当前进度 currentProgress 

        条数是这个文件的总共有多少行数据,我的是20W行数据。当前进度就是存了多少行数据了。

        解压ZIP文件为TXT,并存到LIst集合中。

  1. /**
  2. * Zip转txt并储存数据库。
  3. *outputFolderPath 解压的地址
  4. *zipFilePath zip文件地址
  5. *resourceFileId 资源文件ID
  6. */
  7. private void fileStoreTable(String outputFolderPath, Long resourceFileId, String zipFilePath) throws IOException, ParseException {
  8. // 创建解压目标文件夹
  9. File outputFolder = new File(outputFolderPath);
  10. if (!outputFolder.exists()) {
  11. outputFolder.mkdirs();
  12. }
  13. // 创建ZipInputStream对象
  14. ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(zipFilePath));
  15. ZipEntry entry = zipInputStream.getNextEntry();
  16. BufferedReader buffReader = null;
  17. if (!entry.isDirectory() && entry.getName().endsWith(".txt")) {
  18. String entryName = entry.getName();
  19. String outputPath = outputFolderPath + File.separator + entryName;
  20. FileOutputStream outputStream = new FileOutputStream(outputPath);
  21. /**从ZipInputStream中读取数据并写入到输出流中*/
  22. byte[] buffer = new byte[1024];
  23. int length;
  24. while ((length = zipInputStream.read(buffer)) > 0) {
  25. outputStream.write(buffer, 0, length);
  26. }
  27. outputStream.close(); // 关闭输出流
  28. buffReader = new BufferedReader(new InputStreamReader(new FileInputStream(outputPath), "UTF-8")); // 读取解压后的txt文件内容
  29. }
  30. List<MedicalInsuranceDirectory> medicalInsuranceDirectoryList = new ArrayList<>();
  31. /**对象*/
  32. String strTmp = "";
  33. while ((strTmp = buffReader.readLine()) != null) {
  34. String[] parts = strTmp.split("\t");
  35. MedicalInsuranceDirectory medicalInsuranceDirectory = new MedicalInsuranceDirectory();
  36. medicalInsuranceDirectory.setMedInsCode(parts[0]);
  37. medicalInsuranceDirectory.setMedInsName(parts[3].equals("null") ? null : parts[3]);
  38. .......................
  39. medicalInsuranceDirectory.setDosageForm(parts[90].equals("null") ? null : parts[90]);
  40. medicalInsuranceDirectory.setClassABMark(parts[91].equals("null") ? null : parts[91]);
  41. medicalInsuranceDirectory.setResourceFileCode(fileResource.getResourceFileCode());
  42. medicalInsuranceDirectory.setManMnemonicCode((HanyupinyinUtils.getFirstLettersUp(parts[53])).equals("null") ? null : (HanyupinyinUtils.getFirstLettersUp(parts[53])));
  43. medicalInsuranceDirectory.setMedMnemonicCode((HanyupinyinUtils.getFirstLettersUp(parts[3])).equals("null") ? null : (HanyupinyinUtils.getFirstLettersUp(parts[3])));
  44. medicalInsuranceDirectory.setParams(medicalInsuranceDirectory.assignment());
  45. medicalInsuranceDirectoryList.add(medicalInsuranceDirectory);
  46. }
  47. /**获取数据总条数*/
  48. ......
  49. /**更新或新增*/
  50. saveDataInBatches(medicalInsuranceDirectoryList, resourceFileId);
  51. buffReader.close();
  52. }
  53. zipInputStream.close();
  54. }

        获取文件中总共多少条数据。

  1. /**获取数据总条数*/
  2. MedicalInsuranceFile MIF = medicalInsureFileRepository.findMedicalInsuranceFileByResourceFileIdAndFileStatus(resourceFileId, MedicalInsuranceFileEnum.Downloading.code());
  3. if (MIF == null) {
  4. throw new BusinessException(resourceFileId + "无正在下载的任务");
  5. }
  6. MIF.setRows(medicalInsuranceDirectoryList.size());
  7. medicalInsureFileRepository.save(MIF);

          拿到的这个list集合里存了TXT文件的全部的数据。

  1. /**
  2. * 入库时有则更新,没有则新增
  3. */
  4. private void saveDataInBatches(List<MedicalInsuranceDirectory> westernChinesePatentMedicineCatalogs, Long resourceFileId) {
  5. List<MedicalInsuranceDirectory> existingEntities = new ArrayList<>(); //更新数据集合
  6. List<MedicalInsuranceDirectory> newEntities = new ArrayList<>(); //新增数据集合
  7. // 批量查询已存在的记录
  8. List<String> codeList = westernChinesePatentMedicineCatalogs.stream() //获取所有商品编码
  9. .map(MedicalInsuranceDirectory::getMedInsCode)
  10. .collect(Collectors.toList());
  11. int pageSize = MedicalInsuranceFileEnum.STOCKPILE_COUNT.code(); // 每页查询的记录数量
  12. for (int i = 0; i < codeList.size(); i += pageSize) {
  13. List<String> subCodeList = codeList.subList(i, Math.min(i + pageSize, codeList.size()));
  14. List<MedicalInsuranceDirectory> result = medicalInsuranceDirectoryRepository.findMedicalInsuranceDirectoriesByMedInsCodeIn(subCodeList);//获取数据库中重复的对象。
  15. existingEntities.addAll(result); //将数据库中重复的对象添加至 更新数据集合
  16. }
  17. // 根据查询结果进行更新或新增操作
  18. for (MedicalInsuranceDirectory entity : westernChinesePatentMedicineCatalogs) {
  19. if (existingEntities.stream().anyMatch(e -> e.getMedInsCode().equals(entity.getMedInsCode()))) {
  20. // 处理更新数据集合
  21. for (MedicalInsuranceDirectory MID : existingEntities) {
  22. //更加编码判断,编码相同的对象属性赋值.
  23. if (MID.getMedInsCode().equals(entity.getMedInsCode())) {
  24. //对无效商品进行保留。
  25. MID.setResourceFileCode(entity.getResourceFileCode());
  26. ...........
  27. MID.setDosageForm(entity.getDosageForm());
  28. MID.setClassABMark(entity.getClassABMark());
  29. MID.setAprvNo(entity.getAprvNo());
  30. MID.setParams(entity.getParams());
  31. if (MID.getValidMark().equals(MedicalInsuranceDirectoryEnum.VALID.code())) { //对无效商品进行保留。
  32. MID.setValidMark(MedicalInsuranceDirectoryEnum.VALID.code());
  33. } else {
  34. MID.setValidMark(MedicalInsuranceDirectoryEnum.INVALID.code());
  35. }
  36. if (entity.getManufacturers().equals(null)) {
  37. MID.setManMnemonicCode(null);
  38. } else {
  39. MID.setManMnemonicCode(HanyupinyinUtils.getFirstLettersUp(entity.getManufacturers()));
  40. }
  41. }
  42. }
  43. } else {
  44. // 新增集合
  45. newEntities.add(entity);
  46. }
  47. }
  48. // 批量更新
  49. if (!existingEntities.isEmpty()) {
  50. saveData(existingEntities, resourceFileId, MedicalInsuranceFileEnum.DICT_UPDATE.code());
  51. }
  52. // 批量新增
  53. if (!newEntities.isEmpty()) {
  54. saveData(newEntities, resourceFileId, MedicalInsuranceFileEnum.DICT_SAVE.code());
  55. }
  56. }

        resourceFileId 是多文件入库时,文件的id,方便一个方法通用。

  1. /**
  2. * 每1000个更新或新增一次
  3. */
  4. private void saveData(List<MedicalInsuranceDirectory> westernChinesePatentMedicineCatalogs, Long resourceFileId, Integer a) {
  5. int totalCount = westernChinesePatentMedicineCatalogs.size();
  6. int savedCount = 0;
  7. int batchSize = MedicalInsuranceFileEnum.STOCKPILE_COUNT.code();
  8. while (savedCount < totalCount) {
  9. int endIndex = Math.min(savedCount + batchSize, totalCount);
  10. List<MedicalInsuranceDirectory> batchEntities = westernChinesePatentMedicineCatalogs.subList(savedCount, endIndex);
  11. medicalInsuranceDirectoryRepository.saveAll(batchEntities);
  12. medicalInsuranceDirectoryRepository.flush();
  13. savedCount += batchEntities.size();
  14. //每1000个更新一次当前进度
  15. updateProgress(resourceFileId, savedCount);
  16. }
  17. }

        在每一次更新都调用方法   updateProgress(resourceFileId, savedCount); 来修改当前的进度。

更新当前的进度

        每次saveAll之后调用 flush()方法 再更新当前的进度

  1. /**
  2. * 修改当前入库进度
  3. */
  4. private void updateProgress(Long resourceFileId, Integer a) {
  5. MedicalInsuranceFile MIF = medicalInsureFileRepository.findMedicalInsuranceFileByResourceFileId(resourceFileId);
  6. MIF.setCurrentProgress(a);
  7. medicalInsureFileRepository.save(MIF);
  8. medicalInsureFileRepository.flush();
  9. }

        每入库1000条数据,当前进度字段就变化一次,客户端可以间隔3秒获取一次信息,就可以实时获取当前的入库进度了。 

        有两点要注意,

        1、方法一定不能加事务,不能事务,加了事务就会导致全部入库完成后数据库才会发生变化,就无法获取实时入库进度了。

        2、save或者saveAll之后一定要调用一次 flush() 。别问为什么,要实时获取就要加这个东西。

        3、对于异常一定要 try cache 然后结束当前任务。

        给前端提供一个查询接口,前端间隔3秒查询就可以获取这个文件最大条数,当前入库的条数进度等信息。

        这样之后逻辑变了,但20W行数据入库的时间为440多秒。

        如果时间还是太长,那只能从业务上入手,从97个字段中删减字段。留下必要的字段,其他字段不要了,优化到25个字段后,20W行数据只要了190 秒。

        入库优化完成后,批量删除优化,也进行优化

        1、直接使用deleteAll() 20W数据大约需要80秒

        建议使用sql语句删除

        

  1. //根据resourceFileCode进行删除
  2. @Transactional
  3. @Modifying
  4. @Query(value = "delete from medical_insurance_directory where resource_file_code=?1 ", nativeQuery = true)
  5. void deleteAllByResourceFileCode(@Param("resourceFileCode") String resourceFileCode);

             优化后 20W条数据从 83秒 -> 18秒

  链接   JPA批量删除优化

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/450396
推荐阅读
相关标签
  

闽ICP备14008679号