赞
踩
目录
业务数据是数据仓库的重要数据来源,我们需要每日定时从业务数据库中抽取数据,传输到数据仓库中,之后再对数据进行分析统计。为保证统计结果的正确性,需要保证数据仓库中的数据与业务数据库是同步的,离线数仓的计算周期通常为天,所以数据同步周期也通常为天,即每天同步一次即可。
在同步业务数据时有两种同步策略:全量同步和增量同步
1)解释:每日全量,就是每天都将业务数据库中的全部数据同步一份到数据仓库,是保证两侧数据同步的最简单的方式
2)适用:表数据量不大,且每天即会有新数据插入,也会有旧的数据修改
3)例如:编码字典表、品牌表、商品三级分类表、商品二级分类表、商品一级分类表、优惠规则表、活动表、活动参与商品表、加购表、商品收藏表、优惠券表、SKU商品表、SPU商品表。
1. 解释:每日增量,就是每天只将业务数据中的新增及变化的数据同步到数据仓库中,
2. 适用:表数据量大,且每天只会有新的数据插入的场景,
3. 特点:采用每日增量的表,通常会在首日先进行一个全量同步。
4. 例如:退单表、订单状态表、支付流水表、订单详情表、活动与订单关联表、商品评论表。
两种策略都能保证数据仓库和业务数据库的数据同步,那应该选择哪个呢?
同步策略 | 优点 | 缺点 |
全量同步 | 逻辑简单 | 某些情况下效率低下 |
增量同步 | 效率高,无需同步和存储重复的数据 | 逻辑复杂,需要将每日的新增及变化数据同原来的数据进行整合,才能使用。 |
结论:
若业务表数据量比较大,且每天的数据变化比例还比较低,这时应采用增量同步,否则采用全量同步。
1. 介绍:DataX是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(Mysql、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
2. 特点:
① DataX侧重于同步数据库中的数据
② DataX没有所谓版本号,这有这一个开源的版本
③ 开源的,有些功能受限,商业版名为DataWorks。
1.异构数据源同步问题,就是不同框架之间同步数据时,相同的数据在不同框架中具有不同的数据结构。
2. DataX的设计理念:
DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接数据各种数据源。
当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
3. 当前使用现状:
DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8W多道作业,每日传输数据量超过300TB。
DataX本身作为离线数据同步框架,采用Framework+plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
其中:
① Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。
② Writer:数据写入模块,负责不断从Framework取数据,并将数据写出到目的端。
③ Framework:主题框架,用于连接Reader和Writer,作为两者的数据传输通道,并处理缓冲、流控、并发、数据转换等核心技术问题。
DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NoSQL、大数据计算系统都已经接入,支持如下:
类型 | 数据源 | Reader(读) | Writer(写) |
RDBMS 关系型数据库 | MySQL | √ | √ |
Oracle | √ | √ | |
OceanBase | √ | √ | |
SQLServer | √ | √ | |
PostgreSQL | √ | √ | |
DRDS | √ | √ | |
通用RDBMS | √ | √ | |
阿里云数仓数据存储 | ODPS | √ | √ |
ADS | √ | ||
OSS | √ | √ | |
OCS | √ | √ | |
NoSQL数据存储 | OTS | √ | √ |
Hbase0.94 | √ | √ | |
Hbase1.1 | √ | √ | |
Phoenix4.x | √ | √ | |
Phoenix5.x | √ | √ | |
MongoDB | √ | √ | |
Hive | √ | √ | |
Cassandra | √ | √ | |
无结构化数据存储 | TxtFile | √ | √ |
FTP | √ | √ | |
HDFS | √ | √ | |
Elasticsearch | √ | ||
时间序列数据库 | OpenTSDB | √ | |
TSDB | √ | √ |
DataX支持单机多线程模式完成同步作业,下面用一个DataX作业生命周期的时序图,用以说明DataX的运行流程、核心概念以及每个概念的关系。
1.核心模块介绍:
①Job:DataX完成单个数据同步的作业称之为Job,Job模块是单个作业的中枢管理节点,承担着数据清理、子任务切分、TaskGroup管理等功能。
② Task:由Job切分出来的子任务。Task是DataX作业中的最小单元,每个一个Task都会负责一部分的同步工作
③ Scheduler:Scheduler模块根据并发量重组Task
④ TaskGroup:重组后的多个Task的集合,即任务组。负责以一定的并发量运行分配好的所有Task,默认并发量5
2.流程介绍:
1)DataX接受到一个Job作业后,将启动一个进程来完成整个作业同步过程。
2)Job启动后,会根据不同数据源端的切分策略,将Job切分成多个小的Task(子任务),以便并发执行。
3)切分多个Task之后,Job会调用Scheduler模块根据配置的并发数量,将Task重新组合,成为TaskGroup(任务组)。
4)每个Task都是由TaskGroup负责启动,Task启动后,会固定启动Reder->Channel->Writer的线程来完成任务的同步工作。
5)DataX作业运行起来之后,Job监控并等待多个TaskGroup模块任务完成,等待所有的TaskGroup任务完成Job成功退出。否则,异常退出。
1. 案例需求:
如果用户提交了一个DataX作业,并配置了总的并发度为20,目睹是对有100张表的Mysql数据源进行同步。
DataX的调度策略思路是:
1)Job根据数据源的切分策略,将作业切分成100个Task
2)根据配置的并发度是20、每个TaskGroup的并发度是5,计算出需要TaskGroup的个数是4个
3)由4个TaskGroup平分100个Task,每个TaskGroup负责运行25个Task。
功能 | DataX | Sqoop |
运行模式 | 单进程多线程 | MR |
分布式 | 不支持,可以通过调度系统规避 | 支持 |
流控 | 有流控功能 | 需要定制 |
统计信息 | 已有一些统计,上报需定制 | 没有,分布式的数据收集不方便 |
数据校验 | 在core部分有校验功能 | 没有,分布式的数据收集不方便 |
监控 | 需要定制 | 需要定制 |
- 1. 上传DataX的安装包到/opt/software下
- [hadoop102 software]$ ll
-
- 总用量 1406064
-
- -rw-rw-r--. 1 atguigu atguigu 829372407 2月 27 12:00 datax.tar.gz
-
- 2. 解压Jar包到/opt/module/下
-
- [hadoop102 datax]$ tar -zxvf datax.tar.gz -C /opt/module
- 1. 执行如下命令进行自检,出现如下内容表明安装成功。
- [atguigu@hadoop102 datax]$ python bin/datax.py job/job.json
- ……
- 2022-02-27 12:52:15.568 [job-0] INFO JobContainer -
-
- 任务启动时刻 : 2022-02-27 12:52:05
- 任务结束时刻 : 2022-02-27 12:52:15
- 任务总计耗时 : 10s
- 任务平均流量 : 253.91KB/s
- 记录写入速度 : 10000rec/s
- 读出记录总数 : 100000
- 读写失败总数 : 0
DataX的使用非常简单,用户仅需要根据自己同步数据的数据源和目的地的类型来选择相应的Reader和Writer插件即可,并将Reader和Writer插件的信息配置在一个json文件中,然后,在执行命令时,指定配置文件提交数据同步任务即可。
任务提交实例:
[hadoop102 datax]$ python bin/datax.py path/to/your/job.json
4.1.2 DataX配置文件格式
1. 可以执行如下命令,查看DataX配置文件模板
[hadoop102 datax]$ python bin/datax.py -r mysqlreader -w hdfswriter
Reader和Writer的具体参数可参考官方文档:
DataX/README.md at master · alibaba/DataX · GitHub
① MysqlReader插件介绍:实现了从Mysql读取数据。在底层实现上,MysqlReader通过JDBC连接远程Mysql数据库,并执行相应的sql语句将数据从mysql库中select出来(和idea通过jdbc连接mysql写sql原理一样)。
② MysqlReader插件原理:MysqlReader通过JDBC连接器连接到远程的Mysql数据库,并根据用户配置的信息生成查询语句,然后发送到远程Mysql数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。TableMode模式中根据用户配置的Table、column、Where等信息生成SQl语句发送到Mysql数据库;QuerysqlMode模式则是直接将用户配置的querysql信息发送到Mysql数据库中。
③ HdfsWriter插件介绍:提供向HDFS文件系统指定路径中写入TextTile和OrcFile类型的文件,文件内容可与Hive表相关联。
④ HdfsWriter插件实现过程:首先根据用户指定的path,创建一个hdfs文件系统上的不存在的临时目录,创建规则是:path_随机;然后将读取的文件写入到这个临时目录中;待到全部写入后,再将这个临时目录下的文件移动到用户所指定的目录下,(在创建文件时保证文件名不重复);最后删除临时目录。如果在中间过程中发生网络中断等情况,造成无法与hdfs建立连接,需要用户手动删除已经写入的文件和临时目录。
1. 速度控制介绍
DataX中提供了包括通道(并发)、记录流、字节流三种流控模式,可以根据需要控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。
2. 优化参数
参数 | 说明 | 注意事项 |
job.setting.speed.channel | 并发数 | |
job.setting.speed.record | 总record限速 | 配置此参数,则必须配置单个channel的record限速参数 |
job.setting.speed.byte | 总byte限速 | 配置此参数,则必须配置单个channel的byte限速参数 |
core.transport.channel.speed.record | 单个channel的record限速,默认10000条/s | |
core.transport.channel.speed.byte | 单个channel的byte限速,默认1M/s |
注意:如果配置了总record限速和总byte限速,channel并发数就会失效。因为配置了这两个参数后,实际的channel并发数是通过计算得到的。
①计算公式:
1. 内存调整介绍
当提升DataX Job内的Channel并发数时,内存的占用会明显增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据,例如:channel中会有一个Buffer,作为临时的数据交换缓冲区,而在Reader和Write中,也会有一些buffer,为了防止OOM等错误,需要适当调大JVM堆内存。
2. 建议措施
建议根据实际情况来适当调整内存,如:将内存设置为4G或者8G,
方式:
① 修改datax.py脚本
② 启动时使用参数:python bin/datax.py --jvm = "-Xms8G -Xmx8G” /path/to/your/job.json
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。