赞
踩
目录
5.6 Canal + Kafka + ClientAdapter
构建实时数据仓库最大的挑战在于从操作型数据源实时抽取数据,即ETL过程中的Extract部分。我们要以全量加增量的方式,实时捕获源系统中所需的所有数据及其变化,而这一切都要在不影响对业务数据库正常操作的前提下进行,目标是要满足高负载、低延迟,难点正在于此,所以需要完全不同于批处理的技术加以实现。当操作型数据进入数据仓库过渡区或ODS以后,就可以利用数据仓库系统软件提供的功能特性进行后续处理,不论是Greenplum、Hive或是其他软件,这些处理往往只需要使用其中一种,相对来说简单一些。
Greenplum作为数据仓库的计算引擎,其数据来源多是业务数据,其中以MySQL为主。本篇将介绍两种主要的从MySQL实时同步数据到Greenplum的解决方案,一是maxwell + Kafka + bireme、二是Canal + Kafka + ClientAdapter,这两个方案的共同点是都使用开源组件,不需要编写代码,只要进行适当配置便可运行。总体来说,两种方案都是基于MySQL binlog捕获数据变化,然后将binlog以数据流的形式传入Kafka消息队列,再以消费的方式将数据变化应用到Greenplum。但是,两者在实现上区别很大,尤其是消费端的不同实现方式使数据载入Greenplum的性能差别巨大。由于主要的MySQL变化数据捕获技术都是基于其复制协议,并以消息系统作为中间组件,所以先会介绍作为基础的MySQL数据复制和Kafka。
抽取数据是ETL处理过程的第一个步骤,也是数据仓库中最重要和最具有挑战性的部分,适当的数据抽取是成功建立数据仓库的关键。
从源抽取数据导入数据仓库或过渡区有两种方式,可以从源把数据抓取出来(拉),也可以请求源把数据发送(推)到数据仓库。影响选择数据抽取方式的一个重要因素是操作型系统的可用性和数据量,这是抽取整个数据集还是仅仅抽取自最后一次抽取以来的变化数据的基础。我们考虑以下两个问题:
对于第二个问题来说,通常要改变或增加操作型业务系统的功能是非常困难的,这种困难不仅体现在技术上,还有来自于业务系统用户及其开发者的阻力。理论上讲,数据仓库不应该要求对源系统做任何改造,实际上也很少由源系统推数据给数据仓库。因此对这个问题的答案比较明确,大都采用拉数据模式。下面我们着重讨论第一个问题。
如果数据量很小并且易处理,一般来说采取完全源数据抽取,就是将所有的文件记录或所有的数据库表数据抽取至数据仓库。这种方式适合基础编码类型的源数据,比如邮政编码、学历、民族等。基础编码型源数据通常是维度表的数据来源。如果源数据量很大,抽取全部数据是不可行的,那么只能抽取变化的源数据,即最后一次抽取以来发生了变化的数据。这种数据抽取模式称为变化数据捕获,简称CDC(Change Data Capture),常被用于抽取操作型系统的事务数据,比如销售订单、用户注册,或各种类型的应用日志记录等。
CDC大体可以分为两种,一种是侵入式的,另一种是非侵入式的。所谓侵入式的是指CDC操作会给源系统带来性能的影响。只要CDC操作以任何一种方式对源库执行了SQL语句,就可以认为是侵入式的CDC。常用的四种CDC方法是:基于时间戳的CDC、基于触发器的CDC、基于快照的CDC、基于日志的CDC,其中前三种是侵入性的。表5-1总结了四种CDC方案的特点。
时间戳 | 触发器 | 快照 | 日志 | |
能区分插入/更新 | 否 | 是 | 是 | 是 |
周期内,检测到多次更新 | 否 | 是 | 否 | 是 |
能检测到删除 | 否 | 是 | 是 | 是 |
不具有侵入性 | 否 | 否 | 否 | 是 |
支持实时 | 否 | 是 | 否 | 是 |
不依赖数据库 | 是 | 否 | 是 | 否 |
表5-1 四种CDC方案比较
基于源数据的CDC要求源数据里有相关的属性列,抽取过程可以利用这些属性列来判断哪些数据是增量数据。最常见的属性列有以下两种。
这种方法的实现较为简单,假设表t1中有一个时间戳字段last_inserted,t2表中有一个自增序列字段id,则下面SQL语句的查询结果就是新增的数据,其中{last_load_time}和{last_load_id}分别表示ETL系统中记录的最后一次数据装载时间和最大自增序列号。
- select * from t1 where last_inserted > {last_load_time};
- select * from t2 where id > {last_load_id};
通常需要建立一个额外的数据库表存储上一次更新时间或上一次抽取的最后一个序列号。在实践中,一般是在一个独立的模式下或在数据过渡区里创建这个参数表。基于时间戳和自增序列的方法是CDC最简单的实现方式,也是最常用的方法,但它的缺点也很明显:
这种方法是具有侵入性的,如果操作型系统中没有时间戳或时间戳信息是不可用的,那么不得不通过修改源系统把时间戳包含进去,首先要求修改操作型系统的表包含一个新的时间戳列,然后建立一个触发器,在修改一行时更新时间戳列的值。在实施这些操作前必须被源系统的拥有者所接受,并且要仔细评估对源系统产生的影响。
有些方案通过高频率扫描递增列的方式实现准实时数据抽取。例如Flume的flume-ng-sql-source插件,缺省每5秒查询一次源表的主键以捕获新增数据,“利用Flume将MySQL表数据准实时抽取到HDFS”展示了一个具体示例。
当执行INSERT、UPDATE、DELETE这些SQL语句时,可以激活数据库里的触发器,并执行一些动作,就是说触发器可以用来捕获变更的数据并把数据保存到中间临时表里。然后这些变更的数据再从临时表中取出,被抽取到数据仓库的过渡区里。但在大多数场合下,不允许向操作型数据库里添加触发器(业务数据库的变动通常都异常慎重),而且这种方法会降低系统的性能,所以此方法用的并不是很多。
作为直接在源数据库上建立触发器的替代方案,可以使用源数据库的复制功能,把源数据库上的数据复制到从库上,在从库上建立触发器以提供CDC功能。尽管这种方法看上去过程冗余,且需要额外的存储空间,但实际上这种方法非常有效,而且没有侵入性。复制是大部分数据库系统的标准功能,如MySQL、Oracle和SQL Server等都有各自的数据复制方案。
如果没有时间戳,也不允许使用触发器,就要使用快照表了。可以通过比较源表和快照表来获得数据变化。快照就是一次性抽取源系统中的全部数据,把这些数据装载到数据仓库的过渡区中。下次需要同步时,再从源系统中抽取全部数据,并把全部数据也放到数据仓库的过渡区中,作为这个表的第二个版本,然后再比较这两个版本的数据,从而找到变化。
有多个方法可以获得这两个版本数据的差异。假设表有两个列id和name,id是主键列。该表的第一、二个版本的快照表名为snapshot_1、snapshot_2。下面的SQL语句在主键id列上做全外链接,并根据主键比较的结果增加一个标志字段,I表示新增,U表示更新,D代表删除,N代表没有变化。外层查询过滤掉没有变化的记录。
- select * from
- (select case when t2.id is null then 'D'
- when t1.id is null then 'I'
- when t1.name <> t2.name then 'U'
- else 'N'
- end as flag,
- case when t2.id is null then t1.id else t2.id end as id,
- t2.name
- from snapshot_1 t1 full outer join snapshot_2 t2 on t1.id = t2.id) a
- where flag <> 'N';
当然,这样的SQL语句需要数据库支持全外链接,对于MySQL这样不支持全外链接的数据库,可以使用类似下面的SQL语句:
- select 'U' as flag, t2.id as id, t2.name as name
- from snapshot_1 t1 inner join snapshot_2 t2 on t1.id = t2.id
- where t1.name != t2.name
- union all
- select 'D' as flag, t1.id as id, t1.name as name
- from snapshot_1 t1 left join snapshot_2 t2 on t1.id = t2.id
- where t2.id is null
- union all
- select 'I' as flag, t2.id as id, t2.name as name
- from snapshot_2 t2 left join snapshot_1 t1 on t2.id = t1.id
- where t1.id is null;
基于快照的CDC可以检测到插入、更新和删除的数据,这是相对于基于时间戳的CDC方案的优点。它的缺点是需要大量的存储空间来保存快照。另外,当表很大时,这种查询会有比较严重的性能问题。
最复杂的和最没有侵入性的CDC方法是基于日志的方式。数据库会把每个插入、更新、删除操作记录到日志里。如使用MySQL数据库,只要在数据库服务器中启用二进制日志binlog(设置log_bin服务器系统变量),之后就可以实时从数据库日志中读取到所有数据库写操作,并使用这些操作来更新数据仓库中的数据。这种方式需要把二进制日志转为可以理解的格式,然后再把里面的操作按照顺序读取出来。
MySQL提供了一个叫做mysqlbinlog的日志读取工具。这个工具可以把二进制的日志格式转换为可读的格式,然后就可以把这种格式的输出保存到文本文件里,或者直接把这种格式的日志应用到MySQL客户端用于数据还原操作。mysqlbinlog工具有很多命令行参数,其中最重要的一组参数可以设置开始/截止时间戳,这样能够只从日志里截取一段时间的日志。另外,日志里的每个日志项都有一个序列号,也可以用来做偏移操作。MySQL的日志提供了上述两种方式来防止CDC过程发生重复或丢失数据的情况。下面是使用mysqlbinlog的两个例子。第一条命令将jbms_binlog.000002文件中从120偏移量以后的操作应用到一个MySQL数据库中。第二条命令将jbms_binlog.000002文件中一段时间的操作格式化输出到一个文本文件中。
- mysqlbinlog --start-position=120 jbms_binlog.000002 | mysql -u root -p123456
- mysqlbinlog --start-date="2011-02-27 13:10:12" --stop-date="2011-02-27 13:47:21" jbms_binlog.000002 > temp/002.txt
使用基于数据库的日志工具也有缺陷,即只能用来处理一种特定的数据库,如果要在异构的数据库环境下使用基于日志的CDC方法,就要使用GoldenGate之类的软件。本篇介绍的两种实时数据同步方案都是使用开源组件完成类似功能。
Maxwell、Canal都可以实时读取MySQL二进制日志,本质上都是将自身伪装成一个从库,利用MySQL原生的主从复制协议获取并处理二进制日志。了解MySQL复制的基本原理有助于理解和使用这些组件。
简单说,复制就是将来自一个MySQL数据库服务器(主库)的数据复制到一个或多个MySQL数据库服务器(从库)。传统的MySQL复制提供了一种简单的Primary-Secondary复制方法,默认情况下,复制是单向异步的。MySQL支持两种复制方式:基于行的复制和基于语句的复制。这两种方式都是通过在主库上记录二进制日志(binlog)、在从库重放中继日志(relylog)的方式来实现异步的数据复制。二进制日志或中继日志中的记录被称为事件。所谓异步包含两层含义,一是主库的二进制日志写入与将其发送到从库是异步进行的,二是从库获取与重放日志事件是异步进行的。这意味着,在同一时间点从库上的数据更新可能落后于主库,并且无法保证主从之间的延迟间隔。
复制给主库增加的开销主要体现在启用二进制日志带来的I/O,但是增加并不大,MySQL官方文档中称开启二进制日志会产生1%的性能损耗。出于对历史事务备份以及从介质失败中恢复的目的,这点开销是非常必要的。除此之外,每个从库也会对主库产生一些负载,例如网络和I/O。当从库读取主库的二进制日志时,也会造成一定的I/O开销。如果从一个主库复制到多个从库,唤醒多个复制线程发送二进制日志内容的开销将会累加。但所有这些复制带来的额外开销相对于应用对MySQL服务器造成的高负载来说都微不足道。
复制的用途主要体现在以下五个方面:
1. 横向扩展
通过复制可以将读操作指向从库来获得更好的读扩展。所有写入和更新都在主库上进行,但读取可能发生在一个或多个从库上。在这种读写分离模型中,主库专用于更新,显然比同时进行读写操作会有更好的写性能。需要注意的是,对于写操作并不适合通过复制来扩展。在一主多从架构中,写操作会被执行多次,正如“木桶效应”,这时整个系统的写性能取决于写入最慢的那部分。
2. 负载均衡
通过MySQL复制可以将读操作分布到多个服务器上,实现对读密集型应用的优化。对于小规模的应用,可以简单地对机器名做硬编码或者使用DNS轮询(将一个机器名指向多个IP地址)。当然也可以使用复杂的方法,例如使用LVS网络负载均衡器等,能够很好地将负载分配到不同的MySQL服务器上。
3. 提高数据安全性
提高数据安全性可以从两方面来理解。其一,因为数据被复制到从库,并且从库可以暂停复制过程,所以可以在从库上执行备份操作而不会影响对应的主库。其二,当主库出现问题时,还有从库的数据可以被访问。但是,对备份来说,复制仅是一项有意义的技术补充,它既不是备份也不能够取代备份。例如,当用户误删除一个表,而且此操作已经在从库上被复制执行,这种情况下只能用备份来恢复。
4. 提高高可用性
复制可以帮助应用程序避免MySQL单点失败,一个包含复制的设计良好的故障切换系统能够显著缩短宕机时间。
5. 滚动升级
比较普遍的做法是,使用一个高版本MySQL作为从库,保证在升级全部实例前,查询能够在从库上按照预期执行。测试没有问题后,将高版本的MySQL切换为主库,并将应用连接至该主库,然后重新搭建高版本的从库。
后面介绍Maxwell和Canal方案时会看到,其架构正是利用了横向扩展中的级联主从拓扑结构,以及从库可以安全暂停复制的特点才得以实现。
MySQL复制依赖二进制日志(binlog),所以要理解复制如何工作,先要了解MySQL的二进制日志。
二进制日志包含描述数据库更改的事件,如建表操作或对表数据的更改等。开启二进制日志有两个重要目的:
不难看出,MySQL二进制日志所起的作用与Oracle的归档日志类似。二进制日志只记录更新数据的事件,不记录SELECT或SHOW等语句。通过设置log-bin系统变量开启二进制日志,不同版本MySQL的缺省配置可能不同,如MySQL 5.6的缺省为不开启,MySQL 8中缺省是开启的。
二进制日志有STATEMENT、ROW、MIXED三种格式,通过binlog-format系统变量设置:
不同版本MySQL的binlog-format参数的缺省值可能不同,如MySQL 5.6的缺省值为STATEMENT,MySQL 8缺省使用ROW格式。二进制日志的存放位置最好设置到与MySQL数据目录不同的磁盘分区,以降低磁盘I/O的竞争,提升性能,并且在数据磁盘故障的时候还可以利用备份和二进制日志恢复数据。
总的来说,MySQL复制有五个步骤:
图5-1更详细地描述了复制的细节。
图5-1 复制如何工作
第一步是在主库上记录二进制日志。每次准备提交事务完成数据更新前,主库将数据更新的事件记录到二进制日志中。MySQL会按事务提交的顺序而非每条语句的执行顺序来记录二进制日志。在记录二进制日志后,主库会告诉存储引擎可以提交事务了。
下一步,从库将主库的二进制日志复制到其本地的中继日志中。首先,从库会启动一个工作线程,称为I/O线程。I/O线程跟主库建立一个普通的客户端连接,然后在主库上启动一个特殊的二进制日志转储(binlog dump)线程,它会读取主库上二进制日志中的事件,但不会对事件进行轮询。如果该线程追赶上了主库,它将进入睡眠状态,直到主库发送信号通知其有新的事件时才会被唤醒,从库I/O线程会将接收到的事件记录到中继日志中。
从库的SQL线程执行最后一步,该线程从中继日志中读取事件并在从库上执行,从而实现从库数据的更新。当SQL线程追赶I/O线程时,中继日志通常已经在系统缓存中,所以读取中继日志的开销很低。SQL线程执行的事件也可以通过log_slave_updates系统变量来决定是否写入其自己的二进制日志中,这可以用于级联复制的场景。
这种复制架构实现了获取事件和重放事件的解耦,允许这两个过程异步进行。也就是说I/O线程能够独立于SQL线程之外工作。但这种架构也限制了复制的过程,其中最重要的一点是在主库上并发更新的查询在从库上通常只能串行化执行,因为缺省只有一个SQL线程来重放中继日志中的事件。在MySQL 5.6以后已经可以通过配置slave_parallel_workers等系统变量进行并行复制,相关细节参见“组提交与多线程复制”。
从MySQL复制中从库的角度看,实际上是实现了一个消息队列的功能。消息就是二进制日志中的事件,持久化存储在中继日志文件里。I/O线程是消息的生产者,向中继日志写数据,SQL线程是消息的消费者,从中继日志读取数据并在目标库上重放。队列是一种先进先出的数据结构,这个简单定义就决定了队列中的数据一定是有序的。在数据复制场景中这种有序性极为重要,如果不能保证事件重放与产生同序,主从库的数据将会不一致,也就失去了数据复制的意义。
中继日志、I/O线程、SQL线程是MySQL内部的实现。在本专题讨论的异构环境中,源是MySQL,目标是Greenplum。作为一种不严格的类比,Maxwell、Canal实现的是类似I/O线程的功能,bireme、Canal的ClientAdapter组件实现的是类似SQL线程的功能。那中继日志呢,是Kafka登场的时候了,当然Kafka比中继日志或消息队列要复杂得多,它是一个完整的消息系统。严格来说,在本实时数据同步场景中,Kafka并不是必须的。比如Canal的TCP服务器模式,就是直接将网络数据包直接发送给消费者进行消费,消息数据只存在于内存中,并不做持久化。这种实现方式用于生产环境很不合适,既有丢失数据的风险,也缺乏必要的管理和监控,引入Kafka正好可以物尽其用。
Kafka是一款基于发布与订阅的分布式消息系统,其数据按照一定顺序持久化保存,并可以按需读取。此外,Kafka的数据分布在整个集群中,具备数据故障保护和性能伸缩能力。
Kafka的数据单元被称为消息,它可以被看作是数据库里的一条记录。消息由字节数组组成,所以对于Kafka来说,消息里的数据没有特别的格式或含义。消息可以有一个可选的元数据,也就是键。与消息一样,键也是一个字节数组,对于Kafka来说也没有特殊的含义。当消息以一种可控的方式写入不同分区时会用到键。最简单的例子就是为键生成一个一致性哈希值,然后使用哈希值对主题分区进行取模,为消息选取分区。这样可以保证具有相同键的消息总是被写到相同的分区上。对数据库来说,通常将表的主键作为消息的键,这是Kafka保证消费顺序的关键所在,后面将详细说明。
为了提高效率,消息被分批次写入Kafka。批次就是一组消息,这些消息属于同一个主题和分区。把消息分批次传输可以减少网络开销。不过,这要在延迟时间和吞吐量之间做出权衡:批次越大,单位时间处理的消息就越多,单个消息的传输时间就越长。批次数据会被压缩,这样可以提升数据的传输和存储能力,但要做更多的计算处理。
Kafka的消息通过主题(topic)进行分类。主题就好比数据库的表,或者文件系统的目录。主题可以被分为若干个分区(partition),一个分区就是一个提交日志。消息以追加的方式写入分区,然后以先进先出的顺序读取。注意,由于一个主题一般包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。如果需要所有消息都是有序的,那么最好只用一个分区。图5-2所示的主题有4个分区,消息被追加写入每个分区的尾部。Kafka通过分区来实现数据冗余和伸缩性。分区可以分布在不同的服务器上,也就是说,一个主题可以横跨多个服务器,以此来提供更强大的性能。
图5-2 包含多个分区的主题
Kafka的客户端就是Kafka系统的用户,它们被分为两种基本类型:生产者和消费者。除此之外,还有其他两个客户端API——用于数据集成的Kafka Connect API和用于流式处理的Kafka Streams。这些客户端API使用生产者和消费者作为内部组件,提供了高级的功能。
生产者(producer)创建消息。一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均匀分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个哈希值,并将其映射到指定分区。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同业务规则将消息映射到分区。
消费者(consumer)读取数据。消费者订阅一个或多个主题,并按消息生成的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取的消息。偏移量(offset)是另一种元数据,它是一个不断递增的整数值,在消息创建时,Kafka会把它添加到消息里。在给定分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在Zookeeper或Kafka中,如果消费者关闭或重启,它的读取状态不会丢失。
消费者是消费者组(consumer group)的一部分,也就是说,会有一个或多个消费者共同读取一个主题。组保证每个分区只能被同组中的一个消费者使用。图5-3所示的组中,有3个消费者同时读取一个主题。其中两个消费者各自读取一个分区,另一个消费者读取其他两个分区。消费者和分区之间的映射通常被称为消费者对分区的所有权关系(ownership)。
通过这种方式,消费者可以消费包含大量消息的主题。而且如果一个消费者失效,组里的其他消费者可以接管失效消费者的工作。
图5-3 消费者组从主题读取消息
一个独立的Kafka服务器被称为broker。broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。根据特定的硬件机器性能特征,单个broker可以轻松处理数千个分区以及每秒百万级的消息量。
broker是集群的组成部分,每个集群都有一个broker同时充当了集群控制器(controller)的角色,它被自动从集群的活跃成员中选举出来,负责将分区分配给broker和监控broker等管理工作。在集群中,一个分区从属于一个broker,该broker被称为分区的首领(leader)。一个分区可以分配给多个broker,这个时候发生分区复制,如图5-4所示。这种复制机制为分区提供了消息冗余,如果有一个broker失效,其他broker可以接管领导权。不过,相关的消费者和生产者都要重新连接到新的首领。
图5-4 集群里的分区复制
消息保存期限(retention)是Kafka的一个重要特性。Kafka broker默认的消息保留策略是这样的:要么保留一段时间(比如7天),要么保留到消息到达一定大小的字节数(比如1GB)。当消息数量达到这些上限时,旧消息就会过期并被删除,所以在任何时刻,可用消息的总量都不会超过配置参数所指定的大小。主题可以配置自己的保留策略,能将消息保留到不再使用它们为止。例如,用于跟踪用户活动的数据可能需要保留几天,而应用程序的度量指标可能只需要保留几小时。可以通过配置把主题当做紧凑型日志(log compacted),只有最后一个带有特定键的消息会被保留下来。这种情况对于变更日志类型的数据比较适用,因为人们只关心最后时刻发生的那个变更。
通常消息的生成速度比消费速度快,显然此时有必要对消费者进行横向扩展。就像多个生产者可以向相同的主题写入消息一样,我们也可以使用多个消费者从同一主题读取消息,对消息进行分流。
Kafka消费者从属于消费者组,一个组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。假设主题T1有4个分区,我们创建了消费者C1,它是组G1里唯一的消费者,我们用它订阅主题T1。消费者C1将收到主题T1全部4个分区的消息,如图5-5所示。
图5-5 1个消费者接收4个分区的消息
如果在组G1里新增一个消费者C2,那么每个消费者将分别从两个分区接收消息。我们假设消费者C1接收分区0和分区2的消息,消费者C2接收分区1和分区3的消息,如图5-6所示。
图5-6 2个消费者接收4个分区的消息
如果组G1有4个消费者,那么每个消费者可以分配到一个分区,如图5-7所示。
图5-7 4个消费者接收4个分区的消息
如果我们往组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收任何消息,如图5-8所示。
图5-8 5个消费者接收4个分区的消息
往群组里增加消费者是横向扩展消费能力的主要方式。Kafka消费者经常会做一些高延迟的操作,比如把数据写到数据库或HDFS,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向扩展的主要手段。我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。不过要注意,不要让消费者数量超过主题分区的数量,多余的消费者只会被闲置。
除了通过增加消费者来横向扩展单个应用程序外,还经常出现多个应用程序从同一个主题读取数据的情况。实际上,Kafka设计的主要目标之一,就是要让主题里的数据能够满足企业各种应用场景的需求。在这些场景里,每个应用程序可以获取到所有的消息,而不只是其中的一部分。只要保证每个应用程序有自己的消费者组,就可以让它们获取到主题的所有消息。横向扩展Kafka消费者或消费者组并不会对性能造成负面影响。
在上面的例子里,如果新增一个只包含一个消费者的组G2,那么这个消费者将从主题T1上接收所有消息,与组G1之间互不影响。组G2可以增加更多的消费者,每个消费者可以消费若干个分区,就像组G1那样,如图5-9所示。总的来说,组G2还是会接收所有消息,不管有没有其他组存在。
简而言之,为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者组,然后往组里添加消费者来扩展读取能力和扩展能力,组里的每个消费者只处理一部分消息。
图5-9 两个消费者组对应一个主题
上一节提到,Kafka只能保证单个分区中消息的顺序,因此如果要求与数据库保持强一致性,最好只使用一个分区。那么,单分区的吞吐量能否满足负载需求呢?下面就在现有环境上做一个测试,以得出有根据的量化的结论。
1. 测量MySQL binlog日志量
测试方法为使用tpcc-mysql工具,执行一段时间的压测,然后查看这段时间产生的binlog文件大小,得出binlog吞吐量。TPC-C是专门针对联机交易处理系统(OLTP系统)的规范,而tpcc-mysql则是percona公司基于TPC-C衍生出来的产品,专用于MySQL基准测试,下载地址为https://github.com/Percona-Lab/tpcc-mysql。关于tpcc-mysql的安装和使用,参见“测试规划”。
(1)从库重置binlog
- reset master;
- show master status;
初始binlog文件名和偏移量分别是mysql-bin.000001和120。
(2)主库执行tpcc测试
- # 10仓库,32并发线程,预热10秒,执行300秒
- ~/tpcc-mysql-master/tpcc_start -h172.16.1.126 -d tpcc_test -u root -p "123456" -w 10 -c 32 -r 10 -l 300
得到的每分钟事务数为:5543.600 TpmC
(3)压测执行结束后,在从库查询binlog日志量
show binary logs;
此时binlog文件名和偏移量分别是mysql-bin.000001和406396209。预热10秒,执行300秒,binlog产生速度为:(406396209-120)/1024/1024/310 ≈ 1.25MB/S。
2. 测量kafka单分区生产者吞吐量
(1)创建topic
- # 创建topic
- kafka-topics.sh --create --topic test --bootstrap-server 172.16.1.124:9092 --partitions 1 --replication-factor 3
- # 查看topic
- kafka-topics.sh --describe --topic test --bootstrap-server 172.16.1.124:9092
创建了一个单分区三副本的topic:
Topic: test Partition: 0 Leader: 339 Replicas: 339,330,340 Isr: 339,330,340
(2)执行测试
kafka-producer-perf-test.sh --topic test --num-records 500000 --record-size 2048 --throughput -1 --producer-props bootstrap.servers=172.16.1.124:9092 acks=1
kafka-producer-perf-test.sh是Kafka提供的生产者性能测试命令行工具,这里所使用的选项说明:
测试结果为:
500000 records sent, 10989.010989 records/sec (21.46 MB/sec), 1267.54 ms avg latency, 1714.00 ms max latency, 1388 ms 50th, 1475 ms 95th, 1496 ms 99th, 1693 ms 99.9th.
可以看到单分区平均吞吐量约21.46 MB/S,平均每秒发送10989条2KB的消息。两相比较,Kafka单分区生产者的消息吞吐量大约是压测binlog吞吐量的17倍。实际生产环境的硬件配置会比本实验环境高得多,单分区吞吐量通常可达100 MB/S。通过这个粗略测试得出的结论是单分区可以承载一般的生产数据库负载。
3. 测量kafka单分区消费者吞吐量
单分区只能有一个消费者(一个消费组中),但可以利用多个线程提高消费性能。
kafka-consumer-perf-test.sh --broker-list 172.16.1.124:9092 --topic test --messages 500000 --threads 1
--threads指定消费线程数,1、3、6、12时的测试结果如下:
- start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
- # 1线程
- 2021-12-09 10:57:19:198, 2021-12-09 10:57:28:921, 976.6543, 100.4478, 500047, 51429.2914, 3034, 6689, 146.0090, 74756.6153
- # 3线程
- 2021-12-09 10:57:52:134, 2021-12-09 10:58:00:280, 976.6543, 119.8937, 500047, 61385.5880, 3039, 5107, 191.2384, 97914.0396
- # 6线程
- 2021-12-09 10:58:58:345, 2021-12-09 10:59:06:495, 976.6543, 119.8349, 500047, 61355.4601, 3031, 5119, 190.7901, 97684.5087
- # 12线程
- 2021-12-09 10:59:16:028, 2021-12-09 10:59:24:093, 976.6543, 121.0979, 500047, 62002.1079, 3031, 5034, 194.0116, 99333.9293
严格说只要涉及多分区,一定会有消费顺序问题。在非强一致性场景中,可以通过选择表的主键作为分区键,以适当避免消费乱序带来的数据一致性问题,同时利用多分区保持Kafka的扩展性。在选择分区数量时,需要考虑如下几个因素。
如果估算出主题的吞吐量和消费者吞吐量,可以用主题吞吐量除以消费者吞吐量算出分区个数。如果不知道这些信息,根据经验,把分区大小限制在25GB以内可以得到比较理想的效果。
本节介绍的方法是采用 maxwell + Kafka + bireme,将MySQL数据实时同步至Greenplum。maxwell实时解析MySQL的binlog,并将输出的JSON格式数据发送到Kafka,Kafka在此方案中主要用于消息中转,bireme负责读取Kafka的消息,并应用于Greenplum数据库以增量同步数据。方法实施的主要流程为如下三步:
本方案的总体架构如图5-10所示。
图5-10 maxwell + Kafka + bireme 架构
图中的maswell从MySQL复制的从库中级联获取binlog,这样做的原因将在5.5.4小节“实时CDC”中详细说明。maxwell是一个能实时读取MySQL二进制日志binlog,并生成JSON 格式的消息,作为生产者发送给Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其他平台的应用程序,其中Kafka是maxwell支持最完善的一个消息系统。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。maswell在GitHub上具有较高的活跃度,官网地址为地址为https://github.com/zendesk/maxwell。
maxwell主要提供了下列功能:
bireme是一个Greenplum数据仓库的增量同步工具,目前支持MySQL、PostgreSQL和MongoDB数据源,maxwell + Kafka 是一种支持的数据源类型。bireme作为Kafka的消费者,采用 DELETE + COPY 的方式,将数据源的修改记录同步到Greenplum,相较于INSERT、UPDATE、DELETE方式,COPY方式速度更快,性能更优。bireme的主要特性是采用小批量加载方式(默认加载延迟时间为10秒钟)提升数据同步的性能,但要求所有同步表在源和目标数据库中都必须有主键。bireme官网地址为https://github.com/HashDataInc/bireme/。
Kafka在本架构中作为消息中间件将maxwell和bireme桥接在一起,上下游组件的实现都依赖于它。正如本节开头所述,搭建Kafka服务是实施本方案的第一步。为了简便,在此实验环境中使用CDH中的Kafka服务,基本信息如下:
ps -ef|grep '/libs/kafka.\{2,40\}.jar'
default.replication.factor参数指定broker级别的复制系数,CDH中的Kafka缺省值为1。这里将该设置改为3,也就是说每个分区总共会被3个不同的broker复制3次。默认情况下,Kafka会确保分区的每个副本被放在不同的broker上。
如果复制系数为N,那么在N-1个broker失效的情况下,仍然能够从主题读取数据或向主题写入数据。所以,更高的复制系数会带来更高的可用性。另一方面,复制系数N需要至少N个broker,而且N个数据副本会占用N倍的磁盘空间。通常要在可用性和存储硬件之间做出权衡。
如果因broker重启导致的主题不可用是可接受的(这在集群里属正常行为),那么把复制系数设为1即可。复制系数为2意味着可以容忍1个broker失效。但是要知道,有时候1个broker发生失效会导致集群不稳定,迫使重启另一个broker——集群控制器,也就是说,如果将复制系数设置为2,就有可能因为重启等问题导致集群暂时不可用。基于以上原因,建议在要求高可用的场景里把复制系数设置为3,大多数情况下这已经足够安全。
下面在Kafka中创建一个topic,在后面配置maxwell时将使用该topic:
- # 设置Kafka可执行文件路径
- export PATH=$PATH:/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/;
- # 创建一个三分区三副本的topic,主要用于演示数据在分区间的均匀分布
- kafka-topics.sh --create --topic mytopic --bootstrap-server 172.16.1.124:9092 --partitions 3 --replication-factor 3
- # 查看topic
- kafka-topics.sh --list --bootstrap-server 172.16.1.124:9092
- # 查看partition
- kafka-topics.sh --describe --topic mytopic --bootstrap-server 172.16.1.124:9092
- # 查看每个分区的大小
- kafka-log-dirs.sh --describe --topic-list mytopic --bootstrap-server 172.16.1.124:9092
mytopic的分区如下:
- Topic: mytopic Partition: 0 Leader: 340 Replicas: 340,339,330 Isr: 340,339,330
- Topic: mytopic Partition: 1 Leader: 330 Replicas: 330,340,339 Isr: 330,340,339
- Topic: mytopic Partition: 2 Leader: 339 Replicas: 339,330,340 Isr: 339,330,340
ISR(In Sync Replica)是Kafka的副本同步机制,leader会维持一个与其保持同步的副本集合,该集合就是ISR,每个分区都有一个ISR,由leader动态维护。我们要保证Kafka不丢消息,就要保证ISR这组集合中至少有一个存活,并且消息成功提交。
本实验环境部署的其他角色如下:
Greenplum集群主机的操作系统版本为CentOS Linux release 7.9.2009 (Core),其他所有主机操作系统版本为CentOS Linux release 7.2.1511 (Core)。MySQL已经开启主从复制,相关配置如下:
- log-bin=mysql-bin # 开启 binlog
- binlog-format=ROW # 选择 ROW 格式
- server_id=126 # 主库server_id,从库为127
- log_slave_updates # 开启级联binlog
我们还事先在MySQL中创建了maxwell用于连接数据库的用户,并授予了相关权限。
- -- 在126主库执行
- create user 'maxwell'@'%' identified by '123456';
- grant all on maxwell.* to 'maxwell'@'%';
- grant select, replication client, replication slave on *.* to 'maxwell'@'%';
MySQL主从复制相关配置参见“配置异步复制”,Greenplum安装部署参见本专题上一篇“Greenplum 实时数据仓库实践(4)——Greenplum安装部署”。
我们在172.16.1.126上搭建maxwell服务。
1. 下载并解压
- wget https://github.com/zendesk/maxwell/releases/download/v1.34.1/maxwell-1.34.1.tar.gz
- tar -zxvf maxwell-1.34.1.tar.gz
2. 修改配置文件
(1)备份示例配置文件
- cd ~/maxwell-1.34.1
- cp config.properties.example config.properties
(2)编辑config.properties文件内容如下
- log_level=info
- metrics_type=http
- http_port=9090
-
- producer=kafka
-
- # Kafka配置
- kafka_topic=mytopic
- kafka.bootstrap.servers=172.16.1.124:9092,172.16.1.125:9092,172.16.1.126:9092
- kafka.compression.type=snappy
- kafka.retries=0
- kafka.acks=1
- kafka.batch.size=16384
- kafka_partition_hash=murmur3
- producer_partition_by=primary_key
-
- # MySQL配置
- host=172.16.1.127
- port=3306
- user=maxwell
- password=123456
-
- filter=exclude: *.*, include: tpcc_test.*, include: source.*, include: test.*
配置项说明:
maxwell完整的配置参数说明参见Reference - Maxwell's Daemon。
3. 启动maxwell
maxwell用Java语言开发,启动maxwell 1.34.1需要JDK 11运行环境,JDK 8报错:
- Error: A JNI error has occurred, please check your installation and try again
- Exception in thread "main" java.lang.UnsupportedClassVersionError: com/zendesk/maxwell/Maxwell has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0
因此先安装JDK 11:
- yum -y install jsvc
- rpm -ivh jdk-11.0.12_linux-x64_bin.rpm
然后启动maxwell:
- export JAVA_HOME=/usr/java/jdk-11.0.12
- cd ~/maxwell-1.34.1
- bin/maxwell --config config.properties --daemon
--config选项指定配置文件,--daemon选项指定maxwell实例作为守护进程到后台运行。maxwell启动时,会在它所连接的MySQL实例中创建一个maxwell数据库,其中包含如下7个表,保存maxwell的元数据。
maxwell成功启动后,将在日志文件中看到类似下面的信息:
- [mysql@node2~/maxwell-1.34.1]$tail /home/mysql/maxwell-1.34.1/bin/../logs/MaxwellDaemon.out
- 15:22:54,297 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000001:406400351
- 15:22:54,302 INFO MaxwellHTTPServer - Maxwell http server starting
- 15:22:54,306 INFO MaxwellHTTPServer - Maxwell http server started on port 9090
- 15:22:54,326 INFO BinaryLogClient - Connected to 172.16.1.127:3306 at mysql-bin.000001/406400351 (sid:6379, cid:5179862)
- 15:22:54,326 INFO BinlogConnectorReplicator - Binlog connected.
- 15:22:54,339 INFO log - Logging initialized @4344ms to org.eclipse.jetty.util.log.Slf4jLog
- 15:22:54,595 INFO Server - jetty-9.4.41.v20210516; built: 2021-05-16T23:56:28.993Z; git: 98607f93c7833e7dc59489b13f3cb0a114fb9f4c; jvm 11.0.12+8-LTS-237
- 15:22:54,710 INFO ContextHandler - Started o.e.j.s.ServletContextHandler@4ceb3f9a{/,null,AVAILABLE}
- 15:22:54,746 INFO AbstractConnector - Started ServerConnector@202c0dbd{HTTP/1.1, (http/1.1)}{0.0.0.0:9090}
- 15:22:54,747 INFO Server - Started @4755ms
- [mysql@node2~/maxwell-1.34.1]$
从http://172.16.1.126:9090/可以获取所有监控指标,指标说明参见Reference - Maxwell's Daemon。
我们在172.16.1.126上搭建bireme服务。
1. 下载并解压
- wget https://github.com/HashDataInc/bireme/releases/download/v2.0.0-alpha-1/bireme-2.0.0-alpha-1.tar.gz
- tar -zxvf bireme-2.0.0-alpha-1.tar.gz
2. 修改配置文件
(1)备份示例配置文件
- cd ~/bireme-2.0.0-alpha-1/etc/
- cp config.properties config.properties.bak
(2)编辑config.properties文件内容如下
- target.url = jdbc:postgresql://114.112.77.198:5432/dw
- target.user = dwtest
- target.passwd = 123456
-
- data_source = mysql
-
- mysql.type = maxwell
- mysql.kafka.server = 172.16.1.124:9092,172.16.1.125:9092,172.16.1.126:9092
- mysql.kafka.topic = mytopic
-
- pipeline.thread_pool.size = 3
-
- state.server.addr = 172.16.1.126
- state.server.port = 9091
配置项说明:
bireme完整的配置参数说明参见https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md。
(3)编辑mysql.properties文件内容如下
test.t1 = public.t1
在config.properties文件中指定数据源为mysql,因此需要新建文件~/bireme-2.0.0-alpha-1/etc/mysql.properties,在其中加入源表到目标表的映射。这里是将MySQL中test.t1表的数据同步到Greenplum的public.t1表中。(4)在源和目标库创建表,注意都要有主键
- -- MySQL,126主库执行
- use test;
- create table t1 (a int primary key);
-
- -- Greenplum
- set search_path=public;
- create table t1 (a int primary key);
在MySQL主库上建表的DDL语句会写到binlog中,并在从库上重放。同样,因为我们建表前已经启动了maxwell,该建表语句也会随binlog传递到maxwell。maxwell可以通过启用output_ddl支持DDL事件捕获,该参数是boolean类型,默认为false。默认配置时不会将DLL的binlog事件发送到Kafka,只会记录到日志文件和maxwell.schemas表中。
如果output_ddl设置为true,除了日志文件和maxwell.schemas表,DDL事件还会被写到由ddl_kafka_topic参数指定的Kafka topic中,默认为kafka_topic。Kafka中的DDL消息需要由消费者实现消费逻辑,bireme不处理DDL。我们使用默认配置,DDL只记录信息,不写入消息,因此只要在目标Greenplum库手工执行同构的DDL语句,使源和目标保持相同的表结构,MySQL中执行的DDL语句就不会影响后面的数据同步。
3. 启动bireme
bireme用Java语言开发,启动bireme 2.0.0需要JDK 8运行环境,用JDK 11报错:
- Bireme JMX enabled by default
- Starting the bireme service...
- Cannot find any VM in Java Home /usr/java/jdk-11.0.12
- Failed to start bireme service
因此先安装JDK 8:
yum -y install java-1.8.0-openjdk.x86_64
然后启动bireme,我这里使用的是安装CDH时自带的JDK 8:
- export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera
- cd ~/bireme-2.0.0-alpha-1
- bin/bireme start
bireme成功启动后,将在控制台看到类似下面的信息:
- Bireme JMX enabled by default
- Starting the bireme service...
- The bireme service has started.
从http://172.16.1.126:9091/?pretty可以获取bireme状态:
- {
- "source_name": "mysql",
- "type": "MAXWELL",
- "pipelines": [
- {
- "name": "mytopic-1",
- "latest": "1970-01-01T08:00:00.000Z",
- "delay": 0.0,
- "state": "NORMAL"
- },
- {
- "name": "mytopic-2",
- "latest": "1970-01-01T08:00:00.000Z",
- "delay": 0.0,
- "state": "NORMAL"
- },
- {
- "name": "mytopic-3",
- "latest": "1970-01-01T08:00:00.000Z",
- "delay": 0.0,
- "state": "NORMAL"
- }
- ]
- }
现在所有服务都已正常,可以进行一些简单的测试:
- -- 在MySQL主库执行一些数据修改
- use test;
- insert into t1 values (1);
- insert into t1 values (2);
- insert into t1 values (3);
- update t1 set a=10 where a=1;
- delete from t1 where a=2;
- commit;
-
- -- 查询Greenplum
- dw=> select * from public.t1;
- a
- ----
- 3
- 10
- (2 rows)
MySQL中的数据变化被实时同步到Greenplum中。
4. 如何保证数据的顺序消费
bireme实现中的一个pipeline就是Kafka中的一个消费者。我们建的topic有三个分区,maxwell在写的时候指定一个主键作为hash key,那么同一主键的相关数据,一定会被分发到同一个分区中去,而单个分区中的数据一定是有序的。消费者从分区中取出数据的时候,也一定是有序的,到这里顺序没有错乱。接着,我们在消费者里可能设置多个线程来并发处理消息(如配置pipeline.thread_pool.size=3),因为如果消费者是单线程,而处理又比较耗时的话,吞吐量太低。一个消费者多线程并发处理就可能出现乱序问题,如图5-11所示。
图5-11 多线程消费造成乱序
解决该问题的方式是写N个内存阻塞队列,具有相同主键的数据都到同一个队列,然后对于N个线程,每个线程分别消费一个队列即可,这样就能保证顺序性,如图5-12所示。bireme就是使用这个模型实现的。
图5-12 用内存阻塞队列解决多线程消费乱序问题
大多数情况下,数据同步被要求在不影响线上业务的情况下联机执行,而且还要求对线上库的影响越小越好。例如,同步过程中对主库加锁会影响对主库的访问,因此通常是不被允许的。本节演示如何在保持对线上库正常读写的前提下,通过全量加增量的方式,完成MySQL到Greenplum的实时数据同步。
为展示完整过程,先做一些清理工作,然后对主库执行tpcc-mysql压测,模拟正在使用的线上业务数据库,在压测执行期间做全部九个测试用表的全量和增量数据同步。
- # 停止maxwell进程
- ps -ef | grep maxwell | grep -v grep | awk '{print $2}' | xargs kill
-
- # 在127从库删除maxwell库
- drop database maxwell;
-
- # 停止bireme服务
- export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera
- cd ~/bireme-2.0.0-alpha-1
- bin/bireme stop
-
- # 清除bireme日志
- cat /dev/null > ~/bireme-2.0.0-alpha-1/logs/bireme.err
- cat /dev/null > ~/bireme-2.0.0-alpha-1/logs/bireme.gc
- cat /dev/null > ~/bireme-2.0.0-alpha-1/logs/bireme.out
alter table history add primary key (h_c_id,h_c_d_id,h_c_w_id,h_d_id,h_w_id,h_date,h_amount,h_data);
- # 10仓库,32并发线程,预热5分钟,执行半小时
- ~/tpcc-mysql-master/tpcc_start -h172.16.1.126 -d tpcc_test -u root -p "123456" -w 10 -c 32 -r 300 -l 1800
(1)在Greenplum中创建模式
模式(schema)是一个有趣的概念,不同数据库系统中的模式代表完全不同的东西。如Oracle中,默认在创建用户的时候,就建立了一个和用户同名的模式,并且互相绑定,因此很多情况下Oracle的用户和模式可以通用。MySQL中的schema是database的同义词。而Greenplum中的模式是从PostgreSQL继承来的,其概念与SQL Server的模式更为类似,是数据库中的逻辑对象。
Greenplum的模式是数据库中对象和数据的逻辑组织。模式允许在一个数据库中存在多个同名的对象,如果对象属于不同的模式,同名对象之间不会冲突。使用schema有如下好处:
比如要设计一个复杂系统,由众多模块构成,有时候模块间又需要具有独立性。各模块存放单独的数据库显然是不合适的。此时就可使用schema来划分各模块间的对象,再对用户进行适当的权限控制,这样逻辑也非常清晰。执行以下操作在Greenplum中创建schema。
- # 连接master
- psql -d dw -U dwtest -h 127.0.0.1
- # 创建schema
- create schema tpcc_test;
- # 修改用户搜索路径
- alter database dw set search_path to public,pg_catalog,tpcc_test;
(2)在tpcc_test模式中创建tpcc-mysql测试用表
tpcc-mysql安装目录下的create_table.sql文件中包含MySQL里的建表脚本。将该SQL脚本改为Greenplum版:
Greenplum是分布式数据库,一般为提高查询性能需要在建表时通过distributed by子句指定分布键。如果表有主键,同时没有指定分布键,则Greenplum自动使用主键作为表的分布键,我们出于简便使用这种方式。关于选择分布键的最佳实践,将在下一篇的建立示例数据仓库环境中加以说明。执行以下操作在tpcc_test模式中建表。
- -- 设置当前模式
- set search_path to tpcc_test;
-
- -- 创建tpcc-mysql测试用的9个表
- ...
-
- create table history (
- h_c_id int,
- h_c_d_id smallint,
- h_c_w_id smallint,
- h_d_id smallint,
- h_w_id smallint,
- h_date timestamp,
- h_amount decimal(6,2),
- h_data varchar(24),
- primary key (h_c_id,h_c_d_id,h_c_w_id,h_d_id,h_w_id,h_date,h_amount,h_data) );
-
- ...
(3)修改bireme表映射配置
编辑~/bireme-2.0.0-alpha-1/etc/mysql.properties内容如下:
- tpcc_test.customer = tpcc_test.customer
- tpcc_test.district = tpcc_test.district
- tpcc_test.history = tpcc_test.history
- tpcc_test.item = tpcc_test.item
- tpcc_test.new_orders = tpcc_test.new_orders
- tpcc_test.order_line = tpcc_test.order_line
- tpcc_test.orders = tpcc_test.orders
- tpcc_test.stock = tpcc_test.stock
- tpcc_test.warehouse = tpcc_test.warehouse
等号左边是MySQL库表名,右边为对应的Greenplum模式及表名。bireme虽然提供了表映射配置文件,但实际只支持Greenplum中的public模式,如果映射其他模式,bireme启动时会报错:
Greenplum table and MySQL table size are inconsistent
通过查看GetPrimaryKeys.java的源代码,发现它在查询Greenplum表的元数据时,使用的是硬编码:
- String tableList = sb.toString().substring(0, sb.toString().length() - 1) + ")";
- String tableSql = "select tablename from pg_tables where schemaname='public' and tablename in "
- + tableList + "";
- String prSql = "SELECT NULL AS TABLE_CAT, "
- + "n.nspname AS TABLE_SCHEM, "
- + "ct.relname AS TABLE_NAME, "
- + "a.attname AS COLUMN_NAME, "
- + "(i.keys).n AS KEY_SEQ, "
- + "ci.relname AS PK_NAME "
- + "FROM pg_catalog.pg_class ct JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid) "
- + "JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid) "
- + "JOIN ( SELECT i.indexrelid, i.indrelid, i.indisprimary, information_schema._pg_expandarray(i.indkey) AS KEYS FROM pg_catalog.pg_index i) i ON (a.attnum = (i.keys).x AND a.attrelid = i.indrelid) "
- + "JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) WHERE TRUE AND n.nspname = 'public' AND ct.relname in "
- + tableList + " AND i.indisprimary ORDER BY TABLE_NAME, pk_name, key_seq";
可以简单修改源码解决此问题,但还有另一个表映射问题。在表映射配置文件中,目标端Greenplum只能使用一个模式而不能自由配置。该问题与bireme的整体实现架构有关,它使用二维数组存储配置项,代码倒是简化了,可逻辑完全不对。这个问题可不像public那么好改,估计得重构才行。
(4)停止127从库复制
- -- 在127从库执行
- stop slave;
这么简单的一句SQL却是实现全量数据同步的关键所在。从库停止复制,不影响主库的正常使用,也就不会影响业务。此时从库的数据处于静止状态,不会产生变化,这使得获取全量数据变得轻而易举。
(5)执行全量数据同步
maxwell提供了一个命令工具 maxwell-bootstrap 帮助我们完成数据初始化,它基于 SELECT * FROM table 的方式进行全量数据读取,不会产生多余的binlog。启动maxwell时,如果使用--bootstrapper=sync,则初始化引导和binlog接收使用同一线程,这意味着所有binlog事件都将被阻止,直到引导完成。如果使用--bootstrapper=async(默认配置),maxwell将产生一个用于引导的单独线程。在这种异步模式下,非引导表将由主线程正常复制,而引导表的binlog事件将排队,并在引导过程结束时发送到复制流。
如果maxwell在下次引导时崩溃,它将完全重新引导全量数据,不管之前的进度如何。如果不需要此行为,则需要手动更新bootstrap表。具体来说,是将未完成的引导程序行标记为“完成”(is_complete=1)或删除该行。
虽然maxwell考虑到了全量数据初始化问题,但bireme却处理不了全量数据消费,会报类似下面的错误:
cn.hashdata.bireme.BiremeException: Not found. Record does not have a field named "w_id"
设置producer=stdout,从控制台可以看到bootstrap和正常insert会生产不同的JSON输出。执行maxwell-bootstrap:
- export JAVA_HOME=/usr/java/jdk-11.0.12
- cd ~/maxwell-1.34.1
- bin/maxwell-bootstrap --config config.properties --database test --table t1 --client_id maxwell
控制台输出:
- {"database":"maxwell","table":"bootstrap","type":"insert","ts":1638778749,"xid":39429616,"commit":true,"data":{"id":1,"database_name":"tpcc_test","table_name":"t1","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":1,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell","comment":null}}
- {"database":"tpcc_test","table":"t1","type":"bootstrap-start","ts":1638778770,"data":{}}
- {"database":"tpcc_test","table":"t1","type":"bootstrap-insert","ts":1638778770,"data":{"a":1}}
- {"database":"tpcc_test","table":"t1","type":"bootstrap-complete","ts":1638778770,"data":{}}
普通insert控制台输出:
{"database":"tpcc_test","table":"t1","type":"insert","ts":1638778816,"xid":39429828,"commit":true,"data":{"a":2}}
bireme不处理bootstrap相关类型,因此这里无法使用maxwell-bootstrap进行全量数据同步。我们执行以下操作,手工将源表的全量数据复制到目标表。
- mkdir tpcc_test_bak
- mysqldump -u root -p123456 -S /data/mysql.sock -t -T ~/tpcc_test_bak tpcc_test customer district history item new_orders order_line orders stock warehouse --fields-terminated-by='|' --single-transaction
scp ~/tpcc_test_bak/*.txt gpadmin@114.112.77.198:/data/tpcc_test_bak/
- # 用gpadmin用户连接数据库
- psql -d dw
-
- -- 用copy命令执行导入
- copy customer from '/data/tpcc_test_bak/customer.txt' with delimiter '|';
- copy district from '/data/tpcc_test_bak/district.txt' with delimiter '|';
- copy history from '/data/tpcc_test_bak/history.txt' with delimiter '|';
- copy item from '/data/tpcc_test_bak/item.txt' with delimiter '|';
- copy new_orders from '/data/tpcc_test_bak/new_orders.txt' with delimiter '|';
- copy order_line from '/data/tpcc_test_bak/order_line.txt' with delimiter '|';
- copy orders from '/data/tpcc_test_bak/orders.txt' with delimiter '|';
- copy stock from '/data/tpcc_test_bak/stock.txt' with delimiter '|';
- copy warehouse from '/data/tpcc_test_bak/warehouse.txt' with delimiter '|';
-
- -- 分析表
- analyze customer;
- analyze district;
- analyze history;
- analyze item;
- analyze new_orders;
- analyze order_line;
- analyze orders;
- analyze stock;
- analyze warehouse;
maxwell是从从库接收binlog,停止复制使得从库的binlog不再发生变化,从而给maxwell提供了一个增量数据同步的初始binlog位点。只要此时启动maxwell与bireme服务,然后开启从库的复制,增量数据就会自动执行同步。
(1)启动maxwell
- export JAVA_HOME=/usr/java/jdk-11.0.12
- cd ~/maxwell-1.34.1
- bin/maxwell --config config.properties --daemon
(2)启动bireme
- export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera
- cd ~/bireme-2.0.0-alpha-1
- bin/bireme start
(3)启动MySQL从库的数据复制
- -- 在127从库执行
- start slave;
(4)查看Kafka消费情况
- export PATH=$PATH:/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/;
- kafka-consumer-groups.sh --bootstrap-server 172.16.1.124:9092 --describe --group bireme
压测结束时,三个消费者的LAG在几秒后都变为0,说明此时源和目标已经实时同步,而且bireme方式的消费延迟很小,几乎是同时完成数据同步。可以对比mysql主库、从库和Greenplum的表数据以确认三者的数据一致性。
聪明如你也许已经想到,如果数据量太大,导致全量同步执行时间过长,以至于MySQL从库的复制停滞太久,在重新启动复制后会不会延迟越拉越久,而永远不能追上主库呢?理论上可能发生这种情况,但实际上不太可能出现。首先,业务库不可能永远满载工作,总有波峰波谷。其次,数据仓库通常只需要同步部分业务数据,而不会应用全部binlog。最后,我们还能采取各种手段加快MySQL主从复制,使从库在一个可接受的时间范围内追上主库,包括:
maxwell + Kafka + bireme 方案的有点主要体现在两点:一是容易上手,只需配置无需编程即可使用;二是消费速度快,这得益于bireme采用的 DELETE + COPY 方法,通过小批次准实时进行数据装载的方式。这种方案的缺点也很明显,bireme的实现比较糟糕。前面已经看到了几处,如不支持DDL,不支持maxwell-bootstrap,不支持源表和目标表的自由映射等。而且,当pipeline.thread_pool.size值设置小于Kafka 分区数时,极易出现Consumer group 'xxx' is rebalancing问题,该问题还不能自行恢复,我极度怀疑这是由实现代码的瑕疵所造成。也难怪,bireme这个个人作品在github上已经四年没更新了。下面介绍更为流行,也是我们所采用的基于Canal的解决方案。
本节介绍的方法和上节类似,只是将Kafka的生产者与消费者换成了Canal Server和Canal Adapter。
本方案的总体架构如图5-13所示。
图5-13 Canal + Kafka + ClientAdapter 架构
Canal是阿里开源的一个的组件,无论功能还是实现上都与maxwell类似。其主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费,工作原理相对比较简单:
图5-14显示了Canal服务器的构成模块。Server代表一个Canal运行实例,对应于一个jvm。Instance对应于一个数据队列,1个Server对应1..n个Instance。Instance模块中,EventParser完成数据源接入,模拟slave与master进行交互并解析协议。EventSink是Parser和Store的连接器,进行数据过滤、加工与分发。EventStore负责存储数据。MetaManager是增量订阅与消费信息管理器。
图5-14 Canal服务器构成模块
Canal 1.1.1版本之后默认支持将Canal Server接收到的binlog数据直接投递到消息队列,目前默认支持的消息系统有Kafka和RocketMQ。早期的Canal仅提供Client API,需要用户自己编写客户端程序实现消费逻辑。Canal 1.1.1版本之后增加了client-adapter,提供客户端数据落地的适配及启动功能。
下面演示安装配置Canal Server和Canal Adapter实现MySQL到Greenplum的实时数据同步。这里使用的环境与5.5.1的相同,MySQL已经配置好主从复制,CDH的Kafka服务正常。我们还事先在MySQL中创建了Canal用于连接数据库的用户,并授予了相关权限。
- -- 在126主库执行
- create user canal identified by 'canal';
- grant select, replication slave, replication client on *.* to 'canal'@'%';
下面在Kafka中创建一个topic,在后面配置Canal时将使用该topic:
kafka-topics.sh --create --topic example --bootstrap-server 172.16.1.124:9092 --partitions 3 --replication-factor 3
example的分区如下:
- Topic: example Partition: 0 Leader: 340 Replicas: 340,339,330 Isr: 340,339,330
- Topic: example Partition: 1 Leader: 339 Replicas: 339,330,340 Isr: 339,330,340
- Topic: example Partition: 2 Leader: 330 Replicas: 330,340,339 Isr: 330,340,339
Client-Adapter在1.14版本为了解决对MySQL关键字的兼容问题引入了一个BUG,使它只能兼容MySQL,在向Greenplum插入数据时会报错:
- canal 1.1.5 bug
- 2021-10-08 16:51:09.347 [pool-2-thread-1] ERROR com.alibaba.otter.canal.client.adapter.support.Util - ERROR: syntax error at or near "`"
- Position: 15
- org.postgresql.util.PSQLException: ERROR: syntax error at or near "`"
- Position: 15
该问题说明详见https://github.com/alibaba/canal/pull/3020,直到Adapter 1.1.5依然没有解决。1.1.3版本还没有出现此问题,因此我们选择使用Canal 1.1.3,要求JDK 1.8以上。
我们在172.16.1.126上运行Canal Server。
1. 下载并解压
- wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
- tar -zxvf canal.deployer-1.1.3.tar.gz -C ~/canal_113/deployer/
2. 修改配置文件
(1)编辑Canal配置文件/home/mysql/canal_113/deployer/conf/canal.properties,修改以下配置项。
- # Canal服务器模式
- canal.serverMode = kafka
- # Kafka服务器地址
- canal.mq.servers = 172.16.1.124:9092,172.16.1.125:9092:172.16.1.126:9092
canal.properties是Canal服务器配置文件,其中包括三部分定义:
(2)编辑instance配置文件/home/mysql/canal_113/deployer/conf/example/instance.properties,修改以下配置项。
- # Canal实例对应的MySQL master
- canal.instance.master.address=172.16.1.127:3306
- # 注释canal.mq.partition配置项
- # canal.mq.partition=0
- # Kafka topic分区数
- canal.mq.partitionsNum=3
- # 哈希分区规则,指定所有正则匹配的表对应的哈希字段为表主键
- canal.mq.partitionHash=.*\\..*:$pk$
instance.properties是Canal实例配置文件,在canal.properties定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名目录。例如缺省配置:
- canal.destinations = example
- canal.conf.dir = ../conf
这时需要在canal.properties所在目录中存在(或创建)example目录,example目录里有一个instance.properties文件。
与上节介绍的bireme类似,Canal同样存在消息队列的顺序性问题。Canal目前选择支持的Kafka/rocketmq,本质上都是基于本地文件的方式来支持分区级的顺序消息能力,也就是binlog写入消息队列可以有一些顺序性保障,这取决于用户的参数选择。
Canal支持消息队列数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区。canal.mq.dynamicTopic参数主要控制是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic,或默认topic。canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、主键级做分区等。
Canal的消费顺序性,主要取决于路由选择,例如:
如果事先创建好topic,canal.mq.partitionsNum参数值不能大于该topic的分区数。如果选择在开始向Kafka发送消息时自动创建topic,则canal.mq.partitionsNum值不能大于Kafka的 num.partitions参数值。否则在Canal Server启动时报错:
ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - Invalid partition given with record: 1 is not in the range [0...1).
3. 启动Canal Server
执行下面的命令启动Canal Server:
~/canal_113/deployer/bin/startup.sh
Canal Server成功启动后,将在日志文件/home/mysql/canal_113/deployer/logs/canal/canal.log中看到类似下面的信息:
2021-12-14 16:29:42.724 [destination = example , address = /172.16.1.127:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=333,serverId=127,gtid=,timestamp=1639466655000] cost : 688ms , the next step is binlog dump
从MySQL可以看到canal用户创建的dump线程:
- *************************** 4. row ***************************
- Id: 5739453
- User: canal
- Host: 172.16.1.126:22423
- db: NULL
- Command: Binlog Dump
- Time: 123
- State: Master has sent all binlog to slave; waiting for binlog to be updated
- Info: NULL
我们在172.16.1.126上运行Canal Adapter。
1. 下载并解压
- wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz
- tar -zxvf canal.adapter-1.1.3.tar.gz -C ~/canal_113/adapter/
2. 修改配置文件
(1)编辑启动器配置文件/home/mysql/canal_113/adapter/conf/application.yml,内容如下。
- server:
- port: 8081 # REST 端口号
- spring:
- jackson:
- date-format: yyyy-MM-dd HH:mm:ss
- time-zone: GMT+8
- default-property-inclusion: non_null
-
- canal.conf:
- mode: kafka # canal client的模式: tcp kafka rocketMQ
- canalServerHost: 127.0.0.1:11111 # 对应单机模式下的canal server的ip:port
- mqServers: 172.16.1.124:9092,172.16.1.125:9092:172.16.1.126:9092 # kafka或rocketMQ地址
- batchSize: 5000 # 每次获取数据的批大小, 单位为K
- syncBatchSize: 10000 # 每次同步的批数量
- retries: 0 # 重试次数, -1为无限重试
- timeout: # 同步超时时间, 单位毫秒
- accessKey:
- secretKey:
- canalAdapters: # 适配器列表
- - instance: example # canal 实例名或者 MQ topic 名
- groups: # 消费分组列表
- - groupId: g1 # 分组id,如果是MQ模式将用到该值
- outerAdapters: # 分组内适配器列表
- - name: logger # 日志适配器
- - name: rdb # 指定为rdb类型同步
- key: Greenplum # 适配器唯一标识,与表映射配置中outerAdapterKey对应
- properties: # 目标库jdbc相关参数
- jdbc.driverClassName: org.postgresql.Driver # jdbc驱动名,部分jdbc的jar包需要自行放致lib目录下
- jdbc.url: jdbc:postgresql://114.112.77.198:5432/dw # jdbc url
- jdbc.username: dwtest # jdbc username
- jdbc.password: 123456 # jdbc password
- threads: 10 # 并行执行的线程数, 默认为1
- commitSize: 30000 # 批次提交的最大行数
1.1.3版本的ClientAdapter支持如下功能:
适配器将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文件。
(2)编辑RDB表映射文件/home/mysql/canal_113/adapter/conf/rdb/t1.yml,内容如下。
- dataSourceKey: defaultDS # 源数据源的key
- destination: example # cannal的instance或者MQ的topic
- groupId: g1 # 对应MQ模式下的groupId,只会同步对应groupId的数据
- outerAdapterKey: Greenplum # adapter key, 对应上面配置outAdapters中的key
- concurrent: true # 是否按主键hash并行同步,并行同步的表必须保证主键不会更改,及不存在依赖该主键的其他同步表上的外键约束。
- dbMapping:
- database: test # 源数据源的database/shcema
- table: t1 # 源数据源表名
- targetTable: public.t1 # 目标数据源的模式名.表名
- targetPk: # 主键映射
- a: a # 如果是复合主键可以换行映射多个
- # mapAll: true # 是否整表映射,要求源表和目标表字段名一模一样。如果targetColumns也配置了映射,则以targetColumns配置为准。
- targetColumns: # 字段映射,格式: 目标表字段: 源表字段,如果字段名一样源表字段名可不填。
- a: a
- commitBatch: 30000 # 批量提交的大小
RDB adapter 用于适配MySQL到关系型数据库(需支持jdbc)的数据同步及导入。
3. 启动Canal Adapter
执行下面的命令启动Canal Adapter:
~/canal_113/adapter/bin/startup.sh
Canal Adapter成功启动后,将在日志文件/home/mysql/canal_113/adapter/logs/adapter/adapter.log中看到类似下面的信息:
- 2021-12-14 17:40:37.206 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed
- ...
- 2021-12-14 17:40:37.384 [Thread-5] INFO c.a.o.c.adapter.launcher.loader.CanalAdapterKafkaWorker - =============> Start to subscribe topic: example <=============
- 2021-12-14 17:40:37.385 [Thread-5] INFO c.a.o.c.adapter.launcher.loader.CanalAdapterKafkaWorker - =============> Subscribe topic: example succeed <=============
- ...
现在所有服务都已正常,可以进行一些简单的测试:
- -- 在MySQL主库执行一些数据修改
- use test;
- insert into t1 values (4),(5),(6);
- update t1 set a=30 where a=3;
- delete from t1 where a=10;
- commit;
-
- -- 查询Greenplum
- dw=> select * from public.t1;
- a
- ----
- 30
- 6
- 5
- 4
- (4 rows)
MySQL中的数据变化被实时同步到Greenplum中。
Canal的高可用不是服务器级别,而是基于实例的,一个实例对应一个MySQL实例。Canal通过将增量订阅&消费的关系信息持久化存储在Zookeeper中,保证数据集群共享,以支持HA模式。我们在172.16.1.127上再安装一个Canal Server,然后对126、127上的Canal Server进行HA模式配置。配置中所使用的Zookeeper是Kafka同一CDH集群中的Zookeeper服务。
(1)在127上安装Canal Server
- wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
- tar -zxvf canal.deployer-1.1.3.tar.gz -C ~/canal_113/deployer/
(2)修改canal.properties文件,加上zookeeper配置。
- # 两个Canal Server的配置相同
- canal.zkServers = 172.16.1.125:2181,172.16.1.126:2181,172.16.1.127:2181
- canal.instance.global.spring.xml = classpath:spring/default-instance.xml
- # 注释下面行
- # canal.instance.global.spring.xml = classpath:spring/file-instance.xml
(3)修改example/instance.properties文件
canal.instance.mysql.slaveId=1126 # 另一台机器改成1127,保证slaveId不重复即可
注意,两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置。
(4)启动两台机器的Canal Server
~/canal_113/deployer/bin/startup.sh
启动后可以查看logs/example/example.log,只会看到一台机器上出现了启动成功的日志,如这里启动成功的是126:
2021-12-15 11:23:46.593 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
zookeeper中记录了集群信息:
- [zk: localhost:2181(CONNECTED) 6] ls /otter/canal/destinations/example/cluster
- [172.16.1.127:11111, 172.16.1.126:11111]
(1)修改Adapter启动器配置文件application.yml,加上zookeeper配置。
- # 注释下面一行
- # canalServerHost: 127.0.0.1:11111
- zookeeperHosts: 172.16.1.125:2181,172.16.1.126:2181,172.16.1.127:2181
(2)重启Adapter
- ~/canal_113/adapter/bin/stop.sh
- ~/canal_113/adapter/bin/startup.sh
Adapter会自动从zookeeper中的running节点获取当前服务的工作节点,然后与其建立连接。连接成功后,Canal Server会记录当前正在工作的服务器信息:
- [zk: localhost:2181(CONNECTED) 9] get /otter/canal/destinations/example/running
- {"active":true,"address":"172.16.1.126:11111","cid":1}
现在进行一些简单的测试验证功能是否正常:
- -- 在MySQL主库执行一些数据修改
- use test;
- insert into t1 values (7),(8),(9);
- update t1 set a=40 where a=4;
- delete from t1 where a in (5,6,7);
- commit;
-
- -- 查询Greenplum
- dw=> select * from public.t1;
- a
- ----
- 30
- 8
- 9
- 40
- (4 rows)
MySQL中的数据变化被实时同步到Greenplum中,所有组件工作正常。
数据消费成功后,Canal Server会在zookeeper中记录下当前最后一次消费成功的binlog位点,下次重启客户端时时,会从这最后一个位点继续进行消费。
- [zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/1001/cursor
- {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"node3","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":4270,"serverId":126,"timestamp":1639541811000}}
停止正在工作的126的Canal Server:
~/canal_113/deployer/bin/stop.sh
这时127会立马启动example instance,提供新的数据服务:
- [zk: localhost:2181(CONNECTED) 20] get /otter/canal/destinations/example/running
- {"active":true,"address":"172.16.1.127:11111","cid":1}
验证功能是否正常:
- -- 在MySQL主库执行一些数据修改
- use test;
- insert into t1 values (1),(2);
- update t1 set a=3 where a=30;
- delete from t1 where a in (8,9,40);
- commit;
-
- -- 查询Greenplum
- dw=> select * from public.t1;
- a
- ---
- 1
- 3
- 2
- (3 rows)
启动126的Canal Server,它将再次被添加到集群中:
- [zk: localhost:2181(CONNECTED) 22] ls /otter/canal/destinations/example/cluster
- [172.16.1.127:11111, 172.16.1.126:11111]
我们依然可以使用上节介绍的方法,进行全量加增量的实时数据同步。工作原理和操作步骤别无二致,只是实现的组件变了,maxwell替换为Canal Server,bireme替换为Canal Adapter,而这对于数据仓库用户来说完全透明。
也许在初始化数据同步之前需要进行必要的清理工作:
- ~/canal_113/deployer/bin/stop.sh
- ~/canal_113/adapter/bin/stop.sh
- rm ~/canal_113/deployer/conf/example/meta.dat
- rm ~/canal_113/deployer/conf/example/h2.mv.db
- cat /dev/null > ~/canal_113/adapter/logs/adapter/adapter.log
- cat /dev/null > ~/canal_113/deployer/logs/canal/canal.log
- cat /dev/null > ~/canal_113/deployer/logs/example/example.log
meta.dat文件的内容是个json串,存储实例最后获取的binlog位点信息,例如:
{"clientDatas":[{"clientIdentity":{"clientId":1001,"destination":"example","filter":""},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"node3","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":3707,"serverId":126,"timestamp":1639531036000}}}],"destination":"example"}
Canal重启时,从meta.dat文件中journalName的起始位置开始同步,如果此时获取到的binlog信息在MySQL中已被清除,启动将会失败。通常在MySQL执行了reset master、purge binary logs等操作,或修改完Canal的instance.properties配置后,重启Canal Server前需要删除meta.dat文件。
h2.mv.db是Canal的表元数据存储数据库。当MySQL修改了表结构,根据binlog的DDL语句,将该时刻表结构元数据信息在h2.mv.db的meta_snapshot、meta_history等表中。改设计主要为解决MySQL某一时刻发生DDL变更,如果回溯时间跨越DDL变更的时刻,产生解析字段不一致的问题。目前Canal Adapter不支持DDL。
清理后的Canal处于一个全新的初始状态,此时可以在不影响业务数据库正常访问的前提下进行实时数据同步,主要操作步骤归纳如下:
有别于bireme的DELETE + COPY,Canal Adapter在Greenplum上逐条执行INSERT、UPDATE、DELETE语句。从日志中清晰可见:
- 2021-12-15 12:45:37.597 [pool-24-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"a":3}],"database":"test","destination":"example","es":1639543537000,"groupId":"g1","isDdl":false,"old":[{"a":30}],"pkNames":null,"sql":"","table":"t1","ts":1639543512399,"type":"UPDATE"}
- 2021-12-15 12:45:38.308 [pool-24-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"a":1},{"a":2}],"database":"test","destination":"example","es":1639543537000,"groupId":"g1","isDdl":false,"old":null,"pkNames":null,"sql":"","table":"t1","ts":1639543512393,"type":"INSERT"}
- 2021-12-15 12:46:02.934 [pool-24-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"a":8},{"a":40}],"database":"test","destination":"example","es":1639543562000,"groupId":"g1","isDdl":false,"old":null,"pkNames":null,"sql":"","table":"t1","ts":1639543537814,"type":"DELETE"}
正如上节所见,压测中bireme的批次方式与MySQL的执行速度大差不差,而Canal Adapter的这种方式在Greenplum中的执行速度会比MySQL中慢得多。下面我们还是使用tpcc-mysql压测制造MySQL负载,然后执行脚本监控消费延迟。Greenplum中已经创建好tpcc-mysql测试所用的9张表,需要配置这些表的映射关系,为每张表生成一个yml文件:
- [mysql@node2~]$ls -l /home/mysql/canal_113/adapter/conf/rdb
- total 40
- -rw-r--r-- 1 mysql mysql 825 Oct 11 11:13 customer.yml
- -rw-r--r-- 1 mysql mysql 538 Oct 11 11:18 district.yml
- -rw-r--r-- 1 mysql mysql 603 Oct 22 15:38 history.yml
- -rw-r--r-- 1 mysql mysql 379 Oct 11 11:23 item.yml
- -rw-r--r-- 1 mysql mysql 407 Oct 11 11:26 new_orders.yml
- -rw-r--r-- 1 mysql mysql 631 Oct 11 11:30 order_line.yml
- -rw-r--r-- 1 mysql mysql 506 Oct 11 11:33 orders.yml
- -rw-r--r-- 1 mysql mysql 720 Oct 11 11:44 stock.yml
- -rw-r--r-- 1 mysql mysql 275 Dec 14 17:37 t1.yml
- -rw-r--r-- 1 mysql mysql 473 Oct 11 11:50 warehouse.yml
- [mysql@node2~]$
history.yml文件内容如下,其他表映射配置文件类似:
- dataSourceKey: defaultDS
- destination: example
- groupId: g1
- outerAdapterKey: Greenplum
- concurrent: true
- dbMapping:
- database: tpcc_test
- table: history
- targetTable: tpcc_test.history
- targetPk:
- h_c_id: h_c_id
- h_c_d_id: h_c_d_id
- h_c_w_id: h_c_w_id
- h_d_id: h_d_id
- h_w_id: h_w_id
- h_date: h_date
- h_amount: h_amount
- h_data: h_data
- # mapAll: true
- targetColumns:
- h_c_id: h_c_id
- h_c_d_id: h_c_d_id
- h_c_w_id: h_c_w_id
- h_d_id: h_d_id
- h_w_id: h_w_id
- h_date: h_date
- h_amount: h_amount
- h_data: h_data
- commitBatch: 30000
消费延迟监控脚本lag.sh内容如下:
- #!/bin/bash
- export PATH=$PATH:/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/;
-
- ~/tpcc-mysql-master/tpcc_start -h172.16.1.126 -d tpcc_test -u root -p "123456" -w 10 -c 32 -r 60 -l 540 > tpcc_test.log 2>&1
-
- rate1=0
- for ((i=1; i<=10; i++))
- do
- startTime=`date +%Y%m%d-%H:%M:%S`
- startTime_s=`date +%s`
-
- lag=$(kafka-consumer-groups.sh --bootstrap-server 172.16.1.124:9092 --describe --group $1 |sed -n "3, 5p" | awk '{lag+=$5}END{print lag}')
- lag1=$lag
-
- sleep 60
-
- lag=$(kafka-consumer-groups.sh --bootstrap-server 172.16.1.124:9092 --describe --group $1 |sed -n "3, 5p" | awk '{lag+=$5}END{print lag}')
-
- endTime=`date +%Y%m%d-%H:%M:%S`
- endTime_s=`date +%s`
-
- lag2=$lag
-
- lag=$(($lag1 - $lag2))
- sumTime=$[ $endTime_s - $startTime_s ]
- rate=`expr $lag / $sumTime`
-
- rate1=$(($rate1 + $rate))
- echo "$startTime ---> $endTime" "Total:$sumTime seconds, consume $rate messages per second."
- done
-
- echo;
- avg_rate=`expr $rate1 / 10`;
- left=`expr $lag2 / $avg_rate`
- echo "It will take about $left seconds to complete the consumption."
说明:
在数据同步使用的所有组件工作正常的情况下执行脚本,命令行参数是消费组名称(groupId参数指定):
./lag.sh g1
输出结果如下:
- 20211216-11:33:05 ---> 20211216-11:34:11 Total:66 seconds, consume 1509 messages per second.
- 20211216-11:34:11 ---> 20211216-11:35:17 Total:66 seconds, consume 1488 messages per second.
- 20211216-11:35:17 ---> 20211216-11:36:23 Total:66 seconds, consume 1508 messages per second.
- 20211216-11:36:23 ---> 20211216-11:37:29 Total:66 seconds, consume 1511 messages per second.
- 20211216-11:37:29 ---> 20211216-11:38:35 Total:66 seconds, consume 1465 messages per second.
- 20211216-11:38:35 ---> 20211216-11:39:41 Total:66 seconds, consume 1510 messages per second.
- 20211216-11:39:41 ---> 20211216-11:40:48 Total:67 seconds, consume 1444 messages per second.
- 20211216-11:40:48 ---> 20211216-11:41:54 Total:66 seconds, consume 1511 messages per second.
- 20211216-11:41:54 ---> 20211216-11:43:00 Total:66 seconds, consume 1465 messages per second.
- 20211216-11:43:00 ---> 20211216-11:44:06 Total:66 seconds, consume 1465 messages per second.
-
- It will take about 387 seconds to complete the consumption.
在本实验环境中,MySQL执行10分钟的压测负载,Greenplum大约需要执行约27分半(kafka-consumer-groups.sh命令本身需要约6秒的执行时间),两者的QPS相差2.75倍。由此也可以看出,Greenplum作为分布式数据库,专为分析型数据仓库场景所设计,单条DML的执行效率远没有MySQL这种主机型数据库高,并不适合高并发小事务的OLTP型应用。
虽然性能上比bireme的微批处理慢不少,但Canal Adapter的INSERT、UPDATE、DELETE处理方式,使得用类似于数据库触发器的功能实现自动实时ETL成为可能,这也是下一篇所要讨论的主题。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。