当前位置:   article > 正文

大数据流处理框架之Flink-CDC_flink cdc 达梦

flink cdc 达梦

1. CDC简介

1.1 CDC种类

FlinkCDC,简单了解下Change Data Capture(变更数据获取)的概念:
监控并捕获数据库的变更,将这些变更按照发生的顺序进行记录,写入消息中间件供其他服务订阅及消费。
CDC的种类:主要分为基于查询和基于Binlog两种方式,区别:
在这里插入图片描述

1.2 FlinkCDC

Flink自然也不甘示弱,FlinkCDC应运而生,通过flink-cdc-connectors 组件,可以直接从MySQL等数据库直接读取全量数据和增量变更数据的source组件

2. 实战Coding

通过一个简单的Demo学会使用FlinkCDC

2.1 DataStream方式

通过创建maven项目,通过pom文件注入相关依赖:

 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-java</artifactId>
 <version>1.12.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-java_2.12</artifactId>
 <version>1.12.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-clients_2.12</artifactId>
 <version>1.12.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-client</artifactId>
 <version>3.1.3</version>
 </dependency>
 <dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.49</version>
 </dependency>
  <dependency>
 <groupId>com.alibaba.ververica</groupId>
 <artifactId>flink-connector-mysql-cdc</artifactId>
 <version>1.2.0</version>
 </dependency>
 <dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>1.2.75</version>
 </dependency>
</dependencies>
<build>
 <plugins>
 <plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-assembly-plugin</artifactId>
 <version>3.0.0</version>
 <configuration>
 <descriptorRefs>
 <descriptorRef>jar-with-dependencies</descriptorRef>
 </descriptorRefs>
 </configuration>
 <executions>
 <execution>
 <id>make-assembly</id>
 <phase>package</phase>
 <goals>
 <goal>single</goal>
 </goals>
 </execution>
 </executions>
 </plugin>
 </plugins>
</build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

依赖注入后就可以开始Coding…(愉快的打开IDEA)

public class FlinkCDC {
 public static void main(String[] args) throws Exception {
 //1.创建执行环境
 StreamExecutionEnvironment env =
 StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 /*2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,
 如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
 */
 //2.1开启CheckPoint,每五秒做一次CheckPoint
env.enableCheckpointing(5);
 //2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(
CheckpointingMode.EXACTLY_ONCE);
 //2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);
 //2.4 指定从 CK 自动重启策略
 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
 //2.5 设置状态后端
 env.setStateBackend(new FsStateBackend("hdfs://master:8020/flinkCDC"));
 //2.6 设置访问 HDFS 的用户名
 System.setProperty("HADOOP_USER_NAME", "root");
//3.创建 Flink-MySQL-CDC 的 Source
 DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
 .hostname("master")
 .port(3306)
 .username("root")
 .password("000000")
 .databaseList("mall-flink")
 .tableList("mall-flink.z_user_info") 
 //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
 .startupOptions(StartupOptions.initial())
 .deserializer(new StringDebeziumDeserializationSchema())
 .build();
 //4.使用 CDC Source 从 MySQL 读取数据
 DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
 //5.打印数据
 mysqlDS.print();
 //6.执行任务
 env.execute();
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

ok,到这里代码部分已经完成,接下来开始测试
将代码打包上传至服务器 mvn clean package
(确保MySQL Binlog开启状态,若是首次开始,则需重启MySQL)
启动Flink,HDFS集群,最后启动程序(java -jar FlinkCDC.jar)

2.2 FlinkSQL方式

同样首先注入依赖

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-planner-blink_2.12</artifactId>
 <version>1.12.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
public class FlinkSQL_CDC {
 public static void main(String[] args) throws Exception {
 //1.创建执行环境
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 //2.创建 Flink-MySQL-CDC 的 Source
 tableEnv.executeSql("CREATE TABLE user_info (" +
  " id INT," +
 " name STRING," +
 " phone_num STRING" +
 ") WITH (" +
 " 'connector' = 'mysql-cdc'," +
 " 'hostname' = 'master'," +
 " 'port' = '3306'," +
 " 'username' = 'root'," +
 " 'password' = '000000'," +
 " 'database-name' = 'mall-flink'," +
 " 'table-name' = 'z_user_info'" +
 ")");
 tableEnv.executeSql("select * from user_info").print();
 env.execute();
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

2.3 自定义反序列化器

public class Flink_CDCWithCustomerSchema {
 public static void main(String[] args) throws Exception {
 //1.创建执行环境
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 //2.创建 Flink-MySQL-CDC 的 Source
 Properties properties = new Properties();
 DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
 .hostname("master")
 .port(3306)
 .username("root")
 .password("000000")
 .databaseList("mall-flink")
 .tableList("mall-flink.z_user_info") 
.startupOptions(StartupOptions.initial())
 .deserializer(new DebeziumDeserializationSchema<String>() { //自定义数据解析器
 @Override
 public void deserialize(SourceRecord sourceRecord, Collector<String> 
collector) throws Exception {
 //获取主题信息,包含着数据库和表名 
mysql_binlog_source.gmall-flink.z_user_info
 String topic = sourceRecord.topic();
String[] arr = topic.split("\\.");
 String db = arr[1];
String tableName = arr[2];
 //获取操作类型 READ DELETE UPDATE CREATE
 Envelope.Operation operation = 
Envelope.operationFor(sourceRecord);
 //获取值信息并转换为 Struct 类型
 Struct value = (Struct) sourceRecord.value();
 //获取变化后的数据
Struct after = value.getStruct("after");
 //创建 JSON 对象用于存储数据信息
 JSONObject data = new JSONObject();
for (Field field : after.schema().fields()) {
 Object o = after.get(field);
data.put(field.name(), o);
 }
 //创建 JSON 对象用于封装最终返回值数据信息
 JSONObject result = new JSONObject();
result.put("operation", operation.toString().toLowerCase());
result.put("data", data);
result.put("database", db);
result.put("table", tableName);
 //发送数据至下游
collector.collect(result.toJSONString());
 }
 @Override
 public TypeInformation<String> getProducedType() {
 return TypeInformation.of(String.class);
 }
 })
 .build();
 //3.使用 CDC Source 从 MySQL 读取数据
 DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
 //4.打印数据
 mysqlDS.print();
 //5.执行任务
 env.execute();
 	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/639121
推荐阅读
相关标签
  

闽ICP备14008679号