赞
踩
企业正在经历其数据资产的爆炸式增长,这些数据包括批式或流式传输的结构化、半结构化以及非结构化数据,随着海量数据批量导入的场景的增多,企业对于 Data Pipeline 的需求也愈加复杂。新一代云原生实时数仓 SelectDB Cloud 作为一款运行于多云之上的云原生实时数据仓库,致力于通过开箱即用的能力为客户带来简单快速的数仓体验。在生态方面,SelectDB Cloud 提供了丰富的数据连接器插件(Connector)来连接各种来自周边大数据工具的数据源,内置 Kafka、Flink、Spark、DataX 等常见的 Connector。基于此,企业开发者能够更加便捷的将数据移动到 SelectDB Cloud 上,并利用 SelectDB Cloud 从数据资产中获取更高的价值。
SelectDB Cloud 基于 Apache Doris 研发的新一代云原生实时数仓 SelectDB,运行于多家云上,为客户提供极简运维和极致性价比的数仓服务
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP,等各种异构数据源之间稳定高效的数据同步功能。
Selectdb 提供了datax-selectdb-writer 插件,可以将datax 支持的数据源同步到selectdb中
框架设计
主要分为三个部分:Reader,FrameWork,Writer
执行流程
Selectdb writer介绍
1. 快速介绍
该插件主要将 MySQL、Oracle 等数据库中的数据导入至 SELECTDB。该插件将数据转化为 CSV 或 JSON 格式并将其通过 copy into 方式批量导入至 SELECTDB。
SelectdbWriter 通过调用selectdb upload api,返回一个重定向的S3地址,使用Http向S3地址发送字节流,设置参数达到要求时执行copy into
3. 编译
1. 下载
git clone https://github.com/selectdb/datax-selectdb.git
2. 运行 init-env.sh
这个脚本主要用于构建 DataX 开发环境,他主要进行了以下操作: 1 .将 DataX 代码库 clone 到本地。 2. 将 doriswriter/ 目录软链到 DataX/selectdbwriter 目录。 3. 在 DataX/pom.xml 文件中添加 <module>selectdbwriter</module> 模块。 4. 这个脚本执行后,开发者就可以进入 DataX/ 目录开始开发或编译了。因为做了软链, 所以任何对 DataX/doriswriter 目录中文件的修改,都会反映到 doriswriter/ 目录中, 方便开发者提交代码。
3. 编译 selectdbwriter
mvn package assembly:assembly -Dmaven.test.skip=true 产出在 target/datax/datax/. hdfsreader, hdfswriter and oscarwriter 这三个插件需要额外的jar包。如果你并不需要这些插件,可以在 DataX/pom.xml 中删除这些插件的模块。
mvn clean install -pl plugin-rdbms-util,doriswriter -DskipTests
如遇到如下编译错误:
Could not find artifact com.alibaba.datax:datax-all:pom:0.0.1-SNAPSHOT ...
可尝试以下方式解决:
alibaba/datax/
目录,拷贝到所使用的 maven 对应的 .m2/repository/com/alibaba/
下。这里是一份从Stream读取数据后导入至selectdb的配置文件。
- {
- "job":{
- "content":[
- {
- "reader":{
- "name":"streamreader",
- "parameter":{
- "column":[
- {
- "type":"string",
- "random":"0,31"
- },
- {
- "type":"string",
- "random":"0,31"
- },
- {
- "type":"string",
- "random":"0,31"
- },
- {
- "type":"string",
- "random":"0,31"
- },
- {
- "type":"long",
- "random":"0,5"
- },
- {
- "type":"string",
- "random":"0,10"
- },
- {
- "type":"string",
- "random":"0,5"
- },
- {
- "type":"string",
- "random":"0,31"
- },
- {
- "type":"string",
- "random":"0,31"
- },
- {
- "type":"string",
- "random":"0,21"
- },
- {
- "type":"string",
- "random":"0,31"
- },
- {
- "type":"long",
- "random":"0,10"
- },
- {
- "type":"long",
- "random":"0,20"
- },
- {
- "type":"date",
- "random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
- },
- {
- "type":"long",
- "random":"0,10"
- },
- {
- "type":"date",
- "random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
- },
- {
- "type":"string",
- "random":"0,10"
- },
- {
- "type":"long",
- "random":"0,10"
- },
- {
- "type":"date",
- "random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
- },
- {
- "type":"long",
- "random":"0,10"
- },
- {
- "type":"date",
- "random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
- },
- {
- "type":"long",
- "random":"0,10"
- },
- {
- "type":"date",
- "random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
- },
- {
- "type":"long",
- "random":"0,10"
- },
- {
- "type":"date",
- "random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
- },
- {
- "type":"string",
- "random":"0,100"
- },
- {
- "type":"string",
- "random":"0,1"
- },
- {
- "type":"long",
- "random":"0,1"
- },
- {
- "type":"string",
- "random":"0,64"
- },
- {
- "type":"string",
- "random":"0,20"
- },
- {
- "type":"string",
- "random":"0,31"
- },
- {
- "type":"long",
- "random":"0,3"
- },
- {
- "type":"long",
- "random":"0,3"
- },
- {
- "type":"long",
- "random":"0,19"
- },
- {
- "type":"date",
- "random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
- },
- {
- "type":"string",
- "random":"0,1"
- }
- ],
- "sliceRecordCount":10
- }
- },
- "writer":{
- "name":"selectdbwriter",
- "parameter":{
- "loadUrl":[
- "xxx:47150"
- ],
- "loadProps":{
- "file.type":"json",
- "file.strip_outer_array":"true"
- },
- "column":[
- "id",
- "table_id",
- "table_no",
- "table_name",
- "table_status",
- "no_disturb",
- "dinner_type",
- "member_id",
- "reserve_bill_no",
- "pre_order_no",
- "queue_num",
- "person_num",
- "open_time",
- "open_time_format",
- "order_time",
- "order_time_format",
- "table_bill_id",
- "offer_time",
- "offer_time_format",
- "confirm_bill_time",
- "confirm_bill_time_format",
- "bill_time",
- "bill_time_format",
- "clear_time",
- "clear_time_format",
- "table_message",
- "bill_close",
- "table_type",
- "pad_mac",
- "company_id",
- "shop_id",
- "is_sync",
- "table_split_no",
- "ts",
- "ts_format",
- "dr"
- ],
- "username":"admin",
- "password":"",
- "postSql":[
-
- ],
- "preSql":[
-
- ],
- "connection":[
- {
- "jdbcUrl":"jdbc:mysql://xxx:34142/cl_test",
- "table":[
- "ods_pos_pro_table_dynamic_delta_v4"
- ],
- "selectedDatabase":"cl_test"
- }
- ],
- "maxBatchRows":1000000,
- "maxBatchByteSize":536870912000
- }
- }
- }
- ],
- "setting":{
- "errorLimit":{
- "percentage":0.02,
- "record":0
- },
- "speed":{
- "channel":5
- }
- }
- }
- }
python $DATAX_HOME/datax.py ../xx.json
- **jdbcUrl**
- - 描述:selectdb 的 JDBC 连接串,用户执行 preSql 或 postSQL。
- - 必选:是
- - 默认值:无
-
- * **loadUrl**
-
- - 描述:作为 selecdb 的连接目标。格式为 "ip:port"。其中 IP 是 selectdb的private-link,port 是selectdb 集群的 http_port
- - 必选:是
- - 默认值:无
-
- * **username**
-
- - 描述:访问selectdb数据库的用户名
- - 必选:是
- - 默认值:无
-
- * **password**
-
- - 描述:访问selectdb数据库的密码
- - 必选:否
- - 默认值:空
-
- * **connection.selectedDatabase**
- - 描述:需要写入的selectdb数据库名称。
- - 必选:是
- - 默认值:无
-
- * **connection.table**
- - 描述:需要写入的selectdb表名称。
- - 必选:是
- - 默认值:无
-
- * **column**
-
- - 描述:目的表**需要写入数据**的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
- - 必选:是
- - 默认值:否
-
- * **preSql**
-
- - 描述:写入数据到目的表前,会先执行这里的标准语句。
- - 必选:否
- - 默认值:无
-
- * **postSql**
-
- - 描述:写入数据到目的表后,会执行这里的标准语句。
- - 必选:否
- - 默认值:无
-
-
- * **maxBatchRows**
-
- - 描述:每批次导入数据的最大行数。和 **batchSize** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
- - 必选:否
- - 默认值:500000
-
- * **batchSize**
-
- - 描述:每批次导入数据的最大数据量。和 **maxBatchRows** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
- - 必选:否
- - 默认值:90M
-
- * **maxRetries**
-
- - 描述:每批次导入数据失败后的重试次数。
- - 必选:否
- - 默认值:3
-
- * **labelPrefix**
-
- - 描述:每批次上传文件的 label 前缀。最终的 label 将有 `labelPrefix + UUID` 组成全局唯一的 label,确保数据不会重复导入
- - 必选:否
- - 默认值:`datax_selectdb_writer_`
-
- * **loadProps**
-
- - 描述:COPY INOT 的请求参数
-
- 这里包括导入的数据格式:file.type等,导入数据格式默认我们使用csv,支持JSON,具体可以参照下面类型转换部分
-
- - 必选:否
-
- - 默认值:无
-
- * **clusterName**
-
- - 描述:selectdb could 集群名称
-
- - 必选:否
-
- - 默认值:无
-
- * **flushQueueLength**
-
- - 描述:队列长度
-
- - 必选:否
-
- - 默认值:1
-
- * **flushInterval**
-
- - 描述:数据写入批次的时间间隔,如果maxBatchRows 和 batchSize 参数设置的有很大,那么很可能达不到你这设置的数据量大小,会执行导入。
-
- - 必选:否
-
- - 默认值:30000ms
默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行Selectdb导入操作。
默认是csv格式导入,如需更改列分隔符, 则正确配置 `loadProps` 即可:
"loadProps": { "file.column_separator": "\\x01", "file.line_delimiter": "\\x02" }
如需更改导入格式为`json`, 则正确配置 `loadProps` 即可:
"loadProps": { "file.type": "json", "file.strip_outer_array": true }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。