当前位置:   article > 正文

Apache doris使用问题记录_doris写入数据非常慢

doris写入数据非常慢

插入数据丢数

可能原因之一

某个字段的值超长了,doris会把这条数据丢弃。

坑:如果同时插入多条,默认情况下只要有一条插入成功了,doris就会返回成功。

可以再插入数据前设置临时变量:SET enable_insert_strict = true 来开启插入时严格校验,这样只要有一条插入失败doris就会返回失败。

见官网:Insert Into - Apache Doris

案例:从mysql 同步至doris时有两个varchar字段超长导致部分数据被丢弃。

原因:varchar(n) 再mysql中的n是字符数量,doris中的字节数量,所以如果在mysql中是varchar(n)在doris中varchar(3*N) 三倍,doris中是按照utf-8字节数计算的

插入数据慢

现象描述:项目框架springboot mybatisplus, 通过mysql协议链接doris,写入时刚开始快,随后马上就慢下来了。整体写入非常慢。

问题分析:

参考官网:使用 Insert 方式同步数据 - Apache Doris

对于 Doris 来说,一个 INSERT 命令就是一个完整的导入事务。因此不论是导入一条数据,还是多条数据,我们都不建议在生产环境使用这种方式进行数据导入。高频次的 INSERT 操作会导致在存储层产生大量的小文件,会严重影响系统性能。

解决方案:

我们建议一批次插入条数在尽量大,比如几千甚至一万条一次。或者可以通过下面的程序的方式,

1、使用 PreparedStatement 来进行批量插入

2、在mysql链接后面增加rewriteBatchedStatements=true,开启doris批量提交。(rewriteBatchedStatements=true 会确保 Driver 执行批处理

改造示例如下:

  1. package demo.doris;
  2. import java.sql.Connection;
  3. import java.sql.DriverManager;
  4. import java.sql.PreparedStatement;
  5. import java.sql.SQLException;
  6. public class DorisJDBCDemo {
  7. private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
  8. private static final String DB_URL_PATTERN = "jdbc:mysql://%s:%d/%s?rewriteBatchedStatements=true";
  9. private static final String HOST = "127.0.0.1"; // Leader Node host
  10. private static final int PORT = 9030; // query_port of Leader Node
  11. private static final String DB = "demo";
  12. private static final String TBL = "test_1";
  13. private static final String USER = "admin";
  14. private static final String PASSWD = "my_pass";
  15. private static final int INSERT_BATCH_SIZE = 10000;
  16. public static void main(String[] args) {
  17. insert();
  18. }
  19. private static void insert() {
  20. // 注意末尾不要加 分号 ";"
  21. String query = "insert into " + TBL + " values(?, ?)";
  22. // 设置 Label 以做到幂等。
  23. // String query = "insert into " + TBL + " WITH LABEL my_label values(?, ?)";
  24. Connection conn = null;
  25. PreparedStatement stmt = null;
  26. String dbUrl = String.format(DB_URL_PATTERN, HOST, PORT, DB);
  27. try {
  28. Class.forName(JDBC_DRIVER);
  29. conn = DriverManager.getConnection(dbUrl, USER, PASSWD);
  30. stmt = conn.prepareStatement(query);
  31. for (int i =0; i < INSERT_BATCH_SIZE; i++) {
  32. stmt.setInt(1, i);
  33. stmt.setInt(2, i * 100);
  34. stmt.addBatch();
  35. }
  36. int[] res = stmt.executeBatch();
  37. System.out.println(res);
  38. } catch (Exception e) {
  39. e.printStackTrace();
  40. } finally {
  41. try {
  42. if (stmt != null) {
  43. stmt.close();
  44. }
  45. } catch (SQLException se2) {
  46. se2.printStackTrace();
  47. }
  48. try {
  49. if (conn != null) conn.close();
  50. } catch (SQLException se) {
  51. se.printStackTrace();
  52. }
  53. }
  54. }
  55. }

实际应用

简单改造示例:改造为可以被其他service调用的方法,这里异步插入doris,缓存超过5000提交一次

  1. import com.tank.monitoringdata.entity.Datas;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.sql.*;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. @Slf4j
  7. public class PreparedStatementTool {
  8. private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
  9. private static final String DB_URL_PATTERN2 = "jdbc:mysql://10.0.0.210:9030/prod?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false";
  10. private static final String USER = "s****od";
  11. private static final String PASSWD = "do*****ja";
  12. private static Integer bacthInsertSize = 5000;
  13. private static final String query = "insert into datas_new ( dataDate, pointerID, cDataID, dataPackID, dataID, data, timestamp ) values(?, ?, ?, ?, ?, ?, ?)";
  14. ;
  15. private static Connection conn = null;
  16. private static List<Datas> cacheList = new ArrayList<>();
  17. public static synchronized void insert(List<Datas> datas) {
  18. log.info("insert:{},{}", datas.size(), cacheList.size());
  19. cacheList.addAll(datas);
  20. if (cacheList.size() >= bacthInsertSize) {
  21. PreparedStatement stmt = null;
  22. try {
  23. Class.forName(JDBC_DRIVER);
  24. if (conn == null) {
  25. conn = DriverManager.getConnection(DB_URL_PATTERN2, USER, PASSWD);
  26. }
  27. stmt = conn.prepareStatement(query);
  28. for (int i = 0; i < cacheList.size(); i++) {
  29. stmt.setString(1, cacheList.get(i).getDataDate().toString());
  30. stmt.setLong(2, cacheList.get(i).getPointerID());
  31. stmt.setInt(3, cacheList.get(i).getCDataID());
  32. stmt.setInt(4, cacheList.get(i).getDataPackID());
  33. stmt.setLong(5, cacheList.get(i).getDataID());
  34. stmt.setString(6, cacheList.get(i).getData());
  35. stmt.setString(7, cacheList.get(i).getTimestamp().toString());
  36. stmt.addBatch();
  37. }
  38. int[] res = stmt.executeBatch();
  39. log.info("stmt.executeBatch:{}", cacheList.size());
  40. cacheList.clear();
  41. } catch (Exception e) {
  42. e.printStackTrace();
  43. } finally {
  44. try {
  45. if (stmt != null) {
  46. stmt.close();
  47. }
  48. } catch (SQLException se2) {
  49. se2.printStackTrace();
  50. }
  51. // try {
  52. // if (conn != null) conn.close();
  53. // } catch (SQLException se) {
  54. // se.printStackTrace();
  55. // }
  56. }
  57. }
  58. log.info("end:{},{}", datas.size(), cacheList.size());
  59. }
  60. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/892265
推荐阅读
相关标签
  

闽ICP备14008679号