赞
踩
1、 早期关系型数据库之间的数据同步
1)、全量同步
比如从oracle数据库中同步一张表的数据到Mysql中,通常的做法就是 分页查询源端的表,然后通过 jdbc的batch 方式插入到目标表,这个地方需要注意的是,分页查询时,一定要按照主键id来排序分页,避免重复插入。
2)、基于数据文件导出和导入的全量同步,这种同步方式一般只适用于同种数据库之间的同步,如果是不同的数据库,这种方式可能会存在问题。
3)、基于触发器的增量同步
增量同步一般是做实时的同步,早期很多数据同步都是基于关系型数据库的触发器trigger来做的。
使用触发器实时同步数据的步骤:
A、 基于原表创触发器,触发器包含insert,modify,delete 三种类型的操作,数据库的触发器分Before和After两种情况,一种是在insert,modify,delete 三种类型的操作发生之前触发(比如记录日志操作,一般是Before),一种是在insert,modify,delete 三种类型的操作之后触发。
B、 创建增量表,增量表中的字段和原表中的字段完全一样,但是需要多一个操作类型字段(分表代表insert,modify,delete 三种类型的操作),并且需要一个唯一自增ID,代表数据原表中数据操作的顺序,这个自增id非常重要,不然数据同步就会错乱。
C、 原表中出现insert,modify,delete 三种类型的操作时,通过触发器自动产生增量数据,插入增量表中。
D、处理增量表中的数据,处理时,一定是按照自增id的顺序来处理,这种效率会非常低,没办法做批量操作,不然数据会错乱。 有人可能会说,是不是可以把insert操作合并在一起,modify合并在一起,delete操作合并在一起,然后批量处理,我给的答案是不行,因为数据的增删改是有顺序的,合并后,就没有顺序了,同一条数据的增删改顺序一旦错了,那数据同步就肯定错了。
市面上很多数据etl数据交换产品都是基于这种思想来做的。
E、 这种思想使用kettle 很容易就可以实现,笔者曾经在自己的博客中写过 kettle的文章,https://www.cnblogs.com/laoqing/p/7360673.html
4)、基于时间戳的增量同步
A、首先我们需要一张临时temp表,用来存取每次读取的待同步的数据,也就是把每次从原表中根据时间戳读取到数据先插入到临时表中,每次在插入前,先清空临时表的数据
B、我们还需要创建一个时间戳配置表,用于存放每次读取的处理完的数据的最后的时间戳。
C、每次从原表中读取数据时,先查询时间戳配置表,然后就知道了查询原表时的开始时间戳。
D、根据时间戳读取到原表的数据,插入到临时表中,然后再将临时表中的数据插入到目标表中。
E、从缓存表中读取出数据的最大时间戳,并且更新到时间戳配置表中。缓存表的作用就是使用sql获取每次读取到的数据的最大的时间戳,当然这些都是完全基于sql语句在kettle中来配置,才需要这样的一张临时表。
2、 大数据时代下的数据同步
1)、基于数据库日志(比如mysql的binlog)的同步
我们都知道很多数据库都支持了主从自动同步,尤其是mysql,可以支持多主多从的模式。那么我们是不是可以利用这种思想呢,答案当然是肯定的,mysql的主从同步的过程是这样的。
A、master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
B、slave将master的binary log events拷贝到它的中继日志(relay log);
C、slave重做中继日志中的事件,将改变反映它自己的数据。
阿里巴巴开源的canal就完美的使用这种方式,canal 伪装了一个Slave 去喝Master进行同步。
A、 canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
B、 mysql master收到dump请求,开始推送binary log给slave(也就是canal)
C、 canal解析binary log对象(原始为byte流)
另外canal 在设计时,特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑。
canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
canal c# 客户端: https://github.com/dotnetcore/CanalSharp
canal go客户端: https://github.com/CanalClient/canal-go
canal php客户端: https://github.com/xingwenge/canal-php、
github的地址:https://github.com/alibaba/canal/
另外canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
D、在使用canal时,mysql需要开启binlog,并且binlog-format必须为row,可以在mysql的my.cnf文件中增加如下配置
- log-bin=E:/mysql5.5/bin_log/mysql-bin.log
- binlog-format=ROW
- server-id=123、
E、 部署canal的服务端,配置canal.properties文件,然后 启动 bin/startup.sh 或bin/startup.bat
- #设置要监听的mysql服务器的地址和端口
- canal.instance.master.address = 127.0.0.1:3306
- #设置一个可访问mysql的用户名和密码并具有相应的权限,本示例用户名、密码都为canal
- canal.instance.dbUsername = canal
- canal.instance.dbPassword = canal
- #连接的数据库
- canal.instance.defaultDatabaseName =test
- #订阅实例中所有的数据库和表
- canal.instance.filter.regex = .*\\..*
- #连接canal的端口
- canal.port= 11111
- #监听到的数据变更发送的队列
- canal.destinations= example
F、 客户端开发,在maven中引入canal的依赖
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.client</artifactId>
- <version>1.0.21</version>
- </dependency>
代码示例:
- package com.example;
-
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.common.utils.AddressUtils;
- import com.alibaba.otter.canal.protocol.CanalEntry;
- import com.alibaba.otter.canal.protocol.Message;
- import com.google.protobuf.InvalidProtocolBufferException;
-
- import java.net.InetSocketAddress;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-
-
- public class CanalClientExample {
-
- public static void main(String[] args) {
- while (true) {
- //连接canal
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal");
- connector.connect();
- //订阅 监控的 数据库.表
- connector.subscribe("demo_db.user_tab");
- //一次取10条
- Message msg = connector.getWithoutAck(10);
-
- long batchId = msg.getId();
- int size = msg.getEntries().size();
- if (batchId < 0 || size == 0) {
- System.out.println("没有消息,休眠5秒");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } else {
- //
- CanalEntry.RowChange row = null;
- for (CanalEntry.Entry entry : msg.getEntries()) {
- try {
- row = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- List<CanalEntry.RowData> rowDatasList = row.getRowDatasList();
- for (CanalEntry.RowData rowdata : rowDatasList) {
- List<CanalEntry.Column> afterColumnsList = rowdata.getAfterColumnsList();
- Map<String, Object> dataMap = transforListToMap(afterColumnsList);
- if (row.getEventType() == CanalEntry.EventType.INSERT) {
- //具体业务操作
- System.out.println(dataMap);
- } else if (row.getEventType() == CanalEntry.EventType.UPDATE) {
- //具体业务操作
- System.out.println(dataMap);
- } else if (row.getEventType() == CanalEntry.EventType.DELETE) {
- List<CanalEntry.Column> beforeColumnsList = rowdata.getBeforeColumnsList();
- for (CanalEntry.Column column : beforeColumnsList) {
- if ("id".equals(column.getName())) {
- //具体业务操作
- System.out.println("删除的id:" + column.getValue());
- }
- }
- } else {
- System.out.println("其他操作类型不做处理");
- }
-
- }
-
- } catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
- }
- }
- //确认消息
- connector.ack(batchId);
- }
-
-
- }
- }
-
- public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) {
- Map map = new HashMap();
- if (afterColumnsList != null && afterColumnsList.size() > 0) {
- for (CanalEntry.Column column : afterColumnsList) {
- map.put(column.getName(), column.getValue());
- }
- }
- return map;
- }
-
- }
2)、基于BulkLoad的数据同步,比如从hive同步数据到hbase
我们有两种方式可以实现,
A、 使用spark任务,通过HQl读取数据,然后再通过hbase的Api插入到hbase中
但是这种做法,效率很低,而且大批量的数据同时插入Hbase,对Hbase的性能影响很大。
在大数据量的情况下,使用BulkLoad可以快速导入,BulkLoad主要是借用了hbase的存储设计思想,因为hbase本质是存储在hdfs上的一个文件夹,然后底层是以一个个的Hfile存在的。HFile的形式存在。Hfile的路径格式一般是这样的:
/hbase/data/default(默认是这个,如果hbase的表没有指定命名空间的话,如果指定了,这个就是命名空间的名字)/<tbl_name>/<region_id>/<cf>/<hfile_id>
B、 BulkLoad实现的原理就是按照HFile格式存储数据到HDFS上,生成Hfile可以使用hadoop的MapReduce来实现。如果不是hive中的数据,比如外部的数据,那么我们可以将外部的数据生成文件,然后上传到hdfs中,组装RowKey,然后将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的目录中。
当然我们也可以不事先生成hfile,可以使用spark任务直接从hive中读取数据转换成RDD,然后使用HbaseContext的自动生成Hfile文件,部分关键代码如下:
- //将DataFrame转换bulkload需要的RDD格式
- val rddnew = datahiveDF.rdd.map(row => {
- val rowKey = row.getAs[String](rowKeyField)
-
- fields.map(field => {
- val fieldValue = row.getAs[String](field)
- (Bytes.toBytes(rowKey), Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))
- })
- }).flatMap(array => {
- (array)
- })
- …
- //使用HBaseContext的bulkload生成HFile文件
- hbaseContext.bulkLoad[Put](rddnew.map(record => {
- val put = new Put(record._1)
- record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
- put
- }), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload")
-
- val conn = ConnectionFactory.createConnection(hBaseConf)
- val hbTableName = TableName.valueOf(hBaseTempTable.getBytes())
- val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))
- val realTable = conn.getTable(hbTableName)
- HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator)
-
- // bulk load start
- val loader = new LoadIncrementalHFiles(hBaseConf)
- val admin = conn.getAdmin()
- loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator)
-
- sc.stop()
- }
- …
- def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
- val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
- import scala.collection.JavaConversions._
- for (cells <- put.getFamilyCellMap.entrySet().iterator()) {
- val family = cells.getKey
- for (value <- cells.getValue) {
- val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
- ret.+=((kfq, CellUtil.cloneValue(value)))
- }
- }
- ret.iterator
- }
- }
C、pg_bulkload的使用
这是一个支持pg库(PostgreSQL)批量导入的插件工具,它的思想也是通过外部文件加载的方式,这个工具笔者没有亲自去用过,详细的介绍可以参考:https://my.oschina.net/u/3317105/blog/852785 pg_bulkload项目的地址:http://pgfoundry.org/projects/pgbulkload/
3)、基于sqoop的全量导入
Sqoop 是hadoop生态中的一个工具,专门用于外部数据导入进入到hdfs中,外部数据导出时,支持很多常见的关系型数据库,也是在大数据中常用的一个数据导出导入的交换工具。
Sqoop从外部导入数据的流程图如下:
Sqoop将hdfs中的数据导出的流程如下:
本质都是用了大数据的数据分布式处理来快速的导入和导出数据。
4)、HBase中建表,然后Hive中建一个外部表,这样当Hive中写入数据后,HBase中也会同时更新,但是需要注意
A、hbase中的空cell在hive中会补null
B、hive和hbase中不匹配的字段会补null
我们可以在hbase的shell 交互模式下,创建一张hbse表
create 'bokeyuan','zhangyongqing'
使用这个命令,我们可以创建一张叫bokeyuan的表,并且里面有一个列族zhangyongqing,hbase创建表时,可以不用指定字段,但是需要指定表名以及列族
我们可以使用的hbase的put命令插入一些数据
- put 'bokeyuan','001','zhangyongqing:name','robot'
- put 'bokeyuan','001','zhangyongqing:age','20'
- put 'bokeyuan','002','zhangyongqing:name','spring'
- put 'bokeyuan','002','zhangyongqing:age','18'
可以通过hbase的scan 全表扫描的方式查看我们插入的数据
scan ' bokeyuan'
我们继续创建一张hive外部表
- create external table bokeyuan (id int, name string, age int)
- STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
- WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,user:name,user:age")
- TBLPROPERTIES("hbase.table.name" = " bokeyuan");
外部表创建好了后,我们可以使用HQL语句来查询hive中的数据了
- select * from classes;
- OK
- 1 robot 20
- 2 spring 18
5)、Debezium+bireme:Debezium for PostgreSQL to Kafka Debezium也是一个通过监控数据库的日志变化,通过对行级日志的处理来达到数据同步,而且Debezium 可以通过把数据放入到kafka,这样就可以通过消费kafka的数据来达到数据同步的目的。而且还可以给多个地方进行消费使用。
Debezium是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台。你可以安装并且配置Debezium去监控你的数据库,然后你的应用就可以消费对数据库的每一个行级别(row-level)的更改。只有已提交的更改才是可见的,所以你的应用不用担心事务(transaction)或者更改被回滚(roll back)。Debezium为所有的数据库更改事件提供了一个统一的模型,所以你的应用不用担心每一种数据库管理系统的错综复杂性。另外,由于Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。
该项目的GitHub地址为:https://github.com/debezium/debezium 这是一个开源的项目。
本来监控数据库,并且在数据变动的时候获得通知其实一直是一件很复杂的事情。关系型数据库的触发器可以做到,但是只对特定的数据库有效,而且通常只能更新数据库内的状态(无法和外部的进程通信)。一些数据库提供了监控数据变动的API或者框架,但是没有一个标准,每种数据库的实现方式都是不同的,并且需要大量特定的知识和理解特定的代码才能运用。确保以相同的顺序查看和处理所有更改,同时最小化影响数据库仍然非常具有挑战性。
Debezium正好提供了模块为你做这些复杂的工作。一些模块是通用的,并且能够适用多种数据库管理系统,但在功能和性能方面仍有一些限制。另一些模块是为特定的数据库管理系统定制的,所以他们通常可以更多地利用数据库系统本身的特性来提供更多功能,Debezium提供了对MongoDB,mysql,pg,sqlserver的支持。
Debezium是一个捕获数据更改(CDC)平台,并且利用Kafka和Kafka Connect实现了自己的持久性、可靠性和容错性。每一个部署在Kafka Connect分布式的、可扩展的、容错性的服务中的connector监控一个上游数据库服务器,捕获所有的数据库更改,然后记录到一个或者多个Kafka topic(通常一个数据库表对应一个kafka topic)。Kafka确保所有这些数据更改事件都能够多副本并且总体上有序(Kafka只能保证一个topic的单个分区内有序),这样,更多的客户端可以独立消费同样的数据更改事件而对上游数据库系统造成的影响降到很小(如果N个应用都直接去监控数据库更改,对数据库的压力为N,而用debezium汇报数据库更改事件到kafka,所有的应用都去消费kafka中的消息,可以把对数据库的压力降到1)。另外,客户端可以随时停止消费,然后重启,从上次停止消费的地方接着消费。每个客户端可以自行决定他们是否需要exactly-once或者at-least-once消息交付语义保证,并且所有的数据库或者表的更改事件是按照上游数据库发生的顺序被交付的。
对于不需要或者不想要这种容错级别、性能、可扩展性、可靠性的应用,他们可以使用内嵌的Debezium connector引擎来直接在应用内部运行connector。这种应用仍需要消费数据库更改事件,但更希望connector直接传递给它,而不是持久化到Kafka里。
更详细的介绍可以参考:https://www.jianshu.com/p/f86219b1ab98
bireme 的github 地址 https://github.com/HashDataInc/bireme
bireme 的介绍:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md
另外Maxwell也是可以实现MySQL到Kafka的消息中间件,消息格式采用Json:
Download:
https://github.com/zendesk/maxwell/releases/download/v1.22.5/maxwell-1.22.5.tar.gz
Source:
https://github.com/zendesk/maxwell
6)、datax
datax 是阿里开源的etl 工具,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能,采用java+python进行开发,核心是java语言实现。
github地址:https://github.com/alibaba/DataX
A、设计架构:
数据交换通过DataX进行中转,任何数据源只要和DataX连接上即可以和已实现的任意数据源同步
B、框架
核心模块介绍:
DataX调度流程:
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
优势:
从插件视角看框架
总之,Job拆分为Task,分别在框架提供的容器中执行,插件只需要实现Job和Task两部分逻辑。
物理执行有三种运行模式:
总体来说,当JobContainer和TaskGroupContainer运行在同一个进程内的时候就是单机模式,在不同进程执行就是分布式模式。
如果需要开发插件,可以看zhege这个插件开发指南: https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md
数据源支持情况:
类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
---|---|---|---|---|
RDBMS 关系型数据库 | MySQL | √ | √ | 读 、写 |
Oracle | √ | √ | 读 、写 | |
SQLServer | √ | √ | 读 、写 | |
PostgreSQL | √ | √ | 读 、写 | |
DRDS | √ | √ | 读 、写 | |
通用RDBMS(支持所有关系型数据库) | √ | √ | 读 、写 | |
阿里云数仓数据存储 | ODPS | √ | √ | 读 、写 |
ADS | √ | 写 | ||
OSS | √ | √ | 读 、写 | |
OCS | √ | √ | 读 、写 | |
NoSQL数据存储 | OTS | √ | √ | 读 、写 |
Hbase0.94 | √ | √ | 读 、写 | |
Hbase1.1 | √ | √ | 读 、写 | |
Phoenix4.x | √ | √ | 读 、写 | |
Phoenix5.x | √ | √ | 读 、写 | |
MongoDB | √ | √ | 读 、写 | |
Hive | √ | √ | 读 、写 | |
无结构化数据存储 | TxtFile | √ | √ | 读 、写 |
FTP | √ | √ | 读 、写 | |
HDFS | √ | √ | 读 、写 | |
Elasticsearch | √ | 写 | ||
时间序列数据库 | OpenTSDB | √ | 读 | |
TSDB | √ | 写 |
7)、OGG
OGG 一般主要用于Oracle数据库。即Oracle GoldenGate是Oracle的同步工具 ,可以实现两个Oracle数据库之间的数据的同步,也可以实现Oracle数据同步到Kafka,相关的配置操作可以参考如下:
https://blog.csdn.net/dkl12/article/details/80447154
https://www.jianshu.com/p/446ed2f267fa
http://blog.itpub.net/15412087/viewspace-2154644/
8)、databus
Databus是一个实时的、可靠的、支持事务的、保持一致性的数据变更抓取系统。 2011年在LinkedIn正式进入生产系统,2013年开源。
Databus通过挖掘数据库日志的方式,将数据库变更实时、可靠的从数据库拉取出来,业务可以通过定制化client实时获取变更。
Databus的传输层端到端延迟是微秒级的,每台服务器每秒可以处理数千次数据吞吐变更事件,同时还支持无限回溯能力和丰富的变更订阅功能。
github:https://github.com/linkedin/databus
databus架构设计:
Databus Relay中继的功能主要包括:
Databus客户端的功能主要包括:
Databus Bootstrap Producer的功能有:
Databus Bootstrap Server的主要功能,监听来自Databus客户端的请求,并返回长期回溯数据变更事件。
更多可以参考 databus社区wiki主页:https://github.com/linkedin/Databus/wiki
Databus和canal的功能对比:
对比项 |
| Databus | canal | 结论 |
---|---|---|---|---|
支持的数据库 |
| mysql, oracle | mysql(据说内部版本支持oracle) | Databus目前支持的数据源更多 |
业务开发 |
| 业务只需要实现事件处理接口 | 事件处理外,需要处理ack/rollback, 反序列化异常等 | Databus开发接口用户友好度更高 |
服务模型 | relay | relay可以同时服务多个client | 一个server instance只能服务一个client (受限于server端保存拉取位点) | Databus服务模式更灵活 |
| client | client可以拉取多个relay的变更, 访问的relay可以指定拉取某些表某些分片的变更 | client只能从一个server拉取变更, 而且只能是拉取全量的变更 | |
可扩展性 |
| client可以线性扩展,处理能力也能线性扩展 (Databus可识别pk,自动做数据分片) | client无法扩展 | Databus扩展性更好 |
可用性 | client ha | client支持cluster模式,每个client处理一部分数据, 某个client挂掉,其他client自动接管对应分片数据 | 主备client模式,主client消费, 如果主client挂掉,备client可自动接管 | Databus实时热备方案更成熟 |
| relay/server ha | 多个relay可连接到同一个数据库, client可以配置多个relay,relay故障启动切换 | 主备relay模式,relay通过zk进行failover | canal主备模式对数据库影响更小 |
| 故障对上游 数据库的影响 | client故障,bootstrap会继续拉取变更, client恢复后直接从bootstrap拉取历史变更 | client故障会阻塞server拉取变更, client恢复会导致server瞬时从数据库拉取大量变更 | Databus本身的故障对数据库影响几乎为0 |
系统状态监控 |
| 程序通过http接口将运行状态暴露给外部 | 暂无 | Databus程序可监控性更好 |
开发语言 |
| java,核心代码16w,测试代码6w | java,4.2w核心代码,6k测试代码 | Databus项目更成熟,当然学习成本也更大 |
9)、gobblin
Gobblin是用来整合各种数据源的通用型ETL框架,在某种意义上,各种数据都可以在这里“一站式”的解决ETL整个过程,专为大数据采集而生,易于操作和监控,提供流式抽取支持。主要用于Kafka的数据同步到HDFS。
该框架来源于kafka的东家LinkedIn。大体的架构如下:
Gobblin的功能真的是非常的全。底层支持三种部署方式,分别是standalone,mapreduce,mapreduce on yarn。可以方便快捷的与Hadoop进行集成,上层有运行时任务调度和状态管理层,可以与Oozie,Azkaban进行整合,同时也支持使用Quartz来调度(standalone模式默认使用Quartz进行调度)。对于失败的任务还拥有多种级别的重试机制,可以充分满足我们的需求。再上层呢就是由6大组件组成的执行单元了。这6大组件的设计也正是Gobblin高度可扩展的原因。
Gobblin组件
Gobblin提供了6个不同的组件接口,因此易于扩展并进行定制化开发。分别是:
Source主要负责将源数据整合到一系列workunits中,并指出对应的extractor是什么。这有点类似于Hadoop的InputFormat。
Extractor则通过workunit指定数据源的信息,例如kafka,指出topic中每个partition的起始offset,用于本次抽取使用。Gobblin使用了watermark的概念,记录每次抽取的数据的起始位置信息。
Converter顾名思义是转换器的意思,即对抽取的数据进行一些过滤、转换操作,例如将byte arrays 或者JSON格式的数据转换为需要输出的格式。转换操作也可以将一条数据映射成0条或多条数据(类似于flatmap操作)。
Quality Checker即质量检测器,有2中类型的checker:record-level和task-level的策略。通过手动策略或可选的策略,将被check的数据输出到外部文件或者给出warning。
Writer就是把导出的数据写出,但是这里并不是直接写出到output file,而是写到一个缓冲路径( staging directory)中。当所有的数据被写完后,才写到输出路径以便被publisher发布。Sink的路径可以包括HDFS或者kafka或者S3中,而格式可以是Avro,Parquet,或者CSV格式。同时Writer也可是根据时间戳,将输出的文件输出到按照“小时”或者“天”命名的目录中。
Publisher就是根据writer写出的路径,将数据输出到最终的路径。同时其提供2种提交机制:完全提交和部分提交;如果是完全提交,则需要等到task成功后才pub,如果是部分提交模式,则当task失败时,有部分在staging directory的数据已经被pub到输出路径了。
Gobblin执行流程
Job被创建后,Runtime就根据Job的部署方式进行执行。Runtime负责job/task的定时执行,状态管理,错误处理以及失败重试,监控和报告等工作。Gobblin存在分支的概念,从数据源获取的数据由不同的分支进行处理。每个分支都可以有自己的Converter,Quality Checker,Writer和Publisher。因此各个分支可以按不同的结构发布到不同的目标地址。单个分支任务失败不会影响其他分支。 同时每一次Job的执行都会将结果持久化到文件( SequenceFiles)中,以便下一次执行时可以读到上次执行的位置信息(例如offset),本次执行可以从上次offset开始执行本次Job。状态的存储会被定期清理,以免出现存储无限增长的情况。
Gobblin详情参考:http://www.imooc.com/article/78811
github源码:https://github.com/apache/incubator-gobblin
10)、MongoShake
MongoShake是阿里巴巴Nosql团队开源出来的一个项目,主要用于mongdb的数据同步到kafka或者其他的mongdb数据库中,MongoShake是一个以golang语言进行编写的通用的平台型服务,通过读取MongoDB集群的Oplog操作日志,对MongoDB的数据进行复制,后续通过操作日志实现特定需求。日志可以提供很多场景化的应用,为此,我们在设计时就考虑了把MongoShake做成通用的平台型服务。通过操作日志,我们提供日志数据订阅消费PUB/SUB功能,可通过SDK、Kafka、MetaQ等方式灵活对接以适应不同场景(如日志订阅、数据中心同步、Cache异步淘汰等)。集群数据同步是其中核心应用场景,通过抓取oplog后进行回放达到同步目的,实现灾备和多活的业务场景。
整体的架构图如下:
应用场景举例
1. MongoDB集群间数据的异步复制,免去业务双写开销。
2. MongoDB集群间数据的镜像备份(当前1.0开源版本支持受限)
3. 日志离线分析
4. 日志订阅
5. 数据路由。根据业务需求,结合日志订阅和过滤机制,可以获取关注的数据,达到数据路由的功能。
6. Cache同步。日志分析的结果,知道哪些Cache可以被淘汰,哪些Cache可以进行预加载,反向推动Cache的更新。
7. 基于日志的集群监控
功能介绍
MongoShake从源库抓取oplog数据,然后发送到各个不同的tunnel通道。源库支持:ReplicaSet,Sharding,Mongod,目的库支持:Mongos,Mongod。现有通道类型有:
1. Direct:直接写入目的MongoDB
2. RPC:通过net/rpc方式连接
3. TCP:通过tcp方式连接
4. File:通过文件方式对接
5. Kafka:通过Kafka方式对接
6. Mock:用于测试,不写入tunnel,抛弃所有数据
数据同步的架构如下图所示
更多详细介绍可以参考官方提供的中文介绍文档:https://yq.aliyun.com/articles/603329
总结:
1、databus活跃度不高,datax和canal 相对比较活跃。
2、datax 一般比较适合于全量数据同步,对全量数据同步效率很高(任务可以拆分,并发同步,所以效率高),对于增量数据同步支持的不太好(可以依靠时间戳+定时调度来实现,但是不能做到实时,延迟较大)。
3、canal 、databus 等由于是通过日志抓取的方式进行同步,所以对增量同步支持的比较好。
4、以上这些工具都缺少一个监控和任务配置调度管理的平台来进行支撑。
作者的原创文章,转载须注明出处。原创文章归作者所有,欢迎转载,但是保留版权。对于转载了博主的原创文章,不标注出处的,作者将依法追究版权,请尊重作者的成果。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。