赞
踩
FlinkCDC,简单了解下Change Data Capture(变更数据获取)的概念:
监控并捕获数据库的变更,将这些变更按照发生的顺序进行记录,写入消息中间件供其他服务订阅及消费。
CDC的种类:主要分为基于查询和基于Binlog两种方式,区别:
Flink自然也不甘示弱,FlinkCDC应运而生,通过flink-cdc-connectors 组件,可以直接从MySQL等数据库直接读取全量数据和增量变更数据的source组件
通过一个简单的Demo学会使用FlinkCDC
通过创建maven项目,通过pom文件注入相关依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
依赖注入后就可以开始Coding…(愉快的打开IDEA)
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
/*2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,
如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
*/
//2.1开启CheckPoint,每五秒做一次CheckPoint
env.enableCheckpointing(5);
//2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(
CheckpointingMode.EXACTLY_ONCE);
//2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);
//2.4 指定从 CK 自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//2.5 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://master:8020/flinkCDC"));
//2.6 设置访问 HDFS 的用户名
System.setProperty("HADOOP_USER_NAME", "root");
//3.创建 Flink-MySQL-CDC 的 Source
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
.hostname("master")
.port(3306)
.username("root")
.password("000000")
.databaseList("mall-flink")
.tableList("mall-flink.z_user_info")
//可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
.startupOptions(StartupOptions.initial())
.deserializer(new StringDebeziumDeserializationSchema())
.build();
//4.使用 CDC Source 从 MySQL 读取数据
DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
//5.打印数据
mysqlDS.print();
//6.执行任务
env.execute();
}
}
ok,到这里代码部分已经完成,接下来开始测试
将代码打包上传至服务器 mvn clean package
(确保MySQL Binlog开启状态,若是首次开始,则需重启MySQL)
启动Flink,HDFS集群,最后启动程序(java -jar FlinkCDC.jar)
同样首先注入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.0</version>
</dependency>
public class FlinkSQL_CDC {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.创建 Flink-MySQL-CDC 的 Source
tableEnv.executeSql("CREATE TABLE user_info (" +
" id INT," +
" name STRING," +
" phone_num STRING" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'master'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '000000'," +
" 'database-name' = 'mall-flink'," +
" 'table-name' = 'z_user_info'" +
")");
tableEnv.executeSql("select * from user_info").print();
env.execute();
}
}
public class Flink_CDCWithCustomerSchema {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.创建 Flink-MySQL-CDC 的 Source
Properties properties = new Properties();
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
.hostname("master")
.port(3306)
.username("root")
.password("000000")
.databaseList("mall-flink")
.tableList("mall-flink.z_user_info")
.startupOptions(StartupOptions.initial())
.deserializer(new DebeziumDeserializationSchema<String>() { //自定义数据解析器
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String>
collector) throws Exception {
//获取主题信息,包含着数据库和表名
mysql_binlog_source.gmall-flink.z_user_info
String topic = sourceRecord.topic();
String[] arr = topic.split("\\.");
String db = arr[1];
String tableName = arr[2];
//获取操作类型 READ DELETE UPDATE CREATE
Envelope.Operation operation =
Envelope.operationFor(sourceRecord);
//获取值信息并转换为 Struct 类型
Struct value = (Struct) sourceRecord.value();
//获取变化后的数据
Struct after = value.getStruct("after");
//创建 JSON 对象用于存储数据信息
JSONObject data = new JSONObject();
for (Field field : after.schema().fields()) {
Object o = after.get(field);
data.put(field.name(), o);
}
//创建 JSON 对象用于封装最终返回值数据信息
JSONObject result = new JSONObject();
result.put("operation", operation.toString().toLowerCase());
result.put("data", data);
result.put("database", db);
result.put("table", tableName);
//发送数据至下游
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
})
.build();
//3.使用 CDC Source 从 MySQL 读取数据
DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
//4.打印数据
mysqlDS.print();
//5.执行任务
env.execute();
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。