当前位置:   article > 正文

基于Mysql的Flink-cdc_flink mysql

flink mysql

flink-cdc解析

要想深入学习,先去哥的GitHub上去下载源码:https://github.com/BaronND/flink-cdc-connectors

  • 起源背景

数据库的更改对于客户端来说是没有感知的,你需要开启线程去查询,才知道数据有没有更新,但是就算是查询,如果是直接select * from ....,这样获取的结果还要和上次获取的结果对比,才知道数据有没有发生变化,耗时大。要想实时监控mysql数据,要用到mysql binlog日志处理流程,binlog里保存了mysql的DDL和DML,而且是追加模式的,很适合流失数据的处理。我们知道每个taskmanager都有两个网关:输入和输出。他们之间通过netty进行通讯,有了一个buffer数据就可以下发。Source机制其实就是类似,是append模式,是一条一条的追加,不是批处理那样一次全部加载后交给下游,所以他能支持的数据源就不多,而mysql的binglog刚好合适这样的一个模式。

例如canal监听binlog把日志写入到kafka中。Apache Flink实时消费Kakfa的数据实现mysql数据的同步或其他内容等。整体的处理链路较长,需要用到的组件也比较多。虽然kafka是能够用来解耦了,但是也会造成磁盘资源和时间的消耗。Apache Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析。简单来说链路会变成这样。也就是说数据不再通过canal与kafka进行同步,而flink直接进行处理mysql的数据。节省了canal与kafka的过程。

Flink 1.11中实现了mysql-cdc与postgre-CDC,也就是说在Flink 1.11中我们可以直接通过Flink来直接消费mysql,postgresql的数据进行业务的处理。替代了之前的canal+kafka节点.直接通过sql的方式来实现对mysql数据的同步。

  • CDC介绍
  1. CDC简介

CDC(Change Data Capture)变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。它是一个比较广义的概念,只要能捕获变更的数据,我们都可以称为 CDC 。业界主要有基于查询的 CDC 和基于日志的 CDC ,目前flink支持两种内置的connector,PostgreSQL和mysql。应用场景有如下。

1.使用flink sql进行数据同步,可以将数据从一个数据同步到其他的地方,比如mysql、elasticsearch等。

2.可以在源数据库上实时的物化一个聚合视图

3.因为只是增量同步,所以可以实时的低延迟的同步数据

4.维表join

  1. 解决方案

业务系统经常会遇到需要更新数据到多个存储的需求。例如:一个订单系统刚刚开始只需要写入数据库即可完成业务使用。某天 BI 团队期望对数据库做全文索引,于是我们同时要写多一份数据到 ES 中,改造后一段时间,又有需求需要写入到 Redis 缓存中。

 

很明显这种模式是不可持续发展的,这种双写到各个数据存储系统中可能导致不可维护和扩展,数据一致性问题等,需要引入分布式事务,成本和复杂度也随之增加。我们可以通过 CDC(Change Data Capture)工具进行解除耦合,同步到下游需要同步的存储系统。通过这种方式提高系统的稳健性,也方便后续的维护。

 

  • Mysql案例分析

为了设置MySQL CDC连接器,下表提供了使用构建自动化工具(例如Maven或SBT)和带有SQL JAR捆绑包的SQL Client的两个项目的依赖项信息。

1、Maven依赖

<dependency>

<groupId>com.alibaba.ververica</groupId>

<artifactId>flink-connector-mysql-cdc</artifactId>

<version>1.1.0</version>

</dependency>

2、SQL客户端JAR

 下载flink-sql-connector-mysql-cdc-1.1.0.jar并将其放在下 <FLINK_HOME> /lib/。

  1. 创建MySQL用户

必须定义一个对Debezium MySQL连接器监视的所有数据库具有适当权限的MySQL用户。

mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

2、向用户授予所需的权限

mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

3、最终确定用户的权限

mysql> FLUSH PRIVILEGES;

  • 源码解析

其实最主要的就是用MySQLSource类build一个DebeziumSourceFunction出来,也就是MysqlSource就是一个Builder,包括table模块中的MysqlTableSource也会调用MysqlSource来build一个DebeziumSourceFunction,所以今天的主角就是DebeziumSourceFunction。因为上一篇博客flink connector源码分析分析过sourcefunction,所以我们就直接看open和run的逻辑,就不细节讲open和run是如何以及何时调用的,总结上一篇博客来说就是先回调用open初始化sourcefunction一些属性,run就是把数据拉取过来然后emit出去。

1、步骤分析

  1. DebeziumSourceFunction中的open:可以看到open这里就是创建一个线程池,非常简单。
  2. 接下来看看run:这些属性当然是为了给debenium使用。在他的run方法里创建了一个DebeziumChangeConsumer,以及用properties和DebeziumChangeConsumer创建了DebeziumEngine,最后用线程池来执行DebeziumEngine,但是看看DebeziumChangeConsumer和DebeziumEngine两个类,就知道这个sourcefunction很简单。
  3. DebeziumChangeConsumer类的实现接口只有一个方法handleBatch,可以看到这个逻辑非常简单,就是先把debenium获取到的cdc数据先反序列化一波,直接emit到下游了,那问题来了,handleBatch中的参数数据是如何获取的呢,既然是debenium获取的,而且只有一个DebeziumEngine(这个是个runnable)。那咱们就先看看DebeziumEngine,因为DebeziumEngine是debezium的组件跟flink没关系,刚才咱们知道DebeziumEngine是个Runnable(其实也是个接口,默认实现为EmbeddedEngine)。既然是runnable那就主要看看他的run方法。
  4. DebeziumEngine的run方法里首先会创建一个task,然后启动他,明显是启动task去获取任务。接下启动任务之后就会在循环里面poll数据,说明task里面肯定有一个组赛队列,接着handler会处理获取的数据,还记得刚才咱们说的DebeziumChangeConsumer吗,他就是咱们的handler呀,正好刚才咱们还愁着handleBatch中的参数从哪里来,现在看到了吧,就是从这里来。现在咱们知道原来数据是从task的阻塞队列里面的,那么,task启动之后肯定是把数据方法阻塞队列中了,基于这样的猜想咱们来看看task。这里咱们主要看看task的start做了啥
  5. task.start里第一个start不用看,第二个start是MySqlConnectorTask实现的,看类名明显知道这是处理mysql的,其实在start里面会创建好多Reader(BinlogReader用于增量获取,SnapshotReader用于第一次全量拉取),然后放到ChainedReader中。

2、核心逻辑总结

  1. MySqlConnectorTask的ChainedReader包含多个Reader,这些reader就是用来获取全量数据和增量数据,这些数据会放进抽象类AbstractReader中的BlockingQueue中
  2. 在Debezium的run方法中会从task中poll数据
  3. task会从Reader中的blockingQueue拿数据
  4. 数据拿到之后会交给DebezinumConsumer,DebezinumConsumer会先反序列化数据,然后emit给下游
  • MySQL CDC表的创建

1、Sql的方式:

  1. -- register a MySQL table 'orders' in Flink SQL
  2. CREATE TABLE orders (
  3.   order_id INT,
  4.   order_date TIMESTAMP(0),
  5.   customer_name STRING,
  6.   price DECIMAL(10, 5),
  7.   product_id INT,
  8.   order_status BOOLEAN
  9. ) WITH (
  10.   'connector' = 'mysql-cdc',
  11.   'hostname' = 'localhost',
  12.   'port' = '3306',
  13.   'username' = 'root',
  14.   'password' = '123456',
  15.   'database-name' = 'mydb',
  16.   'table-name' = 'orders'
  17. );
  18. -- read snapshot and binlogs from orders table
  19. SELECT * FROM orders;


2、Stream API:

 

  1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  2. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  3. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  4. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  5. public class MySqlBinlogSourceExample {
  6.   public static void main(String[] args) throws Exception {
  7.     SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
  8.       .hostname("localhost")
  9.       .port(3306)
  10.       .databaseList("inventory") // monitor all tables under inventory database
  11.       .username("flinkuser")
  12.       .password("flinkpw")
  13.       .deserializer(new StringDebeziumDeserializationSchema()) // SourceRecord to String
  14.       .build();
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16.     env.addSource(sourceFunction) .print().setParallelism(1);
  17.     env.execute();
  18.   }
  19. }

 

 

  • 特征和常见问题

1.特征

1、Exactly-Once Processing 一次处理 MySQL CDC连接器是Flink Source连接器,它将首先读取数据库快照,然后即使发生故障,也将以完全一次的处理继续读取二进制日志。请阅读连接器如何执行数据库快照。

2、Single Thread Reading 单线程阅读 MySQL CDC源无法并行读取,因为只有一个任务可以接收Binlog事件。

2.常见问题

1、如何跳过快照并仅从binlog中读取?可以通过选项进行控制debezium.snapshot.mode,您可以将其设置为:

never:指定连接永远不要使用快照,并且在第一次使用逻辑服务器名称启动时,连接器应该从binlog的开头读取;请谨慎使用,因为只有在binlog保证包含数据库的整个历史记录时才有效。

schema_only:如果自连接器启动以来不需要数据的连续快照,而只需要它们进行更改,则可以使用该schema_only选项,其中连接器仅对模式(而不是数据)进行快照。

2、如何读取包含多个表(例如user_00,user_01,...,user99)的共享数据库?该table-name选项支持正则表达式以监视多个与正则表达式匹配的表。因此,您可以设置table-name为user.*监视所有user_前缀表。database-name选项相同。请注意,共享表应该在相同的架构中。

3、ConnectException:收到用于处理的DML'...',binlog可能包含使用语句或基于混合的复制格式生成的事件 如果有上述异常,请检查是否binlog_format为ROW,您可以通过show variables like '%binlog_format%'在MySQL客户端中运行来进行检查。请注意,即使binlog_format您的数据库配置为ROW,也可以通过其他会话更改此配置,例如SET SESSION binlog_format='MIXED'; SET SESSION tx_isolation='REPEATABLE-READ'; COMMIT;。还请确保没有其他会话正在更改此配置

代码demo

  1. package com.flink.java.cdc.mysql;
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  3. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  6. public class MySqlBinlogDemo {
  7. public static void main(String[] args) {
  8. SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
  9. .hostname("192.168.100.1")
  10. .port(3306)
  11. // 监视库存数据库下的所有表
  12. // .databaseList("test")
  13. .tableList("test.user_test")
  14. .username("root")
  15. .password("123456")
  16. // 将SourceRecord转换为String
  17. .deserializer(new StringDebeziumDeserializationSchema())
  18. .build();
  19. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20. // StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
  21. // 对接收器使用并行性1以保持消息顺序
  22. env.addSource(sourceFunction).print().setParallelism(1);
  23. try {
  24. env.execute();
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }

2.mysql

  1. package com.flink.java.cdc.mysql;
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  3. import org.apache.flink.connector.jdbc.JdbcOutputFormat;
  4. import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  8. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  9. import org.apache.flink.types.Row;
  10. import java.sql.Types;
  11. public class Mysql2Mysql {
  12. public static void main(String[] args) {
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. env.setParallelism(1);
  15. SourceFunction<Row> sourceFunction = MySQLSource.<Row>builder()
  16. .hostname("192.168.100.1")
  17. .port(3306)
  18. .tableList("test.user_test2")
  19. .username("root")
  20. .password("123456")
  21. // 将SourceRecord转换为Row
  22. .deserializer(new DebeziumDeserialization())
  23. .build();
  24. DataStreamSource<Row> streamSource = env.addSource(sourceFunction);
  25. // String query = "INSERT INTO test.user_test(id,name) VALUES (?,?) on duplicate key update id=VALUES(id);";
  26. String query = "INSERT INTO test.user_test(id,name) VALUES (?,?) on duplicate key update id=VALUES(id);";
  27. JdbcOutputFormat jdbcOutputFormat = JdbcOutputFormat.buildJdbcOutputFormat()
  28. .setDrivername("com.mysql.cj.jdbc.Driver")
  29. .setDBUrl("jdbc:mysql://192.168.100.1:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false")
  30. .setUsername("root")
  31. .setPassword("123456")
  32. .setQuery(query)
  33. .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR})
  34. .setBatchSize(1)
  35. .finish();
  36. streamSource.print();
  37. SinkFunction<Row> jdbcSinkFunction = new GenericJdbcSinkFunction<>(jdbcOutputFormat);
  38. streamSource.addSink(jdbcSinkFunction);
  39. try {
  40. env.execute();
  41. } catch (Exception e) {
  42. e.printStackTrace();
  43. }
  44. }
  45. }
  1. package com.flink.java.cdc.mysql;
  2. import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
  3. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  4. import org.apache.flink.api.common.typeinfo.TypeInformation;
  5. import org.apache.flink.types.Row;
  6. import org.apache.flink.util.Collector;
  7. import org.apache.kafka.connect.data.Schema;
  8. import org.apache.kafka.connect.data.Struct;
  9. import org.apache.kafka.connect.header.Headers;
  10. import org.apache.kafka.connect.source.SourceRecord;
  11. import java.util.Map;
  12. public class DebeziumDeserialization implements DebeziumDeserializationSchema<Row> {
  13. private static final long serialVersionUID = -3168848963265670603L;
  14. @Override
  15. public void deserialize(SourceRecord record, Collector<Row> out) throws Exception {
  16. //SourceRecord
  17. Map<String, ?> sourcePartition = record.sourcePartition();
  18. Map<String, ?> sourceOffset = record.sourceOffset();
  19. //ConnectRecord
  20. Object value = record.value();
  21. Row row = new Row(2);
  22. Struct struct = (Struct) value;
  23. struct = struct.getStruct("after");
  24. //id,name
  25. String id = struct.getString("id");
  26. String name = struct.getString("name");
  27. row.setField(0, id);
  28. row.setField(1, name);
  29. out.collect(row);
  30. }
  31. @Override
  32. public TypeInformation<Row> getProducedType() {
  33. return BasicTypeInfo.of(Row.class);
  34. }
  35. }

需要额外加入的依赖

  1. <!--flink cdc-->
  2. <dependency>
  3. <groupId>mysql</groupId>
  4. <artifactId>mysql-connector-java</artifactId>
  5. <version>8.0.22</version>
  6. </dependency>
  7. <!-- debezium format-->
  8. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-confluent-registry -->
  9. <dependency>
  10. <groupId>org.apache.flink</groupId>
  11. <artifactId>flink-avro-confluent-registry</artifactId>
  12. <version>1.11.1</version>
  13. </dependency>
  14. <!-- flink mysql cdc connector -->
  15. <dependency>
  16. <groupId>com.alibaba.ververica</groupId>
  17. <!-- add the dependency matching your database -->
  18. <artifactId>flink-connector-mysql-cdc</artifactId>
  19. <version>1.1.0</version>
  20. </dependency>

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/598331
推荐阅读
相关标签
  

闽ICP备14008679号