赞
踩
1.Flink CDC简介
(参考:基于 Flink SQL CDC的实时数据同步方案)
1.1 什么是CDC
CDC是Change Data Capture(变更数据获取)的简称。核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。常见的CDC组件有基于查询的Sqoop、Kafka JDBC Source,基于Binlog的Canal、Maxwell、Debezium等。
1.2 Flink CDC
Flink CDC是Flink社区开发的flink-cdc-connectors 组件,可以直接从 MySQL、Oracle、PostgreSQL等数据库直接读取全量数据和增量变更数据。
2.代码实现
2.1 开启MySQL Binlog
当前使用的是mysql 5.7
vim /etc/my.cnf
2.2 创建maven工程,添加依赖
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.gao</groupId>
- <artifactId>flink_cdc</artifactId>
- <version>1.0-SNAPSHOT</version>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>1.12.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.12</artifactId>
- <version>1.12.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.12</artifactId>
- <version>1.12.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>3.1.3</version>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.48</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>1.2.0</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.75</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_2.12</artifactId>
- <version>1.12.0</version>
- </dependency>
-
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.0.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- </project>
2.2 创建FlinkCDCTest测试类
- import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
- import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
- import org.apache.flink.api.common.restartstrategy.RestartStrategies;
- import org.apache.flink.runtime.state.filesystem.FsStateBackend;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.CheckpointConfig;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
- public class FlinkCDCTest {
-
- public static void main(String[] args) throws Exception {
- //TODO 1.基础环境
- //1.1流处理执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- //1.2设置并行度
- env.setParallelism(1);//设置并行度为1方便测试
-
- //TODO 2.检查点配置
- //2.1 开启检查点
- env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);//5秒执行一次,模式:精准一次性
- //2.2 设置检查点超时时间
- env.getCheckpointConfig().setCheckpointTimeout(60*1000);
- //2.3 设置重启策略
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2*1000));//两次,两秒执行一次
- //2.4 设置job取消后检查点是否保留
- env.getCheckpointConfig().enableExternalizedCheckpoints(
- CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//保留
- //2.5 设置状态后端-->保存到hdfs
- env.setStateBackend(new FsStateBackend("hdfs://192.168.231.121:8020/ck"));
- //2.6 指定操作hdfs的用户
- System.setProperty("HADOOP_USER_NAME", "gaogc");
-
- //TODO 3.FlinkCDC
- //3.1 创建MySQLSource
- SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
- .hostname("192.168.231.121")
- .port(3306)
- .databaseList("test_db")//库
- .tableList("test_db.user")//表
- .username("root")
- .password("123456")
- .startupOptions(StartupOptions.initial())//启动的时候从第一次开始读取
- .deserializer(new MyDeserializationSchemaFunction ())//这里使用自定义的反序列化器将数据封装成json格式
- .build();
-
- //3.2 从源端获取数据
- DataStreamSource<String> sourceDS = env.addSource(sourceFunction);
-
- //打印测试
- sourceDS.print();
-
- //执行
- env.execute();
- }
-
- }
2.3 自定义反序列化器
- import com.alibaba.fastjson.JSONObject;
- import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import io.debezium.data.Envelope;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.util.Collector;
- import org.apache.kafka.connect.data.Field;
- import org.apache.kafka.connect.data.Struct;
- import org.apache.kafka.connect.source.SourceRecord;
-
- /**
- * 自定义反序列化器,将FlinkCDC读取到的数据转为json格式
- */
- public class MyDeserializationSchemaFunction implements DebeziumDeserializationSchema<String> {
- @Override
- public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
- Struct valueStruct = (Struct) sourceRecord.value();
- Struct sourceStrut = valueStruct.getStruct("source");
- //获取数据库的名称
- String database = sourceStrut.getString("db");
- //获取表名
- String table = sourceStrut.getString("table");
-
- //获取类型(c-->insert,u-->update)
- String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
- if(type.equals("create")){
- type="insert";
- }
- JSONObject jsonObj = new JSONObject();
- jsonObj.put("database",database);
- jsonObj.put("table",table);
- jsonObj.put("type",type);
-
- //获取数据data
- Struct afterStruct = valueStruct.getStruct("after");
- JSONObject dataJsonObj = new JSONObject();
- if(afterStruct!=null){
- for (Field field : afterStruct.schema().fields()) {
- String fieldName = field.name();
- Object fieldValue = afterStruct.get(field);
- dataJsonObj.put(fieldName,fieldValue);
- }
- }
- jsonObj.put("data",dataJsonObj);
-
- //向下游传递数据
- collector.collect(jsonObj.toJSONString());
-
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return TypeInformation.of(String.class);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。