赞
踩
数据同步用一个简单的模型可以描述为源端(Source)到目标端(Sink)的数据复制过程。源端通常是数据库比如Mysql、目标端通常是分布式存储系统如HDFS等,在源端和目标端有时需要进行一些数据变换,如下图:
这个过程如果按照同步方式One by One地从源库read记录,然后write目标库,因为是单条记录,很容易控制写入的原子性事务:要么写成功,要么写失败,而且源端read的位置也不会丢失。这种同步方式的最大问题是效率低,因为源端和目标端系统的性能不一致,整体效率取决于较慢的一端,为了提高读写效率和吞吐量,需要将Source和Sink解耦,如基于内存Channel或者基于消息系统Kafka(Kafka本身也是Sink)的生产者和消费者模式,于是上述过程变成如下方式:
基于内存Channel的方式,当同步过程中出现读写异常或者宕机,会导致应用程序出错和产生源端和目标端不一致的脏数据,而且通常无法恢复,因为没有持久化同步过程中的状态信息,比如Datax、Sqoop等采用类似方式。还有一种基于内存Channel的实现框架是Debezium,其定位为支持多种数据库系统的实时数据捕获:
相比现有的Datax和Sqoop等,它有两大点显著性优势:实时和故障可恢复,基于Debezium的数据同步模型为:
先说实时:Source如何实时读取binlog?
从上图可知,Mysql一次主从复制需要有三个线程来实现,其中一个线程(Binlog dump thread)在主服务器上,其它两个线程(Slave I/O thread , Slave SQL thread)在从服务器上。如果一台主服务器配两台从服务器,那主服务器上就会有两个Binlog dump 线程,而每个从服务器上各自有两个线程。
Binlog dump 线程是当有从服务器连接的时候由主服务器创建,用于向Slave发送binlog日志数据,Slave I/O 线程去连接主服务器的Binlog dump 线程并要求其发送binlog日志中记录的更新操作,然后它将Binlog dump 线程发送的数据拷贝到从服务器上的本地文件relay log中。Slave SQL线程是在从服务器上面创建的,主要负责读取由Slave I/O写的relay log文件并执行其中的事件。Slave跟Master创建连接并发送dump指令之后,两端的处理流程为:
Master端:
启动Dump Thread根据Slave发送过来需要dump的文件名和位置从读取binlog数据并发送给Slave的IO Thread。当没有更新日志,该线程将阻塞,直到有新日志事件到来将其唤醒。
Slave端:
IO Thread将接收到的日志Event写入本地relay文件,然后SQL Thread读取relay日志数据将数据反映到Slave数据库中。
从交互流程中可知,从库主动从主库请求 binlog 的副本,而不是主库主动将数据推送到从库。也就是说每个从库都是独立地与主库进行连接,每个从库只能通过向主库请求来接收 binlog 的副本,因此从库能够以自己的速度读取和更新数据库的副本,并且可以随意启动和停止该过程,而不会影响到主库或者其他从库的状态。以下代码(有删减)摘录自debezium项目中跟binlog交互过程:
- //debezium#BinaryLogClient.java
- public void connect() throws IOException, IllegalStateException {
- try{
- this.channel = this.openChannel();
- GreetingPacket greetingPacket = this.receiveGreeting();
- (new Authenticator(greetingPacket, this.channel, this.schema, this.username, this.password)).authenticate();
- this.tryUpgradeToSSL(greetingPacket);
- this.fetchBinlogFilenameAndPosition();
- this.enableHeartbeat();
- this.requestBinaryLogStream();
- for (LifecycleListener lifecycleListener : lifecycleListeners) {
- lifecycleListener.onConnect(this);
- }
- this.listenForEventPackets();
- }finally{
- for (LifecycleListener lifecycleListener : lifecycleListeners) {
- lifecycleListener.onDisconnect(this);
- }
- }
- }
-
-
- private void listenForEventPackets() throws IOException {
- ByteArrayInputStream inputStream = this.channel.getInputStream();
- while(inputStream.peek() != -1) {
- int packetLength = inputStream.readInteger(3);
- inputStream.skip(1L);
- int marker = inputStream.read();
- if (marker == 254 && !this.blocking) {
- completeShutdown = true;
- break;
- }
- Event event;
- try {
- event = this.eventDeserializer.nextEvent(packetLength == 16777215 ? new ByteArrayInputStream(this.readPacketSplitInChunks(inputStream, packetLength - 1)) : inputStream);
- this.notifyEventListeners(event);
- this.updateClientBinlogFilenameAndPosition(event);
- }finally {
- this.disconnect();
- }
- }
- }
- }
再看看Debezium如何做故障恢复,而解释怎么做故障恢复,需要先了解其怎么表示进度信息。Source是Debezium的主要特征,基于Kafka Connector API对接上游源系统,比如Mysql、Sql Server、Postgrel和Mongodb,下游对接Kafka。其中Kafka是可选项,Debezium允许通过Embedding方式集成到应用程序,使用示例为:
- // Define the configuration for the Debezium Engine with MySQL connector...
- final Properties props = config.asProperties();
- props.setProperty("name", "engine");
- props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
- props.setProperty("offset.storage.file.filename", "/tmp/offsets.dat");
- props.setProperty("offset.flush.interval.ms", "60000");
- /* begin connector properties */
- props.setProperty("database.hostname", "localhost");
- props.setProperty("database.port", "3306");
- props.setProperty("database.user", "mysqluser");
- props.setProperty("database.password", "mysqlpw");
- props.setProperty("database.server.name", "my-app-connector");
- props.setProperty("database.history",
- "io.debezium.relational.history.FileDatabaseHistory");
- props.setProperty("database.history.file.filename",
- "/path/to/storage/dbhistory.dat");
-
-
- // Create the engine with this configuration ...
- try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
- .using(props)
- .notifying(record -> {
- System.out.println(record);
- }).build()) {
- // Run the engine asynchronously ...
- ExecutorService executor = Executors.newSingleThreadExecutor();
- executor.execute(engine);
- // Do something else or wait for a signal or an event
- }
- // Engine is stopped when the main code is finished
Debezium支持不同的Offset持久化方式:内存持久化
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。