当前位置:   article > 正文

详解 Flink CDC 的介绍和入门案例_flinkcdc续传代码

flinkcdc续传代码

一、Flink CDC 简介

1. CDC 介绍

​ CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

2. CDC 种类

基于查询的 CDC基于 Binlog 的 CDC
开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
执行模式BatchStreaming
是否可以捕获所有数据变化
延迟性高延迟低延迟
是否增加数据库压力

3. Flink CDC 介绍

​ Flink CDC 是一个内置了 Debezium 的基于 Binlog 的可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。开源地址:https://github.com/ververica/flink-cdc-connectors

二、Flink CDC 案例实操

1. DataStream 实现

1.1 导入依赖
<dependencies>
	 <dependency>
		 <groupId>org.apache.flink</groupId>
		 <artifactId>flink-java</artifactId>
		 <version>1.12.0</version>
	 </dependency>
	 <dependency>
		 <groupId>org.apache.flink</groupId>
		 <artifactId>flink-streaming-java_2.12</artifactId>
		 <version>1.12.0</version>
	 </dependency>
	 <dependency>
		 <groupId>org.apache.flink</groupId>
		 <artifactId>flink-clients_2.12</artifactId>
		 <version>1.12.0</version>
	 </dependency>
	 <dependency>
		 <groupId>org.apache.hadoop</groupId>
		 <artifactId>hadoop-client</artifactId>
		 <version>3.1.3</version>
	 </dependency>
	 <dependency>
		 <groupId>mysql</groupId>
		 <artifactId>mysql-connector-java</artifactId>
		 <version>5.1.49</version>
	 </dependency>
	 <dependency>
		 <groupId>org.apache.flink</groupId>
		 <artifactId>flink-table-planner-blink_2.12</artifactId>
		 <version>1.12.0</version>
	 </dependency>
	 <dependency>
		 <groupId>com.ververica</groupId>
		 <artifactId>flink-connector-mysql-cdc</artifactId>
		 <version>2.0.0</version>
	 </dependency>
	 <dependency>
		 <groupId>com.alibaba</groupId>
		 <artifactId>fastjson</artifactId>
		 <version>1.2.75</version>
	 </dependency>
</dependencies>
<build>
	 <plugins>
		 <plugin>
			 <groupId>org.apache.maven.plugins</groupId>
			 <artifactId>maven-assembly-plugin</artifactId>
			 <version>3.0.0</version>
			 <configuration>
				 <descriptorRefs>
				 	<descriptorRef>jar-with-dependencies</descriptorRef>
				 </descriptorRefs>
			 </configuration>
			 <executions>
				 <execution>
					 <id>make-assembly</id>
					 <phase>package</phase>
					 <goals>
					 	<goal>single</goal>
					 </goals>
				 </execution>
			 </executions>
		 </plugin>
	 </plugins>
</build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
1.2 编写程序代码
public class FlinkCDC {
    public static void main(String[] args) throws Exception {
        //1. 创建 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        //Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
         //1.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
         env.enableCheckpointing(5000L);
         //1.2 指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
         //1.3 设置任务关闭的时候保留最后一次 CK 数据
 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         //1.4 指定从 CK 自动重启策略
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
         //1.5 设置状态后端
         env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
         //1.6 设置访问 HDFS 的用户名
         System.setProperty("HADOOP_USER_NAME", "lgb");
        
        //2. 创建 FlinkCDC Source
        /*
        	StartupOptions 有 5 种类型:
        	1. initial:默认,先使用查询的方式读取表中所有的数据,然后再从 binlog 的最近位置监控读取
        	2. earliest:从 binlog 最开始的位置读取,要求在数据库创建之前就开启了 binlog
        	3. latest:从 binlog 的最近位置监控读取
			4. specificOffset:从 binlog 的指定位置读取
			5. timestamp:从 binlog 的指定时间戳读取
        */
        DebeziumSourceFunction<String> mysqlSource = MysqlSource.<String>builder()
            .hostname("hadoop102") //Mysql所在主机名
            .port(3306) //mysql端口号
            .username("root") //登录mysql用户名
            .password("123456") //登录mysql密码
            .databaseList("cdc_test") //监控的数据库列表,可变参数
            .tableList("cdc_test.user_info") //监控的数据表,不指定则监控数据库下所有表
            .deserializer(new StringDebeziumDeserializationSchema()) //反序列化器
            .startupOptions(StartupOptions.initial()) //指定读取策略
            .build();
        
        //3. 通过 FlinkCDC Source 创建 DataStream
        DataStream<String> dataStream = env.addSource(mysqlSource);
        
        //4. 打印输出流
        dataStream.print();
        
        //5. 启动任务
        env.execute("FlinkCDC");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
1.3 测试
1.3.1 本地测试
  • 开启 MySQL Binlog 并重启 MySQL
  • 在 Mysql 中创建对应的数据库和数据表并插入一条数据
  • 启动 FlinkCDC 程序,查看控制台结果,可以看到通过查询的方式获取到了数据表里的所有数据
  • 在数据表中进行增删改操作,查看程序控制台输出结果
1.3.2 集群测试
  • 将 FlinkCDC 程序进行打包并上传到集群

  • 启动 Hadoop、zookeeper 和 Flink 集群

  • 运行 FlinkCDC 程序

    bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
    
    • 1
  • 给当前的 Flink 程序创建 Savepoint

    bin/flink savepoint [JobId] hdfs://hadoop102:8020/flink/save
    
    
    • 1
    • 2
  • 停止 FlinkCDC 程序

  • 在Mysql数据表中进行增删改操作

  • 从 Savepoint 重启程序查看程序输出结果

    bin/flink run -s hdfs://hadoop102:8020/flink/save/[JobId] -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
    
    • 1

2. Flink SQL 实现

2.0.0 版本的 FlinkCDC 通过 FlinkSQL 实现需要 1.13+ 版本的 Flink 支持

public class FlinkSQLCDC {
    public static void main(String[] args) throws Exception {
        //1. 创建 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        //2. 创建 FlinkSQL 表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        //3. 配置 FlinkSQLCDC 监控单表(只能监控单表),不需要指定反序列化器,读取模式只有 initial 和 latest-offset
        tableEnv.executeSql(
        	"create table user_info (" +
            "id String primary key, name String, sex String) with (" +
            " 'connector' = 'mysql-cdc'," +
            " 'scan.startup.mode' = 'initial'," +
            " 'hostname' = 'hadoop102'," +
            " 'port' = '3306'," +
            " 'username' = 'root'," +
            " 'password' = '123456'," +
            " 'database-name' = 'cdc_test'," +
            " 'table-name' = 'user_info'" +
            ")"
        );
        
        //4. 查询输出表中数据
        Table table = tableEnv.sqlQuery("select * from user_info");
        DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(table, Row.class);
        dataStream.print();
        
        //5. 启动任务
        env.execute("FlinkSqlCDC");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

3. 自定义反序列化器

规范化数据输出格式,方便后续解析

/**
	自定义反序列化器:实现 DebeziumDeserializationSchema<T> 接口并实现 deserialize 和 getProducedType 方法 
*/
public class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
    /*
    	想要展示的数据格式:
    	{
    		"dbName":"",
    		"tableName":"",
    		"before":{"field1":"value1",...},
    		"after":{"field1":"value1",...},
    		"op":""
    	}
    */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        JSONObject result = new JSONObject();
        
        //1.获取库名和表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        
        //2. 获取 before 数据
        Struct value = (Struct) sourceRecord.value();
        Struct before = value.getStruct("before");
        JSONObject beforeJSON = new JSONObject();
        if(before != null) {
            Schema schema = before.schema();
            List<Field> fields = schema.fields();
            
            for(Field field : fields) {
                beforeJSON.put(field.name(), before.get(field));
            }
        }
        
        //3. 获取 after 数据
        Struct after = value.getStruct("after");
        JSONObject afterJSON = new JSONObject();
        if(after != null) {
            Schema schema = after.schema();
            List<Field> fields = schema.fields();
            
            for(Field field : fields) {
                afterJSON.put(field.name(), after.get(field));
            }
        }
        
        //4. 获取操作类型 READ DELETE UPDATE CREATE
 		Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        
        result.put("dbName", fields[1]);
        result.put("tableName", fields[2]);
        result.put("before", beforeJSON);
        result.put("after", afterJSON);
        result.put("op", operation);
        
        collcetor.collect(result.toJSONString());
    }
    
    
    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }	
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/934416
推荐阅读
相关标签
  

闽ICP备14008679号