当前位置:   article > 正文

Apache Doris 生态扩展及优化:Spark Doris Connector;Flink Doris Connector;DataX DorisWriter数据同步;JDBC Catalog_doris.request.tablet.size

doris.request.tablet.size

8第八章 Apache Doris 生态扩展及优化

8.1Spark Doris Connector

Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。

  • 支持从Doris中读取数据
  • 支持Spark DataFrame批量/流式 写入Doris
  • 可以将Doris表映射为DataFrame或者RDD,推荐使用DataFrame。
  • 支持在Doris端完成数据过滤,减少数据传输量。

特别注意:在测试过程中发现 Spark StructuredStreaming 实时写入 Doris 存在问题。

要想在Spark编程中使用Doris Connector,我们需要根据Doris官网提供的Spark Doris Connector的源码进行编译获取doris-spark-connector,源代码库地址:https://github.com/apache/incubator-doris-spark-connector,我们需要根据自己使用Spark的版本以及Scala的版本进行手动编译源码获取Spark Doris Connector 依赖包,然后导入到项目中使用。如果自己不想编译也可以通过Doris官方提供编译好的包,如下:

https://repo.maven.apache.org/maven2/org/apache/doris/

8.1.1Spark与Doris版本兼容

ConnectorSparkDorisJavaScala
2.3.4-2.11.xx2.x0.12+82.11
3.1.2-2.12.xx3.x0.12+82.12
3.2.0-2.12.xx3.2.x0.12+82.12

经过以上测试,Spark3.3.x版本也与Doris0.12+版本之上兼容,测试Spark3.3.x与Spark3.2.x与Doris1.2.1编译得到Spark-Doris-Connecotr时,StructuredStreaming实时写入到Doris存在问题。

8.1.2Spark Doris Connector 源码编译

根据Doris官方提供的Spark-Doris-Connector源码进行编译获取spark-doris-connector时,需要用到Maven环境,后续在node1节点上进行源码编译,所以在Node1节点上安装Maven环境。

8.1.2.1安装Maven 3.6.3

Maven下载地址:http://maven.apache.org/download.cgi这里下载Maven版本为:3.6.3,下载Maven完成后,按照以下步骤配置Maven。

  1. 上传Maven 安装包到 node3 节点,并解压
[root@node1 software]# tar -zxvf ./apache-maven-3.6.3-bin.tar.gz
  • 1
  1. 配置 Maven 环境变量
#打开 /etc/profile在最后追加以下内容
[root@node1 software]# vim /etc/profile
export MAVEN_HOME=/software/apache-maven-3.6.3/
export PATH=$PATH:$MAVEN_HOME/bin

#配置环境变量生效
[root@node1 software]# source /etc/profile
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  1. 检查是否安装 Maven 成功
[root@node1 software]# mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /software/apache-maven-3.6.3
Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/java/jdk1.8.0_181-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-957.el7.x86_64", arch: "amd64", family: "unix"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 修改默认下载源为阿里镜像源

为了编译源码下载包更快,可以修改MAVEN_HOME/conf/settings.xml文件,添加阿里镜像源,注意:将以下内容放在“…”之间。此外,maven下载包默认对应的仓库位置在{user.home}/.m2/repository 目录下,”{user.home}”为当前用户家目录。

<!-- 添加阿里云镜像-->
<mirror>
    <id>nexus-aliyun</id>
    <mirrorOf>central</mirrorOf>
    <name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

注意:就算设置了阿里云镜像地址,由于源码编译过程中可能涉及从apache maven官网站点下载一些基础依赖,速度也比较慢,也可以将资料中"repository.zip"上传至node1 /root/.m2目录中,并解压,该压缩包是完整下载好依赖的maven 仓库,具体步骤如下:

#将 repository.zip 资料上传至/root/.m2目录中,并解压
[root@node1 ~]# cd /root/.m2
[root@node1 .m2]# unzip ./repository.zip 
  • 1
  • 2
  • 3
8.1.2.2安装Thrift

在源码编译过程中需要用到thrift环境,所以这里在node1上安装thrift,具体步骤如下。

#在node1节点安装依赖
[root@node1 ~]# yum install -y autoconf automake libtool cmake ncurses-devel openssl-devel lzo-devel zlib-devel gcc gcc-c++

#下载thrift源码包,也可以从资料中获取,名称为“thrift-0.13.0.tar.gz”
[root@node1 ~]# cd /software && wget https://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.tar.gz

#解压编译
[root@node1 software]# tar -zxvf thrift-0.13.0.tar.gz
[root@node1 software]# cd thrift-0.13.0
[root@node1 thrift-0.13.0]# ./configure --without-tests
[root@node1 thrift-0.13.0]# make
[root@node1 thrift-0.13.0]#  make install

#安装完成后,检查版本
[root@node1 ~]# thrift --version
Thrift version 0.13.0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
8.1.2.3源码编译

按照如下步骤根据源码编译获取spark-doris-connector。

#创建目录
[root@node1 ~]# cd /software
[root@node1 software]# mkdir doris-spark-connector

#将资料中下载好的源码“doris-spark-connector-master.zip”上传至该目录并解压
[root@node1 doris-spark-connector]# unzip doris-spark-connector-master.zip 

#修改custom_env.sh.tpl文件为custom_env.sh名字
[root@node1 ~]# cd /software/doris-spark-connector/doris-spark-connector-master
[root@node1 doris-spark-connector-master]# mv custom_env.sh.tpl custom_env.sh

#源码编译,编译时选择scala版本和spark版本,这里选择Scala2.12和Spark3.3.x版本
[root@node1 ~]# cd /software/doris-spark-connector/doris-spark-connector-master/spark-doris-connector
[root@node1 spark-doris-connector]# ./build.sh 
Spark-Doris-Connector supports Scala 2.11 and 2.12. Which version do you need ?
1) 2.11
2) 2.12
#? 2
Spark-Doris-Connector supports multiple versions of spark. Which version do you need ?
1) 2.3.x
2) 3.1.x
3) 3.2.x
4) 3.3.x
5) other
#? 4
...
[INFO] Skipping javadoc generation
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  01:35 min
[INFO] Finished at: 2023-04-13T15:49:35+08:00
[INFO] ------------------------------------------------------------------------
[Doris] *****************************************************************
[Doris] Successfully build Spark-Doris-Connector
[Doris] dist: /software/doris-spark-connector/doris-spark-connector-master/dist/spark-doris-connector-3.3_2.12-1.0.0-SNAPSHOT.jar 
[Doris] *****************************************************************
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

以上编译完成后,将目录"/software/doris-spark-connector/doris-spark-connector-master/spark-doris-connector/target"中的spark-doris-connector-3.3_2.12-1.0.0-SNAPSHOT.jar包放在window本地,然后加入到本地maven仓库中在代码中引入使用即可。

8.1.3Spark Doris Connector 使用

8.1.3.1将编译jar包加入本地Maven仓库

将spark-doris-connector-3.3_2.12-1.0.0-SNAPSHOT.jar放在D盘下,打开cmd输入如下命令将spark-doris-connector加入到maven本地仓库。

mvn install:install-file -Dfile=D:\spark-doris-connector-3.3_2.12-1.0.0-SNAPSHOT.jar -DgroupId=org.apache.doris -DartifactId=spark-doris-connector-3.3_2.12 -Dversion=1.0.0 -Dpackaging=jar
  • 1

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传image.png

加载完成之后,可以在本地maven 仓库中看到该jar包:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传image.png

然后,在IDEA中创建Maven项目,将jar包引入到Maven pom.xml文件中即可,这里还需要操作Spark,也同时需要引入Spark的其他依赖包,如下:

<!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” -->
<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
  <!-- Spark 整合Doris 依赖包 -->
  <dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>spark-doris-connector-3.3_2.12</artifactId>
    <version>1.0.0</version>
  </dependency>

  <!-- Spark-core -->
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.3.2</version>
  </dependency>
  <!-- SparkSQL -->
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.3.2</version>
  </dependency>

  <!--mysql依赖的jar包-->
  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
  </dependency>

  <!--SparkStreaming-->
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.3.2</version>
  </dependency>

  <!-- Kafka 0.10+ Source For Structured Streaming-->
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.3.2</version>
  </dependency>

  <!-- Scala 包-->
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.12.10</version>
  </dependency>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-compiler</artifactId>
    <version>2.12.10</version>
  </dependency>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-reflect</artifactId>
    <version>2.12.10</version>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.22</version>
    <scope>compile</scope>
  </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
8.1.3.2Spark Core操作Doris

Spark可以使用SparkCore RDD编程方式读取Doris表中的数据,不支持RDD写入到Doris中。Spark RDD方式操作Doris首先在Doris中创建一张表:

#创建表
CREATE TABLE IF NOT EXISTS example_db.spark_doris_tbl
(
`id` INT NOT NULL COMMENT "id",
`name` VARCHAR(255) NOT NULL COMMENT "姓名",
`age` INT COMMENT "年龄",
`score` BIGINT COMMENT "分数"
)
DUPLICATE KEY(`id`, `name`)
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

#向表中插入数据
mysql> insert into spark_doris_tbl values (1,'zs',18,100),(2,'ls',19,200),(3,'ww',20,300);

#查询Doris表中数据
mysql> select * from spark_doris_tbl;
+------+------+------+-------+
| id   | name | age  | score |
+------+------+------+-------+
|    1 | zs   |   18 |   100 |
|    3 | ww   |   20 |   300 |
|    2 | ls   |   19 |   200 |
+------+------+------+-------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

然后编写Spark RDD 读取Doris中的数据,代码如下:

/**
 * Spark 读取Doris,生成RDD ,目前只有Scala API 支持,Java API 暂时不支持
 * 只支持读取进行数据分析
 */
object SparkReadDoris {
  def main(args: Array[String]): Unit = {
    //创建SparkConf 及 SparkContext
    val conf = new SparkConf
    conf.setMaster("local")
    conf.setAppName("SparkReadDoris")
    val sc = new SparkContext(conf)

    //导入转换
    import org.apache.doris.spark._

    val dorisSparkRDD = sc.dorisRDD(
      tableIdentifier = Some("example_db.spark_doris_tbl"),
      cfg = Some(Map(
        "doris.fenodes" -> "node1:8030",
        "doris.request.auth.user" -> "root",
        "doris.request.auth.password" -> "123456"
      ))
    )

    //打印结果
    dorisSparkRDD.collect().foreach(println)

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

此外需要注意的是 RDD 读取 Doris 表中的数据只支持 Scala API 实现, Java API 会报错。

8.1.3.3Spark DataFrame 操作Doris

我们也可以使用DataFrame方式操作Doris,可以向Doris中写入数据,也可以从Doris中读取数据。在编写代码之前,首先将Doris 表spark_doris_tbl中数据清空:

mysql> delete from spark_doris_tbl where id >0;
  • 1

Spark DataFrame操作Doris代码如下:

/**
 * SparkDataFrame 操作 Doris
 * 注意:操作之前需要先删除Doris中的表 example_db.spark_doris_tbl :delete from example_db.spark_doris_tbl where id >0;
 * 1.Spark 向Doris 表中写入数据
 * 2.Spark 读取 Doris中数据
 *
 */
public class SparkDorisBatchOperator {
    public static void main(String[] args) {
        //1.创建SparkSession对象
        SparkSession sparkSession = SparkSession.builder().appName("SparkReadDoris").master("local").getOrCreate();

        //2.准备数据集并转换成Dataset<Row>
        ArrayList<String> jsonList = new ArrayList<>();
        jsonList.add("{'id':1,'name':'zhangsan','age':18,'score':100}");
        jsonList.add("{'id':2,'name':'lisi','age':19,'score':200}");
        jsonList.add("{'id':3,'name':'wangwu','age':20,'score':300}");
        jsonList.add("{'id':4,'name':'zhaoliu','age':21,'score':400}");
        jsonList.add("{'id':5,'name':'tianqi','age':22,'score':500}");

        Dataset<Row> dataset = sparkSession.read().json(sparkSession.createDataset(jsonList, Encoders.STRING()));

        dataset.printSchema();
        dataset.show();

        //3.将Dataset<Row>写入Doris
        dataset.write().format("doris")
                .option("doris.table.identifier", "example_db.spark_doris_tbl")
                .option("doris.fenodes", "node1:8030")
                .option("user", "root")
                .option("password", "123456")
                //指定你要写入的字段,这里是指定对应到Doris表中的字段的顺序
                .option("doris.write.fields","age,id,name,score")
                .save();

        //4.读取Doris中的数据
        Dataset<Row> load = sparkSession.read().format("doris")
                .option("doris.table.identifier", "example_db.spark_doris_tbl")
                .option("doris.fenodes", "node1:8030")
                .option("user", "root")
                .option("password", "123456")
                .load();
        load.show();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
8.1.3.4Spark SQL操作Doris

除了DataFrame API方式操作Doris数据表之外,还可以使用SparkSQL编程方式来操作Doris表,在编写代码之前,首先将Doris 表spark_doris_tbl中数据清空:

mysql> delete from spark_doris_tbl where id >0;
  • 1

Spark SQL操作Doris代码如下:

/**
 *  SparkSQL 操作 Doris
 *  注意:需要先在Doris中清空表中数据
 *
 *  1.Spark 向Doris 表中写入数据
 *  2.Spark 读取 Doris中数据
 */
public class SparkDorisOperator {
    public static void main(String[] args) {

        //1.创建SparkSession对象
        SparkSession sparkSession = SparkSession.builder().appName("SparkReadDoris").master("local").getOrCreate();

        //2.在Spark中创建Doris的临时视图
        sparkSession.sql("CREATE TEMPORARY VIEW spark_doris USING doris " +
                " OPTIONS( " +
                " 'table.identifier'='example_db.spark_doris_tbl', " +
                " 'fenodes'='node1:8030', " +
                " 'user'='root', " +
                " 'password'='123456' " +
                " ) ");

        //3.使用SparkSQL向Doris中写入数据
        sparkSession.sql("insert into spark_doris values (1,'zs',18,100),(2,'ls',19,200),(3,'ww',20,300)");

        //4.使用SparkSQL从Doris中读取数据
        sparkSession.sql("select * from spark_doris").show();

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
8.1.3.5StructuredStreaming 操作Doris

在Doris官网中给出了StructuredStreaming实时向Doris表中写入数据的案例,但是按照官网写法实现代码运行时一直报错:

Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
  • 1

该错误说明数据写出没有指定start方法,经测试目前在代码编写中就算给出该start启动方法,代码执行还是有问题,目前测试版本为Spark3.2.x 、Spark3.3.x与Doris1.2.1版本,其他版本没有测试。

下面给出Spark StructuredStreaming读取Kafka中数据写入到Doris的代码实现:

/**
 * StructuredStreaming 实时读取Kafka数据写入到Doris
 * 注意:操作之前需要先删除Doris中的表 example_db.spark_doris_tbl :delete from example_db.spark_doris_tbl where id >0;
 */
public class SparkDorisStreamOperator {
    public static void main(String[] args) throws TimeoutException, StreamingQueryException {
        //1.创建SparkSession对象
        SparkSession sparkSession = SparkSession.builder().appName("SparkReadDoris").master("local").getOrCreate();

        //2.读取Kafka数据
        Dataset<Row> kafkaSource = sparkSession.readStream()
                .option("kafka.bootstrap.servers", "node1:9092,node2:9092,nod3:9092")
                .option("startingOffsets", "earliest")
                .option("subscribe", "doris-topic")
                .format("kafka")
                .load();

        Dataset<Row> result = kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
                .select(functions.split(functions.col("value"), ",").getItem(0).as("id"),
                        functions.split(functions.col("value"), ",").getItem(1).as("name"),
                        functions.split(functions.col("value"), ",").getItem(2).as("age"),
                        functions.split(functions.col("value"), ",").getItem(3).as("score"));

        //3.将Kafka数据实时写入Doris
        StreamingQuery query =result.writeStream().outputMode("append")
                .format("doris")
                .trigger(Trigger.ProcessingTime("5 seconds"))
                .option("checkpointLocation", "./checkpoint/dir")
                .option("doris.table.identifier", "example_db.spark_doris_tbl")
                .option("doris.fenodes", "node1:8030")
                .option("user", "root")
                .option("password", "123456")
                //指定你要写入的字段
                .option("doris.write.fields", "id,name,age,score")
                .start();

        query.awaitTermination();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

8.1.4Spark 操作Doris配置项

Spark操作Doris配置项参考Doris官网:https://doris.apache.org/zh-CN/docs/dev/ecosystem/spark-doris-connector#配置。

8.1.4.1通用配置项
KeyDefault ValueComment
doris.fenodesDoris FE http 地址,支持多个地址,使用逗号分隔
doris.table.identifierDoris 表名,如:db1.tbl1
doris.request.retries3向Doris发送请求的重试次数
doris.request.connect.timeout.ms30000向Doris发送请求的连接超时时间
doris.request.read.timeout.ms30000向Doris发送请求的读取超时时间
doris.request.query.timeout.s3600查询doris的超时时间,默认值为1小时,-1表示无超时限制
doris.request.tablet.sizeInteger.MAX_VALUE一个RDD Partition对应的Doris Tablet个数。
此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。
doris.batch.size1024一次从BE读取数据的最大行数。增大此数值可减少Spark与Doris之间建立连接的次数。
从而减轻网络延迟所带来的额外时间开销。
doris.exec.mem.limit2147483648单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.asyncfalse是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch
doris.deserialize.queue.size64异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效
doris.write.fields指定写入Doris表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照Doris表字段顺序写入全部字段。
sink.batch.size10000单次写BE的最大行数
sink.max-retries1写BE失败之后的重试次数
sink.properties.*Stream Load 的导入参数。
例如: ‘sink.properties.column_separator’ = ', ’
doris.sink.task.partition.sizeDoris写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。
此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。
doris.sink.task.use.repartitionfalse是否采用 repartition 方式控制 Doris写入 Partition数。默认值为 false,采用 coalesce 方式控制(注意: 如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。
如果设置为 true,则采用 repartition 方式(注意: 可设置最后 Partition 数,但会额外增加 shuffle 开销)。
doris.sink.batch.interval.ms50每个批次sink的间隔时间,单位 ms。
8.1.4.2SQL 和 Dataframe 专有配置
KeyDefault ValueComment
user访问Doris的用户名
password访问Doris的密码
doris.filter.query.in.max.count100谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。
8.1.4.3RDD 专有配置
KeyDefault ValueComment
doris.request.auth.user访问Doris的用户名
doris.request.auth.password访问Doris的密码
doris.read.field读取Doris表的列名列表,多列之间使用逗号分隔
doris.filter.query过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。

8.1.5Spark和Doris列类型映射关系

Spark处理数据写入到Doris时需要注意对应的类型关系,如下:

Doris TypeSpark Type
NULL_TYPEDataTypes.NullType
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DATEDataTypes.StringType1
DATETIMEDataTypes.StringType1
BINARYDataTypes.BinaryType
DECIMALDecimalType
CHARDataTypes.StringType
LARGEINTDataTypes.StringType
VARCHARDataTypes.StringType
DECIMALV2DecimalType
TIMEDataTypes.DoubleType
HLLUnsupported datatype

注意:Connector中,将DATE和DATETIME映射为String。由于Doris底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 String 类型直接返回对应的时间可读文本。

8.2Flink Doris Connector

Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。通过Flink Doris Connector 可以将Doris表映射为DataStream或者Table对象,批或者实时将数据写入到Doris中,目前从Doris中读取数据只支持批读取。

此外,修改和删除只支持在 Unique Key 模型上,目前的删除是支持 Flink CDC 的方式接入数据实现自动删除,如果是其他数据接入的方式删除需要自己实现。

想要在Flink中使用Doris Connector需要通过Doris官方提供的flink-doris-connector源码进行编译,源码代码库地址:https://github.com/apache/doris-flink-connector,如果自己不想编译也可以通过Doris官方提供编译好的包,如下:

https://repo.maven.apache.org/maven2/org/apache/doris/。

8.2.1Flink与Doris版本兼容

Connector VersionFlink VersionDoris VersionJava VersionScala Version
1.0.31.11+0.15+82.11,2.12
1.1.01.141.0+82.11,2.12
1.2.01.151.0+8-
1.3.01.161.0+8-

8.2.2Flink Doris Connector 源码编译

这里我们选择根据Doris官方提供的源码进行编译获取flink-doris-connector 依赖包,编译源码过程中需要使用Maven和Thrift环境,由于之前在Spark Doris Connector编译过程中已经在node1安装过Maven和Thrift,这里不再安装,也选择在node1节点进行Flink Doris Connector 的源码编译。

此外,由于源码编译过程中可能涉及从apache maven官网站点下载一些基础依赖,速度也比较慢,也可以将资料中"repository.zip"上传至node1 /root/.m2目录中,并解压,该压缩包是完整下载好依赖的maven 仓库,具体步骤如下:

#将 repository.zip 资料上传至/root/.m2目录中,并解压
[root@node1 ~]# cd /root/.m2
[root@node1 .m2]# unzip ./repository.zip 
  • 1
  • 2
  • 3

如果在Spark Doris Connector编译中解压过repository.zip,就不必再次解压。按照如下步骤根据源码编译获取flink-doris-connector。

#创建目录
[root@node1 ~]# cd /software
[root@node1 software]# mkdir doris-flink-connector

#将资料中下载好的源码“doris-flink-connector-master.zip”上传至该目录并解压
[root@node1 doris-flink-connector]# unzip doris-flink-connector-master.zip 

#修改custom_env.sh.tpl文件为custom_env.sh名字
[root@node1 ~]# cd /software/doris-flink-connector/doris-flink-connector-master
[root@node1 doris-flink-connector-master]# mv custom_env.sh.tpl custom_env.sh

#源码编译
[root@node1 ~]# cd /software/doris-flink-connector/doris-flink-connector-master/flink-doris-connector
[root@node1 spark-doris-connector]# ./build.sh 
Flink-Doris-Connector supports multiple versions of flink. Which version do you need ?
1) 1.15.x
2) 1.16.x
3) 1.17.x
#? 2
[Doris]  flink version: 1.16.0
[Doris]  build starting.
... ...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  23.388 s
[INFO] Finished at: 2023-04-14T15:37:26+08:00
[INFO] ------------------------------------------------------------------------
[Doris] *****************************************************************
[Doris] Successfully build Flink-Doris-Connector
[Doris] dist: /software/doris-flink-connector/doris-flink-connector-master/dist/flink-doris-connector-1.4.0-SNAPSHOT.jar 
[Doris] *****************************************************************
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

以上编译完成后,将目录"/software/doris-flink-connector/doris-flink-connector-master/flink-doris-connector/target"中的flink-doris-connector-1.4.0-SNAPSHOT.jar包放在window本地,然后加入到本地maven仓库中在代码中引入使用即可。

8.2.3Flink Doris Connector 使用

Flink操作Doris有两种方式,一个是DataStream方式,另外一种就是SQL方式操作,这两种方式官方建议使用SQL方式进行编程,相对比较简单。

通过Flink Doris Connector可以通过Flink读取Doris中的数据,但是目前仅支持批次从Doris中读取数据,不支持实时读取,向Doris中写入数据时,支持批量写入和实时写入。

8.2.3.1将编译jar包加入本地Maven仓库

将flink-doris-connector-1.4.0-SNAPSHOT.jar放在D盘下,打开cmd输入如下命令将flink-doris-connector加入到maven本地仓库。

mvn install:install-file -Dfile=D:\flink-doris-connector-1.4.0-SNAPSHOT.jar -DgroupId=org.apache.doris -DartifactId=flink-doris-connector-1.4.0 -Dversion=1.0.0 -Dpackaging=jar
  • 1

加载完成之后,可以在本地maven 仓库中看到该jar包:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传image.png

然后,在IDEA中创建Maven项目,将jar包引入到Maven pom.xml文件中即可,这里还需要操作Flink,也同时需要引入Flink的其他依赖包,如下:

<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
  <flink.version>1.16.0</flink.version>
  <hadoop.version>3.3.4</hadoop.version>
  <slf4j.version>1.7.36</slf4j.version>
  <log4j.version>2.17.2</log4j.version>
  <mysql.version>5.1.47</mysql.version>
</properties>

<dependencies>
  <!-- Flink 整合Doris 依赖包 -->
  <dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.4.0</artifactId>
    <version>1.0.0</version>
  </dependency>

  <!-- Flink 依赖包-->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- Flink批和流开发依赖包 -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- DataStream files connector -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-files</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- MySQL 依赖包 -->
  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>3.3.1</version>
  </dependency>

  <!-- slf4j&log4j 日志相关包 -->
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>${slf4j.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-to-slf4j</artifactId>
    <version>${log4j.version}</version>
  </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
8.2.3.2Doris配置及表准备

后续使用Flink操作Doris中的表时需要在Doris FE节点配置启动http v2,在Doris FE各个节点fe.cof中设置enable_http_server_v2=true即可,该参数默认值如下:从官方 0.14.0 release 版之后默认是 true,之前默认 false。

HTTP Server V2 由 SpringBoot 实现, 并采用前后端分离的架构。只有启用 httpv2,用户才能使用新的前端 UI 界面。这里我们使用的Doris版本为1.2.1,所以默认开启。

此外,Flink操作Doris时涉及到的一些表这里预先在Doris中创建出来。

# 创建 Doris 表 :flink_doris_tbl1、flink_doris_tbl2、flink_result_tbl
CREATE TABLE IF NOT EXISTS example_db.flink_doris_tbl1
(
`id` INT NOT NULL COMMENT "id",
`name` VARCHAR(255) NOT NULL COMMENT "姓名",
`age` INT COMMENT "年龄",
`score` BIGINT COMMENT "分数"
)
DUPLICATE KEY(`id`, `name`)
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);


CREATE TABLE IF NOT EXISTS example_db.flink_doris_tbl2
(
`id` INT NOT NULL COMMENT "id",
`name` VARCHAR(255) NOT NULL COMMENT "姓名",
`age` INT COMMENT "年龄",
`total_score` BIGINT COMMENT "总分数"
)
DUPLICATE KEY(`id`, `name`)
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

CREATE TABLE IF NOT EXISTS example_db.flink_result_tbl
(
`window_start` DATETIME NOT NULL COMMENT "窗口开始",
`window_end`  DATETIME NOT NULL COMMENT "窗口结束",
`name` VARCHAR(255) COMMENT "姓名",
`cnt` BIGINT COMMENT "个数"
)
DUPLICATE KEY(`window_start`, `window_end`)
DISTRIBUTED BY HASH(`name`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

#向doris flink_doris_tbl1表中插入数据
insert into flink_doris_tbl1 values (1,'zs',18,100),(2,'ls',19,200),(3,'ww',20,300),(3,'ww',20,400);

#查询表flink_doris_tbl1中数据
mysql> select * from flink_doris_tbl1;
+------+------+------+-------+
| id   | name | age  | score |
+------+------+------+-------+
|    1 | zs   |   18 |   100 |
|    3 | ww   |   20 |   400 |
|    3 | ww   |   20 |   300 |
|    2 | ls   |   19 |   200 |
+------+------+------+-------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
8.2.3.3Flink DataStream操作Doris
  • Flink 读取 Doris 中数据 ( 仅支持批读取 )

下面编写Flink DataStream API 读取Doris表flink_doris_tbl1中的数据,目前Flink仅支持批次从Doris中读取数据,不支持实时读取。代码如下:

/**
 * Flink读取Doris数据
 * 注意:目前Doris Source是有界流,不支持CDC方式读取。
 */
public class FlinkBatchReadDoris {
    public static void main(String[] args) throws Exception {
        //1.准备Flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.准备Doris连接参数
        DorisOptions.Builder builder = DorisOptions.builder()
                .setFenodes("node1:8030")
                .setTableIdentifier("example_db.flink_doris_tbl1")
                .setUsername("root")
                .setPassword("123456");

        //3.准备Doris Source
        DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
                .setDorisOptions(builder.build())
                .setDorisReadOptions(DorisReadOptions.builder().build())
                .setDeserializer(new SimpleListDeserializationSchema())
                .build();

        //4.读取Doris数据
        env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();

        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

执行以上代码,可以在控制台看到读取后的结果。

  • Flink Doris 中写入数据

Flink DataStream API 也可以向Doris中写入数据,写数据可以支持批写和流式写两种方式,这里编写代码实现Flink向Doris 表flink_doris_tbl1中批次写入指定数据,代码如下:

//1.准备Flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//2.数据写入到Doris中必须开启checkpoint
env.enableCheckpointing(10000);

//3.使用批方式写入到Doris 表中,默认是流的方式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

//4.Doris Sink builder对象
DorisSink.Builder<RowData> builder = DorisSink.builder();

//4.1 准备Doris连接参数
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("node1:8030")
        .setTableIdentifier("example_db.flink_doris_tbl1")
        .setUsername("root")
        .setPassword("123456");

//4.2 准备 streamload 关于加载 读取json格式的参数
Properties properties = new Properties();
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");

DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris"+ UUID.randomUUID()) //streamload label 前缀
        .setStreamLoadProp(properties); //streamload 参数

//4.3 设置 flink rowdata 的 schema
String[] fields = {"id", "name", "age", "score"};
DataType[] types = {DataTypes.INT(), DataTypes.VARCHAR(255), DataTypes.INT(), DataTypes.BIGINT()};

//4.4 准备Doris Sink
builder.setDorisReadOptions(DorisReadOptions.builder().build())
        .setDorisExecutionOptions(executionBuilder.build())
        .setSerializer(RowDataSerializer.builder()    // 根据rowdata进行序列化
                .setFieldNames(fields)
                .setType("json")           //json 格式
                .setFieldType(types).build())
        .setDorisOptions(dorisBuilder.build());

//5.生成 rowdata 数据
DataStream<RowData> source = env.fromElements("1,zs,18,100", "2,ls,19,99")
        .map(new MapFunction<String, RowData>() {
            @Override
            public RowData map(String value) throws Exception {
                GenericRowData genericRowData = new GenericRowData(4);
                genericRowData.setField(0, Integer.valueOf(value.split(",")[0]));
                genericRowData.setField(1, StringData.fromString(value.split(",")[1]));
                genericRowData.setField(2, Integer.valueOf(value.split(",")[2]));
                genericRowData.setField(3, Long.valueOf(value.split(",")[3]));
                return genericRowData;
            }
        });

//6.将数据写入到Doris中
source.sinkTo(builder.build());

env.execute();

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

注意:在执行代码之前,可以将Flink表flink_doris_tbl1中的数据删除: delete from flink_doris_tbl1 where id >0;然后再执行以上代码。代码中向Doris表flink_doris_tbl1中插入"1,zs,18,100", "2,ls,19,99"两条数据,代码执行完成后,查看Doris flink_doris_tbl1中的数据:

mysql> select * from flink_doris_tbl1;
+------+------+------+-------+
| id   | name | age  | score |
+------+------+------+-------+
|    2 | ls   |   19 |    99 |
|    1 | zs   |   18 |   100 |
+------+------+------+-------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • Flink 读取 Kafka 数据实时写入 Doris 表中

下面编写代码实现Flink DataStream API 实时读取Kafka中的数据,然后实时写入到Doris表中。代码如下:

/**
 * 从Kafka中实时读取数据 将结果实时写入到Doris表中
 */
public class RealTimeReadKafkaSinkDoris {
    public static void main(String[] args) throws Exception {
        //1.准备Flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.数据写入到Doris中必须开启checkpoint
        env.enableCheckpointing(10000);

        //3.读取Kafka中的数据,并转换成RowData,Kafka中输入数据格式为:1,张三,18,100;2,李四,19,99;3,王五,20,98
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("node1:9092,node2:9092,node3:9092") //设置Kafka 集群节点
                .setTopics("doris-topic") //设置读取的topic
                .setGroupId("test-group") //设置消费者组
                .setStartingOffsets(OffsetsInitializer.latest()) //设置读取数据位置
                .setDeserializer(new KafkaRecordDeserializationSchema<String>() {
                    //设置key ,value 数据获取后如何处理
                    @Override
                    public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<String> collector) throws IOException {
                        String key = null;
                        String value = null;
                        if(consumerRecord.key() != null){
                            key = new String(consumerRecord.key(), "UTF-8");
                        }
                        if(consumerRecord.value() != null){
                            value = new String(consumerRecord.value(), "UTF-8");
                        }
                        collector.collect(value);
                    }

                    //设置置返回的二元组类型
                    @Override
                    public TypeInformation<String> getProducedType() {
                        return TypeInformation.of(String.class);
                    }
                })
                .build();

        DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

        SingleOutputStreamOperator<RowData> sinkData = kafkaDS.map(new MapFunction<String, RowData>() {
            @Override
            public RowData map(String value) throws Exception {
                GenericRowData genericRowData = new GenericRowData(4);
                genericRowData.setField(0, Integer.valueOf(value.split(",")[0]));
                genericRowData.setField(1, StringData.fromString(value.split(",")[1]));
                genericRowData.setField(2, Integer.valueOf(value.split(",")[2]));
                genericRowData.setField(3, Long.valueOf(value.split(",")[3]));
                return genericRowData;
            }
        });

        //4.Doris Sink builder对象
        DorisSink.Builder<RowData> builder = DorisSink.builder();

        //4.1 准备Doris连接参数
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder.setFenodes("node1:8030")
                .setTableIdentifier("example_db.flink_doris_tbl1")
                .setUsername("root")
                .setPassword("123456");

        //4.2 准备 streamload 关于加载 读取json格式的参数
        Properties properties = new Properties();
        properties.setProperty("format", "json");
        properties.setProperty("read_json_by_line", "true");

        DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
        executionBuilder.setLabelPrefix("label-doris"+ UUID.randomUUID()) //streamload label 前缀
                .setStreamLoadProp(properties); //streamload 参数

        //4.3 设置 flink rowdata 的 schema
        String[] fields = {"id", "name", "age", "score"};
        DataType[] types = {DataTypes.INT(), DataTypes.VARCHAR(255), DataTypes.INT(), DataTypes.BIGINT()};

        //4.4 准备Doris Sink
        builder.setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisExecutionOptions(executionBuilder.build())
                .setSerializer(RowDataSerializer.builder()    // 根据rowdata进行序列化
                        .setFieldNames(fields)
                        .setType("json")           //json 格式
                        .setFieldType(types).build())
                .setDorisOptions(dorisBuilder.build());


        //6.将数据写入到Doris中
        sinkData.sinkTo(builder.build());

        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93

以上代码编写完成后,在执行代码之前,可以将Flink表flink_doris_tbl1中的数据删除: delete from flink_doris_tbl1 where id >0;另外,需要在Kafka中创建 doris-topic Topic,执行代码后,输入数据可以实时查看flink_doris_tbl1表中的数据。

#删除doris表flink_doris_tbl1数据
mysql> delete from flink_doris_tbl1 where id >0;

#启动kafka集群,并创建doris-topic 
[root@node1 ~]# kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --create --topic doris-topic --partitions 3 --replication-factor 3

#运行以上编写好的代码,并向kafka doris-topic中输入如下数据
[root@node1 ~]# kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic doris-topic
1,张三,18,100
2,李四,19,99
3,王五,20,98

#查询Doris表 flink_doris_tbl1中的数据
mysql> select * from flink_doris_tbl1;
+------+--------+------+-------+
| id   | name   | age  | score |
+------+--------+------+-------+
|    2 | 李四   |   19 |    99 |
|    1 | 张三   |   18 |   100 |
|    3 | 王五   |   20 |    98 |
+------+--------+------+-------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
8.2.3.4Flink SQL 操作Doris
  • FlinkSQL 读取 Doris 中数据(只支持批读取)

与DataStream API 一样,在读取Doris中数据时只支持批量读取,不支持实时读取。下面编写代码实现Flink SQL方式读取Doris中的数据,然后再写入到Doris表中。代码如下:

/**
 * 从Doris中读取数据 再次将结果写入到Doris表中
 * 1) 创建Doris 表 flink_doris_tbl1 并插入数据
 * 2) 从该表中读取数据并写入到flink_doris_tbl2 表中
 *  注意:目前Doris Source是有界流,不支持CDC方式读取,所以Doris Source在数据读取完成后,流为什么就结束
 */
public class FlinkReadDorisToDoris {
    public static void main(String[] args) throws Exception {
        //1.准备Flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.通过Doris Connector创建表 读取Doris表 flink_doris_tbl1 中的数据
        String  sourceSQL = "CREATE TABLE flink_doris_source ( " +
                "    id INT," +
                "    name STRING," +
                "    age INT," +
                "    score BIGINT" +
                "    ) " +
                "    WITH (" +
                "      'connector' = 'doris'," +
                "      'fenodes' = 'node1:8030'," +
                "      'table.identifier' = 'example_db.flink_doris_tbl1'," +
                "      'username' = 'root'," +
                "      'password' = '123456'" +
                ")";

        tableEnv.executeSql(sourceSQL);

        //3.打印读取到的数据
        tableEnv.executeSql("select id,name,age,sum(score) as total_score from flink_doris_source group by id,name,age")
                .print();

        //4.将聚合结果写入到Doris表 flink_doris_tbl2 中
        //4.1 创建Doris表 flink_doris_tbl2
        String  sinkSQL = "CREATE TABLE flink_doris_sink ( " +
                "    id INT," +
                "    name STRING," +
                "    age INT," +
                "    total_score BIGINT" +
                "    ) " +
                "    WITH (" +
                "      'connector' = 'doris'," +
                "      'fenodes' = 'node1:8030'," +
                "      'sink.label-prefix' = '" + UUID.randomUUID() + "'," +
                "      'table.identifier' = 'example_db.flink_doris_tbl2'," +
                "      'username' = 'root'," +
                "      'password' = '123456'" +
                ")";

        tableEnv.executeSql(sinkSQL);

        //4.2 将聚合结果写入到Doris表 flink_doris_tbl2 中
        tableEnv.executeSql("insert into flink_doris_sink select id,name,age,sum(score) as total_score from flink_doris_source group by id,name,age");

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

以上代码执行前,首先确保表flink_doris_tbl1中有数据,并保证flink_doris_tbl2中没有数据,执行代码完成后,可以查询Doris表flink_doris_tbl2中的数据。

mysql> select * from flink_doris_tbl2;
+------+--------+------+-------------+
| id   | name   | age  | total_score |
+------+--------+------+-------------+
|    2 | 李四   |   19 |          99 |
|    1 | 张三   |   18 |         100 |
|    3 | 王五   |   20 |          98 |
+------+--------+------+-------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • FlinkSQL 实时读取 Kafka 数据写入到 Doris

下面使用Flink SQL编程方式编写代码实现从Kafka中实时读取数据,经过窗口分析后,实时写入到Doris 表中。代码如下:

/**
 * Flink实时读取Kafak数据,实时将结果写入到Doris表中
 * 1.在kafka中创建topic:doris-topic
 * 2.在Doris MySQL客户端 创建表 :flink_result_tbl
 */
public class RealTimeReadKafkaSinkDoris {
    public static void main(String[] args) throws Exception {
        //1.创建Flink流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //可以设置并行度为1,防止一些并行度中没有数据时,结果不写入Doris,也可以设置table.exec.source.idle-timeout参数
        // env.setParallelism(1);

        //2.必须开启checkpoint,否则无法写入Doris
        env.enableCheckpointing(5000);

        //3.创建Flink Table 环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        TableConfig config = tableEnv.getConfig();
        //table.exec.source.idle-timeout=5s ,单位是ms,如果其他分区没有等待多少ms后没有数据来,则自动推进水位线。
        config.getConfiguration().setString("table.exec.source.idle-timeout","5 s");

        //4.创建Kafka Source表
        String  sourceSQL = "CREATE TABLE KafkaTable (" +
                "  `name` STRING," +
                "  `dt` BIGINT," +
                "  `ts` AS TO_TIMESTAMP(FROM_UNIXTIME(dt/1000, 'yyyy-MM-dd HH:mm:ss'))," +
                "   WATERMARK FOR ts AS ts - INTERVAL '0' SECOND" +
                ") WITH (" +
                "  'connector' = 'kafka'," +
                "  'topic' = 'doris-topic'," +
                "  'properties.bootstrap.servers' = 'node1:9092,node2:9092,node3:9092'," +
                "  'properties.group.id' = 'testGroup'," +
                "  'scan.startup.mode' = 'latest-offset'," +
                "  'format' = 'csv'" +
                ")";
        tableEnv.executeSql(sourceSQL);

        //5.执行SQL,实时统计每5秒的数据,每10秒输出一次统计一次人数 ,并将结果注册临时表tmp
        String executeSql = "SELECT window_start, window_end, name, COUNT(name) as cnt" +
                "  FROM TABLE(" +
                "    HOP(TABLE KafkaTable, DESCRIPTOR(ts), INTERVAL '5' SECOND, INTERVAL '10' SECOND))" +
                "  GROUP BY window_start, window_end, name";
        Table table = tableEnv.sqlQuery(executeSql);
        tableEnv.createTemporaryView("tmp",table);

        //6.将聚合结果写入到Doris表 flink_doris_tbl2 中
        //6.1 创建Flink临时表映射 Doris 表 flink_result_tbl
        String  sinkSQL = "CREATE TABLE flink_doris_sink ( " +
                "    window_start TIMESTAMP," +
                "    window_end TIMESTAMP," +
                "    name STRING," +
                "    cnt BIGINT" +
                "    ) " +
                "    WITH (" +
                "      'connector' = 'doris'," +
                "      'fenodes' = 'node1:8030'," +
                "      'sink.label-prefix' = '" + UUID.randomUUID() + "'," +
                "      'table.identifier' = 'example_db.flink_result_tbl'," +
                "      'username' = 'root'," +
                "      'password' = '123456'" +
                ")";
        tableEnv.executeSql(sinkSQL);

        //6.2 将聚合结果写入到Doris表 flink_result_tbl 中
        tableEnv.executeSql("insert into flink_doris_sink select window_start,window_end,name,cnt from tmp");

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

以上代码编写完成后,运行代码后,向Kafka doris-topic中写入数据,然后在Doris flink_result_tbl中查看结果。向Kafka doris-topic中写入的数据如下:

[root@node1 ~]# kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic doris-topic
Alice,1681448400000
Alice,1681448401000
Bob,1681448405000
Bob,1681448406000
Bob,1681448407000
Mary,1681448410000
Mike,1681448415000
Lucy,1681448420000
Tom,1681448425000
Sara,1681448430000

#数据写入后,可以查询Doris表flink_result_tbl中的数据
mysql> select * from flink_result_tbl;
+---------------------+---------------------+-------+------+
| window_start        | window_end          | name  | cnt  |
+---------------------+---------------------+-------+------+
| 2023-04-14 13:00:05 | 2023-04-14 13:00:15 | Mary  |    1 |
| 2023-04-14 13:00:10 | 2023-04-14 13:00:20 | Mary  |    1 |
| 2023-04-14 13:00:00 | 2023-04-14 13:00:10 | Bob   |    3 |
| 2023-04-14 13:00:05 | 2023-04-14 13:00:15 | Bob   |    3 |
| 2023-04-14 13:00:10 | 2023-04-14 13:00:20 | Mike  |    1 |
| 2023-04-14 13:00:15 | 2023-04-14 13:00:25 | Mike  |    1 |
| 2023-04-14 13:00:20 | 2023-04-14 13:00:30 | Tom   |    1 |
| 2023-04-14 12:59:55 | 2023-04-14 13:00:05 | Alice |    2 |
| 2023-04-14 13:00:00 | 2023-04-14 13:00:10 | Alice |    2 |
| 2023-04-14 13:00:15 | 2023-04-14 13:00:25 | Lucy  |    1 |
| 2023-04-14 13:00:20 | 2023-04-14 13:00:30 | Lucy  |    1 |
+---------------------+---------------------+-------+------+


#再次继续向Kafka topic 中写入数据
Alice,1681448435000
Bob,1681448440000
Mary,1681448445000
Mike,1681448450000
Lucy,1681448455000
Tom,1681448460000
Sara,1681448465000
11 rows in set (0.07 sec)

#再次查询Doris表flink_result_tbl中的数据
mysql> select * from flink_result_tbl;
+---------------------+---------------------+-------+------+
| window_start        | window_end          | name  | cnt  |
+---------------------+---------------------+-------+------+
| 2023-04-14 13:00:15 | 2023-04-14 13:00:25 | Lucy  |    1 |
| 2023-04-14 13:00:20 | 2023-04-14 13:00:30 | Lucy  |    1 |
| 2023-04-14 13:00:25 | 2023-04-14 13:00:35 | Sara  |    1 |
| 2023-04-14 13:00:30 | 2023-04-14 13:00:40 | Sara  |    1 |
| 2023-04-14 13:00:50 | 2023-04-14 13:01:00 | Lucy  |    1 |
| 2023-04-14 13:00:55 | 2023-04-14 13:01:05 | Lucy  |    1 |
| 2023-04-14 12:59:55 | 2023-04-14 13:00:05 | Alice |    2 |
| 2023-04-14 13:00:00 | 2023-04-14 13:00:10 | Alice |    2 |
| 2023-04-14 13:00:20 | 2023-04-14 13:00:30 | Tom   |    1 |
| 2023-04-14 13:00:25 | 2023-04-14 13:00:35 | Tom   |    1 |
| 2023-04-14 13:00:30 | 2023-04-14 13:00:40 | Alice |    1 |
| 2023-04-14 13:00:35 | 2023-04-14 13:00:45 | Alice |    1 |
| 2023-04-14 13:00:55 | 2023-04-14 13:01:05 | Tom   |    1 |
| 2023-04-14 13:00:00 | 2023-04-14 13:00:10 | Bob   |    3 |
| 2023-04-14 13:00:05 | 2023-04-14 13:00:15 | Bob   |    3 |
| 2023-04-14 13:00:05 | 2023-04-14 13:00:15 | Mary  |    1 |
| 2023-04-14 13:00:10 | 2023-04-14 13:00:20 | Mike  |    1 |
| 2023-04-14 13:00:10 | 2023-04-14 13:00:20 | Mary  |    1 |
| 2023-04-14 13:00:15 | 2023-04-14 13:00:25 | Mike  |    1 |
| 2023-04-14 13:00:35 | 2023-04-14 13:00:45 | Bob   |    1 |
| 2023-04-14 13:00:40 | 2023-04-14 13:00:50 | Mary  |    1 |
| 2023-04-14 13:00:40 | 2023-04-14 13:00:50 | Bob   |    1 |
| 2023-04-14 13:00:45 | 2023-04-14 13:00:55 | Mary  |    1 |
| 2023-04-14 13:00:45 | 2023-04-14 13:00:55 | Mike  |    1 |
| 2023-04-14 13:00:50 | 2023-04-14 13:01:00 | Mike  |    1 |
+---------------------+---------------------+-------+------+
25 rows in set (0.03 sec)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
8.2.3.5Flink 操作Doris总结
  1. 无论是DataStream API 还是SQL API 读取Doris中数据时,目前仅支持批读取。
  2. Flink向Doris中写入数据时底层会转换成Stream Load方式向Doris加载数据,这种速度快,每次执行代码都会生成一个Stream Load对应的Label ,在代码中或者SQL Connector中可以指定,建议加上UUID随机生成。否则每次执行代码前还需执行clean label from db 来清除对应Doris 库中的Label。
  3. Flink向Doris中写入数据不建议使用JDBC的方式,建议使用Stream Load的方式,即以上案例演示方式。
  4. 实时向Doris中写入数据时,一样要开启Checkpoint,否则数据不能正常写入到Doris表中。
  5. Flink Doris Connector主要是依赖Checkpoint进行流式写入,所以Checkpoint的间隔即为数据的可见延迟时间。
  6. 为了保证Flink的Exactly Once语义,Flink Doris Connector 默认开启两阶段提交,Doris在1.1版本后默认开启两阶段提交。
  7. Flink在数据导入时,如果有脏数据,比如字段格式、长度等问题,会导致StreamLoad报错,此时Flink会不断的重试。如果需要跳过,可以通过禁用StreamLoad的严格模式(strict_mode=false,max_filter_ratio=1)或者在Sink算子之前对数据做过滤。

8.2.4Flink操作Doris配置项

Flink操作Doris时有一些配置项如下,可以参考官网配置,地址:https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector#配置,如下:

KeyDefault ValueRequiredComment
fenodesYDoris FE http 地址
table.identifierYDoris 表名,如:db.tbl
usernameY访问 Doris 的用户名
passwordY访问 Doris 的密码
doris.request.retries3N向 Doris 发送请求的重试次数
doris.request.connect.timeout.ms30000N向 Doris 发送请求的连接超时时间
doris.request.read.timeout.ms30000N向 Doris 发送请求的读取超时时间
doris.request.query.timeout.s3600N查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制
doris.request.tablet.sizeInteger. MAX_VALUEN一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。
doris.batch.size1024N一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。
doris.exec.mem.limit2147483648N单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.asyncFALSEN是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch
doris.deserialize.queue.size64N异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效
doris.read.fieldN读取 Doris 表的列名列表,多列之间使用逗号分隔
doris.filter.queryN过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。
sink.label-prefixYStream load导入使用的label前缀。2pc场景下要求全局唯一 ,用来保证Flink的EOS语义。
sink.properties.*NStream Load 的导入参数。
例如: ‘sink.properties.column_separator’ = ‘, ’ 定义列分隔符, ‘sink.properties.escape_delimiters’ = ‘true’ 特殊字符作为分隔符,’\x01’会被转换为二进制的0x01
JSON格式导入
‘sink.properties.format’ = ‘json’ ‘sink.properties.read_json_by_line’ = ‘true’
sink.enable-deleteTRUEN是否启用删除。此选项需要 Doris 表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。
sink.enable-2pcTRUEN是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。关于两阶段提交可参考这里

8.2.5Doris 和 Flink 列类型映射关系

Flink处理数据写入到Doris时需要注意对应的类型关系,如下:

Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
DECIMALV2DECIMAL
TIMEDOUBLE
HLLUnsupported datatype

8.3DataX DorisWriter

DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。

DataX doriswriter 插件,用于通过 DataX 同步其他数据源的数据到 Doris 中。这个插件是利用Doris的Stream Load 功能进行数据导入的。需要配合 DataX 服务一起使用。

默认的DataX安装包中没有DorisWriter插件,我们想要同步其他数据库数据到Doris中就需要根据Doris官方提供的Doris Write源码结合DataX源码进行编译,获取到DataX的安装包。

8.3.1DorisWriter插件集成DataX编译

doriswriter 插件依赖的 DataX 代码中的一些模块。而这些模块并没有在 Maven 官方仓库中。所以我们在开发 doriswriter 插件时,需要下载完整的 DataX 代码库,才能进行插件的编译和开发。DorisWriter 插件源码地址:https://github.com/apache/doris/tree/master/extension/DataX,该目录下的文件如下:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传image.png

  • doriswriter

这个目录是 doriswriter 插件的代码目录。这个目录中的所有代码,都托管在 Apache Doris 的代码库中。

doriswriter 插件帮助文档在这里:https://github.com/apache/doris/blob/master/extension/DataX/doriswriter/doc/doriswriter.md

  • init-env.sh

该脚本主要用于构建 DataX 开发环境,他主要进行了以下操作:

  1. 将 DataX 代码库 clone 到本地。
  2. 将 doriswriter/ 目录软链到 DataX/doriswriter 目录。
  3. 在 DataX/pom.xml 文件中添加 <module>doriswriter</module> 模块。
  4. 将 DataX/core/pom.xml 文件中的 httpclient 版本从 4.5 改为 4.5.13.(httpclient v4.5 在处理 307 转发时有bug)

这个脚本执行后,开发者就可以进入 DataX/ 目录开始开发或编译了。因为做了软链,所以任何对 DataX/doriswriter 目录中文件的修改,都会反映到 doriswriter/ 目录中,方便开发者提交代码。

这里在node1节点进行DorisWriter插件集成DataX源码编译,步骤如下:

  1. 安装 git
[root@node1 ~]# yum -y install git
  • 1
  1. 准备 doriswriter 源码与 init-env.sh 文件

在node1节点创建目录,并将https://github.com/apache/doris/tree/master/extension/DataX 中的doriswriter文件夹和init-env.sh文件上传到该目录(在资料中有DataX目录,目录中有这两个文件,也可以自己下载)。

#创建目录并上传文件
[root@node1 software]# mkdir doris-datax && cd doris-datax

#查看上传的文件
[root@node1 doris-datax]# ls
doriswriter  init-env.sh
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 编译源码
#执行init-env.sh脚本
[root@node1 doris-datax]# sh init-env.sh

#执行完成后,查看生成的文件
[root@node1 doris-datax]# ls
DataX  doriswriter  init-env.sh  pom.xml

#进入该DataX该目录,执行如下命令进行DataX安装包编译
[root@node1 doris-datax]# cd DataX/
[root@node1 DataX]# mvn package assembly:assembly -Dmaven.test.skip=true
...
[INFO] adbmysqlwriter 0.0.1-SNAPSHOT ...................... SUCCESS [  1.364 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  15:46 min
[INFO] Finished at: 2023-04-14T17:28:22+08:00
[INFO] ------------------------------------------------------------------------
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

以上代码编译完成后,可以在/software/doris-datax/DataX/target 下看到datax.tar.gz 安装包。

8.3.2DataX安装

编译好的DataX安装包只能安装到Linux中,需要有JDK8、Python环境(推荐Python2.6,Centos7自带python为2版本),将编译好的DataX安装包datax.tar.gz直接解压到某一路径下完成安装,进入bin目录,即可运行同步作业。这里解压到/software目录下。

#将编译好的DataX安装包移动到/software目录下,解压DataX安装包
[root@node1 bin]# cd /software/
[root@node1 software]# tar -zxvf ./datax.tar.gz
  • 1
  • 2
  • 3

8.3.3DorisWriter参数介绍

关于DorisWriter插件的介绍可以参考:https://github.com/apache/doris/blob/master/extension/DataX/doriswriter/doc/doriswriter.md,DorisWriter支持将大批量数据写入Doris中。

DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter会将reader读取的数据进行缓存在内存中,拼接成Json文本,然后批量导入至Doris。下面结合样例json来介绍DorisWriter的参数。

这里是一份从Stream读取数据后导入至Doris的配置文件:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"],
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://localhost:3306/demo"],
                                "table": ["employees_1"]
                            }
                        ],
                        "username": "root",
                        "password": "xxxxx",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "doriswriter",
                    "parameter": {
                        "loadUrl": ["172.16.0.13:8030"],
                        "loadProps": {
                        },
                        "column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"],
                        "username": "root",
                        "password": "xxxxxx",
                        "postSql": ["select count(1) from all_employees_info"],
                        "preSql": [],
                        "flushInterval":30000,
                        "connection": [
                          {
                            "jdbcUrl": "jdbc:mysql://172.16.0.13:9030/demo",
                            "selectedDatabase": "demo",
                            "table": ["all_employees_info"]
                          }
                        ],
                        "loadProps": {
                            "format": "json",
                            "strip_outer_array": true
                        }
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

以上参数解释如下:

  • jdbcUrl

描述:Doris 的 JDBC 连接串,用户执行 preSql 或 postSQL。

必选:是

默认值:无

  • loadUrl

描述:作为 Stream Load 的连接目标。格式为 “ip:port”。其中 IP 是 FE 节点 IP,port 是 FE 节点的 http_port。可以填写多个,多个之间使用英文状态的分号隔开:;,doriswriter 将以轮询的方式访问。

必选:是

默认值:无

  • username

描述:访问Doris数据库的用户名

必选:是

默认值:无

  • password

描述:访问Doris数据库的密码

必选:否

默认值:空

  • connection.selectedDatabase

描述:需要写入的Doris数据库名称。

必选:是

默认值:无

  • connection.table

描述:需要写入的Doris表名称。

必选:是

默认值:无

  • column

描述:目的表需要写入数据的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: “column”: [“id”,“name”,“age”]。

必选:是

默认值:否

  • preSql

描述:写入数据到目的表前,会先执行这里的标准语句。

必选:否

默认值:无

  • postSql

描述:写入数据到目的表后,会执行这里的标准语句。

必选:否

默认值:无

  • maxBatchRows

描述:每批次导入数据的最大行数。和 maxBatchSize 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。

必选:否

默认值:500000

  • batchSize

描述:每批次导入数据的最大数据量。和 maxBatchRows 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。

必选:否

默认值:104857600

  • maxRetries

描述:每批次导入数据失败后的重试次数。

必选:否

默认值:0

  • labelPrefix

描述:每批次导入任务的 label 前缀。最终的 label 将有 labelPrefix + UUID 组成全局唯一的 label,确保数据不会重复导入

必选:否

默认值:datax_doris_writer_

  • loadProps

描述:StreamLoad 的请求参数,详情参照StreamLoad介绍。这里包括导入的数据格式:format等,导入数据格式默认我们使用csv,支持JSON,具体可以参照下面类型转换部分,也可以参照上面Stream load 官方信息。

必选:否

默认值:无

默认传入的数据均会被转为字符串,并以\t作为列分隔符,\n作为行分隔符,组成csv文件进行StreamLoad导入操作。

默认是csv格式导入,如需更改列分隔符, 则正确配置 loadProps 即可:

"loadProps": {
    "column_separator": "\\x01",
    "line_delimiter": "\\x02"
}
  • 1
  • 2
  • 3
  • 4

如需更改导入格式为json, 则正确配置 loadProps 即可:

"loadProps": {
    "format": "json",
    "strip_outer_array": true
}
  • 1
  • 2
  • 3
  • 4

8.3.4DataX同步MySQL数据到Doris

这里通过DataX将MySQL中表数据导入到Doris对应表中。按照如下步骤操作即可。

  1. 创建 MySQL 数据表
#在node2节点mysql中创建如下test表,并插入数据
mysql> use demo;
mysql> create table demo.test(id int,name varchar(255),age int ,score bigint);
mysql> insert into test values (1,'zs',18,100),(2,'ls',19,200),(3,'ww',20,300),(4,'ml',21,400),(5,'gb',22,500);

#查看数据
mysql> select * from test;
+------+------+------+-------+
| id   | name | age  | score |
+------+------+------+-------+
|    1 | zs   |   18 |   100 |
|    2 | ls   |   19 |   200 |
|    3 | ww   |   20 |   300 |
|    4 | ml   |   21 |   400 |
|    5 | gb   |   22 |   500 |
+------+------+------+-------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  1. 创建 Doris
#在Doris example_db下创建表
CREATE TABLE example_db.`doris_datax_tbl` (
 `id` int NOT NULL,
 `name` varchar(30) DEFAULT NULL COMMENT '',
 `age` int DEFAULT NULL COMMENT '',
 `score` bigint DEFAULT NULL COMMENT ''
) ENGINE=OLAP
UNIQUE KEY(`id`, `name`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  1. 准备 DataX 数据同步需要的 json 文件

my_import.json:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": ["id","name","age","score"],
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://node2:3306/demo"],
                                "table": ["test"]
                            }
                        ],
                        "username": "root",
                        "password": "123456",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "doriswriter",
                    "parameter": {
                        "loadUrl": ["node1:8030"],
                        "loadProps": {
                        },
                        "column": ["id","name","age","score"],
                        "username": "root",
                        "password": "123456",
                        "postSql": ["select count(1) from doris_datax_tbl"],
                        "preSql": [],
                        "flushInterval":30000,
                        "connection": [
                          {
                            "jdbcUrl": "jdbc:mysql://node1:9030/example_db",
                            "selectedDatabase": "example_db",
                            "table": ["doris_datax_tbl"]
                          }
                        ],
                        "loadProps": {
                            "format": "json",
                            "strip_outer_array":"true",
                            "line_delimiter": "\\x02"
                        }
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

将my_import.json文件放在node1节点的/root目录下。

  1. DataX 中执行数据同步脚本
#执行数据同步脚本
[root@node1 ~]# cd /software/datax/bin/
[root@node1 bin]# python datax.py /root/my_import.json 
...
任务启动时刻                    : 2023-04-14 19:52:40
任务结束时刻                    : 2023-04-14 19:52:52
任务总计耗时                    :                 11s
任务平均流量                    :                4B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   5
读写失败总数                    :                   0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  1. Doris 中查看同步数据
#在Doris中查询同步的数据
mysql> select * from doris_datax_tbl;
+------+------+------+-------+
| id   | name | age  | score |
+------+------+------+-------+
|    1 | zs   |   18 |   100 |
|    2 | ls   |   19 |   200 |
|    3 | ww   |   20 |   300 |
|    4 | ml   |   21 |   400 |
|    5 | gb   |   22 |   500 |
+------+------+------+-------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

8.4JDBC Catalog

JDBC Catalog 通过标准 JDBC 协议,连接其他数据源。连接后,Doris 会自动同步数据源下的 Database 和 Table 的元数据,以便快速访问这些外部数据,数据不会同步到Doris中,Doris只是通过JDBC方式来访问外部数据库中的数据。目前Catalog支持MySQL、PostgreSQL、Oracle、SQLServer、Clickhouse、Doris。具体可以参考:https://doris.apache.org/zh-CN/docs/dev/lakehouse/multi-catalog/jdbc

8.4.1创建Catalog

使用JDBC方式读取MySQL、PostgreSQL、Oracle、SQLServer、Clickhouse、Doris中的数据时,需要首先创建Catalog,下面以读取MySQL、Oracle、Clickhouse、Doris中的数据创建Catalog为例来演示Catalog的创建。

#mysql
CREATE CATALOG jdbc_mysql PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://127.0.0.1:3306/demo",
"driver_url" = "mysql-connector-java-5.1.47.jar",
"driver_class" = "com.mysql.jdbc.Driver"
)

#oracle
CREATE CATALOG jdbc_oracle PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:oracle:thin:@127.0.0.1:1521:helowin",
"driver_url" = "ojdbc6.jar",
"driver_class" = "oracle.jdbc.driver.OracleDriver"
);

#clickhouse
CREATE CATALOG jdbc_clickhouse PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:clickhouse://127.0.0.1:8123/demo",
"driver_url" = "clickhouse-jdbc-0.3.2-patch11-all.jar",
"driver_class" = "com.clickhouse.jdbc.ClickHouseDriver"
);

#doris,JDBC Catalog支持连接另一个Doris数据库
CREATE CATALOG doris_catalog PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://127.0.0.1:9030?useSSL=false",
"driver_url" = "mysql-connector-java-5.1.47.jar",
"driver_class" = "com.mysql.jdbc.Driver"
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

以上创建Catalog的参数说明如下:

参数是否必须默认值说明
user对应数据库的用户名
password对应数据库的密码
jdbc_urlJDBC连接串
driver_urlJDBC Driver Jar包名称*
driver_classJDBC Driver Class名称
only_specified_database“false”指定是否只同步指定的database
lower_case_table_names“false”是否以小写的形式同步jdbc外部数据源的表名
specified_database_list“”当only_specified_database=true时,指定同步多个database,以’,'分隔。db名称是大小写敏感的。

注意 :

  1. driver_url :需将 Jar 包预先存放在所有 FE及BE 节点指定的路径下。如 file:///path/to/mysql-connector-java-5.1.47.jar
  2. only_specified_database:在jdbc连接时可以指定链接到哪个database/schema, 如:mysql中jdbc_url中可以指定database, pg的jdbc_url中可以指定currentSchema。only_specified_database=true 且specified_database_list为空时,可以只同步指定的 database。当only_specified_database=true且specified_database_list指定了database列表时,则会同步指定的多个database。
  3. 映射 Oracle 时,Doris 的一个 Database 对应于 Oracle 中的一个 User。而 Doris 的 Database 下的 Table 则对应于 Oracle 中,该 User 下的有权限访问的 Table。
DorisOracle
CatalogDatabase
DatabaseUser
TableTable
  1. 目前Jdbc Catalog连接一个Doris数据库只支持用5.x版本的jdbc jar包。如果使用8.x jdbc jar包,可能会出现列类型无法匹配问题。

8.4.2数据查询

当在Doris中创建好对应的Catalog后,可以直接编写SQL读取对应数据库中的数据,举例如下:

#数据读取
select * from mysql_catalog.mysql_database.mysql_table where k1 > 1000 and k3 ='term';
  • 1
  • 2

由于可能存在使用数据库内部的关键字作为字段名,为解决这种状况下仍能正确查询,所以在SQL语句中,会根据各个数据库的标准自动在字段名与表名上加上转义符。例如 MYSQL(``)、PostgreSQL(“”)、SQLServer([])、ORACLE(“”),所以此时可能会造成字段名的大小写敏感,具体可以通过explain sql,查看转义后下发到各个数据库的查询语句。

8.4.3列类型映射

Doirs JDBC方式读取外部存储库中的数据需要注意Doris列类型与其他数据库中列类型的映射对应关系,下面以MySQL中列类型对应Doris列类型,其他数据库列类型与Doris 列类型映射关系参考:https://doris.apache.org/zh-CN/docs/dev/lakehouse/multi-catalog/jdbc#%E5%88%97%E7%B1%BB%E5%9E%8B%E6%98%A0%E5%B0%84

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传image.png

8.4.4Doris JDBC方式操作MySQL数据

下面以Doris读取MySQL中的数据为例,来演示通过JDBC Catalog方式读取MySQL中的数据。

  1. 准备 mysql-connector-java-5.1.47.jar

在Doris各个FE和BE节点上"/software/doris-1.2.1/apache-doris-fe/jdbc_drivers"目录中上传mysql-connector-java-5.1.47.jar,如果没有jdbc_drivers需要手动创建。

上传的该jar包需要在创建JDBC Catalog时指定在jdbc.driver_url配置下,这里上传至node1~node5各个节点的/software/doris-1.2.1/apache-doris-fe/jdbc_drivers路径中。

  1. 创建MySQL JDBC Catalog
CREATE CATALOG jdbc_mysql PROPERTIES (
    "type"="jdbc",
    "jdbc.user"="root",
    "jdbc.password"="123456",
    "jdbc.jdbc_url" = "jdbc:mysql://192.168.179.5:3306/demo",
    "jdbc.driver_url" = "file:///software/doris-1.2.1/apache-doris-fe/jdbc_drivers/mysql-connector-java-5.1.47.jar",
    "jdbc.driver_class" = "com.mysql.jdbc.Driver"
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  1. MySQL demo 下准备数据表并插入数据
#在node2 MySQL中创建表,并插入数据
create table demo.tbl1 (id int,name varchar(255),age int);
create table demo.tbl2 (id int,name varchar(255),age int);
create table demo.tbl3 (id int,name varchar(255),age int);

insert into tbl1 values (1,'a',18),(2,'b',19);
insert into tbl2 values (3,'c',20),(4,'d',21);
insert into tbl3 values (5,'e',22),(6,'f',23);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

注意:如果在Doris中创建了对应的Catalog,再向mysql中创建的表,Doris中不认识,可以删除Doris中的对应catalog并重建该Catalog同步mysql新的数据表元数据来解决。

  1. Doris JDBC 方式操作 MySQL
#Doris JDBC 方式查询MySQL中的数据
mysql> select * from jdbc_mysql.demo.tbl1;
+------+------+------+
| id   | name | age  |
+------+------+------+
|    1 | a    |   18 |
|    2 | b    |   19 |
+------+------+------+

mysql> select * from jdbc_mysql.demo.tbl2;
+------+------+------+
| id   | name | age  |
+------+------+------+
|    3 | c    |   20 |
|    4 | d    |   21 |
+------+------+------+

mysql> select * from jdbc_mysql.demo.tbl3;
+------+------+------+
| id   | name | age  |
+------+------+------+
|    5 | e    |   22 |
|    6 | f    |   23 |
+------+------+------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

注意: 目前JDBC方式官方声称可以向外部数据库表中插入数据,但是经过测试目前Doris JDBC 读取外部数据库表只支持查询,不支持插入、更新、删除操作。

8.5Doris优化

Doris中的优化涉及到的方面非常多,例如在分区、分桶、join、查询、参数等各个方面都有优化对应策略,具体可以参考前面每个部分内容,这里大体总结下Doris中的优化相关内容:

  1. 执行SQL慢可以通过Doris提供的QueryProfile查看SQL执行的统计信息帮助我们了解Doris的执行情况,并有针对性的进行相应Debug与调优工作。具体可以参考https://doris.apache.org/zh-CN/docs/dev/admin-manual/query-profile。
  2. 在Doris中选择合适的数据存储模型来存储数据,针对不同场景选择AGGREGATE KEY, UNIQUE KEY, DUPLICATE KEY 三种数据存储模型
  3. 创建Doris表时设置合理的分区和分桶个数,方便对数据进行管理,在企业生产环境中也可以设置Doris分级存储(SSD + SATA)提高数据管理。
  4. Doris建表时,建议采用区分度大的列做分桶, 避免出现数据倾斜,为方便数据恢复, 建议单个 bucket 的 size 不要太大, 保持在 10GB 以内, 所以建表或增加 partition 时请合理考虑 bucket 数目, 其中不同 partition 可指定不同的 buckets 数。
  5. Doris建表时可以根据表数据使用情况自定义Bitmap索引和BloomFilter索引加快数据查询速度,例如对于表某列未来有in查询操作可以对该列构建布隆过滤器加快查询速度。此外如果默认的前缀索引不能满足查询需求,还可以基于原表构建rollup物化索引来调整列顺序或预先聚合数据来提高数据查询效率。
  6. 业务方建表时, 为了和前端业务适配, 往往不对维度信息和指标信息加以区分, 而将 Schema 定义成大宽表,这种操作对于数据库其实不是那么友好,我们更建议用户采用星型模型。使用过程中,建议用户尽量使用 Star Schema 区分维度表和指标表。频繁更新的维度表也可以放在 MySQL 外部表中。而如果只有少量更新, 可以直接放在 Doris 中。在 Doris 中存储维度表时,可对维度表设置更多的副本,提升 Join 的性能。
  7. 向Doris中导入、导出数据时,根据情况选择不同的导入、导出方式,每种导入、导出方式由各自优化参数,具体参考数据导入、导出部分。
  8. 针对json的优化可以选择Broadcast、Shuffle Join、Bucket Join、Colocation Join ,根据自己数据的情况来选择。关于json优化参考7.6小结。
  9. 查询Doris数据时,如果数据表数据量大且逻辑复杂,可以给BE节点增加内存并设置查询超时时间。具体设置参考:https://doris.apache.org/zh-CN/docs/dev/data-table/basic-usage#数据表的查询
  10. 不建议对Doris中的数据直接delete进行删除,建议尽量使用Doris中Unique存储模型+Sequence列实现数据替换,以增代删的方式解决。
  11. 建议创建doris表时如果要有多个分区,使用动态分区方式比较灵活。
  12. 对一次写入多次查询的数据表建议使用SQLCache或者PartitoinCache对数据进行缓存,可以大大提高数据查询分析效率
  13. 海量数据有去重场景情况下可以使用Doris BitMap精准去重,效率会大大增加。
  14. 对一张Doris表经常会有固定维度分析时,可以基于该表构建物化视图预先计算好根据固定维度分析的结果,效率会有质的提升。不建议对一张表构建非常多物化视图,归根结底,物化视图是真实计算存储数据的,一张表的大量物化视图会降低数据导入效率和使用更多的存储空间。
  15. 建议使用JDBC的方式来查询外部数据库的数据,不用ODBC的方式,ODBC方式已丢弃。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/956530
推荐阅读
相关标签
  

闽ICP备14008679号