赞
踩
某个字段的值超长了,doris会把这条数据丢弃。
坑:如果同时插入多条,默认情况下只要有一条插入成功了,doris就会返回成功。
可以再插入数据前设置临时变量:SET enable_insert_strict = true 来开启插入时严格校验,这样只要有一条插入失败doris就会返回失败。
案例:从mysql 同步至doris时有两个varchar字段超长导致部分数据被丢弃。
原因:varchar(n) 再mysql中的n是字符数量,doris中的字节数量,所以如果在mysql中是varchar(n)在doris中varchar(3*N) 三倍,doris中是按照utf-8字节数计算的
现象描述:项目框架springboot mybatisplus, 通过mysql协议链接doris,写入时刚开始快,随后马上就慢下来了。整体写入非常慢。
对于 Doris 来说,一个 INSERT 命令就是一个完整的导入事务。因此不论是导入一条数据,还是多条数据,我们都不建议在生产环境使用这种方式进行数据导入。高频次的 INSERT 操作会导致在存储层产生大量的小文件,会严重影响系统性能。
我们建议一批次插入条数在尽量大,比如几千甚至一万条一次。或者可以通过下面的程序的方式,
1、使用 PreparedStatement 来进行批量插入。
2、在mysql链接后面增加rewriteBatchedStatements=true,开启doris批量提交。(
rewriteBatchedStatements=true
会确保 Driver 执行批处理)
- package demo.doris;
-
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
- import java.sql.SQLException;
-
- public class DorisJDBCDemo {
-
- private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
- private static final String DB_URL_PATTERN = "jdbc:mysql://%s:%d/%s?rewriteBatchedStatements=true";
- private static final String HOST = "127.0.0.1"; // Leader Node host
- private static final int PORT = 9030; // query_port of Leader Node
- private static final String DB = "demo";
- private static final String TBL = "test_1";
- private static final String USER = "admin";
- private static final String PASSWD = "my_pass";
-
- private static final int INSERT_BATCH_SIZE = 10000;
-
- public static void main(String[] args) {
- insert();
- }
-
- private static void insert() {
- // 注意末尾不要加 分号 ";"
- String query = "insert into " + TBL + " values(?, ?)";
- // 设置 Label 以做到幂等。
- // String query = "insert into " + TBL + " WITH LABEL my_label values(?, ?)";
-
- Connection conn = null;
- PreparedStatement stmt = null;
- String dbUrl = String.format(DB_URL_PATTERN, HOST, PORT, DB);
- try {
- Class.forName(JDBC_DRIVER);
- conn = DriverManager.getConnection(dbUrl, USER, PASSWD);
- stmt = conn.prepareStatement(query);
-
- for (int i =0; i < INSERT_BATCH_SIZE; i++) {
- stmt.setInt(1, i);
- stmt.setInt(2, i * 100);
- stmt.addBatch();
- }
-
- int[] res = stmt.executeBatch();
- System.out.println(res);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- if (stmt != null) {
- stmt.close();
- }
- } catch (SQLException se2) {
- se2.printStackTrace();
- }
- try {
- if (conn != null) conn.close();
- } catch (SQLException se) {
- se.printStackTrace();
- }
- }
- }
- }

简单改造示例:改造为可以被其他service调用的方法,这里异步插入doris,缓存超过5000提交一次
- import com.tank.monitoringdata.entity.Datas;
- import lombok.extern.slf4j.Slf4j;
-
- import java.sql.*;
- import java.util.ArrayList;
- import java.util.List;
-
- @Slf4j
- public class PreparedStatementTool {
-
- private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
- private static final String DB_URL_PATTERN2 = "jdbc:mysql://10.0.0.210:9030/prod?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false";
- private static final String USER = "s****od";
- private static final String PASSWD = "do*****ja";
- private static Integer bacthInsertSize = 5000;
-
- private static final String query = "insert into datas_new ( dataDate, pointerID, cDataID, dataPackID, dataID, data, timestamp ) values(?, ?, ?, ?, ?, ?, ?)";
- ;
- private static Connection conn = null;
- private static List<Datas> cacheList = new ArrayList<>();
-
-
- public static synchronized void insert(List<Datas> datas) {
- log.info("insert:{},{}", datas.size(), cacheList.size());
- cacheList.addAll(datas);
- if (cacheList.size() >= bacthInsertSize) {
- PreparedStatement stmt = null;
- try {
- Class.forName(JDBC_DRIVER);
- if (conn == null) {
- conn = DriverManager.getConnection(DB_URL_PATTERN2, USER, PASSWD);
- }
- stmt = conn.prepareStatement(query);
- for (int i = 0; i < cacheList.size(); i++) {
- stmt.setString(1, cacheList.get(i).getDataDate().toString());
- stmt.setLong(2, cacheList.get(i).getPointerID());
- stmt.setInt(3, cacheList.get(i).getCDataID());
- stmt.setInt(4, cacheList.get(i).getDataPackID());
- stmt.setLong(5, cacheList.get(i).getDataID());
- stmt.setString(6, cacheList.get(i).getData());
- stmt.setString(7, cacheList.get(i).getTimestamp().toString());
- stmt.addBatch();
- }
- int[] res = stmt.executeBatch();
- log.info("stmt.executeBatch:{}", cacheList.size());
- cacheList.clear();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- if (stmt != null) {
- stmt.close();
- }
- } catch (SQLException se2) {
- se2.printStackTrace();
- }
- // try {
- // if (conn != null) conn.close();
- // } catch (SQLException se) {
- // se.printStackTrace();
- // }
- }
- }
- log.info("end:{},{}", datas.size(), cacheList.size());
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。