当前位置:   article > 正文

Flink-cdc 同步mysql数据_flink-cdc mysql数据库迁移

flink-cdc mysql数据库迁移

下载地址:https://github.com/ververica/flink-cdc-connectors/releases

这里下载2.2.0版本:https://github.com/ververica/flink-cdc-connectors/archive/refs/tags/release-2.2.0.zip

下载完成后,在 pom.xml 中找到这一项:flink.version ,修改 flink 版本号为:

<flink.version>1.13.6</flink.version>
  • 1

自行打包编译

通过flink-cdc 同步mysql数据

1、flink集群准备

wget http://mirrors.cloud.tencent.com/apache/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz
tar zxvf flink-1.13.6-bin-scala_2.11.tgz
  • 1
  • 2

将打包好的 flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar 包放入到flink的lib目录下
启动集群

cd flink-1.13.6
bin/start-cluster.sh
  • 1
  • 2

2、mysql环境准备

CREATE DATABASE mydb;

USE mydb;

CREATE TABLE products (
       id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
       name VARCHAR(255) NOT NULL,
       description VARCHAR(512)
     );

ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
     VALUES (default,"scooter","Small 2-wheel scooter"),
            (default,"car battery","12V car battery"),
            (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
            (default,"hammer","12oz carpenter's hammer"),
            (default,"hammer","14oz carpenter's hammer"),
            (default,"hammer","16oz carpenter's hammer"),
            (default,"rocks","box of assorted rocks"),
            (default,"jacket","water resistent black wind breaker"),
            (default,"spare tire","24 inch spare tire");

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

3、启动flinksql client

cd /opt/flink-1.13.6
bin/sql-client.sh
  • 1
  • 2

4、在flinksql client中执行命令

Flink SQL> SET execution.checkpointing.interval = 3s
Flink SQL> CREATE TABLE products (
     id INT,
     name STRING,
     description STRING,
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = '自己的ip地址',
     'port' = '3306',
     'username' = 'root',
     'password' = '密码',
     'database-name' = 'mydb',
     'table-name' = 'products'
   );

Flink SQL> select * from products;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

5、在 MySQL 客户端继续插入数据

INSERT INTO products VALUES (default,"scooter1","Small 2-wheel scooter");
INSERT INTO products VALUES (default,"scooter2","Small 2-wheel scooter");
INSERT INTO products VALUES (default,"scooter3","Small 2-wheel scooter");
INSERT INTO products VALUES (default,"scooter4","Small 2-wheel scooter");
  • 1
  • 2
  • 3
  • 4

4、在flinksql client中查看数据

select * from products;
  • 1

可以查看到数据变化

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/692565
推荐阅读
相关标签
  

闽ICP备14008679号