当前位置:   article > 正文

flink-cdc 基础教程 完结 附报错解决(二)

flink cdc 同步富文本报错

今天分享又来了呀。ღ( ´・ᴗ・` ) 一起学习进步ღゝ◡╹)ノ♡

摘要:保证能够使用flink-cdc的大部分场景

完整教程内容:

  1. 介绍使用flink-cdc的前置知识,MySQL的binlog

  2. 展示部分flink-cdc源码

  3. 实践DataStream方式使用flink-cdc

  4. 实践FlinkSQL方式使用flink-cdc

  5. 对比总结DataStream、FlinkSQL方式的区别和适用场景

  6. 自定义反序列化器,使得获得的流数据更加直观易用

  7. 学习过程遇见过的flink-cdc相关报错

加油,好好学习,天天向上~ 

Q:

1 flink-cdc的容错保证

27604edfa9e5173d588a944c1e33e40e.png

可以看见这个任务提交上来了.

8bb678fdc0e3e1d4b67f7b81e3a9e4f9.png

可以看见,数据应该打印在hadoop102 上面.

点击hadoop102这一行之后,可以看见输出:

8fb769fcd6bf106706233cd92949b45f.png

当我在mysql中插入一条数据后:

可以看见成功监控到数据:

15bb1512fefb6e141e9cd15d94df69fc.png

我们希望,当任务挂掉之后,重启任务能够接着上次消费到最新的数据,此时,我们应该保存一个savepoint,从savepoint这里来获取上次消费数据的地方:

  1. [myself@hadoop102 ~]$ cd /opt/module/flink-standalone/
  2. [myself@hadoop102 flink-standalone]$ bin/flink savepoint 713d0721e83be877c97f0a846cf6af85 hdfs://hadoop102:8020/flink1109/savepoint 命令操作 jobid hdfs地址
  3. SLF4J: Class path contains multiple SLF4J bindings.
  4. SLF4J: Found binding in [jar:file:/opt/module/flink-standalone/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  5. SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  6. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  7. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
  8. Triggering savepoint for job 713d0721e83be877c97f0a846cf6af85.
  9. Waiting for response...
  10. Savepoint completed. Path: hdfs://hadoop102:8020/flink1109/savepoint/savepoint-713d07-9fd0e5ddd3f3
  11. You can resume your program from this savepoint with the run command.

为了感受savepoint,我把这个flink任务给主动挂掉

1b8904554b8cc8ac7556440a56f4f12e.png

之后在mysql中插入新的数据行:

6a5ec275f2a183ecbfafcb5b6ccc2d7f.png

从hdfs中获取这个文件夹的地址:

7fa2aadcf06bf68a4f37edc77ea4e378.png

执行命令:

  1. [myself@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink1109/savepoint/savepoint-713d07-9fd0e5ddd3f3 -c com.atguigu.Flink01_DataStream flink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar
  2. SLF4J: Class path contains multiple SLF4J bindings.
  3. SLF4J: Found binding in [jar:file:/opt/module/flink-standalone/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  4. SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  5. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  6. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
  7. Job has been submitted with JobID f2efda4055ccd36731b2f64aef7e3c9c

如果不从savepoint重启,那还是重头开始消费。

1589657821bdd8e2e6bb93647ca88021.png

成功接着上次消费数据的地方获取到了数据:

89cf61ffa9cd55320a1f5e5ec2bb0511.png

所以这样就实现了DataStream方式 断点续传   只不过把消费数据的位置当做状态来保存的,然后从状态里恢复,不像flum 和 canal,是把消费数据的位置保存到文件中.

Q:

2 FlinkSQL方式的应用

2.2.0 首先到官网去看example

https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector

  1. -- creates a mysql cdc table source
  2. CREATE TABLE mysql_binlog (
  3. id INT NOT NULL,
  4. name STRING,
  5. description STRING,
  6. weight DECIMAL(10,3)
  7. ) WITH ( with后面加上连接参数
  8. 'connector' = 'mysql-cdc',
  9. 'hostname' = 'localhost',
  10. 'port' = '3306',
  11. 'username' = 'flinkuser',
  12. 'password' = 'flinkpw',
  13. 'database-name' = 'inventory',
  14. 'table-name' = 'products'
  15. );
  16. -- read snapshot and binlog data from mysql, and do some transformation, and show on the client
  17. SELECT id, UPPER(name), description, weight FROM mysql_binlog;

2.2.1 添加依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-table-planner-blink_2.12</artifactId>
  4. <version>1.12.0</version>
  5. </dependency>

2.2.2 代码实现

  1. package com.alibaba;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.Table;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. import org.apache.flink.types.Row;
  6. /**
  7. * @author zhouyanjun
  8. * @create 2021-06-22 12:31
  9. */
  10. public class Flink02_SQL {
  11. public static void main(String[] args) throws Exception {
  12. //1.获取执行环境
  13. //2.使用SQL方式读取MySQL变化数据
  14. //3.转换为流打印 我们用流来看下,数据的格式
  15. //4.启动
  16. //1.获取执行环境
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. env.setParallelism(1);
  19. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  20. //2.使用SQL方式读取MySQL变化数据
  21. tableEnv.executeSql("create table trademark(id string,tm_name string,logo_url string) " +
  22. "with(" +
  23. " 'connector' = 'mysql-cdc', " +
  24. " 'hostname' = 'hadoop102', " +
  25. " 'port' = '3306', " +
  26. " 'username' = 'root', " +
  27. " 'password' = '123456', " +
  28. " 'database-name' = 'gmall0820flink', " +
  29. " 'table-name' = 'base_trademark'" +
  30. ")"
  31. );
  32. //3.转换为流打印 我们用流来看下,数据的格式
  33. Table table = tableEnv.sqlQuery("select * from trademark");//会得到table对象.
  34. tableEnv.toRetractStream(table, Row.class).print(); //把动态表转换成流.这个地方应该是什么流会比较合适呢?
  35. //先用撤回流,
  36. //4.启动
  37. env.execute();
  38. }
  39. }
  1. Connector Options 连接器选项
  2. scan.startup.mode
  3. optional initial String Optional startup mode for MySQL CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Positionsection for more detailed information.

abac2cd0faf441be654e6498f77ef94b.png

输出结果:

  1. D:\Develop_software\java\jdk1.8.0_141\bin\java.exe "-javaagent:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\lib\idea_rt.jar=3755:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\bin" -Dfile.encoding=UTF-8 -classpath D:\Develop_software\java\jdk1.8.0_141\jre\lib\charsets.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\deploy.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\access-bridge-64.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\cldrdata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\dnsns.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jaccess.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jfxrt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\localedata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\nashorn.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunec.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunjce_provider.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunmscapi.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunpkcs11.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\zipfs.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\javaws.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jce.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfr.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfxswt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jsse.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\management-agent.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\plugin.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\resources.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\rt.jar;D:\workspace_idea1\BigData0621\gmall-flink-201109\flink-cdc\target\classes;D:\MyWork\Program\RepMaven\org\apache\flink\flink-java\1.12.0\flink-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-core\1.12.0\flink-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-annotations\1.12.0\flink-annotations-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-metrics-core\1.12.0\flink-metrics-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-asm-7\7.1-12.0\flink-shaded-asm-7-7.1-12.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;D:\MyWork\Program\RepMaven\org\objenesis\objenesis\2.1\objenesis-2.1.jar;D:\MyWork\Program\RepMaven\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-compress\1.20\commons-compress-1.20.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;D:\MyWork\Program\RepMaven\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;D:\MyWork\Program\RepMaven\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;D:\MyWork\Program\RepMaven\org\apache\flink\force-shading\1.12.0\force-shading-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-java_2.12\1.12.0\flink-streaming-java_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-file-sink-common\1.12.0\flink-file-sink-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-runtime_2.12\1.12.0\flink-runtime_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-queryable-state-client-java\1.12.0\flink-queryable-state-client-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-hadoop-fs\1.12.0\flink-hadoop-fs-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-io\commons-io\2.7\commons-io-2.7.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-netty\4.1.49.Final-12.0\flink-shaded-netty-4.1.49.Final-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-jackson\2.10.1-12.0\flink-shaded-jackson-2.10.1-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-zookeeper-3\3.4.14-12.0\flink-shaded-zookeeper-3-3.4.14-12.0.jar;D:\MyWork\Program\RepMaven\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-library\2.12.7\scala-library-2.12.7.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-actor_2.12\2.5.21\akka-actor_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\config\1.3.3\config-1.3.3.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-java8-compat_2.12\0.8.0\scala-java8-compat_2.12-0.8.0.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-stream_2.12\2.5.21\akka-stream_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;D:\MyWork\Program\RepMaven\com\typesafe\ssl-config-core_2.12\0.3.7\ssl-config-core_2.12-0.3.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-parser-combinators_2.12\1.1.1\scala-parser-combinators_2.12-1.1.1.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-protobuf_2.12\2.5.21\akka-protobuf_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-slf4j_2.12\2.5.21\akka-slf4j_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\clapper\grizzled-slf4j_2.12\1.3.2\grizzled-slf4j_2.12-1.3.2.jar;D:\MyWork\Program\RepMaven\com\github\scopt\scopt_2.12\3.5.0\scopt_2.12-3.5.0.jar;D:\MyWork\Program\RepMaven\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;D:\MyWork\Program\RepMaven\com\twitter\chill_2.12\0.7.6\chill_2.12-0.7.6.jar;D:\MyWork\Program\RepMaven\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;D:\MyWork\Program\RepMaven\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-guava\18.0-12.0\flink-shaded-guava-18.0-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-clients_2.12\1.12.0\flink-clients_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-optimizer_2.12\1.12.0\flink-optimizer_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;D:\MyWork\Program\RepMaven\mysql\mysql-connector-java\5.1.49\mysql-connector-java-5.1.49.jar;D:\MyWork\Program\RepMaven\com\alibaba\ververica\flink-connector-mysql-cdc\1.2.0\flink-connector-mysql-cdc-1.2.0.jar;D:\MyWork\Program\RepMaven\com\alibaba\fastjson\1.2.75\fastjson-1.2.75.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-planner-blink_2.12\1.12.0\flink-table-planner-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-common\1.12.0\flink-table-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-files\1.12.0\flink-connector-files-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-base\1.12.0\flink-connector-base-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java\1.12.0\flink-table-api-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala_2.12\1.12.0\flink-table-api-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-reflect\2.12.7\scala-reflect-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-compiler\2.12.7\scala-compiler-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-xml_2.12\1.0.6\scala-xml_2.12-1.0.6.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java-bridge_2.12\1.12.0\flink-table-api-java-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala-bridge_2.12\1.12.0\flink-table-api-scala-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-scala_2.12\1.12.0\flink-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-scala_2.12\1.12.0\flink-streaming-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-runtime-blink_2.12\1.12.0\flink-table-runtime-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\janino\3.0.11\janino-3.0.11.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\commons-compiler\3.0.11\commons-compiler-3.0.11.jar;D:\MyWork\Program\RepMaven\org\apache\calcite\avatica\avatica-core\1.17.0\avatica-core-1.17.0.jar;D:\MyWork\Program\RepMaven\org\reflections\reflections\0.9.10\reflections-0.9.10.jar com.atguigu.Flink02_SQL
  2. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  3. SLF4J: Defaulting to no-operation (NOP) logger implementation
  4. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  5. SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
  6. SLF4J: Defaulting to no-operation MDCAdapter implementation.
  7. SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
  8. (true,1,Redmi,null)
  9. (true,2,苹果,/static/default.jpg)
  10. (true,3,华为,/static/default.jpg)
  11. (true,4,TCL,/static/default.jpg)
  12. (true,5,小米,/static/default.jpg)
  13. (true,6,长粒香,/static/default.jpg)
  14. (true,7,金沙河,/static/default.jpg)
  15. (true,8,索芙特,/static/default.jpg)
  16. (true,9,CAREMiLLE,/static/default.jpg)
  17. (true,10,欧莱雅,/static/default.jpg)
  18. (true,11,香奈儿,/static/default.jpg)
  19. (true,12,周大帅哥,null)
  20. (true,13,天下第一帅,null)
  21. 六月 22, 2021 1:00:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
  22. 信息: Connected to hadoop102:3306 at mysql-bin.000007/154 (sid:5811, cid:11)

可以看见,上面就是默认的初始化方式。

先把数据读入进来,然后连接到最新的binlog的位置。

09c8e283e9b03abf3aacede59c98af6f.png

我们删除一条数据之后的输出:

  1. (true,1,Redmi,null)
  2. (true,2,苹果,/static/default.jpg)
  3. (true,3,华为,/static/default.jpg)
  4. (true,4,TCL,/static/default.jpg)
  5. (true,5,小米,/static/default.jpg)
  6. (true,6,长粒香,/static/default.jpg)
  7. (true,7,金沙河,/static/default.jpg)
  8. (true,8,索芙特,/static/default.jpg)
  9. (true,9,CAREMiLLE,/static/default.jpg)
  10. (true,10,欧莱雅,/static/default.jpg)
  11. (true,11,香奈儿,/static/default.jpg)
  12. (true,12,周大帅哥,null)
  13. (true,13,天下第一帅,null)
  14. 六月 22, 2021 1:00:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
  15. 信息: Connected to hadoop102:3306 at mysql-bin.000007/154 (sid:5811, cid:11)
  16. (false,13,天下第一帅,null) 这里就是表示撤回

我们更新一下信息:

62ac8e326a74d74a7336a22095e13d72.png

  1. (true,1,Redmi,null)
  2. (true,2,苹果,/static/default.jpg)
  3. (true,3,华为,/static/default.jpg)
  4. (true,4,TCL,/static/default.jpg)
  5. (true,5,小米,/static/default.jpg)
  6. (true,6,长粒香,/static/default.jpg)
  7. (true,7,金沙河,/static/default.jpg)
  8. (true,8,索芙特,/static/default.jpg)
  9. (true,9,CAREMiLLE,/static/default.jpg)
  10. (true,10,欧莱雅,/static/default.jpg)
  11. (true,11,香奈儿,/static/default.jpg)
  12. (true,12,周大帅哥,null)
  13. (true,13,天下第一帅,null)
  14. 六月 22, 2021 1:00:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
  15. 信息: Connected to hadoop102:3306 at mysql-bin.000007/154 (sid:5811, cid:11)
  16. (false,13,天下第一帅,null)
  17. (false,12,周大帅哥,null) 更新就会产生两条数据。删除旧数据,增添新数据。
  18. (true,12,周大帅哥,更新一条信息)

Q:

3 两种方式的区别:

那么flink 的DataStream方式和FlinkSQL方式的区别在于:

DataStream可以监控多库多表

FlinkSQL只能监控单库的单表, 这是因为代码中建表语句的限制。优点:格式更友好点。

可以通过代码的对比:

  1. DataStream:库的列表、表的列表
  2. .databaseList("gmall0820flink")
  3. .tableList("gmall0820flink.base_trademark")
  4. FlinkSQL: 库名 表名
  5. " 'database-name' = 'gmall0820flink', " +
  6. " 'table-name' = 'base_trademark'" +

4c09ef0820fcdc7114acec7d2beb7114.png

这样也是可以的,同时监控两个库,3张表(要首先在mysql的binlog里把库给配置上,开启)。

Q:

4 自定义反序列化器

修改反序列化,使得获得的流数据更加直观易用。

.deserializer(new StringDebeziumDeserializationSchema())

4.1 代码实现

  1. public class Flink03_DataStreamWithMySchema {
  2. public static void main(String[] args) throws Exception {
  3. System.setProperty("HADOOP_USER_NAME", "atguigu");
  4. //1.创建流式执行环境
  5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6. env.setParallelism(1);
  7. // 开启Ck
  8. env.enableCheckpointing(5000L);
  9. env.getCheckpointConfig().setCheckpointTimeout(5000L); //设置checkpoint超时时间,超过时间则会被丢弃
  10. env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink1109/ck"));
  11. //2.使用CDC的方式读取MySQL变化数据
  12. DebeziumSourceFunction<String> sourceFunction = MySQLSource
  13. .<String>builder()
  14. .hostname("hadoop102")
  15. .port(3306)
  16. .username("root")
  17. .password("123456")
  18. .databaseList("gmall0820flink")
  19. .tableList("gmall0820flink.base_trademark")///可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据。注意写表名的时候,要把库名加上,因为flinkcdc可以监控多个库
  20. .startupOptions(StartupOptions.initial())//启动的选项/模式。这是个枚举类。共有5种。
  21. // .startupOptions(StartupOptions.earliest())
  22. // .startupOptions(StartupOptions.latest())
  23. // .startupOptions(StartupOptions.specificOffset())
  24. // .startupOptions(StartupOptions.timestamp())
  25. .deserializer(new MyDebeziumDeserializationSchema())//首先对这里进行修改。自定义反序列化器。把序列化的二进制文件给反序列化
  26. .build();
  27. DataStreamSource<String> StreamSource = env.addSource(sourceFunction);//得到了一个流
  28. //3.打印
  29. StreamSource.print();
  30. //4.启动
  31. env.execute();
  32. }
  33. }

源码查看:

  1. .deserializer(new StringDebeziumDeserializationSchema())
  2. 点进去
  3. /**
  4. * The deserializer used to convert from consumed {@link org.apache.kafka.connect.source.SourceRecord}.
  5. */
  6. public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
  7. this.deserializer = deserializer;
  8. return this;
  9. }
  10. 继续点进去,发现是一个接口,所以在我们写代码时,就要实现这个接口
  11. @PublicEvolving
  12. public interface DebeziumDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
  13. void deserialize(SourceRecord var1, Collector<T> var2) throws Exception;
  14. DebeziumDeserializationSchema<T>中的<T>是传递给Collector<T> 使用,而我们已知Collector<T>是往下游输出数据用的。
  15. 所以这个T就是输出的数据类型。
  16. }

当我们不知道怎么写的时候,参考源码中别人是怎么写的

  1. .deserializer(new StringDebeziumDeserializationSchema()) 点击进去。
  2. 可以看见如下:
  3. public class StringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
  4. private static final long serialVersionUID = -3168848963265670603L;
  5. public StringDebeziumDeserializationSchema() {
  6. }
  7. public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
  8. out.collect(record.toString()); 因为是.toString()的写法,所以导致我们默认情况下输出的数据,不能直接的进行使用。非常不直观
  9. }
  10. public TypeInformation<String> getProducedType() {
  11. return BasicTypeInfo.STRING_TYPE_INFO; 所以这里我们可以参考,同样使用这种写法
  12. }
  13. }

拿默认反序列化得到的数据:是一个对象不是json,不然我json转json就没意思了

  1. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}}
  2. ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=1,tm_name=Redmi},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673564}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  1. public void deserialize(SourceRecord sourceRecord, Collector<String> collector)
  2. 我们点进去,找到最后的toString(),可以发现:
  3. public String toString() {
  4. return "SourceRecord{sourcePartition=" + this.sourcePartition + ", sourceOffset=" + this.sourceOffset + "} " + super.toString();
  5. }
  6. 发现是写死的字符串。

54cf0ff123a05819dddd686a173ab61f.png

我们对照着默认监控获取到的数据来写代码,为了获取库名.表名,我们选择.keySchema()

Schema schema = sourceRecord.keySchema();

然后看看schema 能够获取什么东西。然后发现schema并不容易获取数据。

b500423808fda02f74c5c4bcfb6c0f19.png

a9678c8ba62f3c573d6e7b942a320676.png

7a30b6b03788a54ada8612a8cb367b41.png

选择这个kafak的struct。老师是试过之后,才选择使用这个struct

想要一次性获取所有的字段名,这样方便通过字段名来获取对应的字段值 。

所以就可以对schema()元数据信息的属性值做一次遍历。

//获取操作类型 老师在这里也是卡主了,不知道怎么写,然后在网上搜索贴子后才推出写法的。有一个专门的解析类。

公司当中是一定会有新东西的。

最终代码:

  1. package com.alibaba;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  4. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  5. import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
  6. import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
  7. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  8. import io.debezium.data.Envelope;
  9. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  10. import org.apache.flink.api.common.typeinfo.TypeInformation;
  11. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  12. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  13. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.util.Collector;
  16. import org.apache.kafka.connect.data.Field;
  17. import org.apache.kafka.connect.data.Schema;
  18. import org.apache.kafka.connect.data.Struct;
  19. import org.apache.kafka.connect.source.SourceRecord;
  20. /**
  21. * @author zhouyanjun
  22. * @create 2021-06-22 17:03
  23. */
  24. public class Flink03_DataStreamWithMySchema {
  25. public static void main(String[] args) throws Exception {
  26. // System.setProperty("HADOOP_USER_NAME", "alibaba" );
  27. //1.创建流式执行环境
  28. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  29. env.setParallelism(1);
  30. // 开启Ck,这块我不设置了,先在本地测试运行下
  31. //2.使用CDC的方式读取MySQL变化数据
  32. DebeziumSourceFunction<String> sourceFunction = MySQLSource
  33. .<String>builder()
  34. .hostname("hadoop102")
  35. .port(3306)
  36. .username("root")
  37. .password("123456")
  38. .databaseList("gmall0820flink")
  39. .tableList("gmall0820flink.base_trademark")///可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据。注意写表名的时候,要把库名加上,因为flinkcdc可以监控多个库
  40. .startupOptions(StartupOptions.initial())//启动的选项/模式。这是个枚举类。共有5种。
  41. // .startupOptions(StartupOptions.earliest())
  42. // .startupOptions(StartupOptions.latest())
  43. // .startupOptions(StartupOptions.specificOffset())
  44. // .startupOptions(StartupOptions.timestamp())
  45. .deserializer(new MyDebeziumDeserializationSchema())//自定义反序列化器。
  46. .build();
  47. DataStreamSource<String> StreamSource = env.addSource(sourceFunction);//得到了一个流
  48. //3.打印
  49. StreamSource.print();
  50. //4.启动
  51. env.execute();
  52. }
  53. public static class MyDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {//我们要把数据封装成json格式,往下游传递进行处理,是json所以我们设置<T>类型是String
  54. /**
  55. * 下面是我们要定义的json格式:起码要有这些信息,我们后续才方便来处理这条数据
  56. * {
  57. * "data":"{"id":11,"tm_name":"sasa"}", 首先是data,也就是数据本身,因为有多个字段,所以还是个json
  58. * "db":"", 数据库名。因为我们要能够获取多库多表的数据
  59. * "tableName":"", 有了数据库,对应的就要有表名。我后续是对库名.表名的数据进行分流处理。因为不同库的不同表的字段是不一样的,如果放到一个流里进行处理,代码里没法确定具体的类型,只能是object类型。
  60. * "op":"c u d", 操作类型,来表示增加、更新(修改)、删除
  61. * "ts":"" 我们需要有时间字段。
  62. * }
  63. */
  64. @Override
  65. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //输入的数据 输出的数据
  66. //我们就是要把SourceRecord sourceRecord 这部分内容解析成json。
  67. //为了后续能够正常使用数据,我们首先要能正确提取字段出来,也就是说,这个 json是什么样子,我自己要定义下。后续处理数据,我们要什么,就来定义什么
  68. // Schema schema = sourceRecord.keySchema();
  69. //获取主题信息,提取数据库和表名
  70. String topic = sourceRecord.topic();
  71. String[] fields = topic.split("\\." );//java中.不能乱用,要加转义符 有点疑问。
  72. String db = fields[1]; //获取我们想要的库名、表名了
  73. String tableName = fields[2];
  74. //获取Value信息,提取数据本身
  75. // Object value = sourceRecord.value(); 可以看见是object类型,但根据初始化获得的数据,是stuct类型
  76. Struct value = (Struct) sourceRecord.value();
  77. Struct after = value.getStruct("after"); //获得after后面的struct结构
  78. //遍历之前先new一个json.使用fastjson的包
  79. JSONObject jsonObject = new JSONObject();
  80. for (Field field : after.schema().fields()) { //并不清楚具体有多少属性值,选择获取元数据信息,然后进行遍历
  81. Object o = after.get(field);
  82. jsonObject.put(field.name(),o);
  83. }
  84. //想要获得op,op属于source属性所对应的值里
  85. //获取操作类型
  86. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  87. //最后要封装为一个大的json发送出去
  88. //创建结果JSON
  89. JSONObject result = new JSONObject();
  90. result.put("database", db);
  91. result.put("tableName", tableName);
  92. result.put("data", jsonObject);
  93. result.put("op", operation);
  94. //输出数据
  95. collector.collect(result.toJSONString());
  96. }
  97. @Override
  98. public TypeInformation<String> getProducedType() {
  99. return BasicTypeInfo.STRING_TYPE_INFO;
  100. }
  101. }
  102. }

最终代码:

  1. D:\Develop_software\java\jdk1.8.0_141\bin\java.exe "-javaagent:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\lib\idea_rt.jar=11101:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\bin" -Dfile.encoding=UTF-8 -classpath D:\Develop_software\java\jdk1.8.0_141\jre\lib\charsets.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\deploy.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\access-bridge-64.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\cldrdata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\dnsns.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jaccess.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jfxrt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\localedata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\nashorn.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunec.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunjce_provider.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunmscapi.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunpkcs11.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\zipfs.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\javaws.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jce.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfr.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfxswt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jsse.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\management-agent.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\plugin.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\resources.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\rt.jar;D:\workspace_idea1\BigData0621\gmall-flink-201109\flink-cdc\target\classes;D:\MyWork\Program\RepMaven\org\apache\flink\flink-java\1.12.0\flink-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-core\1.12.0\flink-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-annotations\1.12.0\flink-annotations-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-metrics-core\1.12.0\flink-metrics-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-asm-7\7.1-12.0\flink-shaded-asm-7-7.1-12.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;D:\MyWork\Program\RepMaven\org\objenesis\objenesis\2.1\objenesis-2.1.jar;D:\MyWork\Program\RepMaven\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-compress\1.20\commons-compress-1.20.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;D:\MyWork\Program\RepMaven\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;D:\MyWork\Program\RepMaven\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;D:\MyWork\Program\RepMaven\org\apache\flink\force-shading\1.12.0\force-shading-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-java_2.12\1.12.0\flink-streaming-java_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-file-sink-common\1.12.0\flink-file-sink-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-runtime_2.12\1.12.0\flink-runtime_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-queryable-state-client-java\1.12.0\flink-queryable-state-client-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-hadoop-fs\1.12.0\flink-hadoop-fs-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-io\commons-io\2.7\commons-io-2.7.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-netty\4.1.49.Final-12.0\flink-shaded-netty-4.1.49.Final-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-jackson\2.10.1-12.0\flink-shaded-jackson-2.10.1-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-zookeeper-3\3.4.14-12.0\flink-shaded-zookeeper-3-3.4.14-12.0.jar;D:\MyWork\Program\RepMaven\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-library\2.12.7\scala-library-2.12.7.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-actor_2.12\2.5.21\akka-actor_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\config\1.3.3\config-1.3.3.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-java8-compat_2.12\0.8.0\scala-java8-compat_2.12-0.8.0.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-stream_2.12\2.5.21\akka-stream_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;D:\MyWork\Program\RepMaven\com\typesafe\ssl-config-core_2.12\0.3.7\ssl-config-core_2.12-0.3.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-parser-combinators_2.12\1.1.1\scala-parser-combinators_2.12-1.1.1.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-protobuf_2.12\2.5.21\akka-protobuf_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-slf4j_2.12\2.5.21\akka-slf4j_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\clapper\grizzled-slf4j_2.12\1.3.2\grizzled-slf4j_2.12-1.3.2.jar;D:\MyWork\Program\RepMaven\com\github\scopt\scopt_2.12\3.5.0\scopt_2.12-3.5.0.jar;D:\MyWork\Program\RepMaven\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;D:\MyWork\Program\RepMaven\com\twitter\chill_2.12\0.7.6\chill_2.12-0.7.6.jar;D:\MyWork\Program\RepMaven\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;D:\MyWork\Program\RepMaven\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-guava\18.0-12.0\flink-shaded-guava-18.0-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-clients_2.12\1.12.0\flink-clients_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-optimizer_2.12\1.12.0\flink-optimizer_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;D:\MyWork\Program\RepMaven\mysql\mysql-connector-java\5.1.49\mysql-connector-java-5.1.49.jar;D:\MyWork\Program\RepMaven\com\alibaba\ververica\flink-connector-mysql-cdc\1.2.0\flink-connector-mysql-cdc-1.2.0.jar;D:\MyWork\Program\RepMaven\com\alibaba\fastjson\1.2.75\fastjson-1.2.75.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-planner-blink_2.12\1.12.0\flink-table-planner-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-common\1.12.0\flink-table-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-files\1.12.0\flink-connector-files-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-base\1.12.0\flink-connector-base-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java\1.12.0\flink-table-api-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala_2.12\1.12.0\flink-table-api-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-reflect\2.12.7\scala-reflect-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-compiler\2.12.7\scala-compiler-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-xml_2.12\1.0.6\scala-xml_2.12-1.0.6.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java-bridge_2.12\1.12.0\flink-table-api-java-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala-bridge_2.12\1.12.0\flink-table-api-scala-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-scala_2.12\1.12.0\flink-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-scala_2.12\1.12.0\flink-streaming-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-runtime-blink_2.12\1.12.0\flink-table-runtime-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\janino\3.0.11\janino-3.0.11.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\commons-compiler\3.0.11\commons-compiler-3.0.11.jar;D:\MyWork\Program\RepMaven\org\apache\calcite\avatica\avatica-core\1.17.0\avatica-core-1.17.0.jar;D:\MyWork\Program\RepMaven\org\reflections\reflections\0.9.10\reflections-0.9.10.jar com.alibaba.Flink03_DataStreamWithMySchema
  2. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  3. SLF4J: Defaulting to no-operation (NOP) logger implementation
  4. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  5. SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
  6. SLF4J: Defaulting to no-operation MDCAdapter implementation.
  7. SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
  8. {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"Redmi","id":1},"tableName":"base_trademark"}
  9. {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"苹果","logo_url":"/static/default.jpg","id":2},"tableName":"base_trademark"}
  10. {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"华为","logo_url":"/static/default.jpg","id":3},"tableName":"base_trademark"}
  11. {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"TCL","logo_url":"/static/default.jpg","id":4},"tableName":"base_trademark"}
  12. {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"小米","logo_url":"/static/default.jpg","id":5},"tableName":"base_trademark"}
  13. {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"长粒香","logo_url":"/static/default.jpg","id":6},"tableName":"base_trademark"}
  14. {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"金沙河","logo_url":"/static/default.jpg","id":7},"tableName":"base_trademark"}
  15. {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"索芙特","logo_url":"/static/default.jpg","id":8},"tableName":"base_trademark"}
  16. {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"CAREMiLLE","logo_url":"/static/default.jpg","id":9},"tableName":"base_trademark"}
  17. {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"欧莱雅","logo_url":"/static/default.jpg","id":10},"tableName":"base_trademark"}
  18. {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"香奈儿","logo_url":"/static/default.jpg","id":11},"tableName":"base_trademark"}
  19. {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"周大帅哥","logo_url":"更新一条信息","id":12},"tableName":"base_trademark"}
  20. 六月 22, 2021 9:55:39 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
  21. 信息: Connected to hadoop102:3306 at mysql-bin.000009/154 (sid:5499, cid:13)

2ef69a329417765115b4f94032778500.png

放入解析json的网站中,我们可以看见成功进行了解析。这样就方便我们后续对数据的处理。成功把输出为json字符串。

自定义反序列化器,既能够实现监控多库多表,又能够输出方便我们后续处理的数据格式。

Q:

5.在运行过程中我又报错了:

  1. 这个是错误提示:
  2. Caused by: java.lang.ArrayIndexOutOfBoundsException: 3
  3. at org.apache.kafka.connect.data.Struct.get(Struct.java:86)
  4. at com.alibaba.Flink03_DataStreamWithMySchema$MyDebeziumDeserializationSchema.deserialize(Flink03_DataStreamWithMySchema.java:93)
  5. at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:114)
  6. at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:82)
  7. at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812)
  8. at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
  9. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  10. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  11. at java.lang.Thread.run(Thread.java:748)
  12. 我根据这个定位找到了代码:
  13. for (Field field : value.schema().fields()) { //并不清楚具体有多少属性值,选择获取元数据信息,然后进行遍历
  14. Object o = after.get(field); 根据提示,我代码点击到这里了。但其实代码真正写错的地方在上面一行
  15. jsonObject.put(field.name(),o);
  16. }
  17. 正确的代码是:
  18. for (Field field : after.schema().fields()) { 正确的代码写法。
  19. Object o = after.get(field);
  20. jsonObject.put(field.name(),o);
  21. }
  22. 所以,debug的简单方法就是在提示上下行代码处看看,看看有没有逻辑上的漏洞。

打包上传集群后,我每次都从savepoint恢复的,为什么不从checkpoint恢复?

其实可以的,因为savepoint和checkpoint里面的东西都是一样的。

  1. 在任务被正常Cancel的时候不保留CK。假如是任务出错的话,checkpoint的文件夹还是会保留的。
  2. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); 默认是这个参数。也就是cancel任务之后,默认会把checkpoint清除。
  3. 假如我们正常来升级代码,就会手动cancel掉任务,那此时checkpoint的文件夹就没了。
  4. 在任务被正常Cancel的时候 保留CK 。假如是任务出错的话,checkpoint的文件夹还是会保留的。
  5. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 保留
  6. 我们点进RETAIN_ON_CANCELLATION后发现,
  7. RETAIN_ON_CANCELLATION(false);

不加这行参数有什么现象呢?

cancel掉任务之后,

38f312a00608af13cc7aa89a30a8c32e.png

对比这上下两个图片:

2a7ac193b49ea42568fbca4e5840b43d.png

cancel之后,文件夹没了,也就不能从进行数据恢复了。

所以最终的代码:

  1. package com.alibaba;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  4. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  5. import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
  6. import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
  7. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  8. import io.debezium.data.Envelope;
  9. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  10. import org.apache.flink.api.common.typeinfo.TypeInformation;
  11. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  12. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  13. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.util.Collector;
  16. import org.apache.kafka.connect.data.Field;
  17. import org.apache.kafka.connect.data.Schema;
  18. import org.apache.kafka.connect.data.Struct;
  19. import org.apache.kafka.connect.source.SourceRecord;
  20. /**
  21. * @author zhouyanjun
  22. * @create 2021-06-22 17:03
  23. */
  24. public class Flink03_DataStreamWithMySchema {
  25. public static void main(String[] args) throws Exception {
  26. System.setProperty("HADOOP_USER_NAME", "alibaba" );
  27. //1.创建流式执行环境
  28. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  29. env.setParallelism(1);
  30. // 开启Ck
  31. env.enableCheckpointing(5000L);
  32. env.getCheckpointConfig().setCheckpointTimeout(5000L);
  33. env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink1109/checkpoint"));
  34. //在任务被正常Cancel的时候不保留CK
  35. //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
  36. //在任务被正常Cancel的时候 保留CK
  37. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  38. //2.使用CDC的方式读取MySQL变化数据
  39. DebeziumSourceFunction<String> sourceFunction = MySQLSource
  40. .<String>builder()
  41. .hostname("hadoop102")
  42. .port(3306)
  43. .username("root")
  44. .password("123456")
  45. .databaseList("gmall0820flink")
  46. .tableList("gmall0820flink.base_trademark")///可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据。注意写表名的时候,要把库名加上,因为flinkcdc可以监控多个库
  47. .startupOptions(StartupOptions.initial())//启动的选项/模式。这是个枚举类。共有5种。
  48. // .startupOptions(StartupOptions.earliest())
  49. // .startupOptions(StartupOptions.latest())
  50. // .startupOptions(StartupOptions.specificOffset())
  51. // .startupOptions(StartupOptions.timestamp())
  52. .deserializer(new MyDebeziumDeserializationSchema())//自定义反序列化器。把序列化的二进制文件给反序列化
  53. .build();
  54. DataStreamSource<String> StreamSource = env.addSource(sourceFunction);//得到了一个流
  55. //3.打印
  56. StreamSource.print();
  57. //4.启动
  58. env.execute();
  59. }
  60. public static class MyDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {//我们要把数据封装成json格式,往下游传递进行处理,是json所以我们设置<T>类型是String
  61. /**
  62. * 下面是我们要定义的json格式:起码要有这些信息,我们后续才方便来处理这条数据
  63. * {
  64. * "data":"{"id":11,"tm_name":"sasa"}", 首先是data,也就是数据本身,因为有多个字段,所以还是个json
  65. * "db":"", 数据库名。因为我们要能够获取多库多表的数据
  66. * "tableName":"", 有了数据库,对应的就要有表名。我后续是对库名.表名的数据进行分流处理。因为不同库的不同表的字段是不一样的,如果放到一个流里进行处理,代码里没法确定具体的类型,只能是object类型。
  67. * "op":"c u d", 操作类型,来表示增加、更新(修改)、删除
  68. * "ts":"" 我们需要有时间字段。
  69. * }
  70. */
  71. @Override
  72. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //输入的数据 输出的数据
  73. //我们就是要把SourceRecord sourceRecord 这部分内容解析成json。
  74. //为了后续能够正常使用数据,我们首先要能正确提取字段出来,也就是说,这个 json是什么样子,我自己要定义下。后续处理数据,我们要什么,就来定义什么
  75. // Schema schema = sourceRecord.keySchema();
  76. //1 获取主题信息,提取数据库和表名
  77. String topic = sourceRecord.topic();
  78. String[] fields = topic.split("\\." );//java中.不能乱用,要加转义符 有点疑问。
  79. String db = fields[1]; //获取我们想要的库名、表名了
  80. String tableName = fields[2];
  81. //2 获取Value信息,提取数据本身
  82. // Object value = sourceRecord.value(); 可以看见是object类型,但根据初始化获得的数据,是stuct类型
  83. Struct value = (Struct) sourceRecord.value();
  84. Struct after = value.getStruct("after"); //获得after后面的struct结构
  85. //遍历之前先new一个json.使用fastjson的包
  86. JSONObject jsonObject = new JSONObject();
  87. for (Field field : after.schema().fields()) { //并不清楚具体有多少属性值,选择获取元数据信息,然后进行遍历
  88. Object o = after.get(field);
  89. jsonObject.put(field.name(),o);
  90. }
  91. //想要获得op,op属于source属性所对应的值里
  92. //3 获取操作类型
  93. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  94. //最后要封装为一个大的json发送出去
  95. //创建结果JSON
  96. JSONObject result = new JSONObject();
  97. result.put("database", db);
  98. result.put("tableName", tableName);
  99. result.put("data", jsonObject);
  100. result.put("op", operation);
  101. //输出数据
  102. collector.collect(result.toJSONString());
  103. }
  104. @Override
  105. public TypeInformation<String> getProducedType() {
  106. return BasicTypeInfo.STRING_TYPE_INFO;
  107. }
  108. }
  109. }

flink-cdc系列完结。

- END -

本文为原创文章

1b09bf184aa2a4bc44eb8f727c5083e1.jpeg

作者:Eugene
某上市公司数据岗萌新,希望自己以后不会是萌新 哈哈

❤:在这里跟我一起学习技术、职场、人生、原理、健身、摄影、生活等知识吧!

❤: 欢迎点个关注一起学习,进步充实人生。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/723353
推荐阅读
相关标签
  

闽ICP备14008679号