当前位置:   article > 正文

flink 1.13.x集成 CDC 2.3.0_flink cdc2.3.0支持flink 1.13 吗

flink cdc2.3.0支持flink 1.13 吗

使用 flink 1.13.0 和 CDC 2.3.0 的 demo

public class TMySqlCDC {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(1);
        Properties dbProps = new Properties();
        dbProps.put("database.serverTimezone", "UTC");
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("192.168.18.126")
                .port(3306)
                .databaseList("xn_test") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
                .tableList("xn_test.hl_t")// set captured table
                .includeSchemaChanges(true)
                .username("root")
                .password("123456")
                .debeziumProperties(dbProps)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_cdc")
                .print();

        env.execute();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

踩坑一

Caused by: org.apache.flink.table.api.ValidationException: The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
  • 1

原因

https://github.com/ververica/flink-cdc-connectors/pull/1407

简单讲,Flink 运行机器时区和Mysql Server 时区不匹配,database.serverTimezone 配置配置影响
具体代码可以查看CDC com.ververica.cdc.connectors.mysql.MySqlValidator#checkTimeZone

解决办法

手动指定下Flink 运行的时区,和连接的数据库时区信息保持一致

dbProps.put("database.serverTimezone", "UTC");
  • 1

踩坑二

Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder
  • 1

原因

flink 为了解决包冲突,对一些通用的工具包做了shaded,传送门flink-shaded

flink-cdc-connectors 2.3.0 版本引用了 flink 1.16.0,这个版本的flink使用了 flink-shaded-guava:30.1.1-jre-15.0版本。
而 flink 1.13.0 使用的是 flink-shaded-guava:18.0-13.0 版本,两个版本的 shaded package 不一样引起的

解决

那既然是 shaded 引用,在 cdc 中再次 shaded 一下,让 cdc 里面引用到的 guava30 变为 guava18
clone cdc,基于tag release-2.3.0 创建分支,修改 flink-cdc-connectors 的 pom.xml 文件,引入cdc 后排除 guava 依赖。

# 编译基于已经 release 的tag
git branch supos/release-2.3.0 release-2.3.0
git checkout supos/release-2.3.0
  • 1
  • 2
  • 3

修改shaded配置

<!-- 在 maven-shade-plugin 插件中添加configuration -->
<relocation>
    <pattern>org.apache.flink.shaded.guava30</pattern>
    <shadedPattern>org.apache.flink.shaded.guava18</shadedPattern>
</relocation>
  • 1
  • 2
  • 3
  • 4
  • 5

使用 mvn version 修改版本

# 要发布到公司内部仓库,修改为 snapshot 版本
 mvn versions:set -DnewVersion=supos-2.3.0-SNAPSHOT -DgenerateBackupPoms=false
  • 1
  • 2

执行编译

# -Drat.skip=true 发布审计插件,文件需要有 license 头,可选  [-T 8 # 多线程编译]
mvn clean install -Drat.skip=true -DskipTests -T 8
# 将源码打包到jar 包,发布到内部私有仓库上
mvn clean source:jar install deploy -Drat.skip=true -DskipTests -T 8

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  01:48 min
[INFO] Finished at: 2023-03-03T14:14:05+08:00
[INFO] ------------------------------------------------------------------------
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

引入cdc

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>${flink.cdc.version}</version>
    <exclusions>
        <exclusion>
            <artifactId>flink-shaded-guava</artifactId>
            <groupId>org.apache.flink</groupId>
        </exclusion>
    </exclusions>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

run ~

参考
maven-shaded-plugin
maven-version-plugin
maven-source-plugin
maven snapshot

maven 插件生命周期

flink-cdc 2.4.0 发布了,要适配 flink1.13,研究了一下 maven 生命周期

cdc 2.4.0 开始,有多个模块使用了 guava30,如果要挨个模块中配置就不太方便了,通过maven 父子模块管理,在父模块中统一处理

<!-- flink-cdc-connectors pom -->
<configuration>
      <relocations>
          <relocation>
              <pattern>org.apache.flink.shaded.guava30</pattern>
              <shadedPattern>org.apache.flink.shaded.guava18</shadedPattern>
          </relocation>
      </relocations>
</configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

注意,在子模块中,要引用一下父工程

    <parent>
        <artifactId>flink-cdc-connectors</artifactId>
        <groupId>com.ververica</groupId>
        <version>{version}</version>
    </parent>
  • 1
  • 2
  • 3
  • 4
  • 5

构建

mvn clean install -Drat.skip=true -DskipTests -T 8
  • 1

通过观察,插件是有生命周期的,如果在父模块中配置过 shade,把 guava30 -> guava18,子模块中配置 guava30 - guava11 是不生效的,需要配置成 guava18 - > guava11 才能生效。

父模块的配置不会影响到子模块,有些统一处理的方案可以在父模块中直接配置为全局

给自己定个flag,没事了写点东西,记录下工作。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/473114
推荐阅读
相关标签
  

闽ICP备14008679号