当前位置:   article > 正文

[实战系列]SelectDB Cloud Datax 数据写入最佳实践

selectdb

前言


企业正在经历其数据资产的爆炸式增长,这些数据包括批式或流式传输的结构化、半结构化以及非结构化数据,随着海量数据批量导入的场景的增多,企业对于 Data Pipeline 的需求也愈加复杂。新一代云原生实时数仓 SelectDB Cloud 作为一款运行于多云之上的云原生实时数据仓库,致力于通过开箱即用的能力为客户带来简单快速的数仓体验。在生态方面,SelectDB Cloud 提供了丰富的数据连接器插件(Connector)来连接各种来自周边大数据工具的数据源,内置 Kafka、Flink、Spark、DataX 等常见的 Connector。基于此,企业开发者能够更加便捷的将数据移动到 SelectDB Cloud 上,并利用 SelectDB Cloud 从数据资产中获取更高的价值。

SelectDB Cloud 基于 Apache Doris 研发的新一代云原生实时数仓 SelectDB,运行于多家云上,为客户提供极简运维和极致性价比的数仓服务


介绍


DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP,等各种异构数据源之间稳定高效的数据同步功能。


Selectdb 提供了datax-selectdb-writer 插件,可以将datax 支持的数据源同步到selectdb中
框架设计
主要分为三个部分:Reader,FrameWork,Writer

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

执行流程

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

Selectdb writer介绍
1. 快速介绍
该插件主要将 MySQL、Oracle 等数据库中的数据导入至 SELECTDB。该插件将数据转化为 CSV 或 JSON 格式并将其通过 copy into 方式批量导入至 SELECTDB。

  1. 实现原理

SelectdbWriter 通过调用selectdb upload api,返回一个重定向的S3地址,使用Http向S3地址发送字节流,设置参数达到要求时执行copy into


3. 编译
1. 下载
git clone https://github.com/selectdb/datax-selectdb.git
2. 运行 init-env.sh
这个脚本主要用于构建 DataX 开发环境,他主要进行了以下操作: 1 .将 DataX 代码库 clone 到本地。 2. 将 doriswriter/ 目录软链到 DataX/selectdbwriter 目录。 3. 在 DataX/pom.xml 文件中添加 <module>selectdbwriter</module> 模块。 4. 这个脚本执行后,开发者就可以进入 DataX/ 目录开始开发或编译了。因为做了软链, 所以任何对 DataX/doriswriter 目录中文件的修改,都会反映到 doriswriter/ 目录中, 方便开发者提交代码。

3. 编译 selectdbwriter

  1. 编译整个 DataX 项目

mvn package assembly:assembly -Dmaven.test.skip=true 产出在 target/datax/datax/. hdfsreader, hdfswriter and oscarwriter 这三个插件需要额外的jar包。如果你并不需要这些插件,可以在 DataX/pom.xml 中删除这些插件的模块。

  1. 单独编译 doriswriter 插件

mvn clean install -pl plugin-rdbms-util,doriswriter -DskipTests

  1. 编译错误

如遇到如下编译错误:
Could not find artifact com.alibaba.datax:datax-all:pom:0.0.1-SNAPSHOT ...
可尝试以下方式解决:

  • 下载 alibaba-datax-maven-m2-20210928.tar.gz
  • 解压后,将得到的 alibaba/datax/ 目录,拷贝到所使用的 maven 对应的 .m2/repository/com/alibaba/ 下。
  • 再次尝试编译。
  1. 示例
  • 配置样例

这里是一份从Stream读取数据后导入至selectdb的配置文件。

  1. {
  2. "job":{
  3. "content":[
  4. {
  5. "reader":{
  6. "name":"streamreader",
  7. "parameter":{
  8. "column":[
  9. {
  10. "type":"string",
  11. "random":"0,31"
  12. },
  13. {
  14. "type":"string",
  15. "random":"0,31"
  16. },
  17. {
  18. "type":"string",
  19. "random":"0,31"
  20. },
  21. {
  22. "type":"string",
  23. "random":"0,31"
  24. },
  25. {
  26. "type":"long",
  27. "random":"0,5"
  28. },
  29. {
  30. "type":"string",
  31. "random":"0,10"
  32. },
  33. {
  34. "type":"string",
  35. "random":"0,5"
  36. },
  37. {
  38. "type":"string",
  39. "random":"0,31"
  40. },
  41. {
  42. "type":"string",
  43. "random":"0,31"
  44. },
  45. {
  46. "type":"string",
  47. "random":"0,21"
  48. },
  49. {
  50. "type":"string",
  51. "random":"0,31"
  52. },
  53. {
  54. "type":"long",
  55. "random":"0,10"
  56. },
  57. {
  58. "type":"long",
  59. "random":"0,20"
  60. },
  61. {
  62. "type":"date",
  63. "random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
  64. },
  65. {
  66. "type":"long",
  67. "random":"0,10"
  68. },
  69. {
  70. "type":"date",
  71. "random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
  72. },
  73. {
  74. "type":"string",
  75. "random":"0,10"
  76. },
  77. {
  78. "type":"long",
  79. "random":"0,10"
  80. },
  81. {
  82. "type":"date",
  83. "random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
  84. },
  85. {
  86. "type":"long",
  87. "random":"0,10"
  88. },
  89. {
  90. "type":"date",
  91. "random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
  92. },
  93. {
  94. "type":"long",
  95. "random":"0,10"
  96. },
  97. {
  98. "type":"date",
  99. "random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
  100. },
  101. {
  102. "type":"long",
  103. "random":"0,10"
  104. },
  105. {
  106. "type":"date",
  107. "random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
  108. },
  109. {
  110. "type":"string",
  111. "random":"0,100"
  112. },
  113. {
  114. "type":"string",
  115. "random":"0,1"
  116. },
  117. {
  118. "type":"long",
  119. "random":"0,1"
  120. },
  121. {
  122. "type":"string",
  123. "random":"0,64"
  124. },
  125. {
  126. "type":"string",
  127. "random":"0,20"
  128. },
  129. {
  130. "type":"string",
  131. "random":"0,31"
  132. },
  133. {
  134. "type":"long",
  135. "random":"0,3"
  136. },
  137. {
  138. "type":"long",
  139. "random":"0,3"
  140. },
  141. {
  142. "type":"long",
  143. "random":"0,19"
  144. },
  145. {
  146. "type":"date",
  147. "random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
  148. },
  149. {
  150. "type":"string",
  151. "random":"0,1"
  152. }
  153. ],
  154. "sliceRecordCount":10
  155. }
  156. },
  157. "writer":{
  158. "name":"selectdbwriter",
  159. "parameter":{
  160. "loadUrl":[
  161. "xxx:47150"
  162. ],
  163. "loadProps":{
  164. "file.type":"json",
  165. "file.strip_outer_array":"true"
  166. },
  167. "column":[
  168. "id",
  169. "table_id",
  170. "table_no",
  171. "table_name",
  172. "table_status",
  173. "no_disturb",
  174. "dinner_type",
  175. "member_id",
  176. "reserve_bill_no",
  177. "pre_order_no",
  178. "queue_num",
  179. "person_num",
  180. "open_time",
  181. "open_time_format",
  182. "order_time",
  183. "order_time_format",
  184. "table_bill_id",
  185. "offer_time",
  186. "offer_time_format",
  187. "confirm_bill_time",
  188. "confirm_bill_time_format",
  189. "bill_time",
  190. "bill_time_format",
  191. "clear_time",
  192. "clear_time_format",
  193. "table_message",
  194. "bill_close",
  195. "table_type",
  196. "pad_mac",
  197. "company_id",
  198. "shop_id",
  199. "is_sync",
  200. "table_split_no",
  201. "ts",
  202. "ts_format",
  203. "dr"
  204. ],
  205. "username":"admin",
  206. "password":"",
  207. "postSql":[
  208. ],
  209. "preSql":[
  210. ],
  211. "connection":[
  212. {
  213. "jdbcUrl":"jdbc:mysql://xxx:34142/cl_test",
  214. "table":[
  215. "ods_pos_pro_table_dynamic_delta_v4"
  216. ],
  217. "selectedDatabase":"cl_test"
  218. }
  219. ],
  220. "maxBatchRows":1000000,
  221. "maxBatchByteSize":536870912000
  222. }
  223. }
  224. }
  225. ],
  226. "setting":{
  227. "errorLimit":{
  228. "percentage":0.02,
  229. "record":0
  230. },
  231. "speed":{
  232. "channel":5
  233. }
  234. }
  235. }
  236. }

  • 执行任务

python $DATAX_HOME/datax.py ../xx.json

  • 参数说明
  1. **jdbcUrl**
  2. - 描述:selectdb 的 JDBC 连接串,用户执行 preSql 或 postSQL。
  3. - 必选:是
  4. - 默认值:无
  5. * **loadUrl**
  6. - 描述:作为 selecdb 的连接目标。格式为 "ip:port"。其中 IP 是 selectdb的private-link,port 是selectdb 集群的 http_port
  7. - 必选:是
  8. - 默认值:无
  9. * **username**
  10. - 描述:访问selectdb数据库的用户名
  11. - 必选:是
  12. - 默认值:无
  13. * **password**
  14. - 描述:访问selectdb数据库的密码
  15. - 必选:否
  16. - 默认值:空
  17. * **connection.selectedDatabase**
  18. - 描述:需要写入的selectdb数据库名称。
  19. - 必选:是
  20. - 默认值:无
  21. * **connection.table**
  22. - 描述:需要写入的selectdb表名称。
  23. - 必选:是
  24. - 默认值:无
  25. * **column**
  26. - 描述:目的表**需要写入数据**的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
  27. - 必选:是
  28. - 默认值:否
  29. * **preSql**
  30. - 描述:写入数据到目的表前,会先执行这里的标准语句。
  31. - 必选:否
  32. - 默认值:无
  33. * **postSql**
  34. - 描述:写入数据到目的表后,会执行这里的标准语句。
  35. - 必选:否
  36. - 默认值:无
  37. * **maxBatchRows**
  38. - 描述:每批次导入数据的最大行数。和 **batchSize** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
  39. - 必选:否
  40. - 默认值:500000
  41. * **batchSize**
  42. - 描述:每批次导入数据的最大数据量。和 **maxBatchRows** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
  43. - 必选:否
  44. - 默认值:90M
  45. * **maxRetries**
  46. - 描述:每批次导入数据失败后的重试次数。
  47. - 必选:否
  48. - 默认值:3
  49. * **labelPrefix**
  50. - 描述:每批次上传文件的 label 前缀。最终的 label 将有 `labelPrefix + UUID` 组成全局唯一的 label,确保数据不会重复导入
  51. - 必选:否
  52. - 默认值:`datax_selectdb_writer_`
  53. * **loadProps**
  54. - 描述:COPY INOT 的请求参数
  55. 这里包括导入的数据格式:file.type等,导入数据格式默认我们使用csv,支持JSON,具体可以参照下面类型转换部分
  56. - 必选:否
  57. - 默认值:无
  58. * **clusterName**
  59. - 描述:selectdb could 集群名称
  60. - 必选:否
  61. - 默认值:无
  62. * **flushQueueLength**
  63. - 描述:队列长度
  64. - 必选:否
  65. - 默认值:1
  66. * **flushInterval**
  67. - 描述:数据写入批次的时间间隔,如果maxBatchRows 和 batchSize 参数设置的有很大,那么很可能达不到你这设置的数据量大小,会执行导入。
  68. - 必选:否
  69. - 默认值:30000ms

  1. 类型转化

默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行Selectdb导入操作。
默认是csv格式导入,如需更改列分隔符, 则正确配置 `loadProps` 即可:

"loadProps": {     "file.column_separator": "\\x01",     "file.line_delimiter": "\\x02" } 


如需更改导入格式为`json`, 则正确配置 `loadProps` 即可:

 "loadProps": {     "file.type": "json",     "file.strip_outer_array": true } 
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/782382
推荐阅读
相关标签
  

闽ICP备14008679号