当前位置:   article > 正文

Flink(58):Flink之FlinkCDC(上)_flink cdc 和flink

flink cdc 和flink

目录

0. 相关文章链接

1. CDC简介

1.1. 什么是CDC

1.2. CDC的种类

1.3. Flink-CDC

2. 基于DataStream方式的FlinkCDC应用

2.1. 导入依赖

2.2. 编写代码

2.2.1. 主类-从业务库中获取数据并写入到kafka中

2.2.2. 自定义反序列化器

2.2.3. 各方法参数详解

3. FlinkSQL方式的应用


0. 相关文章链接

Flink文章汇总

1. CDC简介

1.1. 什么是CDC

        CDC 是 Change Data Capture(变更数据获取) 的简称。 核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、 更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2. CDC的种类

CDC 主要分为 基于查询 基于Binlog 两种方式,我们主要了解一下这两种之间的区别:

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、 PostgreSQL等数据库直接读取全量数据和增量变更数据的 source 组件。

FlinkCDC开源地址: https://github.com/ververica/flink-cdc-connectors

2. 基于DataStream方式的FlinkCDC应用

2.1. 导入依赖

  1. <properties>
  2. <!-- maven基础版本号 -->
  3. <encoding>UTF-8</encoding>
  4. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  5. <maven.compiler.source>8</maven.compiler.source>
  6. <maven.compiler.target>8</maven.compiler.target>
  7. <!-- flink版本号 -->
  8. <flink.binary.version>1.12</flink.binary.version>
  9. <flink.version>1.12.0</flink.version>
  10. <flink.cdc.version>1.2.0</flink.cdc.version>
  11. <scala.binary.version>2.12</scala.binary.version>
  12. <!-- 其他大数据版本号 -->
  13. <hadoop.version>3.1.3</hadoop.version>
  14. <!-- 日志版本号 -->
  15. <log4j.version>1.2.17</log4j.version>
  16. <slf4j.version>1.7.21</slf4j.version>
  17. <!-- 其他工具包版本号 -->
  18. <mysql.version>5.1.49</mysql.version>
  19. <fastjson.version>1.2.75</fastjson.version>
  20. <commons.beanutils.version>1.9.4</commons.beanutils.version>
  21. <guava.version>29.0-jre</guava.version>
  22. <okhttp.version>3.6.0</okhttp.version>
  23. <springboot.version>2.4.1</springboot.version>
  24. <HikariCP.version>2.6.1</HikariCP.version>
  25. <commons.lang3.version>3.10</commons.lang3.version>
  26. </properties>
  27. <dependencyManagement>
  28. <dependencies>
  29. <!--################ flink包 ################-->
  30. <!--flink流的核心包-->
  31. <dependency>
  32. <groupId>org.apache.flink</groupId>
  33. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  34. <version>${flink.version}</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.apache.flink</groupId>
  38. <artifactId>flink-java</artifactId>
  39. <version>${flink.version}</version>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.apache.flink</groupId>
  43. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  44. <version>${flink.version}</version>
  45. </dependency>
  46. <!--flink中的Table相关包-->
  47. <dependency>
  48. <groupId>org.apache.flink</groupId>
  49. <artifactId>flink-table-api-java</artifactId>
  50. <version>${flink.version}</version>
  51. </dependency>
  52. <dependency>
  53. <groupId>org.apache.flink</groupId>
  54. <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
  55. <version>${flink.version}</version>
  56. </dependency>
  57. <dependency>
  58. <groupId>org.apache.flink</groupId>
  59. <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
  60. <version>${flink.version}</version>
  61. </dependency>
  62. <dependency>
  63. <groupId>org.apache.flink</groupId>
  64. <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
  65. <version>${flink.version}</version>
  66. </dependency>
  67. <!--flink的rocksdb包-->
  68. <dependency>
  69. <groupId>org.apache.flink</groupId>
  70. <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
  71. <version>${flink.version}</version>
  72. </dependency>
  73. <!-- flink的连接包,包括kafka-->
  74. <dependency>
  75. <groupId>org.apache.flink</groupId>
  76. <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  77. <version>${flink.version}</version>
  78. </dependency>
  79. <dependency>
  80. <groupId>com.alibaba.ververica</groupId>
  81. <artifactId>flink-connector-mysql-cdc</artifactId>
  82. <version>${flink.cdc.version}</version>
  83. </dependency>
  84. <!--################ 其他大数据依赖包 ################-->
  85. <!--hadoop的相关包-->
  86. <dependency>
  87. <groupId>org.apache.flink</groupId>
  88. <artifactId>flink-hadoop-compatibility_2.11</artifactId>
  89. <version>${flink.version}</version>
  90. </dependency>
  91. <dependency>
  92. <groupId>org.apache.hadoop</groupId>
  93. <artifactId>hadoop-client</artifactId>
  94. <version>${hadoop.version}</version>
  95. </dependency>
  96. <!--################ 日志打印的jar包 ################-->
  97. <dependency>
  98. <groupId>log4j</groupId>
  99. <artifactId>log4j</artifactId>
  100. <version>${log4j.version}</version>
  101. </dependency>
  102. <dependency>
  103. <groupId>org.slf4j</groupId>
  104. <artifactId>slf4j-api</artifactId>
  105. <version>${slf4j.version}</version>
  106. </dependency>
  107. <dependency>
  108. <groupId>org.slf4j</groupId>
  109. <artifactId>slf4j-log4j12</artifactId>
  110. <version>${slf4j.version}</version>
  111. </dependency>
  112. <dependency>
  113. <groupId>org.slf4j</groupId>
  114. <artifactId>jcl-over-slf4j</artifactId>
  115. <version>${slf4j.version}</version>
  116. </dependency>
  117. <!--################ 其他工具依赖包 ################-->
  118. <!-- beanutils工具包 -->
  119. <dependency>
  120. <groupId>commons-beanutils</groupId>
  121. <artifactId>commons-beanutils</artifactId>
  122. <version>${commons.beanutils.version}</version>
  123. </dependency>
  124. <!-- 谷歌guava工具包 -->
  125. <dependency>
  126. <groupId>com.google.guava</groupId>
  127. <artifactId>guava</artifactId>
  128. <version>${guava.version}</version>
  129. </dependency>
  130. <!--MySQL驱动包 mysql8版本-->
  131. <dependency>
  132. <groupId>mysql</groupId>
  133. <artifactId>mysql-connector-java</artifactId>
  134. <version>${mysql.version}</version>
  135. </dependency>
  136. <!-- json解析包,fastjson包 -->
  137. <dependency>
  138. <groupId>com.alibaba</groupId>
  139. <artifactId>fastjson</artifactId>
  140. <version>${fastjson.version}</version>
  141. </dependency>
  142. <!-- http包 -->
  143. <dependency>
  144. <groupId>com.squareup.okhttp3</groupId>
  145. <artifactId>okhttp</artifactId>
  146. <version>${okhttp.version}</version>
  147. </dependency>
  148. <!-- springboot中的jdbc包 -->
  149. <dependency>
  150. <groupId>org.springframework.boot</groupId>
  151. <artifactId>spring-boot-starter-jdbc</artifactId>
  152. <version>${springboot.version}</version>
  153. </dependency>
  154. <!-- HikariCP连接池包 -->
  155. <dependency>
  156. <groupId>com.zaxxer</groupId>
  157. <artifactId>HikariCP</artifactId>
  158. <version>${HikariCP.version}</version>
  159. </dependency>
  160. <!-- lang3工具包 -->
  161. <dependency>
  162. <groupId>org.apache.commons</groupId>
  163. <artifactId>commons-lang3</artifactId>
  164. <version>${commons.lang3.version}</version>
  165. </dependency>
  166. </dependencies>
  167. </dependencyManagement>

2.2. 编写代码

2.2.1. 主类-从业务库中获取数据并写入到kafka

  1. package com.ouyang.gmall.realtime.app.ods;
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  3. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
  5. import com.ouyang.gmall.realtime.app.function.CustomerDeserialization;
  6. import com.ouyang.gmall.realtime.utils.FlinkUtil;
  7. import com.ouyang.gmall.realtime.utils.ModelUtil;
  8. import com.ouyang.gmall.realtime.utils.MyKafkaUtil;
  9. import org.apache.flink.api.common.functions.MapFunction;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. /**
  15. * @date: 2022/1/14
  16. * @author: yangshibiao
  17. * @desc: cdc,从业务库拉取binlog数据
  18. */
  19. public class GmallCDC {
  20. public static Logger logger = LoggerFactory.getLogger(GmallCDC.class);
  21. public static final String ODS_BASE_DB_TOPIC_NAME = ModelUtil.getConfigValue("kafka.topic.ods.base.db");
  22. public static void main(String[] args) throws Exception {
  23. String applicationName = "gmall-cdc";
  24. long interval = 5000L;
  25. // 1.获取执行环境,并配置checkpoint
  26. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  27. FlinkUtil.deployRocksdbCheckpoint(env, applicationName, interval, true);
  28. // 2.通过FlinkCDC构建SourceFunction并读取数据
  29. DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
  30. .hostname(ModelUtil.getConfigValue("mysql.hostname"))
  31. .port(Integer.parseInt(ModelUtil.getConfigValue("mysql.port")))
  32. .username(ModelUtil.getConfigValue("mysql.username"))
  33. .password(ModelUtil.getConfigValue("mysql.password"))
  34. .databaseList(ModelUtil.getConfigValue("mysql.database.gmall"))
  35. .deserializer(new CustomerDeserialization())
  36. .startupOptions(StartupOptions.initial())
  37. .build();
  38. DataStreamSource<String> streamSource = env.addSource(sourceFunction);
  39. //3.对数据进行日志打印,并将数据输出到Kafka中
  40. streamSource
  41. .map(new MapFunction<String, String>() {
  42. @Override
  43. public String map(String value) throws Exception {
  44. logger.warn(value);
  45. return value;
  46. }
  47. })
  48. .addSink(MyKafkaUtil.getKafkaProducerExactlyOnce(ODS_BASE_DB_TOPIC_NAME));
  49. //4.启动任务
  50. env.execute(applicationName);
  51. }
  52. }

2.2.2. 自定义反序列化器

  1. package com.ouyang.gmall.realtime.app.function;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
  4. import io.debezium.data.Envelope;
  5. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  6. import org.apache.flink.api.common.typeinfo.TypeInformation;
  7. import org.apache.flink.util.Collector;
  8. import org.apache.kafka.connect.data.Field;
  9. import org.apache.kafka.connect.data.Schema;
  10. import org.apache.kafka.connect.data.Struct;
  11. import org.apache.kafka.connect.source.SourceRecord;
  12. /**
  13. * @ date: 2022/1/14
  14. * @ author: yangshibiao
  15. * @ desc: 自定义flinkcdc的反序列化器
  16. */
  17. public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
  18. @Override
  19. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  20. /*
  21. 封装的数据格式
  22. {
  23. "database":"",
  24. "tableName":"",
  25. "before":{"id":"","tm_name":""....},
  26. "after":{"id":"","tm_name":""....},
  27. "type":"c u d",
  28. "ts":156456135615
  29. }
  30. */
  31. /*
  32. SourceRecord{
  33. sourcePartition={server=mysql_binlog_source},
  34. sourceOffset={ts_sec=1642091776, file=mysql-bin.000001, pos=4008355, row=1, server_id=1, event=2}
  35. }
  36. ConnectRecord{
  37. topic='mysql_binlog_source.gmall.base_trademark', kafkaPartition=null, key=Struct{id=15},
  38. keySchema=Schema{mysql_binlog_source.gmall.base_trademark.Key:STRUCT},
  39. value=Struct{
  40. before=Struct{id=15,tm_name=111},
  41. after=Struct{id=15,tm_name=111,logo_url=11111111111},
  42. source=Struct{
  43. version=1.4.1.Final,
  44. connector=mysql,
  45. name=mysql_binlog_source,
  46. ts_ms=1642091776000,
  47. db=gmall,
  48. table=base_trademark,
  49. server_id=1,
  50. file=mysql-bin.000001,
  51. pos=4008492,
  52. row=0,
  53. thread=22
  54. },
  55. op=u,
  56. ts_ms=1642091776679
  57. },
  58. valueSchema=Schema{
  59. mysql_binlog_source.gmall.base_trademark.Envelope:STRUCT
  60. },
  61. timestamp=null,
  62. headers=ConnectHeaders(headers=)
  63. }
  64. */
  65. //1.创建JSON对象用于存储最终数据
  66. JSONObject result = new JSONObject();
  67. //2.获取库名&表名
  68. String topic = sourceRecord.topic();
  69. String[] fields = topic.split("\\.");
  70. String database = fields[1];
  71. String tableName = fields[2];
  72. //3.获取 "before"数据 和 "after"数据
  73. Struct value = (Struct) sourceRecord.value();
  74. // 3.1. "before"数据
  75. Struct before = value.getStruct("before");
  76. JSONObject beforeJson = new JSONObject();
  77. if (before != null) {
  78. Schema beforeSchema = before.schema();
  79. for (Field field : beforeSchema.fields()) {
  80. Object beforeValue = before.get(field);
  81. beforeJson.put(field.name(), beforeValue);
  82. }
  83. }
  84. // 3.2. "after"数据
  85. Struct after = value.getStruct("after");
  86. JSONObject afterJson = new JSONObject();
  87. if (after != null) {
  88. Schema afterSchema = after.schema();
  89. for (Field field : afterSchema.fields()) {
  90. Object afterValue = after.get(field);
  91. afterJson.put(field.name(), afterValue);
  92. }
  93. }
  94. //4.获取timestamp
  95. long ts = (long) value.get("ts_ms");
  96. //5.获取操作类型 CREATE UPDATE DELETE,并转换为小写,其中create转换为insert,方便后续写入
  97. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  98. String type = operation.toString().toLowerCase();
  99. if ("create".equals(type)) {
  100. type = "insert";
  101. }
  102. //6.将字段写入JSON对象
  103. result.put("database", database);
  104. result.put("tableName", tableName);
  105. result.put("before", beforeJson);
  106. result.put("after", afterJson);
  107. result.put("type", type);
  108. result.put("ts", ts);
  109. //7.输出数据
  110. collector.collect(result.toJSONString());
  111. }
  112. @Override
  113. public TypeInformation<String> getProducedType() {
  114. return BasicTypeInfo.STRING_TYPE_INFO;
  115. }
  116. }

2.2.3. 各方法参数详解

  1. package com.ouyang.gmall.realtime.app.ods;
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  3. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
  5. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  6. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  7. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  8. import org.apache.flink.streaming.api.CheckpointingMode;
  9. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  10. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  11. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  12. /**
  13. * @ date: 2022/2/3
  14. * @ author: yangshibiao
  15. * @ desc: 项目描述
  16. */
  17. public class Test {
  18. public static void main(String[] args) throws Exception {
  19. //1.创建执行环境
  20. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  21. env.setParallelism(1);
  22. //2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
  23. //2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
  24. env.enableCheckpointing(5000L);
  25. //2.2 指定 CK 的一致性语义
  26. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  27. //2.3 设置任务关闭的时候保留最后一次 CK 数据
  28. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  29. //2.4 指定从 CK 自动重启策略
  30. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
  31. //2.5 设置状态后端
  32. env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
  33. //2.6 设置访问 HDFS 的用户名
  34. System.setProperty("HADOOP_USER_NAME", "atguigu");
  35. //3.创建 Flink-MySQL-CDC 的 Source
  36. //initial (default): Performs an initial snapshot on the monitored database tables uponfirst startup, and continue to read the latest binlog.
  37. //latest-offset: Never to perform snapshot on the monitored database tables upon firststartup, just read from the end of the binlog which means only have the changes since theconnector was started.
  38. //timestamp: Never to perform snapshot on the monitored database tables upon firststartup, and directly read binlog from the specified timestamp. The consumer will traverse thebinlog from the beginning and ignore change events whose timestamp is smaller than thespecified timestamp.
  39. //specific-offset: Never to perform snapshot on the monitored database tables uponfirst startup, and directly read binlog from the specified offset.
  40. DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
  41. .hostname("hadoop102")
  42. .port(3306)
  43. .username("root")
  44. .password("000000")
  45. .databaseList("gmall-flink")
  46. //可选配置项,如果不指定该参数,则会 读取上一个配置下的所有表的数据, 注意:指定的时候需要使用"db.table"的方式
  47. .tableList("gmall-flink.z_user_info")
  48. .startupOptions(StartupOptions.initial())
  49. .deserializer(new StringDebeziumDeserializationSchema())
  50. .build();
  51. //4.使用 CDC Source 从 MySQL 读取数据
  52. DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
  53. //5.打印数据
  54. mysqlDS.print();
  55. //6.执行任务
  56. env.execute();
  57. }
  58. }

3. FlinkSQL方式的应用

  1. package com.ouyang.gmall.cdc.app;
  2. import com.ouyang.gmall.common.utils.FlinkUtil;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.table.api.Table;
  7. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  8. import org.apache.flink.types.Row;
  9. /**
  10. * @ date: 2022/1/14
  11. * @ author: yangshibiao
  12. * @ desc: 基于SQL的CDC
  13. */
  14. public class GmallCDCWithSQL {
  15. public static void main(String[] args) throws Exception {
  16. String applicationName = "gmall-cdc-sql";
  17. long interval = 5000L;
  18. // 1.获取执行环境,并配置checkpoint
  19. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20. FlinkUtil.deployRocksdbCheckpoint(env, applicationName, interval, true);
  21. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  22. //2.DDL方式建表
  23. tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
  24. " id STRING NOT NULL, " +
  25. " tm_name STRING, " +
  26. " logo_url STRING " +
  27. ") WITH ( " +
  28. " 'connector' = 'mysql-cdc', " +
  29. " 'hostname' = 'bigdata1', " +
  30. " 'port' = '3306', " +
  31. " 'username' = 'root', " +
  32. " 'password' = '123456', " +
  33. " 'database-name' = 'gmall', " +
  34. " 'table-name' = 'base_trademark' " +
  35. ")");
  36. //3.查询数据
  37. Table table = tableEnv.sqlQuery("select * from mysql_binlog");
  38. //4.将动态表转换为流
  39. DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
  40. retractStream.print();
  41. //5.启动任务
  42. env.execute(applicationName);
  43. }
  44. }

注:此博客根据某马2020年贺岁视频改编而来 -> B站网址

注:其他相关文章链接由此进 -> Flink文章汇总

注:此博文为介绍FlinkCDC的详细使用,如果需要了解FlinkCDC2.x的新特性可以查看 Flink(59):Flink之FlinkCDC(下) 博文 


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/723306
推荐阅读
相关标签
  

闽ICP备14008679号