当前位置:   article > 正文

flink-cdc 基础教程 附报错解决 2万字 (一)_windows flinkcdc操作

windows flinkcdc操作

今天分享又来了呀。ღ( ´・ᴗ・` ) 一起学习进步ღゝ◡╹)ノ♡

关注公众号,回复“资料全集”,不定期最新大数据业内资讯。

❤:在这里跟我一起学习技术、职场、人生、原理、健身、摄影、生活等知识吧!

❤: 欢迎点个关注一起学习,进步充实人生。

摘要:保证能够使用flink-cdc的大部分场景

完整教程内容:

  1. 介绍使用flink-cdc的前置知识,MySQL的binlog

  2. 展示部分flink-cdc源码

  3. 实践DataStream方式使用flink-cdc

  4. 实践FlinkSQL方式使用flink-cdc

  5. 自定义反序列化器,使得获得的流数据更加直观易用

  6. 学习过程遇见过的flink-cdc相关报错

加油,好好学习,天天向上~ 

Q:

1 MySQL的binlog

1 什么是binlog

MySQL的二进制日志可以说MySQL最重要的日志了,它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。

一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:

  • 其一:MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves来达到master-slave数据一致的目的。

  • 其二:自然就是数据恢复了,通过使用mysqlbinlog工具来使恢复数据。

二进制日志包括两类文件:

①二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件。

②二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)语句事件。


2 binlog的开启

  • 找到MySQL配置文件的位置

  • Linux: /etc/my.cnf

    如果/etc目录下没有,可以通过locate my.cnf查找位置

  • Windows: \my.ini

  • 在mysql的配置文件下,修改配置

    在[mysqld] 区块,设置/添加  log-bin=mysql-bin

    这个表示binlog日志的前缀是mysql-bin,以后生成的日志文件就是 mysql-bin.123456 的文件后面的数字按顺序生成,每次mysql重启或者到达单个文件大小的阈值时,新生一个文件,按顺序编号。

3 binlog的分类设置

mysql binlog的格式有三种,分别是STATEMENT,MIXED,ROW。

在配置文件中可以选择配置      

binlog_format= statement|mixed|row

三种格式的区别:

1 statement

语句级,binlog会记录每次一执行写操作的语句。

相对row模式节省空间,但是可能产生不一致性,比如

update  tt set create_date=now()

如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。

优点:节省空间

缺点:有可能造成数据不一致。

2 row

行级, binlog会记录每次操作后每行记录的变化。

优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。

缺点:占用较大空间。

3 mixed

statement的升级版,一定程度上解决了,因为一些情况而造成的statement模式不一致问题

默认还是statement,在某些情况下譬如:

当函数中包含 UUID() 时;

包含 AUTO_INCREMENT 字段的表被更新时;

执行 INSERT DELAYED 语句时;

用 UDF 时;

会按照 ROW的方式进行处理

优点:节省空间,同时兼顾了一定的一致性。

缺点:还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。

综合上面对比,想做实时监控分析,选择row格式比较合适。

Q:

2 MySQL的准备

1 创建实时业务数据库

2 导入建表数据

需要SQL脚本的请私信我,提供脚本。

3 修改/etc/my.cnf文件

  1. [myself@hadoop202 module]sudo vim /etc/my.cnf
  2. server-id= 1
  3. log-bin=mysql-bin
  4. binlog_format=row
  5. binlog-do-db=gmall_flink_DW
  6. 注意:binlog-do-db根据自己的情况进行修改,指定具体要同步的数据库名字
  1. [myself@hadoop102 ~]$ vim /etc/my.cnf
  2. # For advice on how to change settings please see
  3. # http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
  4. [mysqld]
  5. #
  6. # Remove leading # and set to the amount of RAM for the most important data
  7. # cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
  8. # innodb_buffer_pool_size = 128M
  9. #
  10. # Remove leading # to turn on a very important data integrity option: logging
  11. # changes to the binary log between backups.
  12. # log_bin
  13. #
  14. # Remove leading # to set options mainly useful for reporting servers.
  15. # The server defaults are faster for transactions and fast SELECTs.
  16. # Adjust sizes as needed, experiment to find the optimal values.
  17. # join_buffer_size = 128M
  18. # sort_buffer_size = 2M
  19. # read_rnd_buffer_size = 2M
  20. datadir=/var/lib/mysql
  21. socket=/var/lib/mysql/mysql.sock
  22. # Disabling symbolic-links is recommended to prevent assorted security risks
  23. symbolic-links=0
  24. log-error=/var/log/mysqld.log
  25. pid-file=/var/run/mysqld/mysqld.pid
  26. server-id= 1
  27. log-bin=mysql-bin
  28. binlog_format=row
  29. binlog-do-db=gmall0820flink

4 重启MySQL使配置生效

sudo systemctl restart mysqld

到/var/lib/mysql目录下查看初始文件大小154

5 模拟生成数据

再次到/var/lib/mysql目录下,查看index文件的大小

可以看见文件大小发生了变化。

Q:

3 Flink-CDC 入门

一、 CDC简介

1.1 什么是CDC

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2 CDC的种类

CDC主要分为基于查询和基于Binlog两种方式,我们主要了解一下这两种之间的区别:

1.3 Flink-CDC

Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors

二、FlinkCDC案例实操

2.1 DataStream方式的应用

2.1.1 导入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>1.12.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-java_2.12</artifactId>
  10. <version>1.12.0</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-clients_2.12</artifactId> #flink的客户端
  15. <version>1.12.0</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.hadoop</groupId>
  19. <artifactId>hadoop-client</artifactId> #hadoop客户端,以后要搞checkpoin
  20. <version>3.1.3</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>mysql</groupId>
  24. <artifactId>mysql-connector-java</artifactId> # mysql驱动
  25. <version>5.1.49</version>
  26. </dependency>
  27. <dependency> #从flink-dcd的官网(github)找着的
  28. <groupId>com.alibaba.ververica</groupId>
  29. <artifactId>flink-connector-mysql-cdc</artifactId>
  30. <version>1.2.0</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>com.alibaba</groupId>
  34. <artifactId>fastjson</artifactId> #封装数据我们还是封装成json数据,因为最后还是要把数据往下游传。后续还要加工处理,加工成json格式容易处理一点。
  35. <version>1.2.75</version>
  36. </dependency>
  37. </dependencies>
  38. <build>
  39. <plugins>
  40. <plugin> #这里这样写,是通过配置现有插件来自定义构建行为
  41. <groupId>org.apache.maven.plugins</groupId>
  42. <artifactId>maven-assembly-plugin</artifactId> # 可以看见,这个是打包插件
  43. <version>3.0.0</version>
  44. <configuration>
  45. <descriptorRefs>
  46. <descriptorRef>jar-with-dependencies</descriptorRef>
  47. </descriptorRefs>
  48. </configuration>
  49. <executions>
  50. <execution>
  51. <id>make-assembly</id>
  52. <phase>package</phase>
  53. <goals>
  54. <goal>single</goal> 把这个single目标绑定到package这个生命周期,以完成具体的构建任务。
  55. </goals>
  56. </execution>
  57. </executions>
  58. </plugin>
  59. </plugins>
  60. </build>

所以我们点击package时,是会生成两个jar包。首先,本身会生成一个jar包;绑定之后,又会执行assemble的single。而assemble打出来的jar包会带ar-with-dependencies这个后缀,这是因为我们给他加了个后缀,这里我们可以添加任意,比如aaa、bbb等。

  1. /**
  2. * Performs an initial snapshot on the monitored database tables upon first startup,
  3. * and continue to read the latest binlog.
  4. */
  5. public static StartupOptions initial() {
  6. return new StartupOptions(StartupMode.INITIAL, null, null, null);
  7. }
  8. 通过注释可以明白,这是有两步操作结合的,①先扫描下监控的这张表,②扫描完以后,立刻切换到最新的binlog,也就是最新的数据那里去。比canel优越,canel只能监控最新的数据。
  1. /**
  2. * Never to perform snapshot on the monitored database tables upon first startup第一次启动,
  3. * just read from the beginning of the binlog.
  4. * This should be used with care, as it is only valid when the binlog is guaranteed to contain
  5. * the entire history of the database.
  6. */
  7. public static StartupOptions earliest() {
  8. return new StartupOptions(StartupMode.EARLIEST_OFFSET, null, null, null);
  9. }
  10. 从最早的数据开始读取,但是这样数据量太大了。应用场景:万一mysql删库了,可以用来恢复数据。
  11. /**
  12. * Never to perform snapshot on the monitored database tables upon first startup第一次启动,
  13. * just read from the end of the binlog which means only have the changes since the connector
  14. * was started.
  15. */
  16. public static StartupOptions latest() {
  17. return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null);
  18. }
  19. 表示只读最新的数据。
  1. /**
  2. * Never to perform snapshot on the monitored database tables upon first startup,
  3. * and directly read binlog from the specified offset.
  4. */
  5. public static StartupOptions specificOffset(String specificOffsetFile, int specificOffsetPos) { 指定一个读取的文件信息,自己指定个任意个位置,决定了这个方法使用比较少。
  6. return new StartupOptions(StartupMode.SPECIFIC_OFFSETS, specificOffsetFile, specificOffsetPos, null);
  7. }
  8. 对比canal支持断点续传。

打开canal的安装文件,是可以寻找记录数据读取位置的文件。

cat meta.dat

这个就是canal能够支持断点续传的保证。如果canal挂掉,重启后重新从position:17635014这个位置继续来读取数据。

  1. /**
  2. * Never to perform snapshot on the monitored database tables upon first startup,
  3. * and directly read binlog from the specified timestamp.
  4. *
  5. * <p>The consumer will traverse the binlog from the beginning and ignore change events whose
  6. * timestamp is smaller than the specified timestamp.
  7. *
  8. * @param startupTimestampMillis timestamp for the startup offsets, as milliseconds from epoch.
  9. */
  10. public static StartupOptions timestamp(long startupTimestampMillis) {
  11. return new StartupOptions(StartupMode.TIMESTAMP, null, null, startupTimestampMillis);
  12. }
  13. binlog日志也是按照时间戳来写的。
  14. 这里就是指定时间戳,从特定的时间来读取数据。
  1. /**
  2. * The deserializer used to convert from consumed {@link org.apache.kafka.connect.source.SourceRecord}.
  3. */
  4. public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
  5. this.deserializer = deserializer;
  6. return this;
  7. }
  8. 紫色部分点进去:
  9. @PublicEvolving
  10. public interface DebeziumDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
  11. void deserialize(SourceRecord var1, Collector<T> var2) throws Exception;
  12. }
  13. 可以发现,这是一个接口。
  14. ctrl+h可以看见有3个实现类。

  1. package com.alibaba;
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  3. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
  5. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. public class Flink01_DataStream {
  9. public static void main(String[] args) throws Exception {
  10. //1.创建流式执行环境
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. env.setParallelism(1);
  13. //2.使用CDC的方式读取MySQL变化数据
  14. DebeziumSourceFunction<String> sourceFunction = MySQLSource
  15. .<String>builder()
  16. .hostname("hadoop102")
  17. .port(3306)
  18. .username("root")
  19. .password("123456")
  20. .databaseList("gmallflink")
  21. .tableList("gmall0820flink.base_trademark")///可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据。注意写表名的时候,要把库名加上,因为flinkcdc可以监控多个库
  22. .startupOptions(StartupOptions.initial())//启动的选项/模式。这是个枚举类。共有5种。
  23. // .startupOptions(StartupOptions.earliest())
  24. // .startupOptions(StartupOptions.latest())
  25. // .startupOptions(StartupOptions.specificOffset())
  26. // .startupOptions(StartupOptions.timestamp())
  27. .deserializer(new StringDebeziumDeserializationSchema())//反序列化器。把序列化的二进制文件给反序列化
  28. .build();
  29. DataStreamSource<String> StreamSource = env.addSource(sourceFunction);//得到了一个流
  30. //3.打印
  31. StreamSource.print();
  32. //4.启动
  33. env.execute();
  34. }
  35. }

上面代码是为了能够在idea里面运行写的,

后续为了能够在hadoop集群中运行,重新修改代码:

  1. package com.alibaba;
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  3. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
  5. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  6. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  7. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. public class Flink01_DataStream {
  10. public static void main(String[] args) throws Exception {
  11. System.setProperty("HADOOP_USER_NAME", "myself"); 保证权限问题.
  12. //1.创建流式执行环境
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. env.setParallelism(1);
  15. // 开启Ck
  16. env.enableCheckpointing(5000L); 设置间隔,多久开启一次checkpoint.
  17. env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink1109/ck"));
  18. //2.使用CDC的方式读取MySQL变化数据
  19. DebeziumSourceFunction<String> sourceFunction = MySQLSource
  20. .<String>builder()
  21. .hostname("hadoop102")
  22. .port(3306)
  23. .username("root")
  24. .password("123456")
  25. .databaseList("gmall0820flink")
  26. .tableList("gmall0820flink.base_trademark")//可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据。注意写表名的时候,要把库名加上,因为flinkcdc可以监控多个库
  27. .startupOptions(StartupOptions.initial())//启动的选项/模式。这是个枚举类。共有5种。
  28. // .startupOptions(StartupOptions.earliest())
  29. // .startupOptions(StartupOptions.latest())
  30. // .startupOptions(StartupOptions.specificOffset())
  31. // .startupOptions(StartupOptions.timestamp())
  32. .deserializer(new StringDebeziumDeserializationSchema())//反序列化器。把序列化的二进制文件给反序列化
  33. .build();
  34. DataStreamSource<String> StreamSource = env.addSource(sourceFunction);//得到了一个流
  35. //3.打印
  36. StreamSource.print();
  37. //4.启动
  38. env.execute();
  39. }
  40. }

首先是mysql中base_trademark品牌表,字段名比较少。

输出结果:

  1. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  2. SLF4J: Defaulting to no-operation (NOP) logger implementation
  3. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  4. log4j:WARN No appenders could be found for logger (org.apache.flink.shaded.netty4.io.netty.util.internal.logging.InternalLoggerFactory).
  5. log4j:WARN Please initialize the log4j system properly.
  6. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
  7. SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
  8. SLF4J: Defaulting to no-operation MDCAdapter implementation.
  9. SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details. 可以看见有11个匹配项。
  10. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=1,tm_name=Redmi},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673564}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  11. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=2,tm_name=苹果,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673567}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  12. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=3,tm_name=华为,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673568}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  13. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=4}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=4,tm_name=TCL,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673568}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  14. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=5}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=5,tm_name=小米,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673568}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  15. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=6}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=6,tm_name=长粒香,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673568}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  16. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=7}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=7,tm_name=金沙河,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673568}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  17. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=8}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=8,tm_name=索芙特,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673568}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  18. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=9}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=9,tm_name=CAREMiLLE,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673568}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  19. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=10}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=10,tm_name=欧莱雅,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673569}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  20. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=11}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=11,tm_name=香奈儿,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673569}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  21. 六月 17, 2021 11:27:53 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
  22. 信息: Connected to hadoop102:3306 at mysql-bin.000004/154 (sid:6368, cid:7) 连接到最后一个binlog文件mysql-bin.000004,切换到最新的位置154

此时我在mysql这张表中插入一条数据:

idea展示的数据:

  1. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
  2. SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
  3. SLF4J: Defaulting to no-operation MDCAdapter implementation.
  4. SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
  5. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=1,tm_name=Redmi},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623944122261}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  6. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=2,tm_name=苹果,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623944122265}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  7. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=3,tm_name=华为,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623944122265}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  8. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=4}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=4,tm_name=TCL,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623944122265}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  9. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=5}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=5,tm_name=小米,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623944122265}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  10. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=6}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=6,tm_name=长粒香,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623944122265}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  11. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=7}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=7,tm_name=金沙河,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623944122265}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  12. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=8}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=8,tm_name=索芙特,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623944122265}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  13. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=9}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=9,tm_name=CAREMiLLE,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623944122265}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  14. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=10}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=10,tm_name=欧莱雅,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623944122265}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  15. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=11}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=11,tm_name=香奈儿,logo_url=/static/default.jpg},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623944122265}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  16. 六月 17, 2021 11:35:22 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
  17. 信息: Connected to hadoop102:3306 at mysql-bin.000004/154 (sid:6336, cid:10) 下面就是新监控到的数据。可以拉到后面看数据的具体展示形式。op=c,表示操作类型是create新增。
  18. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1623944241, file=mysql-bin.000004, pos=219, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=12}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=12,tm_name=阿里巴巴},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1623944241000,db=gmall0820flink,table=base_trademark,server_id=1,file=mysql-bin.000004,pos=374,row=0,thread=2},op=c,ts_ms=1623944241800}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

当我修改数据时,输出的数据:op=u。update,操作类型是:更新。

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1623944614, file=mysql-bin.000004, pos=528, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=12}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{before=Struct{id=12,tm_name=阿里巴巴},after=Struct{id=12,tm_name=阿里巴巴,logo_url=aaaaaaaaa},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1623944614000,db=gmall0820flink,table=base_trademark,server_id=1,file=mysql-bin.000004,pos=683,row=0,thread=2},op=u,ts_ms=1623944614090}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

当我删除数据时:op=d。delete,操作类型是删除。

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1623944868, file=mysql-bin.000004, pos=872, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=12}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{before=Struct{id=12,tm_name=阿里巴巴,logo_url=aaaaaaaaa},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1623944868000,db=gmall0820flink,table=base_trademark,server_id=1,file=mysql-bin.000004,pos=1027,row=0,thread=2},op=d,ts_ms=1623944868978}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

flinkcdc的位置信息保存到ck里面。通过状态来保存的。

如果出现这种场景:读数据的任务挂掉以后,我重启任务,我希望能够接着后面继续读取数据。这个时候就要结合savepoint和checkpoint来使用了。得开启checkpoint,还得做savepoint备份。

重启的时候,使用命令:

  1. SavePoint和CK恢复任务
  2. //启动任务
  3. bin/flink -c com.mysel.WordCount xxx.jar
  4. //保存点(只能手动)
  5. bin/flink savepoint -m hadoop102:8081 JobId hdfs://hadoop102:8020/flink/save
  6. //关闭任务并从保存点恢复任务 通过-s 加上指定的文件目录
  7. bin/flink -s hdfs://hadoop102:8020/flink/save/... -m hadoop102:8081 -c com.mysel.WordCount xxx.jar
  8. //从CK位置恢复数据
  9. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  10. bin/flink run -s hdfs://hadoop102:8020/flink/ck/Jobid/chk-960 -m hadoop102:8081 -c com.mysel.WordCount xxx.jar

后续的优化:

是可以看见输出的数据是非常不人性化的,个人是没法从里面简洁的直接获取数据。所以后续要进行代码的修改。

而且明显是一个对象的tostring的打印出来的结果。而且里面包含了两个对象。

举例说明,以下是监控到的一条数据:

这另个对象是用空格拼接的。这种方式不友好,我们需要自定义。

  1. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}}
  2. ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=1,tm_name=Redmi},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673564}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  1. .deserializer(new StringDebeziumDeserializationSchema())//反序列化器。把序列化的二进制文件给反序列化
  2. .build();
  3. 紫色部分点进去。
  4. public class StringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
  5. private static final long serialVersionUID = -3168848963265670603L;
  6. public StringDebeziumDeserializationSchema() {
  7. }
  8. public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
  9. out.collect(record.toString());
  10. }
  11. public TypeInformation<String> getProducedType() {
  12. return BasicTypeInfo.STRING_TYPE_INFO;
  13. }
  14. }
  15. 点进紫色的toString进去看看;
  16. public String toString() {
  17. return "SourceRecord{sourcePartition=" + this.sourcePartition + ", sourceOffset=" + this.sourceOffset + "} " + super.toString();
  18. }
  19. 点进紫色的super.toString():
  20. public String toString() {
  21. return "ConnectRecord{topic='" + this.topic + '\'' + ", kafkaPartition=" + this.kafkaPartition + ", key=" + this.key + ", keySchema=" + this.keySchema + ", value=" + this.value + ", valueSchema=" + this.valueSchema + ", timestamp=" + this.timestamp + ", headers=" + this.headers + '}';
  22. }
  23. 所以输出的数据是两个对象拼接起来的。
  24. 所以我们要重写下面这个方法:
  25. public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
  26. out.collect(record.toString());
  27. }

  1. [myself@hadoop102 flink-standalone]$ ll
  2. 总用量 110964
  3. drwxr-xr-x. 2 myself myself 4096 6月 20 17:15 bin
  4. drwxr-xr-x. 2 myself myself 4096 6月 20 17:15 conf
  5. drwxr-xr-x. 7 myself myself 4096 6月 20 17:14 examples
  6. -rw-r--r--. 1 myself myself 113012854 6月 20 16:54 flink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar 把这个jar包上传到集群中.
  7. drwxr-xr-x. 2 myself myself 4096 6月 20 17:15 lib
  8. -rw-r--r--. 1 myself myself 11357 6月 20 17:15 LICENSE
  9. drwxr-xr-x. 2 myself myself 4096 6月 20 17:14 licenses
  10. drwxr-xr-x. 2 myself myself 4096 6月 20 17:51 log
  11. -rw-r--r--. 1 myself myself 559112 6月 20 17:14 NOTICE
  12. drwxr-xr-x. 3 myself myself 4096 6月 20 17:15 opt
  13. drwxr-xr-x. 10 myself myself 4096 6月 20 17:14 plugins
  14. -rw-r--r--. 1 myself myself 1309 6月 20 17:14 README.txt

如果想用不带依赖的jar包上传到集群中运行也是可以的.

只用在下面的文件夹中,把flink没有的jar包下载好后放入到这个文件夹里.

  1. [myself@hadoop102 flink-standalone]$ ll lib
  2. 总用量 195856
  3. -rw-r--r--. 1 myself myself 91553 6月 20 17:15 flink-csv-1.12.0.jar
  4. -rw-r--r--. 1 myself myself 114120165 6月 20 17:14 flink-dist_2.11-1.12.0.jar
  5. -rw-r--r--. 1 myself myself 136663 6月 20 17:15 flink-json-1.12.0.jar
  6. -rw-r--r--. 1 myself myself 7709741 6月 20 17:14 flink-shaded-zookeeper-3.4.14.jar
  7. -rw-r--r--. 1 myself myself 36147819 6月 20 17:14 flink-table_2.11-1.12.0.jar
  8. -rw-r--r--. 1 myself myself 40286358 6月 20 17:15 flink-table-blink_2.11-1.12.0.jar
  9. -rw-r--r--. 1 myself myself 67114 6月 20 17:14 log4j-1.2-api-2.12.1.jar
  10. -rw-r--r--. 1 myself myself 276771 6月 20 17:14 log4j-api-2.12.1.jar
  11. -rw-r--r--. 1 myself myself 1674433 6月 20 17:14 log4j-core-2.12.1.jar
  12. -rw-r--r--. 1 myself myself 23518 6月 20 17:14 log4j-slf4j-impl-2.12.1.jar

集群中运行jar包

[myself@hadoop102 flink-standalone]$ bin/flink run -c com.myself.Flink01_DataStream flink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar  命令行参数 全类名 jar包名

Q:

报错及解决

我运行上面命令失败后点击查看日志.

发现下面显示的日志更精确,要比在xshell窗口中执行命令报错提示的信息更精准.我按照下面的提升信息,谷歌搜索之后,定位问题,现在来修改.

  1. Job failed during initialization of JobManager
  2. org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.
  3. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:463)
  4. at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
  5. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  6. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  7. at java.lang.Thread.run(Thread.java:748)
  8. Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.
  9. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:316)
  10. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:231)
  11. at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:495)
  12. at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:347)
  13. at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:291)
  14. at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:256)
  15. at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:238)
  16. at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
  17. at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:108)
  18. at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:323)
  19. at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:310)
  20. at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:96)
  21. at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:41)
  22. at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:141)
  23. at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:80)
  24. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:450)
  25. ... 4 more
  26. Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'这个表示不识别hdfs. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
  27. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:491)
  28. at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
  29. at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
  30. at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess.<init>(FsCheckpointStorageAccess.java:64)
  31. at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:501)
  32. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:313)
  33. ... 19 more
  34. Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
  35. at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
  36. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:487)
  37. ... 24 more

报错一:

当我代码中不设置:

System.setProperty("HADOOP_USER_NAME", "myself");

出现权限相关的报错信息:

Caused by: org.apache.hadoop.security.AccessControlExceptionPermission denied: user=ÖÜÑå¾因为我windows用户是中文名,所以就显示为乱码, access=WRITE, inode="/flink/ck":myself:supergroup:drwxr-xr-x

所以代码中要加上.

报错二:

当我在idea运行代码时,在集群中checkpoint的路径中会存在状态的元数据信息,但是打包上传集群就报错,报错信息如上:

解决思路:

  1. Hadoop is not in the classpath/dependencies.
  2. flink想要操作hadoop相关的组件,比如hdfs.是需要有hadoop一系列相关的jar包、依赖信息才可以.这个系列jar包 依赖信息不需要写在flink的lib文件夹里,
  3. 只需要在/etc/profile.d/my_env.sh(后面是我自己新建的文件,或者就直接/etc/profile文件里也行)中配置环境变量就行:
  4. export HADOOP_CLASSPATH=`hadoop classpath`
  5. 表示hadoop的类路径,执行的命令,
  6. [myself@hadoop102 ~]$ hadoop classpath
  7. /opt/module/hadoop-3.1.3/etc/hadoop:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn/*:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/accessors-smart-1.2.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/animal-sniffer-annotations-1.17.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/asm-5.0.4.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/audience-annotations-0.5.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/avro-1.7.7.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/checker-qual-2.5.2.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/commons-beanutils-1.9.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/commons-cli-1.2.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/commons-codec-1.11.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/commons-collections-3.2.2.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/commons-compress-1.18.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/commons-configuration2-2.1.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/commons-io-2.5.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/commons-lang-2.6.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/commons-lang3-3.4.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/commons-logging-1.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/commons-math3-3.1.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/commons-net-3.6.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/curator-client-2.13.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/curator-framework-2.13.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/curator-recipes-2.13.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/error_prone_annotations-2.2.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/failureaccess-1.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/gson-2.2.4.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/hadoop-annotations-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/hadoop-auth-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/htrace-core4-4.1.0-incubating.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/httpclient-4.5.2.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/httpcore-4.4.4.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/j2objc-annotations-1.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jackson-annotations-2.7.8.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jackson-core-2.7.8.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jackson-core-asl-1.9.13.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jackson-databind-2.7.8.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jackson-jaxrs-1.9.13.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jackson-mapper-asl-1.9.13.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jackson-xc-1.9.13.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/javax.servlet-api-3.1.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jaxb-api-2.2.11.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jaxb-impl-2.2.3-1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jcip-annotations-1.0-1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jersey-core-1.19.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jersey-json-1.19.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jersey-server-1.19.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jersey-servlet-1.19.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jettison-1.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jetty-http-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jetty-io-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jetty-security-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jetty-server-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jetty-servlet-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jetty-util-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jetty-webapp-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jetty-xml-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jsch-0.1.54.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/json-smart-2.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jsp-api-2.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jsr305-3.0.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jsr311-api-1.1.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/jul-to-slf4j-1.7.25.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/kerb-admin-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/kerb-client-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/kerb-common-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/kerb-core-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/kerb-crypto-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/kerb-identity-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/kerb-server-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/kerb-simplekdc-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/kerb-util-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/kerby-asn1-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/kerby-config-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/kerby-pkix-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/kerby-util-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/kerby-xdr-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/log4j-1.2.17.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/metrics-core-3.2.4.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/netty-3.10.5.Final.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/nimbus-jose-jwt-4.41.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/paranamer-2.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/protobuf-java-2.5.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/re2j-1.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-api-1.7.25.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/snappy-java-1.0.5.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/stax2-api-3.1.4.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/token-provider-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/woodstox-core-5.0.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/zookeeper-3.4.13.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-common-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-common-3.1.3-tests.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-kms-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-nfs-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/common/jdiff:/opt/module/hadoop-3.1.3/share/hadoop/common/lib:/opt/module/hadoop-3.1.3/share/hadoop/common/sources:/opt/module/hadoop-3.1.3/share/hadoop/common/webapps:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/accessors-smart-1.2.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/animal-sniffer-annotations-1.17.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/asm-5.0.4.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/audience-annotations-0.5.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/avro-1.7.7.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/checker-qual-2.5.2.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/commons-beanutils-1.9.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/commons-cli-1.2.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/commons-codec-1.11.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/commons-collections-3.2.2.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/commons-compress-1.18.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/commons-configuration2-2.1.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/commons-daemon-1.0.13.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/commons-io-2.5.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/commons-lang-2.6.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/commons-lang3-3.4.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/commons-logging-1.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/commons-math3-3.1.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/commons-net-3.6.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/curator-client-2.13.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/curator-framework-2.13.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/curator-recipes-2.13.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/error_prone_annotations-2.2.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/failureaccess-1.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/gson-2.2.4.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/guava-27.0-jre.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/hadoop-annotations-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/hadoop-auth-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/htrace-core4-4.1.0-incubating.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/httpclient-4.5.2.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/httpcore-4.4.4.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/j2objc-annotations-1.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jackson-annotations-2.7.8.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jackson-core-2.7.8.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jackson-core-asl-1.9.13.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jackson-databind-2.7.8.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jackson-jaxrs-1.9.13.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jackson-mapper-asl-1.9.13.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jackson-xc-1.9.13.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/javax.servlet-api-3.1.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jaxb-api-2.2.11.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jaxb-impl-2.2.3-1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jcip-annotations-1.0-1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jersey-core-1.19.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jersey-json-1.19.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jersey-server-1.19.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jersey-servlet-1.19.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jettison-1.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jetty-http-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jetty-io-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jetty-security-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jetty-server-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jetty-servlet-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jetty-util-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jetty-util-ajax-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jetty-webapp-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jetty-xml-9.3.24.v20180605.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jsch-0.1.54.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/json-simple-1.1.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/json-smart-2.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jsr305-3.0.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/jsr311-api-1.1.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/kerb-admin-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/kerb-client-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/kerb-common-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/kerb-core-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/kerb-crypto-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/kerb-identity-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/kerb-server-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/kerb-simplekdc-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/kerb-util-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/kerby-asn1-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/kerby-config-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/kerby-pkix-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/kerby-util-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/kerby-xdr-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/leveldbjni-all-1.8.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/log4j-1.2.17.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/netty-3.10.5.Final.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/netty-all-4.0.52.Final.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/nimbus-jose-jwt-4.41.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/okhttp-2.7.5.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/okio-1.6.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/paranamer-2.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/protobuf-java-2.5.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/re2j-1.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/snappy-java-1.0.5.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/stax2-api-3.1.4.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/token-provider-1.0.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/woodstox-core-5.0.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/zookeeper-3.4.13.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/hadoop-hdfs-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/hadoop-hdfs-3.1.3-tests.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/hadoop-hdfs-client-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/hadoop-hdfs-client-3.1.3-tests.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/hadoop-hdfs-httpfs-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/hadoop-hdfs-native-client-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/hadoop-hdfs-native-client-3.1.3-tests.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/hadoop-hdfs-nfs-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/hadoop-hdfs-rbf-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/hadoop-hdfs-rbf-3.1.3-tests.jar:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/jdiff:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/sources:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/webapps:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/hamcrest-core-1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/junit-4.11.jar:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-app-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-common-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-hs-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-hs-plugins-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-nativetask-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-shuffle-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-uploader-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/jdiff:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib-examples:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/sources:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/aopalliance-1.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/dnsjava-2.1.7.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/ehcache-3.3.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/fst-2.50.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/geronimo-jcache_1.0_spec-1.0-alpha-1.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/guice-4.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/guice-servlet-4.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/HikariCP-java7-2.4.12.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/jackson-jaxrs-base-2.7.8.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/jackson-jaxrs-json-provider-2.7.8.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/jackson-module-jaxb-annotations-2.7.8.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/java-util-1.9.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/javax.inject-1.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/jersey-client-1.19.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/jersey-guice-1.19.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/json-io-2.5.1.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/metrics-core-3.2.4.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/objenesis-1.0.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/snakeyaml-1.16.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/swagger-annotations-1.5.4.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-api-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-applications-unmanaged-am-launcher-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-client-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-common-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-registry-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-server-applicationhistoryservice-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-server-common-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-server-nodemanager-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-server-resourcemanager-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-server-router-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-server-sharedcachemanager-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-server-tests-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-server-timeline-pluginstorage-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-server-web-proxy-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-services-api-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/hadoop-yarn-services-core-3.1.3.jar:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib:/opt/module/hadoop-3.1.3/share/hadoop/yarn/sources:/opt/module/hadoop-3.1.3/share/hadoop/yarn/test:/opt/module/hadoop-3.1.3/share/hadoop/yarn/timelineservice:/opt/module/hadoop-3.1.3/share/hadoop/yarn/webapps:/opt/module/hadoop-3.1.3/share/hadoop/yarn/yarn-service-examples
  8. 可以看见,就是一堆jar包的地址.
  9. 然后 source /etc/profile.d/my_env.sh,
  10. 分发下 xsync /etc/profile.d/my_env.sh,在hadoop103 hadoop104中进行source
  11. 然后我在执行命令行操作.但是还是报错了.
  12. 我就把hadoop集群和zookeeper集群 standalone集群重启启动了下,再执行命令,就可以正常运行了.
  13. 注意:
  14. 需要提前保证HAOOP_HOME环境变量配置成功
  15. 分发到其他节点

正常运行:

  1. [myself@hadoop102 flink-standalone]$ bin/flink run -c com.myself.Flink01_DataStream flink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar
  2. SLF4J: Class path contains multiple SLF4J bindings.
  3. SLF4J: Found binding in [jar:file:/opt/module/flink-standalone/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  4. SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  5. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  6. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
  7. Job has been submitted with JobID 713d0721e83be877c97f0a846cf6af85

未完待续。

- END -

本文为原创文章

作者:Eugene
某上市公司数据岗萌新,希望自己以后不会是萌新 哈哈

❤:在这里跟我一起学习技术、职场、人生、原理、健身、摄影、生活等知识吧!

❤: 欢迎点个关注一起学习,进步充实人生。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号