当前位置:   article > 正文

flink-cdc 环境搭建 version 1.14.3_flink 1.14.3 cdc mysql

flink 1.14.3 cdc mysql

flink-connector-cdc 独立于flink项目,顾名思义集成的时候要注意版本,注意版本,注意版本
flink-1.14.3 cdc jar 免费下载

1.环境

  • java: jdk8+
  • scala: 1.11 或 1.12看你的flink和cdc依赖的scala
  • flink: 1.14.3
  • mysql: 8.0
  • flink-cdc

1.1 flink-sql环境:

在这里插入图片描述
如上flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar即为flink1.14的依赖,需要在flink_home/lib/下面添加该依赖。
这个依赖需要自己编译,官方提供的只到2.1.1(在2022-03-11 17:05还没最新的)。方法如下:
官方提供的方法:flink-cdc readme

  1. 直接下载提供的jar
  2. 自己编译
git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests
  • 1
  • 2
  • 3

编译好后直接用啥取啥,编译过程会下亿堆插件,so慢
在这里插入图片描述接下来进入flink的/bin目录启动集群

 start-cluster.sh
  • 1

查看
点这里看flink-web-ui
在这里插入图片描述### demo flink-sql cdc mysql 数据
需要开启mysql的binlog,并且创建的表要有主键
3. 创建mysql表:

-- mysql
show databases;
use test;
create table if not exists  test (
	id int primary key auto_increment,
	name varchar(32)
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  1. 启动flink-sql client,创建flink流表
 sql-client.sh 
  • 1
-- flink sql
CREATE TABLE test (
   id INT,
   name STRING,
   PRIMARY KEY (id) NOT ENFORCED
 ) WITH (
   'connector' = 'mysql-cdc',
   'hostname' = 'localhost',
   'port' = '3306',
   'username' = '用户名',
   'password' = '密码',
   'database-name' = '数据库名',
   'table-name' = '表名'
 );
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  1. mysql数据库插入数据
-- mysql
insert into test values(0, "pjs");
insert into test values(0, "jyl");
  • 1
  • 2
  • 3

6.查看flink-sql输出

-- flink sql
select * from test;
  • 1
  • 2

在这里插入图片描述

1.2 flink-stream

pom

<properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <flink.version>1.14.3</flink.version>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <flink.cdc.version>2.2-SNAPSHOT</flink.cdc.version>

    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flink.cdc.version}</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>${flink.cdc.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

demo:
gitee

 public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("test") // set captured database
                .tableList("test.test") // set captured table
                .username("kuro")
                .password("pwdsdfsa;_=sfds")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JString
                .build();

        Configuration configuration = Configuration.fromMap(Map.of("rest.port", "10010"));

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        // enable checkpoint
        env.enableCheckpointing(3000);

        env
                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 4 parallel source tasks
                .setParallelism(4)
                .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("Print MySQL Snapshot + Binlog");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

输出:

{"before":null,"after":{"id":2,"name":"ljy"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"test","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1646993221523,"transaction":null}
{"before":null,"after":{"id":1,"name":"kuro"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"test","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1646993221522,"transaction":null}
{"before":null,"after":{"id":3,"name":"liyouqiang"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"test","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1646993221524,"transaction":null}
3月 11, 2022 6:07:05 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to localhost:3306 at mysql-bin.000001/3592 (sid:5536, cid:33)
{"before":{"id":1,"name":"kuro"},"after":{"id":1,"name":"LJY"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1646993370000,"snapshot":"false","db":"test","sequence":null,"table":"test","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":3813,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1646993370264,"transaction":null}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

flink-cdc 第一次会全量同步数据,其后就会增量进行同步

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/381105
推荐阅读
相关标签
  

闽ICP备14008679号