赞
踩
今天分享又来了呀。ღ( ´・ᴗ・` ) 一起学习进步ღゝ◡╹)ノ♡
摘要:保证能够使用flink-cdc的大部分场景
完整教程内容:
介绍使用flink-cdc的前置知识,MySQL的binlog
展示部分flink-cdc源码
实践DataStream方式使用flink-cdc
实践FlinkSQL方式使用flink-cdc
对比总结DataStream、FlinkSQL方式的区别和适用场景
自定义反序列化器,使得获得的流数据更加直观易用
学习过程遇见过的flink-cdc相关报错
加油,好好学习,天天向上~
Q:
1 flink-cdc的容错保证
可以看见这个任务提交上来了.
可以看见,数据应该打印在hadoop102 上面.
点击hadoop102这一行之后,可以看见输出:
当我在mysql中插入一条数据后:
可以看见成功监控到数据:
我们希望,当任务挂掉之后,重启任务能够接着上次消费到最新的数据,此时,我们应该保存一个savepoint,从savepoint这里来获取上次消费数据的地方:
- [myself@hadoop102 ~]$ cd /opt/module/flink-standalone/
- [myself@hadoop102 flink-standalone]$ bin/flink savepoint 713d0721e83be877c97f0a846cf6af85 hdfs://hadoop102:8020/flink1109/savepoint 命令操作 jobid hdfs地址
-
-
- SLF4J: Class path contains multiple SLF4J bindings.
- SLF4J: Found binding in [jar:file:/opt/module/flink-standalone/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
- SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
- SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
- SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
- Triggering savepoint for job 713d0721e83be877c97f0a846cf6af85.
- Waiting for response...
- Savepoint completed. Path: hdfs://hadoop102:8020/flink1109/savepoint/savepoint-713d07-9fd0e5ddd3f3
- You can resume your program from this savepoint with the run command.
为了感受savepoint,我把这个flink任务给主动挂掉
之后在mysql中插入新的数据行:
从hdfs中获取这个文件夹的地址:
执行命令:
- [myself@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink1109/savepoint/savepoint-713d07-9fd0e5ddd3f3 -c com.atguigu.Flink01_DataStream flink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar
-
-
-
-
- SLF4J: Class path contains multiple SLF4J bindings.
- SLF4J: Found binding in [jar:file:/opt/module/flink-standalone/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
- SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
- SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
- SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
- Job has been submitted with JobID f2efda4055ccd36731b2f64aef7e3c9c
如果不从savepoint重启,那还是重头开始消费。
成功接着上次消费数据的地方获取到了数据:
所以这样就实现了DataStream方式 断点续传 只不过把消费数据的位置当做状态来保存的,然后从状态里恢复,不像flum 和 canal,是把消费数据的位置保存到文件中.
Q:
2 FlinkSQL方式的应用
2.2.0 首先到官网去看example
https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector
- -- creates a mysql cdc table source
- CREATE TABLE mysql_binlog (
- id INT NOT NULL,
- name STRING,
- description STRING,
- weight DECIMAL(10,3)
- ) WITH ( with后面加上连接参数
- 'connector' = 'mysql-cdc',
- 'hostname' = 'localhost',
- 'port' = '3306',
- 'username' = 'flinkuser',
- 'password' = 'flinkpw',
- 'database-name' = 'inventory',
- 'table-name' = 'products'
- );
-
-
- -- read snapshot and binlog data from mysql, and do some transformation, and show on the client
- SELECT id, UPPER(name), description, weight FROM mysql_binlog;
2.2.1 添加依赖
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_2.12</artifactId>
- <version>1.12.0</version>
- </dependency>
2.2.2 代码实现
- package com.alibaba;
-
-
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- import org.apache.flink.types.Row;
-
-
- /**
- * @author zhouyanjun
- * @create 2021-06-22 12:31
- */
- public class Flink02_SQL {
- public static void main(String[] args) throws Exception {
- //1.获取执行环境
- //2.使用SQL方式读取MySQL变化数据
- //3.转换为流打印 我们用流来看下,数据的格式
- //4.启动
-
-
-
-
- //1.获取执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
-
-
- //2.使用SQL方式读取MySQL变化数据
- tableEnv.executeSql("create table trademark(id string,tm_name string,logo_url string) " +
- "with(" +
- " 'connector' = 'mysql-cdc', " +
- " 'hostname' = 'hadoop102', " +
- " 'port' = '3306', " +
- " 'username' = 'root', " +
- " 'password' = '123456', " +
- " 'database-name' = 'gmall0820flink', " +
- " 'table-name' = 'base_trademark'" +
- ")"
- );
- //3.转换为流打印 我们用流来看下,数据的格式
- Table table = tableEnv.sqlQuery("select * from trademark");//会得到table对象.
- tableEnv.toRetractStream(table, Row.class).print(); //把动态表转换成流.这个地方应该是什么流会比较合适呢?
- //先用撤回流,
-
-
- //4.启动
- env.execute();
-
-
-
-
- }
- }
- Connector Options 连接器选项
-
-
- scan.startup.mode
- optional initial String Optional startup mode for MySQL CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Positionsection for more detailed information.
输出结果:
- D:\Develop_software\java\jdk1.8.0_141\bin\java.exe "-javaagent:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\lib\idea_rt.jar=3755:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\bin" -Dfile.encoding=UTF-8 -classpath D:\Develop_software\java\jdk1.8.0_141\jre\lib\charsets.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\deploy.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\access-bridge-64.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\cldrdata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\dnsns.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jaccess.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jfxrt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\localedata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\nashorn.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunec.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunjce_provider.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunmscapi.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunpkcs11.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\zipfs.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\javaws.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jce.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfr.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfxswt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jsse.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\management-agent.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\plugin.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\resources.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\rt.jar;D:\workspace_idea1\BigData0621\gmall-flink-201109\flink-cdc\target\classes;D:\MyWork\Program\RepMaven\org\apache\flink\flink-java\1.12.0\flink-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-core\1.12.0\flink-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-annotations\1.12.0\flink-annotations-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-metrics-core\1.12.0\flink-metrics-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-asm-7\7.1-12.0\flink-shaded-asm-7-7.1-12.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;D:\MyWork\Program\RepMaven\org\objenesis\objenesis\2.1\objenesis-2.1.jar;D:\MyWork\Program\RepMaven\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-compress\1.20\commons-compress-1.20.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;D:\MyWork\Program\RepMaven\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;D:\MyWork\Program\RepMaven\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;D:\MyWork\Program\RepMaven\org\apache\flink\force-shading\1.12.0\force-shading-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-java_2.12\1.12.0\flink-streaming-java_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-file-sink-common\1.12.0\flink-file-sink-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-runtime_2.12\1.12.0\flink-runtime_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-queryable-state-client-java\1.12.0\flink-queryable-state-client-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-hadoop-fs\1.12.0\flink-hadoop-fs-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-io\commons-io\2.7\commons-io-2.7.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-netty\4.1.49.Final-12.0\flink-shaded-netty-4.1.49.Final-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-jackson\2.10.1-12.0\flink-shaded-jackson-2.10.1-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-zookeeper-3\3.4.14-12.0\flink-shaded-zookeeper-3-3.4.14-12.0.jar;D:\MyWork\Program\RepMaven\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-library\2.12.7\scala-library-2.12.7.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-actor_2.12\2.5.21\akka-actor_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\config\1.3.3\config-1.3.3.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-java8-compat_2.12\0.8.0\scala-java8-compat_2.12-0.8.0.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-stream_2.12\2.5.21\akka-stream_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;D:\MyWork\Program\RepMaven\com\typesafe\ssl-config-core_2.12\0.3.7\ssl-config-core_2.12-0.3.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-parser-combinators_2.12\1.1.1\scala-parser-combinators_2.12-1.1.1.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-protobuf_2.12\2.5.21\akka-protobuf_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-slf4j_2.12\2.5.21\akka-slf4j_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\clapper\grizzled-slf4j_2.12\1.3.2\grizzled-slf4j_2.12-1.3.2.jar;D:\MyWork\Program\RepMaven\com\github\scopt\scopt_2.12\3.5.0\scopt_2.12-3.5.0.jar;D:\MyWork\Program\RepMaven\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;D:\MyWork\Program\RepMaven\com\twitter\chill_2.12\0.7.6\chill_2.12-0.7.6.jar;D:\MyWork\Program\RepMaven\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;D:\MyWork\Program\RepMaven\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-guava\18.0-12.0\flink-shaded-guava-18.0-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-clients_2.12\1.12.0\flink-clients_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-optimizer_2.12\1.12.0\flink-optimizer_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;D:\MyWork\Program\RepMaven\mysql\mysql-connector-java\5.1.49\mysql-connector-java-5.1.49.jar;D:\MyWork\Program\RepMaven\com\alibaba\ververica\flink-connector-mysql-cdc\1.2.0\flink-connector-mysql-cdc-1.2.0.jar;D:\MyWork\Program\RepMaven\com\alibaba\fastjson\1.2.75\fastjson-1.2.75.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-planner-blink_2.12\1.12.0\flink-table-planner-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-common\1.12.0\flink-table-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-files\1.12.0\flink-connector-files-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-base\1.12.0\flink-connector-base-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java\1.12.0\flink-table-api-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala_2.12\1.12.0\flink-table-api-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-reflect\2.12.7\scala-reflect-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-compiler\2.12.7\scala-compiler-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-xml_2.12\1.0.6\scala-xml_2.12-1.0.6.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java-bridge_2.12\1.12.0\flink-table-api-java-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala-bridge_2.12\1.12.0\flink-table-api-scala-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-scala_2.12\1.12.0\flink-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-scala_2.12\1.12.0\flink-streaming-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-runtime-blink_2.12\1.12.0\flink-table-runtime-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\janino\3.0.11\janino-3.0.11.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\commons-compiler\3.0.11\commons-compiler-3.0.11.jar;D:\MyWork\Program\RepMaven\org\apache\calcite\avatica\avatica-core\1.17.0\avatica-core-1.17.0.jar;D:\MyWork\Program\RepMaven\org\reflections\reflections\0.9.10\reflections-0.9.10.jar com.atguigu.Flink02_SQL
- SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
- SLF4J: Defaulting to no-operation (NOP) logger implementation
- SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
- SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
- SLF4J: Defaulting to no-operation MDCAdapter implementation.
- SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
- (true,1,Redmi,null)
- (true,2,苹果,/static/default.jpg)
- (true,3,华为,/static/default.jpg)
- (true,4,TCL,/static/default.jpg)
- (true,5,小米,/static/default.jpg)
- (true,6,长粒香,/static/default.jpg)
- (true,7,金沙河,/static/default.jpg)
- (true,8,索芙特,/static/default.jpg)
- (true,9,CAREMiLLE,/static/default.jpg)
- (true,10,欧莱雅,/static/default.jpg)
- (true,11,香奈儿,/static/default.jpg)
- (true,12,周大帅哥,null)
- (true,13,天下第一帅,null)
- 六月 22, 2021 1:00:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
- 信息: Connected to hadoop102:3306 at mysql-bin.000007/154 (sid:5811, cid:11)
可以看见,上面就是默认的初始化方式。
先把数据读入进来,然后连接到最新的binlog的位置。
我们删除一条数据之后的输出:
- (true,1,Redmi,null)
- (true,2,苹果,/static/default.jpg)
- (true,3,华为,/static/default.jpg)
- (true,4,TCL,/static/default.jpg)
- (true,5,小米,/static/default.jpg)
- (true,6,长粒香,/static/default.jpg)
- (true,7,金沙河,/static/default.jpg)
- (true,8,索芙特,/static/default.jpg)
- (true,9,CAREMiLLE,/static/default.jpg)
- (true,10,欧莱雅,/static/default.jpg)
- (true,11,香奈儿,/static/default.jpg)
- (true,12,周大帅哥,null)
- (true,13,天下第一帅,null)
- 六月 22, 2021 1:00:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
- 信息: Connected to hadoop102:3306 at mysql-bin.000007/154 (sid:5811, cid:11)
- (false,13,天下第一帅,null) 这里就是表示撤回
我们更新一下信息:
- (true,1,Redmi,null)
- (true,2,苹果,/static/default.jpg)
- (true,3,华为,/static/default.jpg)
- (true,4,TCL,/static/default.jpg)
- (true,5,小米,/static/default.jpg)
- (true,6,长粒香,/static/default.jpg)
- (true,7,金沙河,/static/default.jpg)
- (true,8,索芙特,/static/default.jpg)
- (true,9,CAREMiLLE,/static/default.jpg)
- (true,10,欧莱雅,/static/default.jpg)
- (true,11,香奈儿,/static/default.jpg)
- (true,12,周大帅哥,null)
- (true,13,天下第一帅,null)
- 六月 22, 2021 1:00:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
- 信息: Connected to hadoop102:3306 at mysql-bin.000007/154 (sid:5811, cid:11)
- (false,13,天下第一帅,null)
- (false,12,周大帅哥,null) 更新就会产生两条数据。删除旧数据,增添新数据。
- (true,12,周大帅哥,更新一条信息)
Q:
那么flink 的DataStream方式和FlinkSQL方式的区别在于:
DataStream可以监控多库多表
FlinkSQL只能监控单库的单表, 这是因为代码中建表语句的限制。优点:格式更友好点。
可以通过代码的对比:
- DataStream:库的列表、表的列表
- .databaseList("gmall0820flink")
- .tableList("gmall0820flink.base_trademark")
-
-
- FlinkSQL: 库名 表名
- " 'database-name' = 'gmall0820flink', " +
- " 'table-name' = 'base_trademark'" +
这样也是可以的,同时监控两个库,3张表(要首先在mysql的binlog里把库给配置上,开启)。
Q:
修改反序列化,使得获得的流数据更加直观易用。
.deserializer(new StringDebeziumDeserializationSchema())
4.1 代码实现
- public class Flink03_DataStreamWithMySchema {
- public static void main(String[] args) throws Exception {
- System.setProperty("HADOOP_USER_NAME", "atguigu");
-
-
- //1.创建流式执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // 开启Ck
- env.enableCheckpointing(5000L);
- env.getCheckpointConfig().setCheckpointTimeout(5000L); //设置checkpoint超时时间,超过时间则会被丢弃
- env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink1109/ck"));
-
-
- //2.使用CDC的方式读取MySQL变化数据
- DebeziumSourceFunction<String> sourceFunction = MySQLSource
- .<String>builder()
- .hostname("hadoop102")
- .port(3306)
- .username("root")
- .password("123456")
- .databaseList("gmall0820flink")
- .tableList("gmall0820flink.base_trademark")///可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据。注意写表名的时候,要把库名加上,因为flinkcdc可以监控多个库
- .startupOptions(StartupOptions.initial())//启动的选项/模式。这是个枚举类。共有5种。
- // .startupOptions(StartupOptions.earliest())
- // .startupOptions(StartupOptions.latest())
- // .startupOptions(StartupOptions.specificOffset())
- // .startupOptions(StartupOptions.timestamp())
- .deserializer(new MyDebeziumDeserializationSchema())//首先对这里进行修改。自定义反序列化器。把序列化的二进制文件给反序列化
- .build();
-
-
- DataStreamSource<String> StreamSource = env.addSource(sourceFunction);//得到了一个流
-
-
- //3.打印
- StreamSource.print();
- //4.启动
- env.execute();
- }
- }
源码查看:
- .deserializer(new StringDebeziumDeserializationSchema())
- 点进去
-
-
- /**
- * The deserializer used to convert from consumed {@link org.apache.kafka.connect.source.SourceRecord}.
- */
- public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
- this.deserializer = deserializer;
- return this;
- }
-
-
- 继续点进去,发现是一个接口,所以在我们写代码时,就要实现这个接口
- @PublicEvolving
- public interface DebeziumDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
- void deserialize(SourceRecord var1, Collector<T> var2) throws Exception;
-
-
- DebeziumDeserializationSchema<T>中的<T>是传递给Collector<T> 使用,而我们已知Collector<T>是往下游输出数据用的。
- 所以这个T就是输出的数据类型。
- }
当我们不知道怎么写的时候,参考源码中别人是怎么写的
- .deserializer(new StringDebeziumDeserializationSchema()) 点击进去。
-
-
- 可以看见如下:
-
-
-
-
- public class StringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
- private static final long serialVersionUID = -3168848963265670603L;
-
-
- public StringDebeziumDeserializationSchema() {
- }
-
-
- public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
- out.collect(record.toString()); 因为是.toString()的写法,所以导致我们默认情况下输出的数据,不能直接的进行使用。非常不直观
- }
-
-
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO; 所以这里我们可以参考,同样使用这种写法
- }
- }
拿默认反序列化得到的数据:是一个对象不是json,不然我json转json就没意思了
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}}
- ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=1,tm_name=Redmi},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673564}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- public void deserialize(SourceRecord sourceRecord, Collector<String> collector)
-
-
- 我们点进去,找到最后的toString(),可以发现:
-
-
- public String toString() {
- return "SourceRecord{sourcePartition=" + this.sourcePartition + ", sourceOffset=" + this.sourceOffset + "} " + super.toString();
- }
- 发现是写死的字符串。
我们对照着默认监控获取到的数据来写代码,为了获取库名.表名,我们选择.keySchema()
Schema schema = sourceRecord.keySchema();
然后看看schema 能够获取什么东西。然后发现schema并不容易获取数据。
选择这个kafak的struct。老师是试过之后,才选择使用这个struct
想要一次性获取所有的字段名,这样方便通过字段名来获取对应的字段值 。
所以就可以对schema()元数据信息的属性值做一次遍历。
//获取操作类型 老师在这里也是卡主了,不知道怎么写,然后在网上搜索贴子后才推出写法的。有一个专门的解析类。
公司当中是一定会有新东西的。
最终代码:
- package com.alibaba;
-
-
- import com.alibaba.fastjson.JSONObject;
- import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
- import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
- import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
- import io.debezium.data.Envelope;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.runtime.state.filesystem.FsStateBackend;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.CheckpointConfig;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
- import org.apache.kafka.connect.data.Field;
- import org.apache.kafka.connect.data.Schema;
- import org.apache.kafka.connect.data.Struct;
- import org.apache.kafka.connect.source.SourceRecord;
-
-
- /**
- * @author zhouyanjun
- * @create 2021-06-22 17:03
- */
- public class Flink03_DataStreamWithMySchema {
- public static void main(String[] args) throws Exception {
- // System.setProperty("HADOOP_USER_NAME", "alibaba" );
-
-
- //1.创建流式执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // 开启Ck,这块我不设置了,先在本地测试运行下
-
-
- //2.使用CDC的方式读取MySQL变化数据
- DebeziumSourceFunction<String> sourceFunction = MySQLSource
- .<String>builder()
- .hostname("hadoop102")
- .port(3306)
- .username("root")
- .password("123456")
- .databaseList("gmall0820flink")
- .tableList("gmall0820flink.base_trademark")///可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据。注意写表名的时候,要把库名加上,因为flinkcdc可以监控多个库
- .startupOptions(StartupOptions.initial())//启动的选项/模式。这是个枚举类。共有5种。
- // .startupOptions(StartupOptions.earliest())
- // .startupOptions(StartupOptions.latest())
- // .startupOptions(StartupOptions.specificOffset())
- // .startupOptions(StartupOptions.timestamp())
- .deserializer(new MyDebeziumDeserializationSchema())//自定义反序列化器。
- .build();
-
-
- DataStreamSource<String> StreamSource = env.addSource(sourceFunction);//得到了一个流
-
-
- //3.打印
- StreamSource.print();
- //4.启动
- env.execute();
- }
-
-
-
-
- public static class MyDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {//我们要把数据封装成json格式,往下游传递进行处理,是json所以我们设置<T>类型是String
- /**
- * 下面是我们要定义的json格式:起码要有这些信息,我们后续才方便来处理这条数据
- * {
- * "data":"{"id":11,"tm_name":"sasa"}", 首先是data,也就是数据本身,因为有多个字段,所以还是个json
- * "db":"", 数据库名。因为我们要能够获取多库多表的数据
- * "tableName":"", 有了数据库,对应的就要有表名。我后续是对库名.表名的数据进行分流处理。因为不同库的不同表的字段是不一样的,如果放到一个流里进行处理,代码里没法确定具体的类型,只能是object类型。
- * "op":"c u d", 操作类型,来表示增加、更新(修改)、删除
- * "ts":"" 我们需要有时间字段。
- * }
- */
- @Override
- public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //输入的数据 输出的数据
- //我们就是要把SourceRecord sourceRecord 这部分内容解析成json。
- //为了后续能够正常使用数据,我们首先要能正确提取字段出来,也就是说,这个 json是什么样子,我自己要定义下。后续处理数据,我们要什么,就来定义什么
- // Schema schema = sourceRecord.keySchema();
-
-
- //获取主题信息,提取数据库和表名
- String topic = sourceRecord.topic();
- String[] fields = topic.split("\\." );//java中.不能乱用,要加转义符 有点疑问。
- String db = fields[1]; //获取我们想要的库名、表名了
- String tableName = fields[2];
-
-
- //获取Value信息,提取数据本身
- // Object value = sourceRecord.value(); 可以看见是object类型,但根据初始化获得的数据,是stuct类型
- Struct value = (Struct) sourceRecord.value();
- Struct after = value.getStruct("after"); //获得after后面的struct结构
-
-
- //遍历之前先new一个json.使用fastjson的包
- JSONObject jsonObject = new JSONObject();
-
-
- for (Field field : after.schema().fields()) { //并不清楚具体有多少属性值,选择获取元数据信息,然后进行遍历
- Object o = after.get(field);
- jsonObject.put(field.name(),o);
- }
-
-
- //想要获得op,op属于source属性所对应的值里
- //获取操作类型
- Envelope.Operation operation = Envelope.operationFor(sourceRecord);
- //最后要封装为一个大的json发送出去
-
-
- //创建结果JSON
- JSONObject result = new JSONObject();
- result.put("database", db);
- result.put("tableName", tableName);
- result.put("data", jsonObject);
- result.put("op", operation);
-
-
- //输出数据
- collector.collect(result.toJSONString());
-
-
- }
-
-
- @Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
- }
- }
最终代码:
- D:\Develop_software\java\jdk1.8.0_141\bin\java.exe "-javaagent:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\lib\idea_rt.jar=11101:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\bin" -Dfile.encoding=UTF-8 -classpath D:\Develop_software\java\jdk1.8.0_141\jre\lib\charsets.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\deploy.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\access-bridge-64.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\cldrdata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\dnsns.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jaccess.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jfxrt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\localedata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\nashorn.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunec.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunjce_provider.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunmscapi.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunpkcs11.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\zipfs.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\javaws.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jce.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfr.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfxswt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jsse.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\management-agent.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\plugin.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\resources.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\rt.jar;D:\workspace_idea1\BigData0621\gmall-flink-201109\flink-cdc\target\classes;D:\MyWork\Program\RepMaven\org\apache\flink\flink-java\1.12.0\flink-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-core\1.12.0\flink-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-annotations\1.12.0\flink-annotations-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-metrics-core\1.12.0\flink-metrics-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-asm-7\7.1-12.0\flink-shaded-asm-7-7.1-12.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;D:\MyWork\Program\RepMaven\org\objenesis\objenesis\2.1\objenesis-2.1.jar;D:\MyWork\Program\RepMaven\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-compress\1.20\commons-compress-1.20.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;D:\MyWork\Program\RepMaven\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;D:\MyWork\Program\RepMaven\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;D:\MyWork\Program\RepMaven\org\apache\flink\force-shading\1.12.0\force-shading-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-java_2.12\1.12.0\flink-streaming-java_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-file-sink-common\1.12.0\flink-file-sink-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-runtime_2.12\1.12.0\flink-runtime_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-queryable-state-client-java\1.12.0\flink-queryable-state-client-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-hadoop-fs\1.12.0\flink-hadoop-fs-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-io\commons-io\2.7\commons-io-2.7.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-netty\4.1.49.Final-12.0\flink-shaded-netty-4.1.49.Final-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-jackson\2.10.1-12.0\flink-shaded-jackson-2.10.1-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-zookeeper-3\3.4.14-12.0\flink-shaded-zookeeper-3-3.4.14-12.0.jar;D:\MyWork\Program\RepMaven\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-library\2.12.7\scala-library-2.12.7.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-actor_2.12\2.5.21\akka-actor_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\config\1.3.3\config-1.3.3.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-java8-compat_2.12\0.8.0\scala-java8-compat_2.12-0.8.0.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-stream_2.12\2.5.21\akka-stream_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;D:\MyWork\Program\RepMaven\com\typesafe\ssl-config-core_2.12\0.3.7\ssl-config-core_2.12-0.3.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-parser-combinators_2.12\1.1.1\scala-parser-combinators_2.12-1.1.1.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-protobuf_2.12\2.5.21\akka-protobuf_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-slf4j_2.12\2.5.21\akka-slf4j_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\clapper\grizzled-slf4j_2.12\1.3.2\grizzled-slf4j_2.12-1.3.2.jar;D:\MyWork\Program\RepMaven\com\github\scopt\scopt_2.12\3.5.0\scopt_2.12-3.5.0.jar;D:\MyWork\Program\RepMaven\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;D:\MyWork\Program\RepMaven\com\twitter\chill_2.12\0.7.6\chill_2.12-0.7.6.jar;D:\MyWork\Program\RepMaven\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;D:\MyWork\Program\RepMaven\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-guava\18.0-12.0\flink-shaded-guava-18.0-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-clients_2.12\1.12.0\flink-clients_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-optimizer_2.12\1.12.0\flink-optimizer_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;D:\MyWork\Program\RepMaven\mysql\mysql-connector-java\5.1.49\mysql-connector-java-5.1.49.jar;D:\MyWork\Program\RepMaven\com\alibaba\ververica\flink-connector-mysql-cdc\1.2.0\flink-connector-mysql-cdc-1.2.0.jar;D:\MyWork\Program\RepMaven\com\alibaba\fastjson\1.2.75\fastjson-1.2.75.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-planner-blink_2.12\1.12.0\flink-table-planner-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-common\1.12.0\flink-table-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-files\1.12.0\flink-connector-files-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-base\1.12.0\flink-connector-base-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java\1.12.0\flink-table-api-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala_2.12\1.12.0\flink-table-api-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-reflect\2.12.7\scala-reflect-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-compiler\2.12.7\scala-compiler-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-xml_2.12\1.0.6\scala-xml_2.12-1.0.6.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java-bridge_2.12\1.12.0\flink-table-api-java-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala-bridge_2.12\1.12.0\flink-table-api-scala-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-scala_2.12\1.12.0\flink-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-scala_2.12\1.12.0\flink-streaming-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-runtime-blink_2.12\1.12.0\flink-table-runtime-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\janino\3.0.11\janino-3.0.11.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\commons-compiler\3.0.11\commons-compiler-3.0.11.jar;D:\MyWork\Program\RepMaven\org\apache\calcite\avatica\avatica-core\1.17.0\avatica-core-1.17.0.jar;D:\MyWork\Program\RepMaven\org\reflections\reflections\0.9.10\reflections-0.9.10.jar com.alibaba.Flink03_DataStreamWithMySchema
- SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
- SLF4J: Defaulting to no-operation (NOP) logger implementation
- SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
- SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
- SLF4J: Defaulting to no-operation MDCAdapter implementation.
- SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
-
-
- {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"Redmi","id":1},"tableName":"base_trademark"}
- {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"苹果","logo_url":"/static/default.jpg","id":2},"tableName":"base_trademark"}
- {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"华为","logo_url":"/static/default.jpg","id":3},"tableName":"base_trademark"}
- {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"TCL","logo_url":"/static/default.jpg","id":4},"tableName":"base_trademark"}
- {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"小米","logo_url":"/static/default.jpg","id":5},"tableName":"base_trademark"}
- {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"长粒香","logo_url":"/static/default.jpg","id":6},"tableName":"base_trademark"}
- {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"金沙河","logo_url":"/static/default.jpg","id":7},"tableName":"base_trademark"}
- {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"索芙特","logo_url":"/static/default.jpg","id":8},"tableName":"base_trademark"}
- {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"CAREMiLLE","logo_url":"/static/default.jpg","id":9},"tableName":"base_trademark"}
- {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"欧莱雅","logo_url":"/static/default.jpg","id":10},"tableName":"base_trademark"}
- {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"香奈儿","logo_url":"/static/default.jpg","id":11},"tableName":"base_trademark"}
- {"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"周大帅哥","logo_url":"更新一条信息","id":12},"tableName":"base_trademark"}
- 六月 22, 2021 9:55:39 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
- 信息: Connected to hadoop102:3306 at mysql-bin.000009/154 (sid:5499, cid:13)
放入解析json的网站中,我们可以看见成功进行了解析。这样就方便我们后续对数据的处理。成功把输出为json字符串。
自定义反序列化器,既能够实现监控多库多表,又能够输出方便我们后续处理的数据格式。
Q:
这个是错误提示: Caused by: java.lang.ArrayIndexOutOfBoundsException: 3 at org.apache.kafka.connect.data.Struct.get(Struct.java:86) at com.alibaba.Flink03_DataStreamWithMySchema$MyDebeziumDeserializationSchema.deserialize(Flink03_DataStreamWithMySchema.java:93) at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:114) at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:82) at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812) at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 我根据这个定位找到了代码: for (Field field : value.schema().fields()) { //并不清楚具体有多少属性值,选择获取元数据信息,然后进行遍历 Object o = after.get(field); 根据提示,我代码点击到这里了。但其实代码真正写错的地方在上面一行 jsonObject.put(field.name(),o); } 正确的代码是: for (Field field : after.schema().fields()) { 正确的代码写法。 Object o = after.get(field); jsonObject.put(field.name(),o); } 所以,debug的简单方法就是在提示上下行代码处看看,看看有没有逻辑上的漏洞。
打包上传集群后,我每次都从savepoint恢复的,为什么不从checkpoint恢复?
其实可以的,因为savepoint和checkpoint里面的东西都是一样的。
- 在任务被正常Cancel的时候不保留CK。假如是任务出错的话,checkpoint的文件夹还是会保留的。
- env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); 默认是这个参数。也就是cancel任务之后,默认会把checkpoint清除。
- 假如我们正常来升级代码,就会手动cancel掉任务,那此时checkpoint的文件夹就没了。
-
-
- 在任务被正常Cancel的时候 保留CK 。假如是任务出错的话,checkpoint的文件夹还是会保留的。
- env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 保留
-
-
-
-
-
-
- 我们点进RETAIN_ON_CANCELLATION后发现,
- RETAIN_ON_CANCELLATION(false);
不加这行参数有什么现象呢?
cancel掉任务之后,
对比这上下两个图片:
cancel之后,文件夹没了,也就不能从进行数据恢复了。
所以最终的代码:
- package com.alibaba;
-
-
- import com.alibaba.fastjson.JSONObject;
- import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
- import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
- import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
- import io.debezium.data.Envelope;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.runtime.state.filesystem.FsStateBackend;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.CheckpointConfig;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
- import org.apache.kafka.connect.data.Field;
- import org.apache.kafka.connect.data.Schema;
- import org.apache.kafka.connect.data.Struct;
- import org.apache.kafka.connect.source.SourceRecord;
-
-
- /**
- * @author zhouyanjun
- * @create 2021-06-22 17:03
- */
- public class Flink03_DataStreamWithMySchema {
- public static void main(String[] args) throws Exception {
- System.setProperty("HADOOP_USER_NAME", "alibaba" );
-
-
- //1.创建流式执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
-
- // 开启Ck
- env.enableCheckpointing(5000L);
- env.getCheckpointConfig().setCheckpointTimeout(5000L);
- env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink1109/checkpoint"));
-
-
- //在任务被正常Cancel的时候不保留CK
- //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
- //在任务被正常Cancel的时候 保留CK
- env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-
-
-
-
- //2.使用CDC的方式读取MySQL变化数据
- DebeziumSourceFunction<String> sourceFunction = MySQLSource
- .<String>builder()
- .hostname("hadoop102")
- .port(3306)
- .username("root")
- .password("123456")
- .databaseList("gmall0820flink")
- .tableList("gmall0820flink.base_trademark")///可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据。注意写表名的时候,要把库名加上,因为flinkcdc可以监控多个库
- .startupOptions(StartupOptions.initial())//启动的选项/模式。这是个枚举类。共有5种。
- // .startupOptions(StartupOptions.earliest())
- // .startupOptions(StartupOptions.latest())
- // .startupOptions(StartupOptions.specificOffset())
- // .startupOptions(StartupOptions.timestamp())
- .deserializer(new MyDebeziumDeserializationSchema())//自定义反序列化器。把序列化的二进制文件给反序列化
- .build();
-
-
- DataStreamSource<String> StreamSource = env.addSource(sourceFunction);//得到了一个流
-
-
- //3.打印
- StreamSource.print();
- //4.启动
- env.execute();
- }
-
-
-
-
- public static class MyDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {//我们要把数据封装成json格式,往下游传递进行处理,是json所以我们设置<T>类型是String
- /**
- * 下面是我们要定义的json格式:起码要有这些信息,我们后续才方便来处理这条数据
- * {
- * "data":"{"id":11,"tm_name":"sasa"}", 首先是data,也就是数据本身,因为有多个字段,所以还是个json
- * "db":"", 数据库名。因为我们要能够获取多库多表的数据
- * "tableName":"", 有了数据库,对应的就要有表名。我后续是对库名.表名的数据进行分流处理。因为不同库的不同表的字段是不一样的,如果放到一个流里进行处理,代码里没法确定具体的类型,只能是object类型。
- * "op":"c u d", 操作类型,来表示增加、更新(修改)、删除
- * "ts":"" 我们需要有时间字段。
- * }
- */
- @Override
- public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //输入的数据 输出的数据
- //我们就是要把SourceRecord sourceRecord 这部分内容解析成json。
- //为了后续能够正常使用数据,我们首先要能正确提取字段出来,也就是说,这个 json是什么样子,我自己要定义下。后续处理数据,我们要什么,就来定义什么
- // Schema schema = sourceRecord.keySchema();
-
-
- //1 获取主题信息,提取数据库和表名
- String topic = sourceRecord.topic();
- String[] fields = topic.split("\\." );//java中.不能乱用,要加转义符 有点疑问。
- String db = fields[1]; //获取我们想要的库名、表名了
- String tableName = fields[2];
-
-
- //2 获取Value信息,提取数据本身
- // Object value = sourceRecord.value(); 可以看见是object类型,但根据初始化获得的数据,是stuct类型
- Struct value = (Struct) sourceRecord.value();
- Struct after = value.getStruct("after"); //获得after后面的struct结构
-
-
- //遍历之前先new一个json.使用fastjson的包
- JSONObject jsonObject = new JSONObject();
-
-
- for (Field field : after.schema().fields()) { //并不清楚具体有多少属性值,选择获取元数据信息,然后进行遍历
- Object o = after.get(field);
- jsonObject.put(field.name(),o);
- }
-
-
- //想要获得op,op属于source属性所对应的值里
- //3 获取操作类型
- Envelope.Operation operation = Envelope.operationFor(sourceRecord);
- //最后要封装为一个大的json发送出去
-
-
- //创建结果JSON
- JSONObject result = new JSONObject();
- result.put("database", db);
- result.put("tableName", tableName);
- result.put("data", jsonObject);
- result.put("op", operation);
-
-
- //输出数据
- collector.collect(result.toJSONString());
-
-
- }
-
-
- @Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
- }
- }
flink-cdc系列完结。
- END -
本文为原创文章
作者:Eugene
某上市公司数据岗萌新,希望自己以后不会是萌新 哈哈
❤:在这里跟我一起学习技术、职场、人生、原理、健身、摄影、生活等知识吧!
❤: 欢迎点个关注一起学习,进步充实人生。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。