赞
踩
Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。
特别注意:在测试过程中发现 Spark StructuredStreaming 实时写入 Doris 存在问题。
要想在Spark编程中使用Doris Connector,我们需要根据Doris官网提供的Spark Doris Connector的源码进行编译获取doris-spark-connector,源代码库地址:https://github.com/apache/incubator-doris-spark-connector,我们需要根据自己使用Spark的版本以及Scala的版本进行手动编译源码获取Spark Doris Connector 依赖包,然后导入到项目中使用。如果自己不想编译也可以通过Doris官方提供编译好的包,如下:
https://repo.maven.apache.org/maven2/org/apache/doris/
Connector | Spark | Doris | Java | Scala |
---|---|---|---|---|
2.3.4-2.11.xx | 2.x | 0.12+ | 8 | 2.11 |
3.1.2-2.12.xx | 3.x | 0.12+ | 8 | 2.12 |
3.2.0-2.12.xx | 3.2.x | 0.12+ | 8 | 2.12 |
经过以上测试,Spark3.3.x版本也与Doris0.12+版本之上兼容,测试Spark3.3.x与Spark3.2.x与Doris1.2.1编译得到Spark-Doris-Connecotr时,StructuredStreaming实时写入到Doris存在问题。
根据Doris官方提供的Spark-Doris-Connector源码进行编译获取spark-doris-connector时,需要用到Maven环境,后续在node1节点上进行源码编译,所以在Node1节点上安装Maven环境。
Maven下载地址:http://maven.apache.org/download.cgi这里下载Maven版本为:3.6.3,下载Maven完成后,按照以下步骤配置Maven。
[root@node1 software]# tar -zxvf ./apache-maven-3.6.3-bin.tar.gz
#打开 /etc/profile在最后追加以下内容
[root@node1 software]# vim /etc/profile
export MAVEN_HOME=/software/apache-maven-3.6.3/
export PATH=$PATH:$MAVEN_HOME/bin
#配置环境变量生效
[root@node1 software]# source /etc/profile
[root@node1 software]# mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /software/apache-maven-3.6.3
Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/java/jdk1.8.0_181-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-957.el7.x86_64", arch: "amd64", family: "unix"
为了编译源码下载包更快,可以修改MAVEN_HOME/conf/settings.xml文件,添加阿里镜像源,注意:将以下内容放在“…”之间。此外,maven下载包默认对应的仓库位置在{user.home}/.m2/repository 目录下,”{user.home}”为当前用户家目录。
<!-- 添加阿里云镜像-->
<mirror>
<id>nexus-aliyun</id>
<mirrorOf>central</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
注意:就算设置了阿里云镜像地址,由于源码编译过程中可能涉及从apache maven官网站点下载一些基础依赖,速度也比较慢,也可以将资料中"repository.zip"上传至node1 /root/.m2目录中,并解压,该压缩包是完整下载好依赖的maven 仓库,具体步骤如下:
#将 repository.zip 资料上传至/root/.m2目录中,并解压
[root@node1 ~]# cd /root/.m2
[root@node1 .m2]# unzip ./repository.zip
在源码编译过程中需要用到thrift环境,所以这里在node1上安装thrift,具体步骤如下。
#在node1节点安装依赖
[root@node1 ~]# yum install -y autoconf automake libtool cmake ncurses-devel openssl-devel lzo-devel zlib-devel gcc gcc-c++
#下载thrift源码包,也可以从资料中获取,名称为“thrift-0.13.0.tar.gz”
[root@node1 ~]# cd /software && wget https://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.tar.gz
#解压编译
[root@node1 software]# tar -zxvf thrift-0.13.0.tar.gz
[root@node1 software]# cd thrift-0.13.0
[root@node1 thrift-0.13.0]# ./configure --without-tests
[root@node1 thrift-0.13.0]# make
[root@node1 thrift-0.13.0]# make install
#安装完成后,检查版本
[root@node1 ~]# thrift --version
Thrift version 0.13.0
按照如下步骤根据源码编译获取spark-doris-connector。
#创建目录
[root@node1 ~]# cd /software
[root@node1 software]# mkdir doris-spark-connector
#将资料中下载好的源码“doris-spark-connector-master.zip”上传至该目录并解压
[root@node1 doris-spark-connector]# unzip doris-spark-connector-master.zip
#修改custom_env.sh.tpl文件为custom_env.sh名字
[root@node1 ~]# cd /software/doris-spark-connector/doris-spark-connector-master
[root@node1 doris-spark-connector-master]# mv custom_env.sh.tpl custom_env.sh
#源码编译,编译时选择scala版本和spark版本,这里选择Scala2.12和Spark3.3.x版本
[root@node1 ~]# cd /software/doris-spark-connector/doris-spark-connector-master/spark-doris-connector
[root@node1 spark-doris-connector]# ./build.sh
Spark-Doris-Connector supports Scala 2.11 and 2.12. Which version do you need ?
1) 2.11
2) 2.12
#? 2
Spark-Doris-Connector supports multiple versions of spark. Which version do you need ?
1) 2.3.x
2) 3.1.x
3) 3.2.x
4) 3.3.x
5) other
#? 4
...
[INFO] Skipping javadoc generation
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:35 min
[INFO] Finished at: 2023-04-13T15:49:35+08:00
[INFO] ------------------------------------------------------------------------
[Doris] *****************************************************************
[Doris] Successfully build Spark-Doris-Connector
[Doris] dist: /software/doris-spark-connector/doris-spark-connector-master/dist/spark-doris-connector-3.3_2.12-1.0.0-SNAPSHOT.jar
[Doris] *****************************************************************
以上编译完成后,将目录"/software/doris-spark-connector/doris-spark-connector-master/spark-doris-connector/target"中的spark-doris-connector-3.3_2.12-1.0.0-SNAPSHOT.jar包放在window本地,然后加入到本地maven仓库中在代码中引入使用即可。
将spark-doris-connector-3.3_2.12-1.0.0-SNAPSHOT.jar放在D盘下,打开cmd输入如下命令将spark-doris-connector加入到maven本地仓库。
mvn install:install-file -Dfile=D:\spark-doris-connector-3.3_2.12-1.0.0-SNAPSHOT.jar -DgroupId=org.apache.doris -DartifactId=spark-doris-connector-3.3_2.12 -Dversion=1.0.0 -Dpackaging=jar
加载完成之后,可以在本地maven 仓库中看到该jar包:
然后,在IDEA中创建Maven项目,将jar包引入到Maven pom.xml文件中即可,这里还需要操作Spark,也同时需要引入Spark的其他依赖包,如下:
<!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- Spark 整合Doris 依赖包 -->
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.3_2.12</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.2</version>
</dependency>
<!-- SparkSQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.2</version>
</dependency>
<!--mysql依赖的jar包-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!--SparkStreaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.3.2</version>
</dependency>
<!-- Kafka 0.10+ Source For Structured Streaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.3.2</version>
</dependency>
<!-- Scala 包-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.10</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.12.10</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.10</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>compile</scope>
</dependency>
</dependencies>
Spark可以使用SparkCore RDD编程方式读取Doris表中的数据,不支持RDD写入到Doris中。Spark RDD方式操作Doris首先在Doris中创建一张表:
#创建表
CREATE TABLE IF NOT EXISTS example_db.spark_doris_tbl
(
`id` INT NOT NULL COMMENT "id",
`name` VARCHAR(255) NOT NULL COMMENT "姓名",
`age` INT COMMENT "年龄",
`score` BIGINT COMMENT "分数"
)
DUPLICATE KEY(`id`, `name`)
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
#向表中插入数据
mysql> insert into spark_doris_tbl values (1,'zs',18,100),(2,'ls',19,200),(3,'ww',20,300);
#查询Doris表中数据
mysql> select * from spark_doris_tbl;
+------+------+------+-------+
| id | name | age | score |
+------+------+------+-------+
| 1 | zs | 18 | 100 |
| 3 | ww | 20 | 300 |
| 2 | ls | 19 | 200 |
+------+------+------+-------+
然后编写Spark RDD 读取Doris中的数据,代码如下:
/**
* Spark 读取Doris,生成RDD ,目前只有Scala API 支持,Java API 暂时不支持
* 只支持读取进行数据分析
*/
object SparkReadDoris {
def main(args: Array[String]): Unit = {
//创建SparkConf 及 SparkContext
val conf = new SparkConf
conf.setMaster("local")
conf.setAppName("SparkReadDoris")
val sc = new SparkContext(conf)
//导入转换
import org.apache.doris.spark._
val dorisSparkRDD = sc.dorisRDD(
tableIdentifier = Some("example_db.spark_doris_tbl"),
cfg = Some(Map(
"doris.fenodes" -> "node1:8030",
"doris.request.auth.user" -> "root",
"doris.request.auth.password" -> "123456"
))
)
//打印结果
dorisSparkRDD.collect().foreach(println)
}
}
此外需要注意的是 RDD 读取 Doris 表中的数据只支持 Scala API 实现, Java API 会报错。
我们也可以使用DataFrame方式操作Doris,可以向Doris中写入数据,也可以从Doris中读取数据。在编写代码之前,首先将Doris 表spark_doris_tbl中数据清空:
mysql> delete from spark_doris_tbl where id >0;
Spark DataFrame操作Doris代码如下:
/**
* SparkDataFrame 操作 Doris
* 注意:操作之前需要先删除Doris中的表 example_db.spark_doris_tbl :delete from example_db.spark_doris_tbl where id >0;
* 1.Spark 向Doris 表中写入数据
* 2.Spark 读取 Doris中数据
*
*/
public class SparkDorisBatchOperator {
public static void main(String[] args) {
//1.创建SparkSession对象
SparkSession sparkSession = SparkSession.builder().appName("SparkReadDoris").master("local").getOrCreate();
//2.准备数据集并转换成Dataset<Row>
ArrayList<String> jsonList = new ArrayList<>();
jsonList.add("{'id':1,'name':'zhangsan','age':18,'score':100}");
jsonList.add("{'id':2,'name':'lisi','age':19,'score':200}");
jsonList.add("{'id':3,'name':'wangwu','age':20,'score':300}");
jsonList.add("{'id':4,'name':'zhaoliu','age':21,'score':400}");
jsonList.add("{'id':5,'name':'tianqi','age':22,'score':500}");
Dataset<Row> dataset = sparkSession.read().json(sparkSession.createDataset(jsonList, Encoders.STRING()));
dataset.printSchema();
dataset.show();
//3.将Dataset<Row>写入Doris
dataset.write().format("doris")
.option("doris.table.identifier", "example_db.spark_doris_tbl")
.option("doris.fenodes", "node1:8030")
.option("user", "root")
.option("password", "123456")
//指定你要写入的字段,这里是指定对应到Doris表中的字段的顺序
.option("doris.write.fields","age,id,name,score")
.save();
//4.读取Doris中的数据
Dataset<Row> load = sparkSession.read().format("doris")
.option("doris.table.identifier", "example_db.spark_doris_tbl")
.option("doris.fenodes", "node1:8030")
.option("user", "root")
.option("password", "123456")
.load();
load.show();
}
}
除了DataFrame API方式操作Doris数据表之外,还可以使用SparkSQL编程方式来操作Doris表,在编写代码之前,首先将Doris 表spark_doris_tbl中数据清空:
mysql> delete from spark_doris_tbl where id >0;
Spark SQL操作Doris代码如下:
/**
* SparkSQL 操作 Doris
* 注意:需要先在Doris中清空表中数据
*
* 1.Spark 向Doris 表中写入数据
* 2.Spark 读取 Doris中数据
*/
public class SparkDorisOperator {
public static void main(String[] args) {
//1.创建SparkSession对象
SparkSession sparkSession = SparkSession.builder().appName("SparkReadDoris").master("local").getOrCreate();
//2.在Spark中创建Doris的临时视图
sparkSession.sql("CREATE TEMPORARY VIEW spark_doris USING doris " +
" OPTIONS( " +
" 'table.identifier'='example_db.spark_doris_tbl', " +
" 'fenodes'='node1:8030', " +
" 'user'='root', " +
" 'password'='123456' " +
" ) ");
//3.使用SparkSQL向Doris中写入数据
sparkSession.sql("insert into spark_doris values (1,'zs',18,100),(2,'ls',19,200),(3,'ww',20,300)");
//4.使用SparkSQL从Doris中读取数据
sparkSession.sql("select * from spark_doris").show();
}
}
在Doris官网中给出了StructuredStreaming实时向Doris表中写入数据的案例,但是按照官网写法实现代码运行时一直报错:
Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
该错误说明数据写出没有指定start方法,经测试目前在代码编写中就算给出该start启动方法,代码执行还是有问题,目前测试版本为Spark3.2.x 、Spark3.3.x与Doris1.2.1版本,其他版本没有测试。
下面给出Spark StructuredStreaming读取Kafka中数据写入到Doris的代码实现:
/**
* StructuredStreaming 实时读取Kafka数据写入到Doris
* 注意:操作之前需要先删除Doris中的表 example_db.spark_doris_tbl :delete from example_db.spark_doris_tbl where id >0;
*/
public class SparkDorisStreamOperator {
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
//1.创建SparkSession对象
SparkSession sparkSession = SparkSession.builder().appName("SparkReadDoris").master("local").getOrCreate();
//2.读取Kafka数据
Dataset<Row> kafkaSource = sparkSession.readStream()
.option("kafka.bootstrap.servers", "node1:9092,node2:9092,nod3:9092")
.option("startingOffsets", "earliest")
.option("subscribe", "doris-topic")
.format("kafka")
.load();
Dataset<Row> result = kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
.select(functions.split(functions.col("value"), ",").getItem(0).as("id"),
functions.split(functions.col("value"), ",").getItem(1).as("name"),
functions.split(functions.col("value"), ",").getItem(2).as("age"),
functions.split(functions.col("value"), ",").getItem(3).as("score"));
//3.将Kafka数据实时写入Doris
StreamingQuery query =result.writeStream().outputMode("append")
.format("doris")
.trigger(Trigger.ProcessingTime("5 seconds"))
.option("checkpointLocation", "./checkpoint/dir")
.option("doris.table.identifier", "example_db.spark_doris_tbl")
.option("doris.fenodes", "node1:8030")
.option("user", "root")
.option("password", "123456")
//指定你要写入的字段
.option("doris.write.fields", "id,name,age,score")
.start();
query.awaitTermination();
}
}
Spark操作Doris配置项参考Doris官网:https://doris.apache.org/zh-CN/docs/dev/ecosystem/spark-doris-connector#配置。
Key | Default Value | Comment |
---|---|---|
doris.fenodes | – | Doris FE http 地址,支持多个地址,使用逗号分隔 |
— | — | — |
doris.table.identifier | – | Doris 表名,如:db1.tbl1 |
doris.request.retries | 3 | 向Doris发送请求的重试次数 |
doris.request.connect.timeout.ms | 30000 | 向Doris发送请求的连接超时时间 |
doris.request.read.timeout.ms | 30000 | 向Doris发送请求的读取超时时间 |
doris.request.query.timeout.s | 3600 | 查询doris的超时时间,默认值为1小时,-1表示无超时限制 |
doris.request.tablet.size | Integer.MAX_VALUE | 一个RDD Partition对应的Doris Tablet个数。 |
此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 | ||
doris.batch.size | 1024 | 一次从BE读取数据的最大行数。增大此数值可减少Spark与Doris之间建立连接的次数。 |
从而减轻网络延迟所带来的额外时间开销。 | ||
doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 |
doris.deserialize.arrow.async | false | 是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch |
doris.deserialize.queue.size | 64 | 异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效 |
doris.write.fields | – | 指定写入Doris表的字段或者字段顺序,多列之间使用逗号分隔。 |
默认写入时要按照Doris表字段顺序写入全部字段。 | ||
sink.batch.size | 10000 | 单次写BE的最大行数 |
sink.max-retries | 1 | 写BE失败之后的重试次数 |
sink.properties.* | – | Stream Load 的导入参数。 |
例如: ‘sink.properties.column_separator’ = ', ’ | ||
doris.sink.task.partition.size | – | Doris写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。 |
此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 | ||
doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris写入 Partition数。默认值为 false,采用 coalesce 方式控制(注意: 如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。 |
如果设置为 true,则采用 repartition 方式(注意: 可设置最后 Partition 数,但会额外增加 shuffle 开销)。 | ||
doris.sink.batch.interval.ms | 50 | 每个批次sink的间隔时间,单位 ms。 |
Key | Default Value | Comment |
---|---|---|
user | – | 访问Doris的用户名 |
— | — | — |
password | – | 访问Doris的密码 |
doris.filter.query.in.max.count | 100 | 谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。 |
Key | Default Value | Comment |
---|---|---|
doris.request.auth.user | – | 访问Doris的用户名 |
— | — | — |
doris.request.auth.password | – | 访问Doris的密码 |
doris.read.field | – | 读取Doris表的列名列表,多列之间使用逗号分隔 |
doris.filter.query | – | 过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。 |
Spark处理数据写入到Doris时需要注意对应的类型关系,如下:
Doris Type | Spark Type |
---|---|
NULL_TYPE | DataTypes.NullType |
BOOLEAN | DataTypes.BooleanType |
TINYINT | DataTypes.ByteType |
SMALLINT | DataTypes.ShortType |
INT | DataTypes.IntegerType |
BIGINT | DataTypes.LongType |
FLOAT | DataTypes.FloatType |
DOUBLE | DataTypes.DoubleType |
DATE | DataTypes.StringType1 |
DATETIME | DataTypes.StringType1 |
BINARY | DataTypes.BinaryType |
DECIMAL | DecimalType |
CHAR | DataTypes.StringType |
LARGEINT | DataTypes.StringType |
VARCHAR | DataTypes.StringType |
DECIMALV2 | DecimalType |
TIME | DataTypes.DoubleType |
HLL | Unsupported datatype |
注意:Connector中,将DATE和DATETIME映射为String。由于Doris底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 String 类型直接返回对应的时间可读文本。
Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。通过Flink Doris Connector 可以将Doris表映射为DataStream或者Table对象,批或者实时将数据写入到Doris中,目前从Doris中读取数据只支持批读取。
此外,修改和删除只支持在 Unique Key 模型上,目前的删除是支持 Flink CDC 的方式接入数据实现自动删除,如果是其他数据接入的方式删除需要自己实现。
想要在Flink中使用Doris Connector需要通过Doris官方提供的flink-doris-connector源码进行编译,源码代码库地址:https://github.com/apache/doris-flink-connector,如果自己不想编译也可以通过Doris官方提供编译好的包,如下:
https://repo.maven.apache.org/maven2/org/apache/doris/。
Connector Version | Flink Version | Doris Version | Java Version | Scala Version |
---|---|---|---|---|
1.0.3 | 1.11+ | 0.15+ | 8 | 2.11,2.12 |
1.1.0 | 1.14 | 1.0+ | 8 | 2.11,2.12 |
1.2.0 | 1.15 | 1.0+ | 8 | - |
1.3.0 | 1.16 | 1.0+ | 8 | - |
这里我们选择根据Doris官方提供的源码进行编译获取flink-doris-connector 依赖包,编译源码过程中需要使用Maven和Thrift环境,由于之前在Spark Doris Connector编译过程中已经在node1安装过Maven和Thrift,这里不再安装,也选择在node1节点进行Flink Doris Connector 的源码编译。
此外,由于源码编译过程中可能涉及从apache maven官网站点下载一些基础依赖,速度也比较慢,也可以将资料中"repository.zip"上传至node1 /root/.m2目录中,并解压,该压缩包是完整下载好依赖的maven 仓库,具体步骤如下:
#将 repository.zip 资料上传至/root/.m2目录中,并解压
[root@node1 ~]# cd /root/.m2
[root@node1 .m2]# unzip ./repository.zip
如果在Spark Doris Connector编译中解压过repository.zip,就不必再次解压。按照如下步骤根据源码编译获取flink-doris-connector。
#创建目录
[root@node1 ~]# cd /software
[root@node1 software]# mkdir doris-flink-connector
#将资料中下载好的源码“doris-flink-connector-master.zip”上传至该目录并解压
[root@node1 doris-flink-connector]# unzip doris-flink-connector-master.zip
#修改custom_env.sh.tpl文件为custom_env.sh名字
[root@node1 ~]# cd /software/doris-flink-connector/doris-flink-connector-master
[root@node1 doris-flink-connector-master]# mv custom_env.sh.tpl custom_env.sh
#源码编译
[root@node1 ~]# cd /software/doris-flink-connector/doris-flink-connector-master/flink-doris-connector
[root@node1 spark-doris-connector]# ./build.sh
Flink-Doris-Connector supports multiple versions of flink. Which version do you need ?
1) 1.15.x
2) 1.16.x
3) 1.17.x
#? 2
[Doris] flink version: 1.16.0
[Doris] build starting.
... ...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 23.388 s
[INFO] Finished at: 2023-04-14T15:37:26+08:00
[INFO] ------------------------------------------------------------------------
[Doris] *****************************************************************
[Doris] Successfully build Flink-Doris-Connector
[Doris] dist: /software/doris-flink-connector/doris-flink-connector-master/dist/flink-doris-connector-1.4.0-SNAPSHOT.jar
[Doris] *****************************************************************
以上编译完成后,将目录"/software/doris-flink-connector/doris-flink-connector-master/flink-doris-connector/target"中的flink-doris-connector-1.4.0-SNAPSHOT.jar包放在window本地,然后加入到本地maven仓库中在代码中引入使用即可。
Flink操作Doris有两种方式,一个是DataStream方式,另外一种就是SQL方式操作,这两种方式官方建议使用SQL方式进行编程,相对比较简单。
通过Flink Doris Connector可以通过Flink读取Doris中的数据,但是目前仅支持批次从Doris中读取数据,不支持实时读取,向Doris中写入数据时,支持批量写入和实时写入。
将flink-doris-connector-1.4.0-SNAPSHOT.jar放在D盘下,打开cmd输入如下命令将flink-doris-connector加入到maven本地仓库。
mvn install:install-file -Dfile=D:\flink-doris-connector-1.4.0-SNAPSHOT.jar -DgroupId=org.apache.doris -DartifactId=flink-doris-connector-1.4.0 -Dversion=1.0.0 -Dpackaging=jar
加载完成之后,可以在本地maven 仓库中看到该jar包:
然后,在IDEA中创建Maven项目,将jar包引入到Maven pom.xml文件中即可,这里还需要操作Flink,也同时需要引入Flink的其他依赖包,如下:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.16.0</flink.version>
<hadoop.version>3.3.4</hadoop.version>
<slf4j.version>1.7.36</slf4j.version>
<log4j.version>2.17.2</log4j.version>
<mysql.version>5.1.47</mysql.version>
</properties>
<dependencies>
<!-- Flink 整合Doris 依赖包 -->
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.4.0</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Flink 依赖包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink批和流开发依赖包 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- DataStream files connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- MySQL 依赖包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.3.1</version>
</dependency>
<!-- slf4j&log4j 日志相关包 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
后续使用Flink操作Doris中的表时需要在Doris FE节点配置启动http v2,在Doris FE各个节点fe.cof中设置enable_http_server_v2=true即可,该参数默认值如下:从官方 0.14.0 release 版之后默认是 true,之前默认 false。
HTTP Server V2 由 SpringBoot 实现, 并采用前后端分离的架构。只有启用 httpv2,用户才能使用新的前端 UI 界面。这里我们使用的Doris版本为1.2.1,所以默认开启。
此外,Flink操作Doris时涉及到的一些表这里预先在Doris中创建出来。
# 创建 Doris 表 :flink_doris_tbl1、flink_doris_tbl2、flink_result_tbl
CREATE TABLE IF NOT EXISTS example_db.flink_doris_tbl1
(
`id` INT NOT NULL COMMENT "id",
`name` VARCHAR(255) NOT NULL COMMENT "姓名",
`age` INT COMMENT "年龄",
`score` BIGINT COMMENT "分数"
)
DUPLICATE KEY(`id`, `name`)
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
CREATE TABLE IF NOT EXISTS example_db.flink_doris_tbl2
(
`id` INT NOT NULL COMMENT "id",
`name` VARCHAR(255) NOT NULL COMMENT "姓名",
`age` INT COMMENT "年龄",
`total_score` BIGINT COMMENT "总分数"
)
DUPLICATE KEY(`id`, `name`)
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
CREATE TABLE IF NOT EXISTS example_db.flink_result_tbl
(
`window_start` DATETIME NOT NULL COMMENT "窗口开始",
`window_end` DATETIME NOT NULL COMMENT "窗口结束",
`name` VARCHAR(255) COMMENT "姓名",
`cnt` BIGINT COMMENT "个数"
)
DUPLICATE KEY(`window_start`, `window_end`)
DISTRIBUTED BY HASH(`name`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
#向doris flink_doris_tbl1表中插入数据
insert into flink_doris_tbl1 values (1,'zs',18,100),(2,'ls',19,200),(3,'ww',20,300),(3,'ww',20,400);
#查询表flink_doris_tbl1中数据
mysql> select * from flink_doris_tbl1;
+------+------+------+-------+
| id | name | age | score |
+------+------+------+-------+
| 1 | zs | 18 | 100 |
| 3 | ww | 20 | 400 |
| 3 | ww | 20 | 300 |
| 2 | ls | 19 | 200 |
+------+------+------+-------+
下面编写Flink DataStream API 读取Doris表flink_doris_tbl1中的数据,目前Flink仅支持批次从Doris中读取数据,不支持实时读取。代码如下:
/**
* Flink读取Doris数据
* 注意:目前Doris Source是有界流,不支持CDC方式读取。
*/
public class FlinkBatchReadDoris {
public static void main(String[] args) throws Exception {
//1.准备Flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.准备Doris连接参数
DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes("node1:8030")
.setTableIdentifier("example_db.flink_doris_tbl1")
.setUsername("root")
.setPassword("123456");
//3.准备Doris Source
DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
.setDorisOptions(builder.build())
.setDorisReadOptions(DorisReadOptions.builder().build())
.setDeserializer(new SimpleListDeserializationSchema())
.build();
//4.读取Doris数据
env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
env.execute();
}
}
执行以上代码,可以在控制台看到读取后的结果。
Flink DataStream API 也可以向Doris中写入数据,写数据可以支持批写和流式写两种方式,这里编写代码实现Flink向Doris 表flink_doris_tbl1中批次写入指定数据,代码如下:
//1.准备Flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.数据写入到Doris中必须开启checkpoint
env.enableCheckpointing(10000);
//3.使用批方式写入到Doris 表中,默认是流的方式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//4.Doris Sink builder对象
DorisSink.Builder<RowData> builder = DorisSink.builder();
//4.1 准备Doris连接参数
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("node1:8030")
.setTableIdentifier("example_db.flink_doris_tbl1")
.setUsername("root")
.setPassword("123456");
//4.2 准备 streamload 关于加载 读取json格式的参数
Properties properties = new Properties();
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris"+ UUID.randomUUID()) //streamload label 前缀
.setStreamLoadProp(properties); //streamload 参数
//4.3 设置 flink rowdata 的 schema
String[] fields = {"id", "name", "age", "score"};
DataType[] types = {DataTypes.INT(), DataTypes.VARCHAR(255), DataTypes.INT(), DataTypes.BIGINT()};
//4.4 准备Doris Sink
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(RowDataSerializer.builder() // 根据rowdata进行序列化
.setFieldNames(fields)
.setType("json") //json 格式
.setFieldType(types).build())
.setDorisOptions(dorisBuilder.build());
//5.生成 rowdata 数据
DataStream<RowData> source = env.fromElements("1,zs,18,100", "2,ls,19,99")
.map(new MapFunction<String, RowData>() {
@Override
public RowData map(String value) throws Exception {
GenericRowData genericRowData = new GenericRowData(4);
genericRowData.setField(0, Integer.valueOf(value.split(",")[0]));
genericRowData.setField(1, StringData.fromString(value.split(",")[1]));
genericRowData.setField(2, Integer.valueOf(value.split(",")[2]));
genericRowData.setField(3, Long.valueOf(value.split(",")[3]));
return genericRowData;
}
});
//6.将数据写入到Doris中
source.sinkTo(builder.build());
env.execute();
注意:在执行代码之前,可以将Flink表flink_doris_tbl1中的数据删除: delete from flink_doris_tbl1 where id >0;然后再执行以上代码。代码中向Doris表flink_doris_tbl1中插入"1,zs,18,100", "2,ls,19,99"两条数据,代码执行完成后,查看Doris flink_doris_tbl1中的数据:
mysql> select * from flink_doris_tbl1;
+------+------+------+-------+
| id | name | age | score |
+------+------+------+-------+
| 2 | ls | 19 | 99 |
| 1 | zs | 18 | 100 |
+------+------+------+-------+
下面编写代码实现Flink DataStream API 实时读取Kafka中的数据,然后实时写入到Doris表中。代码如下:
/**
* 从Kafka中实时读取数据 将结果实时写入到Doris表中
*/
public class RealTimeReadKafkaSinkDoris {
public static void main(String[] args) throws Exception {
//1.准备Flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.数据写入到Doris中必须开启checkpoint
env.enableCheckpointing(10000);
//3.读取Kafka中的数据,并转换成RowData,Kafka中输入数据格式为:1,张三,18,100;2,李四,19,99;3,王五,20,98
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("node1:9092,node2:9092,node3:9092") //设置Kafka 集群节点
.setTopics("doris-topic") //设置读取的topic
.setGroupId("test-group") //设置消费者组
.setStartingOffsets(OffsetsInitializer.latest()) //设置读取数据位置
.setDeserializer(new KafkaRecordDeserializationSchema<String>() {
//设置key ,value 数据获取后如何处理
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<String> collector) throws IOException {
String key = null;
String value = null;
if(consumerRecord.key() != null){
key = new String(consumerRecord.key(), "UTF-8");
}
if(consumerRecord.value() != null){
value = new String(consumerRecord.value(), "UTF-8");
}
collector.collect(value);
}
//设置置返回的二元组类型
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
})
.build();
DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");
SingleOutputStreamOperator<RowData> sinkData = kafkaDS.map(new MapFunction<String, RowData>() {
@Override
public RowData map(String value) throws Exception {
GenericRowData genericRowData = new GenericRowData(4);
genericRowData.setField(0, Integer.valueOf(value.split(",")[0]));
genericRowData.setField(1, StringData.fromString(value.split(",")[1]));
genericRowData.setField(2, Integer.valueOf(value.split(",")[2]));
genericRowData.setField(3, Long.valueOf(value.split(",")[3]));
return genericRowData;
}
});
//4.Doris Sink builder对象
DorisSink.Builder<RowData> builder = DorisSink.builder();
//4.1 准备Doris连接参数
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("node1:8030")
.setTableIdentifier("example_db.flink_doris_tbl1")
.setUsername("root")
.setPassword("123456");
//4.2 准备 streamload 关于加载 读取json格式的参数
Properties properties = new Properties();
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris"+ UUID.randomUUID()) //streamload label 前缀
.setStreamLoadProp(properties); //streamload 参数
//4.3 设置 flink rowdata 的 schema
String[] fields = {"id", "name", "age", "score"};
DataType[] types = {DataTypes.INT(), DataTypes.VARCHAR(255), DataTypes.INT(), DataTypes.BIGINT()};
//4.4 准备Doris Sink
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(RowDataSerializer.builder() // 根据rowdata进行序列化
.setFieldNames(fields)
.setType("json") //json 格式
.setFieldType(types).build())
.setDorisOptions(dorisBuilder.build());
//6.将数据写入到Doris中
sinkData.sinkTo(builder.build());
env.execute();
}
}
以上代码编写完成后,在执行代码之前,可以将Flink表flink_doris_tbl1中的数据删除: delete from flink_doris_tbl1 where id >0;另外,需要在Kafka中创建 doris-topic Topic,执行代码后,输入数据可以实时查看flink_doris_tbl1表中的数据。
#删除doris表flink_doris_tbl1数据
mysql> delete from flink_doris_tbl1 where id >0;
#启动kafka集群,并创建doris-topic
[root@node1 ~]# kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --create --topic doris-topic --partitions 3 --replication-factor 3
#运行以上编写好的代码,并向kafka doris-topic中输入如下数据
[root@node1 ~]# kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic doris-topic
1,张三,18,100
2,李四,19,99
3,王五,20,98
#查询Doris表 flink_doris_tbl1中的数据
mysql> select * from flink_doris_tbl1;
+------+--------+------+-------+
| id | name | age | score |
+------+--------+------+-------+
| 2 | 李四 | 19 | 99 |
| 1 | 张三 | 18 | 100 |
| 3 | 王五 | 20 | 98 |
+------+--------+------+-------+
与DataStream API 一样,在读取Doris中数据时只支持批量读取,不支持实时读取。下面编写代码实现Flink SQL方式读取Doris中的数据,然后再写入到Doris表中。代码如下:
/**
* 从Doris中读取数据 再次将结果写入到Doris表中
* 1) 创建Doris 表 flink_doris_tbl1 并插入数据
* 2) 从该表中读取数据并写入到flink_doris_tbl2 表中
* 注意:目前Doris Source是有界流,不支持CDC方式读取,所以Doris Source在数据读取完成后,流为什么就结束
*/
public class FlinkReadDorisToDoris {
public static void main(String[] args) throws Exception {
//1.准备Flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.通过Doris Connector创建表 读取Doris表 flink_doris_tbl1 中的数据
String sourceSQL = "CREATE TABLE flink_doris_source ( " +
" id INT," +
" name STRING," +
" age INT," +
" score BIGINT" +
" ) " +
" WITH (" +
" 'connector' = 'doris'," +
" 'fenodes' = 'node1:8030'," +
" 'table.identifier' = 'example_db.flink_doris_tbl1'," +
" 'username' = 'root'," +
" 'password' = '123456'" +
")";
tableEnv.executeSql(sourceSQL);
//3.打印读取到的数据
tableEnv.executeSql("select id,name,age,sum(score) as total_score from flink_doris_source group by id,name,age")
.print();
//4.将聚合结果写入到Doris表 flink_doris_tbl2 中
//4.1 创建Doris表 flink_doris_tbl2
String sinkSQL = "CREATE TABLE flink_doris_sink ( " +
" id INT," +
" name STRING," +
" age INT," +
" total_score BIGINT" +
" ) " +
" WITH (" +
" 'connector' = 'doris'," +
" 'fenodes' = 'node1:8030'," +
" 'sink.label-prefix' = '" + UUID.randomUUID() + "'," +
" 'table.identifier' = 'example_db.flink_doris_tbl2'," +
" 'username' = 'root'," +
" 'password' = '123456'" +
")";
tableEnv.executeSql(sinkSQL);
//4.2 将聚合结果写入到Doris表 flink_doris_tbl2 中
tableEnv.executeSql("insert into flink_doris_sink select id,name,age,sum(score) as total_score from flink_doris_source group by id,name,age");
}
}
以上代码执行前,首先确保表flink_doris_tbl1中有数据,并保证flink_doris_tbl2中没有数据,执行代码完成后,可以查询Doris表flink_doris_tbl2中的数据。
mysql> select * from flink_doris_tbl2;
+------+--------+------+-------------+
| id | name | age | total_score |
+------+--------+------+-------------+
| 2 | 李四 | 19 | 99 |
| 1 | 张三 | 18 | 100 |
| 3 | 王五 | 20 | 98 |
+------+--------+------+-------------+
下面使用Flink SQL编程方式编写代码实现从Kafka中实时读取数据,经过窗口分析后,实时写入到Doris 表中。代码如下:
/**
* Flink实时读取Kafak数据,实时将结果写入到Doris表中
* 1.在kafka中创建topic:doris-topic
* 2.在Doris MySQL客户端 创建表 :flink_result_tbl
*/
public class RealTimeReadKafkaSinkDoris {
public static void main(String[] args) throws Exception {
//1.创建Flink流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//可以设置并行度为1,防止一些并行度中没有数据时,结果不写入Doris,也可以设置table.exec.source.idle-timeout参数
// env.setParallelism(1);
//2.必须开启checkpoint,否则无法写入Doris
env.enableCheckpointing(5000);
//3.创建Flink Table 环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
TableConfig config = tableEnv.getConfig();
//table.exec.source.idle-timeout=5s ,单位是ms,如果其他分区没有等待多少ms后没有数据来,则自动推进水位线。
config.getConfiguration().setString("table.exec.source.idle-timeout","5 s");
//4.创建Kafka Source表
String sourceSQL = "CREATE TABLE KafkaTable (" +
" `name` STRING," +
" `dt` BIGINT," +
" `ts` AS TO_TIMESTAMP(FROM_UNIXTIME(dt/1000, 'yyyy-MM-dd HH:mm:ss'))," +
" WATERMARK FOR ts AS ts - INTERVAL '0' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'doris-topic'," +
" 'properties.bootstrap.servers' = 'node1:9092,node2:9092,node3:9092'," +
" 'properties.group.id' = 'testGroup'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(sourceSQL);
//5.执行SQL,实时统计每5秒的数据,每10秒输出一次统计一次人数 ,并将结果注册临时表tmp
String executeSql = "SELECT window_start, window_end, name, COUNT(name) as cnt" +
" FROM TABLE(" +
" HOP(TABLE KafkaTable, DESCRIPTOR(ts), INTERVAL '5' SECOND, INTERVAL '10' SECOND))" +
" GROUP BY window_start, window_end, name";
Table table = tableEnv.sqlQuery(executeSql);
tableEnv.createTemporaryView("tmp",table);
//6.将聚合结果写入到Doris表 flink_doris_tbl2 中
//6.1 创建Flink临时表映射 Doris 表 flink_result_tbl
String sinkSQL = "CREATE TABLE flink_doris_sink ( " +
" window_start TIMESTAMP," +
" window_end TIMESTAMP," +
" name STRING," +
" cnt BIGINT" +
" ) " +
" WITH (" +
" 'connector' = 'doris'," +
" 'fenodes' = 'node1:8030'," +
" 'sink.label-prefix' = '" + UUID.randomUUID() + "'," +
" 'table.identifier' = 'example_db.flink_result_tbl'," +
" 'username' = 'root'," +
" 'password' = '123456'" +
")";
tableEnv.executeSql(sinkSQL);
//6.2 将聚合结果写入到Doris表 flink_result_tbl 中
tableEnv.executeSql("insert into flink_doris_sink select window_start,window_end,name,cnt from tmp");
}
}
以上代码编写完成后,运行代码后,向Kafka doris-topic中写入数据,然后在Doris flink_result_tbl中查看结果。向Kafka doris-topic中写入的数据如下:
[root@node1 ~]# kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic doris-topic
Alice,1681448400000
Alice,1681448401000
Bob,1681448405000
Bob,1681448406000
Bob,1681448407000
Mary,1681448410000
Mike,1681448415000
Lucy,1681448420000
Tom,1681448425000
Sara,1681448430000
#数据写入后,可以查询Doris表flink_result_tbl中的数据
mysql> select * from flink_result_tbl;
+---------------------+---------------------+-------+------+
| window_start | window_end | name | cnt |
+---------------------+---------------------+-------+------+
| 2023-04-14 13:00:05 | 2023-04-14 13:00:15 | Mary | 1 |
| 2023-04-14 13:00:10 | 2023-04-14 13:00:20 | Mary | 1 |
| 2023-04-14 13:00:00 | 2023-04-14 13:00:10 | Bob | 3 |
| 2023-04-14 13:00:05 | 2023-04-14 13:00:15 | Bob | 3 |
| 2023-04-14 13:00:10 | 2023-04-14 13:00:20 | Mike | 1 |
| 2023-04-14 13:00:15 | 2023-04-14 13:00:25 | Mike | 1 |
| 2023-04-14 13:00:20 | 2023-04-14 13:00:30 | Tom | 1 |
| 2023-04-14 12:59:55 | 2023-04-14 13:00:05 | Alice | 2 |
| 2023-04-14 13:00:00 | 2023-04-14 13:00:10 | Alice | 2 |
| 2023-04-14 13:00:15 | 2023-04-14 13:00:25 | Lucy | 1 |
| 2023-04-14 13:00:20 | 2023-04-14 13:00:30 | Lucy | 1 |
+---------------------+---------------------+-------+------+
#再次继续向Kafka topic 中写入数据
Alice,1681448435000
Bob,1681448440000
Mary,1681448445000
Mike,1681448450000
Lucy,1681448455000
Tom,1681448460000
Sara,1681448465000
11 rows in set (0.07 sec)
#再次查询Doris表flink_result_tbl中的数据
mysql> select * from flink_result_tbl;
+---------------------+---------------------+-------+------+
| window_start | window_end | name | cnt |
+---------------------+---------------------+-------+------+
| 2023-04-14 13:00:15 | 2023-04-14 13:00:25 | Lucy | 1 |
| 2023-04-14 13:00:20 | 2023-04-14 13:00:30 | Lucy | 1 |
| 2023-04-14 13:00:25 | 2023-04-14 13:00:35 | Sara | 1 |
| 2023-04-14 13:00:30 | 2023-04-14 13:00:40 | Sara | 1 |
| 2023-04-14 13:00:50 | 2023-04-14 13:01:00 | Lucy | 1 |
| 2023-04-14 13:00:55 | 2023-04-14 13:01:05 | Lucy | 1 |
| 2023-04-14 12:59:55 | 2023-04-14 13:00:05 | Alice | 2 |
| 2023-04-14 13:00:00 | 2023-04-14 13:00:10 | Alice | 2 |
| 2023-04-14 13:00:20 | 2023-04-14 13:00:30 | Tom | 1 |
| 2023-04-14 13:00:25 | 2023-04-14 13:00:35 | Tom | 1 |
| 2023-04-14 13:00:30 | 2023-04-14 13:00:40 | Alice | 1 |
| 2023-04-14 13:00:35 | 2023-04-14 13:00:45 | Alice | 1 |
| 2023-04-14 13:00:55 | 2023-04-14 13:01:05 | Tom | 1 |
| 2023-04-14 13:00:00 | 2023-04-14 13:00:10 | Bob | 3 |
| 2023-04-14 13:00:05 | 2023-04-14 13:00:15 | Bob | 3 |
| 2023-04-14 13:00:05 | 2023-04-14 13:00:15 | Mary | 1 |
| 2023-04-14 13:00:10 | 2023-04-14 13:00:20 | Mike | 1 |
| 2023-04-14 13:00:10 | 2023-04-14 13:00:20 | Mary | 1 |
| 2023-04-14 13:00:15 | 2023-04-14 13:00:25 | Mike | 1 |
| 2023-04-14 13:00:35 | 2023-04-14 13:00:45 | Bob | 1 |
| 2023-04-14 13:00:40 | 2023-04-14 13:00:50 | Mary | 1 |
| 2023-04-14 13:00:40 | 2023-04-14 13:00:50 | Bob | 1 |
| 2023-04-14 13:00:45 | 2023-04-14 13:00:55 | Mary | 1 |
| 2023-04-14 13:00:45 | 2023-04-14 13:00:55 | Mike | 1 |
| 2023-04-14 13:00:50 | 2023-04-14 13:01:00 | Mike | 1 |
+---------------------+---------------------+-------+------+
25 rows in set (0.03 sec)
Flink操作Doris时有一些配置项如下,可以参考官网配置,地址:https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector#配置,如下:
Key | Default Value | Required | Comment |
---|---|---|---|
fenodes | – | Y | Doris FE http 地址 |
— | — | — | — |
table.identifier | – | Y | Doris 表名,如:db.tbl |
username | – | Y | 访问 Doris 的用户名 |
password | – | Y | 访问 Doris 的密码 |
doris.request.retries | 3 | N | 向 Doris 发送请求的重试次数 |
doris.request.connect.timeout.ms | 30000 | N | 向 Doris 发送请求的连接超时时间 |
doris.request.read.timeout.ms | 30000 | N | 向 Doris 发送请求的读取超时时间 |
doris.request.query.timeout.s | 3600 | N | 查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制 |
doris.request.tablet.size | Integer. MAX_VALUE | N | 一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。 |
doris.batch.size | 1024 | N | 一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。 |
doris.exec.mem.limit | 2147483648 | N | 单个查询的内存限制。默认为 2GB,单位为字节 |
doris.deserialize.arrow.async | FALSE | N | 是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch |
doris.deserialize.queue.size | 64 | N | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 |
doris.read.field | – | N | 读取 Doris 表的列名列表,多列之间使用逗号分隔 |
doris.filter.query | – | N | 过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。 |
sink.label-prefix | – | Y | Stream load导入使用的label前缀。2pc场景下要求全局唯一 ,用来保证Flink的EOS语义。 |
sink.properties.* | – | N | Stream Load 的导入参数。 例如: ‘sink.properties.column_separator’ = ‘, ’ 定义列分隔符, ‘sink.properties.escape_delimiters’ = ‘true’ 特殊字符作为分隔符,’\x01’会被转换为二进制的0x01 JSON格式导入 ‘sink.properties.format’ = ‘json’ ‘sink.properties.read_json_by_line’ = ‘true’ |
sink.enable-delete | TRUE | N | 是否启用删除。此选项需要 Doris 表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。 |
sink.enable-2pc | TRUE | N | 是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。关于两阶段提交可参考这里。 |
Flink处理数据写入到Doris时需要注意对应的类型关系,如下:
Doris Type | Flink Type |
---|---|
NULL_TYPE | NULL |
— | — |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
CHAR | STRING |
LARGEINT | STRING |
VARCHAR | STRING |
DECIMALV2 | DECIMAL |
TIME | DOUBLE |
HLL | Unsupported datatype |
DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。
DataX doriswriter 插件,用于通过 DataX 同步其他数据源的数据到 Doris 中。这个插件是利用Doris的Stream Load 功能进行数据导入的。需要配合 DataX 服务一起使用。
默认的DataX安装包中没有DorisWriter插件,我们想要同步其他数据库数据到Doris中就需要根据Doris官方提供的Doris Write源码结合DataX源码进行编译,获取到DataX的安装包。
doriswriter 插件依赖的 DataX 代码中的一些模块。而这些模块并没有在 Maven 官方仓库中。所以我们在开发 doriswriter 插件时,需要下载完整的 DataX 代码库,才能进行插件的编译和开发。DorisWriter 插件源码地址:https://github.com/apache/doris/tree/master/extension/DataX,该目录下的文件如下:
这个目录是 doriswriter 插件的代码目录。这个目录中的所有代码,都托管在 Apache Doris 的代码库中。
doriswriter 插件帮助文档在这里:https://github.com/apache/doris/blob/master/extension/DataX/doriswriter/doc/doriswriter.md
该脚本主要用于构建 DataX 开发环境,他主要进行了以下操作:
这个脚本执行后,开发者就可以进入 DataX/ 目录开始开发或编译了。因为做了软链,所以任何对 DataX/doriswriter 目录中文件的修改,都会反映到 doriswriter/ 目录中,方便开发者提交代码。
这里在node1节点进行DorisWriter插件集成DataX源码编译,步骤如下:
[root@node1 ~]# yum -y install git
在node1节点创建目录,并将https://github.com/apache/doris/tree/master/extension/DataX 中的doriswriter文件夹和init-env.sh文件上传到该目录(在资料中有DataX目录,目录中有这两个文件,也可以自己下载)。
#创建目录并上传文件
[root@node1 software]# mkdir doris-datax && cd doris-datax
#查看上传的文件
[root@node1 doris-datax]# ls
doriswriter init-env.sh
#执行init-env.sh脚本
[root@node1 doris-datax]# sh init-env.sh
#执行完成后,查看生成的文件
[root@node1 doris-datax]# ls
DataX doriswriter init-env.sh pom.xml
#进入该DataX该目录,执行如下命令进行DataX安装包编译
[root@node1 doris-datax]# cd DataX/
[root@node1 DataX]# mvn package assembly:assembly -Dmaven.test.skip=true
...
[INFO] adbmysqlwriter 0.0.1-SNAPSHOT ...................... SUCCESS [ 1.364 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 15:46 min
[INFO] Finished at: 2023-04-14T17:28:22+08:00
[INFO] ------------------------------------------------------------------------
以上代码编译完成后,可以在/software/doris-datax/DataX/target 下看到datax.tar.gz 安装包。
编译好的DataX安装包只能安装到Linux中,需要有JDK8、Python环境(推荐Python2.6,Centos7自带python为2版本),将编译好的DataX安装包datax.tar.gz直接解压到某一路径下完成安装,进入bin目录,即可运行同步作业。这里解压到/software目录下。
#将编译好的DataX安装包移动到/software目录下,解压DataX安装包
[root@node1 bin]# cd /software/
[root@node1 software]# tar -zxvf ./datax.tar.gz
关于DorisWriter插件的介绍可以参考:https://github.com/apache/doris/blob/master/extension/DataX/doriswriter/doc/doriswriter.md,DorisWriter支持将大批量数据写入Doris中。
DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter会将reader读取的数据进行缓存在内存中,拼接成Json文本,然后批量导入至Doris。下面结合样例json来介绍DorisWriter的参数。
这里是一份从Stream读取数据后导入至Doris的配置文件:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://localhost:3306/demo"],
"table": ["employees_1"]
}
],
"username": "root",
"password": "xxxxx",
"where": ""
}
},
"writer": {
"name": "doriswriter",
"parameter": {
"loadUrl": ["172.16.0.13:8030"],
"loadProps": {
},
"column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"],
"username": "root",
"password": "xxxxxx",
"postSql": ["select count(1) from all_employees_info"],
"preSql": [],
"flushInterval":30000,
"connection": [
{
"jdbcUrl": "jdbc:mysql://172.16.0.13:9030/demo",
"selectedDatabase": "demo",
"table": ["all_employees_info"]
}
],
"loadProps": {
"format": "json",
"strip_outer_array": true
}
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
以上参数解释如下:
描述:Doris 的 JDBC 连接串,用户执行 preSql 或 postSQL。
必选:是
默认值:无
描述:作为 Stream Load 的连接目标。格式为 “ip:port”。其中 IP 是 FE 节点 IP,port 是 FE 节点的 http_port。可以填写多个,多个之间使用英文状态的分号隔开:;,doriswriter 将以轮询的方式访问。
必选:是
默认值:无
描述:访问Doris数据库的用户名
必选:是
默认值:无
描述:访问Doris数据库的密码
必选:否
默认值:空
描述:需要写入的Doris数据库名称。
必选:是
默认值:无
描述:需要写入的Doris表名称。
必选:是
默认值:无
描述:目的表需要写入数据的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: “column”: [“id”,“name”,“age”]。
必选:是
默认值:否
描述:写入数据到目的表前,会先执行这里的标准语句。
必选:否
默认值:无
描述:写入数据到目的表后,会执行这里的标准语句。
必选:否
默认值:无
描述:每批次导入数据的最大行数。和 maxBatchSize 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
必选:否
默认值:500000
描述:每批次导入数据的最大数据量。和 maxBatchRows 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
必选:否
默认值:104857600
描述:每批次导入数据失败后的重试次数。
必选:否
默认值:0
描述:每批次导入任务的 label 前缀。最终的 label 将有 labelPrefix + UUID 组成全局唯一的 label,确保数据不会重复导入
必选:否
默认值:datax_doris_writer_
描述:StreamLoad 的请求参数,详情参照StreamLoad介绍。这里包括导入的数据格式:format等,导入数据格式默认我们使用csv,支持JSON,具体可以参照下面类型转换部分,也可以参照上面Stream load 官方信息。
必选:否
默认值:无
默认传入的数据均会被转为字符串,并以\t作为列分隔符,\n作为行分隔符,组成csv文件进行StreamLoad导入操作。
默认是csv格式导入,如需更改列分隔符, 则正确配置 loadProps 即可:
"loadProps": {
"column_separator": "\\x01",
"line_delimiter": "\\x02"
}
如需更改导入格式为json, 则正确配置 loadProps 即可:
"loadProps": {
"format": "json",
"strip_outer_array": true
}
这里通过DataX将MySQL中表数据导入到Doris对应表中。按照如下步骤操作即可。
#在node2节点mysql中创建如下test表,并插入数据
mysql> use demo;
mysql> create table demo.test(id int,name varchar(255),age int ,score bigint);
mysql> insert into test values (1,'zs',18,100),(2,'ls',19,200),(3,'ww',20,300),(4,'ml',21,400),(5,'gb',22,500);
#查看数据
mysql> select * from test;
+------+------+------+-------+
| id | name | age | score |
+------+------+------+-------+
| 1 | zs | 18 | 100 |
| 2 | ls | 19 | 200 |
| 3 | ww | 20 | 300 |
| 4 | ml | 21 | 400 |
| 5 | gb | 22 | 500 |
+------+------+------+-------+
#在Doris example_db下创建表
CREATE TABLE example_db.`doris_datax_tbl` (
`id` int NOT NULL,
`name` varchar(30) DEFAULT NULL COMMENT '',
`age` int DEFAULT NULL COMMENT '',
`score` bigint DEFAULT NULL COMMENT ''
) ENGINE=OLAP
UNIQUE KEY(`id`, `name`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
);
my_import.json:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["id","name","age","score"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://node2:3306/demo"],
"table": ["test"]
}
],
"username": "root",
"password": "123456",
"where": ""
}
},
"writer": {
"name": "doriswriter",
"parameter": {
"loadUrl": ["node1:8030"],
"loadProps": {
},
"column": ["id","name","age","score"],
"username": "root",
"password": "123456",
"postSql": ["select count(1) from doris_datax_tbl"],
"preSql": [],
"flushInterval":30000,
"connection": [
{
"jdbcUrl": "jdbc:mysql://node1:9030/example_db",
"selectedDatabase": "example_db",
"table": ["doris_datax_tbl"]
}
],
"loadProps": {
"format": "json",
"strip_outer_array":"true",
"line_delimiter": "\\x02"
}
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
将my_import.json文件放在node1节点的/root目录下。
#执行数据同步脚本
[root@node1 ~]# cd /software/datax/bin/
[root@node1 bin]# python datax.py /root/my_import.json
...
任务启动时刻 : 2023-04-14 19:52:40
任务结束时刻 : 2023-04-14 19:52:52
任务总计耗时 : 11s
任务平均流量 : 4B/s
记录写入速度 : 0rec/s
读出记录总数 : 5
读写失败总数 : 0
#在Doris中查询同步的数据
mysql> select * from doris_datax_tbl;
+------+------+------+-------+
| id | name | age | score |
+------+------+------+-------+
| 1 | zs | 18 | 100 |
| 2 | ls | 19 | 200 |
| 3 | ww | 20 | 300 |
| 4 | ml | 21 | 400 |
| 5 | gb | 22 | 500 |
+------+------+------+-------+
JDBC Catalog 通过标准 JDBC 协议,连接其他数据源。连接后,Doris 会自动同步数据源下的 Database 和 Table 的元数据,以便快速访问这些外部数据,数据不会同步到Doris中,Doris只是通过JDBC方式来访问外部数据库中的数据。目前Catalog支持MySQL、PostgreSQL、Oracle、SQLServer、Clickhouse、Doris。具体可以参考:https://doris.apache.org/zh-CN/docs/dev/lakehouse/multi-catalog/jdbc。
使用JDBC方式读取MySQL、PostgreSQL、Oracle、SQLServer、Clickhouse、Doris中的数据时,需要首先创建Catalog,下面以读取MySQL、Oracle、Clickhouse、Doris中的数据创建Catalog为例来演示Catalog的创建。
#mysql
CREATE CATALOG jdbc_mysql PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://127.0.0.1:3306/demo",
"driver_url" = "mysql-connector-java-5.1.47.jar",
"driver_class" = "com.mysql.jdbc.Driver"
)
#oracle
CREATE CATALOG jdbc_oracle PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:oracle:thin:@127.0.0.1:1521:helowin",
"driver_url" = "ojdbc6.jar",
"driver_class" = "oracle.jdbc.driver.OracleDriver"
);
#clickhouse
CREATE CATALOG jdbc_clickhouse PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:clickhouse://127.0.0.1:8123/demo",
"driver_url" = "clickhouse-jdbc-0.3.2-patch11-all.jar",
"driver_class" = "com.clickhouse.jdbc.ClickHouseDriver"
);
#doris,JDBC Catalog支持连接另一个Doris数据库
CREATE CATALOG doris_catalog PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://127.0.0.1:9030?useSSL=false",
"driver_url" = "mysql-connector-java-5.1.47.jar",
"driver_class" = "com.mysql.jdbc.Driver"
);
以上创建Catalog的参数说明如下:
参数 | 是否必须 | 默认值 | 说明 |
---|---|---|---|
user | 是 | 对应数据库的用户名 | |
password | 是 | 对应数据库的密码 | |
jdbc_url | 是 | JDBC连接串 | |
driver_url | 是 | JDBC Driver Jar包名称* | |
driver_class | 是 | JDBC Driver Class名称 | |
only_specified_database | 否 | “false” | 指定是否只同步指定的database |
lower_case_table_names | 否 | “false” | 是否以小写的形式同步jdbc外部数据源的表名 |
specified_database_list | 否 | “” | 当only_specified_database=true时,指定同步多个database,以’,'分隔。db名称是大小写敏感的。 |
注意 :
Doris | Oracle |
---|---|
Catalog | Database |
Database | User |
Table | Table |
当在Doris中创建好对应的Catalog后,可以直接编写SQL读取对应数据库中的数据,举例如下:
#数据读取
select * from mysql_catalog.mysql_database.mysql_table where k1 > 1000 and k3 ='term';
由于可能存在使用数据库内部的关键字作为字段名,为解决这种状况下仍能正确查询,所以在SQL语句中,会根据各个数据库的标准自动在字段名与表名上加上转义符。例如 MYSQL(``)、PostgreSQL(“”)、SQLServer([])、ORACLE(“”),所以此时可能会造成字段名的大小写敏感,具体可以通过explain sql,查看转义后下发到各个数据库的查询语句。
Doirs JDBC方式读取外部存储库中的数据需要注意Doris列类型与其他数据库中列类型的映射对应关系,下面以MySQL中列类型对应Doris列类型,其他数据库列类型与Doris 列类型映射关系参考:https://doris.apache.org/zh-CN/docs/dev/lakehouse/multi-catalog/jdbc#%E5%88%97%E7%B1%BB%E5%9E%8B%E6%98%A0%E5%B0%84。
下面以Doris读取MySQL中的数据为例,来演示通过JDBC Catalog方式读取MySQL中的数据。
在Doris各个FE和BE节点上"/software/doris-1.2.1/apache-doris-fe/jdbc_drivers"目录中上传mysql-connector-java-5.1.47.jar,如果没有jdbc_drivers需要手动创建。
上传的该jar包需要在创建JDBC Catalog时指定在jdbc.driver_url配置下,这里上传至node1~node5各个节点的/software/doris-1.2.1/apache-doris-fe/jdbc_drivers路径中。
CREATE CATALOG jdbc_mysql PROPERTIES (
"type"="jdbc",
"jdbc.user"="root",
"jdbc.password"="123456",
"jdbc.jdbc_url" = "jdbc:mysql://192.168.179.5:3306/demo",
"jdbc.driver_url" = "file:///software/doris-1.2.1/apache-doris-fe/jdbc_drivers/mysql-connector-java-5.1.47.jar",
"jdbc.driver_class" = "com.mysql.jdbc.Driver"
);
#在node2 MySQL中创建表,并插入数据
create table demo.tbl1 (id int,name varchar(255),age int);
create table demo.tbl2 (id int,name varchar(255),age int);
create table demo.tbl3 (id int,name varchar(255),age int);
insert into tbl1 values (1,'a',18),(2,'b',19);
insert into tbl2 values (3,'c',20),(4,'d',21);
insert into tbl3 values (5,'e',22),(6,'f',23);
注意:如果在Doris中创建了对应的Catalog,再向mysql中创建的表,Doris中不认识,可以删除Doris中的对应catalog并重建该Catalog同步mysql新的数据表元数据来解决。
#Doris JDBC 方式查询MySQL中的数据
mysql> select * from jdbc_mysql.demo.tbl1;
+------+------+------+
| id | name | age |
+------+------+------+
| 1 | a | 18 |
| 2 | b | 19 |
+------+------+------+
mysql> select * from jdbc_mysql.demo.tbl2;
+------+------+------+
| id | name | age |
+------+------+------+
| 3 | c | 20 |
| 4 | d | 21 |
+------+------+------+
mysql> select * from jdbc_mysql.demo.tbl3;
+------+------+------+
| id | name | age |
+------+------+------+
| 5 | e | 22 |
| 6 | f | 23 |
+------+------+------+
注意: 目前JDBC方式官方声称可以向外部数据库表中插入数据,但是经过测试目前Doris JDBC 读取外部数据库表只支持查询,不支持插入、更新、删除操作。
Doris中的优化涉及到的方面非常多,例如在分区、分桶、join、查询、参数等各个方面都有优化对应策略,具体可以参考前面每个部分内容,这里大体总结下Doris中的优化相关内容:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。