赞
踩
CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
基于查询的 CDC | 基于 Binlog 的 CDC | |
---|---|---|
开源产品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
执行模式 | Batch | Streaming |
是否可以捕获所有数据变化 | 否 | 是 |
延迟性 | 高延迟 | 低延迟 |
是否增加数据库压力 | 是 | 否 |
Flink CDC 是一个内置了 Debezium 的基于 Binlog 的可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。开源地址:https://github.com/ververica/flink-cdc-connectors
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
public class FlinkCDC { public static void main(String[] args) throws Exception { //1. 创建 Flink 执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序 //1.1 开启 Checkpoint,每隔 5 秒钟做一次 CK env.enableCheckpointing(5000L); //1.2 指定 CK 的一致性语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //1.3 设置任务关闭的时候保留最后一次 CK 数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //1.4 指定从 CK 自动重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); //1.5 设置状态后端 env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC")); //1.6 设置访问 HDFS 的用户名 System.setProperty("HADOOP_USER_NAME", "lgb"); //2. 创建 FlinkCDC Source /* StartupOptions 有 5 种类型: 1. initial:默认,先使用查询的方式读取表中所有的数据,然后再从 binlog 的最近位置监控读取 2. earliest:从 binlog 最开始的位置读取,要求在数据库创建之前就开启了 binlog 3. latest:从 binlog 的最近位置监控读取 4. specificOffset:从 binlog 的指定位置读取 5. timestamp:从 binlog 的指定时间戳读取 */ DebeziumSourceFunction<String> mysqlSource = MysqlSource.<String>builder() .hostname("hadoop102") //Mysql所在主机名 .port(3306) //mysql端口号 .username("root") //登录mysql用户名 .password("123456") //登录mysql密码 .databaseList("cdc_test") //监控的数据库列表,可变参数 .tableList("cdc_test.user_info") //监控的数据表,不指定则监控数据库下所有表 .deserializer(new StringDebeziumDeserializationSchema()) //反序列化器 .startupOptions(StartupOptions.initial()) //指定读取策略 .build(); //3. 通过 FlinkCDC Source 创建 DataStream DataStream<String> dataStream = env.addSource(mysqlSource); //4. 打印输出流 dataStream.print(); //5. 启动任务 env.execute("FlinkCDC"); } }
将 FlinkCDC 程序进行打包并上传到集群
启动 Hadoop、zookeeper 和 Flink 集群
运行 FlinkCDC 程序
bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
给当前的 Flink 程序创建 Savepoint
bin/flink savepoint [JobId] hdfs://hadoop102:8020/flink/save
停止 FlinkCDC 程序
在Mysql数据表中进行增删改操作
从 Savepoint 重启程序查看程序输出结果
bin/flink run -s hdfs://hadoop102:8020/flink/save/[JobId] -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
2.0.0 版本的 FlinkCDC 通过 FlinkSQL 实现需要 1.13+ 版本的 Flink 支持
public class FlinkSQLCDC { public static void main(String[] args) throws Exception { //1. 创建 Flink 执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2. 创建 FlinkSQL 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //3. 配置 FlinkSQLCDC 监控单表(只能监控单表),不需要指定反序列化器,读取模式只有 initial 和 latest-offset tableEnv.executeSql( "create table user_info (" + "id String primary key, name String, sex String) with (" + " 'connector' = 'mysql-cdc'," + " 'scan.startup.mode' = 'initial'," + " 'hostname' = 'hadoop102'," + " 'port' = '3306'," + " 'username' = 'root'," + " 'password' = '123456'," + " 'database-name' = 'cdc_test'," + " 'table-name' = 'user_info'" + ")" ); //4. 查询输出表中数据 Table table = tableEnv.sqlQuery("select * from user_info"); DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(table, Row.class); dataStream.print(); //5. 启动任务 env.execute("FlinkSqlCDC"); } }
规范化数据输出格式,方便后续解析
/** 自定义反序列化器:实现 DebeziumDeserializationSchema<T> 接口并实现 deserialize 和 getProducedType 方法 */ public class MyDeserializationSchema implements DebeziumDeserializationSchema<String> { /* 想要展示的数据格式: { "dbName":"", "tableName":"", "before":{"field1":"value1",...}, "after":{"field1":"value1",...}, "op":"" } */ @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { JSONObject result = new JSONObject(); //1.获取库名和表名 String topic = sourceRecord.topic(); String[] fields = topic.split("\\."); //2. 获取 before 数据 Struct value = (Struct) sourceRecord.value(); Struct before = value.getStruct("before"); JSONObject beforeJSON = new JSONObject(); if(before != null) { Schema schema = before.schema(); List<Field> fields = schema.fields(); for(Field field : fields) { beforeJSON.put(field.name(), before.get(field)); } } //3. 获取 after 数据 Struct after = value.getStruct("after"); JSONObject afterJSON = new JSONObject(); if(after != null) { Schema schema = after.schema(); List<Field> fields = schema.fields(); for(Field field : fields) { afterJSON.put(field.name(), after.get(field)); } } //4. 获取操作类型 READ DELETE UPDATE CREATE Envelope.Operation operation = Envelope.operationFor(sourceRecord); result.put("dbName", fields[1]); result.put("tableName", fields[2]); result.put("before", beforeJSON); result.put("after", afterJSON); result.put("op", operation); collcetor.collect(result.toJSONString()); } @Override public TypeInformation<String> getProducedType() { return TypeInformation.of(String.class); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。