当前位置:   article > 正文

FlinkCDC简介_flink cdc

flink cdc

目录

一. Flink CDC介绍

二.Flink CDC 实操

2.1 MySQL配置

2.2 pom文件

2.3 Java代码

2.4 测试结果


一. Flink CDC介绍

CDC主要分为基于查询和基于Binlog两种方式,我们主要了解一下这两种之间的区别:

在这里插入图片描述
FlinkCDC其实和canal差不多,只不过就是flink社区开发的组件,用起来更方便一些。

  Flink在1.11版本中新增了CDC的特性,简称 改变数据捕获。名称来看有点乱,我们先从之前的数据架构来看CDC的内容。

  以上是之前的mysql binlog日志处理流程,例如canal监听binlog把日志写入到kafka中。而Apache Flink实时消费Kakfa的数据实现mysql数据的同步或其他内容等。拆分来说整体上可以分为以下几个阶段。

  1. mysql开启binlog
  2. canal同步binlog数据写入到kafka
  3. flink读取kakfa中的binlog数据进行相关的业务处理。

整体的处理链路较长,需要用到的组件也比较多。Apache Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析。简单来说链路会变成这样

也就是说数据不再通过canal与kafka进行同步,而flink直接进行处理mysql的数据。节省了canal与kafka的过程。

Flink 1.11中实现了mysql-cdc与postgre-CDC,也就是说在Flink 1.11中我们可以直接通过Flink来直接消费mysql,postgresql的数据进行业务的处理。

使用场景:

  1. 数据库数据的增量同步
  2. 数据库表之上的物理化视图
  3. 维表join
  4. 其他业务处理

二.Flink CDC 实操

2.1 MySQL配置

MySQL必须开启binlog
MySQL表必须有主键

  1. mysql> show variables like '%log_bin%';
  2. +---------------------------------+---------------------------------------------+
  3. | Variable_name | Value |
  4. +---------------------------------+---------------------------------------------+
  5. | log_bin | ON |
  6. | log_bin_basename | /home/mysql/data/3306/10-31-1-122-bin |
  7. | log_bin_index | /home/mysql/data/3306/10-31-1-122-bin.index |
  8. | log_bin_trust_function_creators | OFF |
  9. | log_bin_use_v1_row_events | OFF |
  10. | sql_log_bin | ON |
  11. +---------------------------------+---------------------------------------------+
  12. 6 rows in set (0.01 sec)

MySQL代码:

  1. create databases cdc_test;
  2. create table test1(id int primary key,name varchar(50),create_datetime timestamp(0));
  3. insert into test1(id,name,create_datetime) values (1,'abc',current_timestamp());
  4. insert into test1(id,name,create_datetime) values (2,'def',current_timestamp());
  5. insert into test1(id,name,create_datetime) values (3,'ghi',current_timestamp());
  6. update test1 set name = 'aaa' where id = 1;
  7. delete from test1 where id = 1;
  8. create table test2(id int primary key,name varchar(50),create_datetime timestamp(0));
  9. delete from test1 where id = 1;
  10. insert into test2(id,name,create_datetime) values (1,'abc',current_timestamp());
  11. drop table test2;

2.2 pom文件

pom文件配置如下:

  1. <dependencies>
  2. <dependency>
  3. <groupId>junit</groupId>
  4. <artifactId>junit</artifactId>
  5. <version>4.11</version>
  6. <scope>test</scope>
  7. </dependency>
  8. <dependency>
  9. <groupId>com.alibaba.ververica</groupId>
  10. <artifactId>flink-connector-mysql-cdc</artifactId>
  11. <version>1.1.1</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>com.alibaba</groupId>
  15. <artifactId>fastjson</artifactId>
  16. <version>1.2.75</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.apache.flink</groupId>
  20. <artifactId>flink-streaming-java_2.12</artifactId>
  21. <version>1.12.0</version>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.apache.flink</groupId>
  25. <artifactId>flink-clients_2.12</artifactId>
  26. <version>1.12.0</version>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.apache.flink</groupId>
  30. <artifactId>flink-java</artifactId>
  31. <version>1.12.0</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.apache.flink</groupId>
  35. <artifactId>flink-table-planner-blink_2.12</artifactId>
  36. <version>1.12.0</version>
  37. <type>test-jar</type>
  38. </dependency>
  39. </dependencies>

2.3 Java代码

CdcDwdDeserializationSchema

  1. package com.zqs.study.flink.cdc;
  2. import com.alibaba.fastjson.JSONArray;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
  5. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  6. import org.apache.flink.api.common.typeinfo.TypeInformation;
  7. import org.apache.flink.util.Collector;
  8. import org.apache.kafka.connect.data.Field;
  9. import org.apache.kafka.connect.data.Schema;
  10. import org.apache.kafka.connect.data.Struct;
  11. import org.apache.kafka.connect.source.SourceRecord;
  12. import java.util.List;
  13. public class CdcDwdDeserializationSchema implements DebeziumDeserializationSchema<JSONObject> {
  14. private static final long serialVersionUID = -3168848963265670603L;
  15. public CdcDwdDeserializationSchema() {
  16. }
  17. @Override
  18. public void deserialize(SourceRecord record, Collector<JSONObject> out) {
  19. Struct dataRecord = (Struct) record.value();
  20. Struct afterStruct = dataRecord.getStruct("after");
  21. Struct beforeStruct = dataRecord.getStruct("before");
  22. /*
  23. todo 1,同时存在 beforeStruct 跟 afterStruct数据的话,就代表是update的数据
  24. 2,只存在 beforeStruct 就是delete数据
  25. 3,只存在 afterStruct数据 就是insert数据
  26. */
  27. JSONObject logJson = new JSONObject();
  28. String canal_type = "";
  29. List<Field> fieldsList = null;
  30. if (afterStruct != null && beforeStruct != null) {
  31. System.out.println("这是修改数据");
  32. canal_type = "update";
  33. fieldsList = afterStruct.schema().fields();
  34. //todo 字段与值
  35. for (Field field : fieldsList) {
  36. String fieldName = field.name();
  37. Object fieldValue = afterStruct.get(fieldName);
  38. // System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);
  39. logJson.put(fieldName, fieldValue);
  40. }
  41. } else if (afterStruct != null) {
  42. System.out.println("这是新增数据");
  43. canal_type = "insert";
  44. fieldsList = afterStruct.schema().fields();
  45. //todo 字段与值
  46. for (Field field : fieldsList) {
  47. String fieldName = field.name();
  48. Object fieldValue = afterStruct.get(fieldName);
  49. // System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);
  50. logJson.put(fieldName, fieldValue);
  51. }
  52. } else if (beforeStruct != null) {
  53. System.out.println("这是删除数据");
  54. canal_type = "detele";
  55. fieldsList = beforeStruct.schema().fields();
  56. //todo 字段与值
  57. for (Field field : fieldsList) {
  58. String fieldName = field.name();
  59. Object fieldValue = beforeStruct.get(fieldName);
  60. // System.out.println("*****fieldName=" + fieldName+",fieldValue="+fieldValue);
  61. logJson.put(fieldName, fieldValue);
  62. }
  63. } else {
  64. System.out.println("一脸蒙蔽了");
  65. }
  66. //todo 拿到databases table信息
  67. Struct source = dataRecord.getStruct("source");
  68. Object db = source.get("db");
  69. Object table = source.get("table");
  70. Object ts_ms = source.get("ts_ms");
  71. logJson.put("canal_database", db);
  72. logJson.put("canal_database", table);
  73. logJson.put("canal_ts", ts_ms);
  74. logJson.put("canal_type", canal_type);
  75. //todo 拿到topic
  76. String topic = record.topic();
  77. System.out.println("topic = " + topic);
  78. //todo 主键字段
  79. Struct pk = (Struct) record.key();
  80. List<Field> pkFieldList = pk.schema().fields();
  81. int partitionerNum = 0;
  82. for (Field field : pkFieldList) {
  83. Object pkValue = pk.get(field.name());
  84. partitionerNum += pkValue.hashCode();
  85. }
  86. int hash = Math.abs(partitionerNum) % 3;
  87. logJson.put("pk_hashcode", hash);
  88. out.collect(logJson);
  89. }
  90. @Override
  91. public TypeInformation<JSONObject> getProducedType() {
  92. return BasicTypeInfo.of(JSONObject.class);
  93. }
  94. }

FlinkCDCSQLTest

  1. package com.zqs.study.flink.cdc;
  2. /**
  3. * @author 只是甲
  4. * @date 2021-09-30
  5. * @remark Flink CDC 测试
  6. */
  7. import com.alibaba.fastjson.JSONObject;
  8. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  9. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  12. public class FlinkCDCSQLTest {
  13. public static void main(String[] args) {
  14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15. env.setParallelism(1);
  16. SourceFunction<JSONObject> sourceFunction = MySQLSource.<JSONObject>builder()
  17. .hostname("10.31.1.122")
  18. .port(3306)
  19. .databaseList("cdc_test") // monitor all tables under inventory database
  20. .username("root")
  21. //.password("abc123")
  22. .password("Abc123456!")
  23. .deserializer(new CdcDwdDeserializationSchema()) // converts SourceRecord to String
  24. .build();
  25. DataStreamSource<JSONObject> stringDataStreamSource = env.addSource(sourceFunction);
  26. stringDataStreamSource.print("===>");
  27. try {
  28. env.execute("测试mysql-cdc");
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }

2.4 测试结果

如下截图所示,可以捕捉到DML语句,但是无法捕捉到DDL语句





 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/381067
推荐阅读
相关标签
  

闽ICP备14008679号