当前位置:   article > 正文

FlinkCDC 实时监控 MySQL_flink-connector-mysql-cdc

flink-connector-mysql-cdc

通过 FlinkCDC 实现 MySQL 数据库、表的实时变化监控,这里只把变化打印了出来,后面会实现如何再写入其他 MySQL 库中;

1、开启 MySQL 的 binlog

在 my.cnf 中开启 binlog,我这里指定了 test 库,然后重启 MySQL

  1. server.id=1
  2. log-bin=mysql-bin
  3. binlog-do-db=test

2、在 MySQL 中创建测试库和表

  1. mysql> create database test;
  2. mysql> create table user_info(id int unsigned not null auto_increment primary key, username varchar(60), sex tinyint(1), nickname varchar(60), addr varchar(255))ENGINE=InnoDB default charset=utf8mb4;

3、Flink 代码

在 IDEA 中新建工程 flinkcdc

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.zsoft.flinkcdc</groupId>
  7. <artifactId>flinkcdc</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. <flink.version>1.13.1</flink.version>
  13. </properties>
  14. <dependencies>
  15. <!-- FlinkCDC DataStream 方式 -->
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-java</artifactId>
  19. <version>${flink.version}</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-streaming-java_2.12</artifactId>
  24. <version>${flink.version}</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.flink</groupId>
  28. <artifactId>flink-clients_2.12</artifactId>
  29. <version>${flink.version}</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.hadoop</groupId>
  33. <artifactId>hadoop-client</artifactId>
  34. <version>3.1.3</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>mysql</groupId>
  38. <artifactId>mysql-connector-java</artifactId>
  39. <version>8.0.22</version>
  40. </dependency>
  41. <dependency>
  42. <groupId>com.alibaba.ververica</groupId>
  43. <artifactId>flink-connector-mysql-cdc</artifactId>
  44. <version>1.4.0</version>
  45. </dependency>
  46. <dependency>
  47. <groupId>com.alibaba</groupId>
  48. <artifactId>fastjson</artifactId>
  49. <version>1.2.75</version>
  50. </dependency>
  51. </dependencies>
  52. <build>
  53. <plugins>
  54. <plugin>
  55. <groupId>org.apache.maven.plugins</groupId>
  56. <artifactId>maven-assembly-plugin</artifactId>
  57. <version>3.0.0</version>
  58. <configuration>
  59. <descriptorRefs>
  60. <descriptorRef>jar-with-dependencies</descriptorRef>
  61. </descriptorRefs>
  62. </configuration>
  63. <executions>
  64. <execution>
  65. <id>make-assembly</id>
  66. <phase>package</phase>
  67. <goals>
  68. <goal>single</goal>
  69. </goals>
  70. </execution>
  71. </executions>
  72. </plugin>
  73. </plugins>
  74. </build>
  75. </project>

resources/log4j.properties

  1. log4j.rootLogger=warn,stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.target=System.out
  4. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  5. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

反序列化类:

com/zsoft/flinkcdc/MyDeserializationSchema.java

  1. package com.zsoft.flinkcdc;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
  4. import io.debezium.data.Envelope;
  5. import org.apache.flink.api.common.typeinfo.TypeInformation;
  6. import org.apache.flink.util.Collector;
  7. import org.apache.kafka.connect.data.Field;
  8. import org.apache.kafka.connect.data.Struct;
  9. import org.apache.kafka.connect.source.SourceRecord;
  10. public class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
  11. @Override
  12. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  13. Struct valueStruct = (Struct) sourceRecord.value();
  14. Struct sourceStruct = valueStruct.getStruct("source");
  15. // 获取数据库的名称
  16. String database = sourceStruct.getString("db");
  17. // 获取表名
  18. String table = sourceStruct.getString("table");
  19. // 获取类型( c -> insert, u -> update)
  20. String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
  21. if(type.equals("create")){
  22. type = "insert";
  23. }
  24. JSONObject jsonObj = new JSONObject();
  25. jsonObj.put("database",database);
  26. jsonObj.put("table", table);
  27. jsonObj.put("type", type);
  28. // 获取数据 data
  29. Struct afterStruct = valueStruct.getStruct("after");
  30. JSONObject dataJsonObj = new JSONObject();
  31. if(afterStruct != null) {
  32. for(Field field : afterStruct.schema().fields()) {
  33. String fieldName = field.name();
  34. Object fieldValue = afterStruct.get(field);
  35. dataJsonObj.put(fieldName, fieldValue);
  36. }
  37. }
  38. jsonObj.put("data", dataJsonObj);
  39. collector.collect(jsonObj.toJSONString());
  40. }
  41. @Override
  42. public TypeInformation<String> getProducedType() {
  43. return TypeInformation.of(String.class);
  44. }
  45. }

主类:

com/zsoft/flinkcdc/FlinkCdcDataStream.java

  1. package com.zsoft.flinkcdc;
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  3. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  5. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  6. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  7. import org.apache.flink.streaming.api.CheckpointingMode;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  12. import java.util.Properties;
  13. public class FlinkCdcDataStream {
  14. public static void main(String[] args) throws Exception {
  15. // TODO 1. 准备流处理环境
  16. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17. env.setParallelism(1);
  18. // TODO 2. 开启检查点
  19. // 2.1 开启 Checkpoint
  20. env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
  21. // 2.2 设置超时时间
  22. env.getCheckpointConfig().setCheckpointTimeout(60000);
  23. // 2.3 指定从 CK 自动重启策略
  24. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 6000L));
  25. // 2.4 设置任务关闭时候保留最后一次 CK 数据
  26. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  27. // 2.5 设置状态后端
  28. env.setStateBackend(new FsStateBackend("hdfs://s1:8020/flinkCDC_DS"));
  29. // 2.6 设置访问 HDFS 的用户名
  30. System.setProperty("HADOOP_USER_NAME", "hadoop");
  31. // TODO 3. 创建 Flink-MySQL-CDC 的 Source
  32. Properties props = new Properties();
  33. props.setProperty("scan.startup.mode", "initial");
  34. SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
  35. .hostname("s1")
  36. .port(3306)
  37. .username("root")
  38. .password("123456")
  39. .databaseList("test")
  40. .tableList("test.user_info")
  41. .startupOptions(StartupOptions.earliest())
  42. .debeziumProperties(props)
  43. .deserializer(new MyDeserializationSchema())
  44. .build();
  45. // TODO 4. 使用 CDC Source 从 MySQL 读取数据
  46. DataStreamSource<String> mysqlDS = env.addSource(sourceFunction).setParallelism(1);
  47. // TODO 5. 打印输出
  48. mysqlDS.print();
  49. // TODO 6. 执行任务
  50. env.execute();
  51. }
  52. }

4、打包运行

在 IDEA 中打包项目 package

将生成的 flinkcdc-1.0-SNAPSHOT-jar-with-dependencies.jar 通过 Flink 的 webUI 上传

在 Flink 的 WebUI 中上传 jar 包

Submit New Job 页面点击 + Add New 按钮

上传后的 jar 包下填入:

  • Entry Class:com.zsoft.flinkcdc.FlinkCdcDataStream
  • Parallelism:1
  • Program Arguments:
  • Savepoint Path:

点击 ”Submit“ 提交应用

5、测试

此时在 MySQL 中插入如下数据:

mysql> insert into user_info values(null, 'zhangsan', 1, 'zhs','beijing');

mysql> insert into user_info values(null, 'lisi', 1, 'ls','shanghai');

mysql> insert into user_info values(null, 'wangwu', 1, 'ww','wangwu');

在 Flink 的 webUI 中 Task Managers 中点击项目,在 Stdout 中有输出日志:

  1. {"database":"test","data":{"sex":1,"nickname":"zhs","id":1,"addr":"beijing","username":"zhangsan"},"type":"insert","table":"user_info"}
  2. {"database":"test","data":{"sex":1,"nickname":"ls","id":2,"addr":"shanghai","username":"lisi"},"type":"insert","table":"user_info"}
  3. {"database":"test","data":{"sex":1,"nickname":"ww","id":3,"addr":"wangwu","username":"wangwu"},"type":"insert","table":"user_info"}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/594779
推荐阅读
相关标签
  

闽ICP备14008679号