当前位置:   article > 正文

基于Flink的实时数据同步原理

flink同步数据

数据同步用一个简单的模型可以描述为源端(Source)到目标端(Sink)的数据复制过程。源端通常是数据库比如Mysql、目标端通常是分布式存储系统如HDFS等,在源端和目标端有时需要进行一些数据变换,如下图:

这个过程如果按照同步方式One by One地从源库read记录,然后write目标库,因为是单条记录,很容易控制写入的原子性事务:要么写成功,要么写失败,而且源端read的位置也不会丢失。这种同步方式的最大问题是效率低,因为源端和目标端系统的性能不一致,整体效率取决于较慢的一端,为了提高读写效率和吞吐量,需要将Source和Sink解耦,如基于内存Channel或者基于消息系统Kafka(Kafka本身也是Sink)的生产者和消费者模式,于是上述过程变成如下方式:

基于内存Channel的方式,当同步过程中出现读写异常或者宕机,会导致应用程序出错和产生源端和目标端不一致的脏数据,而且通常无法恢复,因为没有持久化同步过程中的状态信息,比如Datax、Sqoop等采用类似方式。还有一种基于内存Channel的实现框架是Debezium,其定位为支持多种数据库系统的实时数据捕获:

相比现有的Datax和Sqoop等,它有两大点显著性优势:实时和故障可恢复,基于Debezium的数据同步模型为:

先说实时:Source如何实时读取binlog?

从上图可知,Mysql一次主从复制需要有三个线程来实现,其中一个线程(Binlog dump thread)在主服务器上,其它两个线程(Slave I/O thread , Slave SQL thread)在从服务器上。如果一台主服务器配两台从服务器,那主服务器上就会有两个Binlog dump 线程,而每个从服务器上各自有两个线程。

Binlog dump 线程是当有从服务器连接的时候由主服务器创建,用于向Slave发送binlog日志数据,Slave I/O 线程去连接主服务器的Binlog dump 线程并要求其发送binlog日志中记录的更新操作,然后它将Binlog dump 线程发送的数据拷贝到从服务器上的本地文件relay log中。Slave SQL线程是在从服务器上面创建的,主要负责读取由Slave I/O写的relay log文件并执行其中的事件。Slave跟Master创建连接并发送dump指令之后,两端的处理流程为:

  • Master端:

    启动Dump Thread根据Slave发送过来需要dump的文件名和位置从读取binlog数据并发送给Slave的IO Thread。当没有更新日志,该线程将阻塞,直到有新日志事件到来将其唤醒。

  • Slave端:

    IO Thread将接收到的日志Event写入本地relay文件,然后SQL Thread读取relay日志数据将数据反映到Slave数据库中。

从交互流程中可知,从库主动从主库请求 binlog 的副本,而不是主库主动将数据推送到从库。也就是说每个从库都是独立地与主库进行连接,每个从库只能通过向主库请求来接收 binlog 的副本,因此从库能够以自己的速度读取和更新数据库的副本,并且可以随意启动和停止该过程,而不会影响到主库或者其他从库的状态。以下代码(有删减)摘录自debezium项目中跟binlog交互过程:

  1. //debezium#BinaryLogClient.java
  2. public void connect() throws IOException, IllegalStateException {
  3. try{
  4. this.channel = this.openChannel();
  5. GreetingPacket greetingPacket = this.receiveGreeting();
  6. (new Authenticator(greetingPacket, this.channel, this.schema, this.username, this.password)).authenticate();
  7. this.tryUpgradeToSSL(greetingPacket);
  8. this.fetchBinlogFilenameAndPosition();
  9. this.enableHeartbeat();
  10. this.requestBinaryLogStream();
  11. for (LifecycleListener lifecycleListener : lifecycleListeners) {
  12. lifecycleListener.onConnect(this);
  13. }
  14. this.listenForEventPackets();
  15. }finally{
  16. for (LifecycleListener lifecycleListener : lifecycleListeners) {
  17. lifecycleListener.onDisconnect(this);
  18. }
  19. }
  20. }
  21. private void listenForEventPackets() throws IOException {
  22. ByteArrayInputStream inputStream = this.channel.getInputStream();
  23. while(inputStream.peek() != -1) {
  24. int packetLength = inputStream.readInteger(3);
  25. inputStream.skip(1L);
  26. int marker = inputStream.read();
  27. if (marker == 254 && !this.blocking) {
  28. completeShutdown = true;
  29. break;
  30. }
  31. Event event;
  32. try {
  33. event = this.eventDeserializer.nextEvent(packetLength == 16777215 ? new ByteArrayInputStream(this.readPacketSplitInChunks(inputStream, packetLength - 1)) : inputStream);
  34. this.notifyEventListeners(event);
  35. this.updateClientBinlogFilenameAndPosition(event);
  36. }finally {
  37. this.disconnect();
  38. }
  39. }
  40. }
  41. }

再看看Debezium如何做故障恢复,而解释怎么做故障恢复,需要先了解其怎么表示进度信息。Source是Debezium的主要特征,基于Kafka Connector API对接上游源系统,比如Mysql、Sql Server、Postgrel和Mongodb,下游对接Kafka。其中Kafka是可选项,Debezium允许通过Embedding方式集成到应用程序,使用示例为:

  1. // Define the configuration for the Debezium Engine with MySQL connector...
  2. final Properties props = config.asProperties();
  3. props.setProperty("name", "engine");
  4. props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
  5. props.setProperty("offset.storage.file.filename", "/tmp/offsets.dat");
  6. props.setProperty("offset.flush.interval.ms", "60000");
  7. /* begin connector properties */
  8. props.setProperty("database.hostname", "localhost");
  9. props.setProperty("database.port", "3306");
  10. props.setProperty("database.user", "mysqluser");
  11. props.setProperty("database.password", "mysqlpw");
  12. props.setProperty("database.server.name", "my-app-connector");
  13. props.setProperty("database.history",
  14. "io.debezium.relational.history.FileDatabaseHistory");
  15. props.setProperty("database.history.file.filename",
  16. "/path/to/storage/dbhistory.dat");
  17. // Create the engine with this configuration ...
  18. try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
  19. .using(props)
  20. .notifying(record -> {
  21. System.out.println(record);
  22.         }).build()) {
  23. // Run the engine asynchronously ...
  24. ExecutorService executor = Executors.newSingleThreadExecutor();
  25. executor.execute(engine);
  26. // Do something else or wait for a signal or an event
  27. }
  28. // Engine is stopped when the main code is finished

Debezium支持不同的Offset持久化方式:内存持久化

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/1013973
推荐阅读
相关标签
  

闽ICP备14008679号