当前位置:   article > 正文

Flink CDC实时获取MySQL数据_flinkcdc读取mysql

flinkcdc读取mysql

1.Flink CDC简介

(参考:基于 Flink SQL CDC的实时数据同步方案

1.1 什么是CDC

CDC是Change Data Capture(变更数据获取)的简称。核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。常见的CDC组件有基于查询的Sqoop、Kafka JDBC Source,基于Binlog的Canal、Maxwell、Debezium等。

1.2 Flink CDC

Flink CDC是Flink社区开发的flink-cdc-connectors 组件,可以直接从 MySQL、Oracle、PostgreSQL等数据库直接读取全量数据和增量变更数据。

2.代码实现

2.1 开启MySQL Binlog

当前使用的是mysql 5.7

vim /etc/my.cnf

 2.2 创建maven工程,添加依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.gao</groupId>
  7. <artifactId>flink_cdc</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <dependencies>
  10. <dependency>
  11. <groupId>org.apache.flink</groupId>
  12. <artifactId>flink-java</artifactId>
  13. <version>1.12.0</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.apache.flink</groupId>
  17. <artifactId>flink-streaming-java_2.12</artifactId>
  18. <version>1.12.0</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.flink</groupId>
  22. <artifactId>flink-clients_2.12</artifactId>
  23. <version>1.12.0</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.apache.hadoop</groupId>
  27. <artifactId>hadoop-client</artifactId>
  28. <version>3.1.3</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>mysql</groupId>
  32. <artifactId>mysql-connector-java</artifactId>
  33. <version>5.1.48</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>com.alibaba.ververica</groupId>
  37. <artifactId>flink-connector-mysql-cdc</artifactId>
  38. <version>1.2.0</version>
  39. </dependency>
  40. <dependency>
  41. <groupId>com.alibaba</groupId>
  42. <artifactId>fastjson</artifactId>
  43. <version>1.2.75</version>
  44. </dependency>
  45. <dependency>
  46. <groupId>org.apache.flink</groupId>
  47. <artifactId>flink-table-planner-blink_2.12</artifactId>
  48. <version>1.12.0</version>
  49. </dependency>
  50. </dependencies>
  51. <build>
  52. <plugins>
  53. <plugin>
  54. <groupId>org.apache.maven.plugins</groupId>
  55. <artifactId>maven-assembly-plugin</artifactId>
  56. <version>3.0.0</version>
  57. <configuration>
  58. <descriptorRefs>
  59. <descriptorRef>jar-with-dependencies</descriptorRef>
  60. </descriptorRefs>
  61. </configuration>
  62. <executions>
  63. <execution>
  64. <id>make-assembly</id>
  65. <phase>package</phase>
  66. <goals>
  67. <goal>single</goal>
  68. </goals>
  69. </execution>
  70. </executions>
  71. </plugin>
  72. </plugins>
  73. </build>
  74. </project>

2.2 创建FlinkCDCTest测试类

  1. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  2. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  3. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  4. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  5. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  6. import org.apache.flink.streaming.api.CheckpointingMode;
  7. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  8. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  11. public class FlinkCDCTest {
  12. public static void main(String[] args) throws Exception {
  13. //TODO 1.基础环境
  14. //1.1流处理执行环境
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. //1.2设置并行度
  17. env.setParallelism(1);//设置并行度为1方便测试
  18. //TODO 2.检查点配置
  19. //2.1 开启检查点
  20. env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);//5秒执行一次,模式:精准一次性
  21. //2.2 设置检查点超时时间
  22. env.getCheckpointConfig().setCheckpointTimeout(60*1000);
  23. //2.3 设置重启策略
  24. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2*1000));//两次,两秒执行一次
  25. //2.4 设置job取消后检查点是否保留
  26. env.getCheckpointConfig().enableExternalizedCheckpoints(
  27. CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//保留
  28. //2.5 设置状态后端-->保存到hdfs
  29. env.setStateBackend(new FsStateBackend("hdfs://192.168.231.121:8020/ck"));
  30. //2.6 指定操作hdfs的用户
  31. System.setProperty("HADOOP_USER_NAME", "gaogc");
  32. //TODO 3.FlinkCDC
  33. //3.1 创建MySQLSource
  34. SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
  35. .hostname("192.168.231.121")
  36. .port(3306)
  37. .databaseList("test_db")//库
  38. .tableList("test_db.user")//表
  39. .username("root")
  40. .password("123456")
  41. .startupOptions(StartupOptions.initial())//启动的时候从第一次开始读取
  42. .deserializer(new MyDeserializationSchemaFunction ())//这里使用自定义的反序列化器将数据封装成json格式
  43. .build();
  44. //3.2 从源端获取数据
  45. DataStreamSource<String> sourceDS = env.addSource(sourceFunction);
  46. //打印测试
  47. sourceDS.print();
  48. //执行
  49. env.execute();
  50. }
  51. }

2.3 自定义反序列化器

  1. import com.alibaba.fastjson.JSONObject;
  2. import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
  3. import io.debezium.data.Envelope;
  4. import org.apache.flink.api.common.typeinfo.TypeInformation;
  5. import org.apache.flink.util.Collector;
  6. import org.apache.kafka.connect.data.Field;
  7. import org.apache.kafka.connect.data.Struct;
  8. import org.apache.kafka.connect.source.SourceRecord;
  9. /**
  10. * 自定义反序列化器,将FlinkCDC读取到的数据转为json格式
  11. */
  12. public class MyDeserializationSchemaFunction implements DebeziumDeserializationSchema<String> {
  13. @Override
  14. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  15. Struct valueStruct = (Struct) sourceRecord.value();
  16. Struct sourceStrut = valueStruct.getStruct("source");
  17. //获取数据库的名称
  18. String database = sourceStrut.getString("db");
  19. //获取表名
  20. String table = sourceStrut.getString("table");
  21. //获取类型(c-->insert,u-->update)
  22. String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
  23. if(type.equals("create")){
  24. type="insert";
  25. }
  26. JSONObject jsonObj = new JSONObject();
  27. jsonObj.put("database",database);
  28. jsonObj.put("table",table);
  29. jsonObj.put("type",type);
  30. //获取数据data
  31. Struct afterStruct = valueStruct.getStruct("after");
  32. JSONObject dataJsonObj = new JSONObject();
  33. if(afterStruct!=null){
  34. for (Field field : afterStruct.schema().fields()) {
  35. String fieldName = field.name();
  36. Object fieldValue = afterStruct.get(field);
  37. dataJsonObj.put(fieldName,fieldValue);
  38. }
  39. }
  40. jsonObj.put("data",dataJsonObj);
  41. //向下游传递数据
  42. collector.collect(jsonObj.toJSONString());
  43. }
  44. @Override
  45. public TypeInformation<String> getProducedType() {
  46. return TypeInformation.of(String.class);
  47. }
  48. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/884505
推荐阅读
相关标签
  

闽ICP备14008679号