当前位置:   article > 正文

Flink CDC 最佳实践(以 MySQL 为例)_finkcdc保存创建表

finkcdc保存创建表

1. 准备工作

1.1 确认 MySQL binlog 模式

确认 MySQL 数据库的 binlog 模式是否为 ROW。可以在 MySQL 命令行中执行以下语句确认:

SHOW GLOBAL VARIABLES LIKE 'binlog_format';

如果返回结果中的 Value 字段为 ROW,则说明 binlog 模式为 ROW

1.2 下载并安装 Flink

下载并安装 Flink,可以参考官方文档进行安装。

2. 配置 Flink CDC

2.1 配置 MySQL 数据库连接信息

在 Flink 的配置文件 flink-conf.yaml 中添加 MySQL 数据库连接信息,例如:

  1. # MySQL connection configuration
  2. mysql.server-id: 12345
  3. mysql.hostname: localhost
  4. mysql.port: 3306
  5. mysql.username: root
  6. mysql.password: 123456
  7. mysql.database-name: test

2.2 配置 CDC Job

在 Flink 的 CDC Job 配置文件 mysql-cdc.properties 中添加以下配置:

  1. # Flink CDC Job Configuration
  2. name: mysql-cdc-job
  3. flink.parallelism: 1
  4. flink.checkpoint.interval: 60000
  5. flink.checkpoint.mode: EXACTLY_ONCE
  6. # MySQL CDC Source Configuration
  7. debezium.transforms: unwrap
  8. debezium.transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
  9. database.hostname: localhost
  10. database.port: 3306
  11. database.user: root
  12. database.password: 123456
  13. database.history.kafka.bootstrap.servers: localhost:9092
  14. database.history.kafka.topic: mysql-cdc-history
  15. database.server.id: 12345
  16. database.server.name: test
  17. database.whitelist: test.user

其中,name 为 CDC Job 的名称,flink.parallelism 为 Flink 的并行度,flink.checkpoint.interval 为 Flink 的 Checkpoint 时间间隔,flink.checkpoint.mode 为 Checkpoint 模式,此处设置为 EXACTLY_ONCE

debezium.transforms 为 Debezium 转换器的名称,此处设置为 unwrapdatabase.hostnamedatabase.portdatabase.userdatabase.password 分别为 MySQL 数据库的连接信息。database.history.kafka.bootstrap.serversKafka 的地址信息,database.history.kafka.topic 为 CDC 历史数据记录的 Kafka Topic。database.server.id 为 MySQL 的 Server ID,database.server.name 为 CDC Source 的名称,database.whitelist 为需要进行同步的 MySQL 表的名称。

步骤一:创建 MySQL 数据库

首先,需要在本地或云端创建 MySQL 数据库,并添加一个具有读写权限的用户。下面是一个创建名为 test_db 的数据库以及名为 flink_cdc_user 的用户的示例 SQL 代码:

  1. CREATE DATABASE test_db;
  2. CREATE USER 'flink_cdc_user'@'%' IDENTIFIED BY 'password';
  3. GRANT ALL PRIVILEGES ON test_db.* TO 'flink_cdc_user'@'%';

步骤二:启动 Flink 集群

启动一个 Flink 集群以便运行 CDC 应用程序。可以使用 Flink 自带的 bin/start-cluster.sh 脚本启动 Flink 集群。确保 Flink 集群在运行时已经包含了 Kafka 和 MySQL 的依赖项。

步骤三:创建 MySQL 表和 CDC 表

在 MySQL 中,首先需要创建需要进行 CDC 的表和 CDC 表。CDC 表是一个系统表,它存储了需要捕获的更改数据。可以通过以下代码创建一个名为 test_table 的表以及与之关联的 CDC 表

  1. CREATE TABLE test_db.test_table (
  2. id INT PRIMARY KEY,
  3. name VARCHAR(30),
  4. age INT,
  5. email VARCHAR(50)
  6. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  7. CREATE TABLE test_db.test_table_cdc (
  8. `database` VARCHAR(100),
  9. `table` VARCHAR(100),
  10. `type` VARCHAR(10),
  11. `ts` TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
  12. `before` JSON,
  13. `after` JSON
  14. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

步骤四:编写 Flink CDC 应用程序

接下来,需要编写一个 Flink CDC 应用程序,以将 MySQL 表更改推送到 Kafka 主题中。可以使用 Flink 的 flink-connector-jdbc 库和 flink-connector-kafka 库来实现此目的。

以下是一个基本的 Flink CDC 应用程序的代码示例:

  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. env.setParallelism(1);
  4. Properties properties = new Properties();
  5. properties.setProperty("bootstrap.servers", "localhost:9092");
  6. properties.setProperty("group.id", "test-group");
  7. JdbcSource<RowData> source = JdbcSource.<RowData>builder()
  8. .setDrivername("com.mysql.jdbc.Driver")
  9. .setDBUrl("jdbc:mysql://localhost:3306/test_db")
  10. .setUsername("flink_cdc_user")
  11. .setPassword("password")
  12. .setQuery("SELECT id, name, age, email FROM test_table")
  13. .setRowTypeInfo(Types.ROW(Types.INT, Types.STRING, Types.INT, Types.STRING))
  14. .setFetchSize(1000)
  15. .build();
  16. DataStream<RowData> stream = env.addSource(source);

以下是一个简单的示例运行及结果:

$ bin/flink run -c com.example.MyCDCJob ./my-cdc-job.jar --database.server=mysql.example.com --database.port=3306 --database.name=mydb --database.username=myuser --database.password=mypassword --table.name=mytable --debezium.plugin.name=mysql --debezium.plugin.property.version=1.3.1.Final
  1. [INFO] Starting CDC process for table: mytable.
  2. [INFO] Initializing CDC source...
  3. [INFO] CDC source successfully initialized.
  4. [INFO] Starting CDC source...
  5. [INFO] CDC source successfully started.
  6. [INFO] Adding CDC source to Flink job topology...
  7. [INFO] CDC source successfully added to Flink job topology.
  8. [INFO] Starting Flink job...
  9. [INFO] Flink job started successfully.
  10. [INFO] Change data for table: mytable.
  11. [INFO] Record key: {"id": 1}, record value: {"id": 1, "name": "Alice", "age": 25}.
  12. [INFO] Record key: {"id": 2}, record value: {"id": 2, "name": "Bob", "age": 30}.
  13. [INFO] Record key: {"id": 3}, record value: {"id": 3, "name": "Charlie", "age": 35}.
  14. [INFO] Change data for table: mytable.
  15. [INFO] Record key: {"id": 1}, record value: {"id": 1, "name": "Alice", "age": 27}.

可以看到,当有数据变更时,Flink CDC Job 会输出变更的表名、记录的主键以及变更的数据。例如,在这个示例中,有一行记录的年龄字段从25变成了27。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/473101
推荐阅读
相关标签
  

闽ICP备14008679号