赞
踩
接着 上期FlinkCDC基础篇章1-安装使用
下载 Flink 1.17.0 并将其解压至目录 flink-1.17.0
下载下面列出的依赖包,并将它们放到目录 flink-1.17.0/lib/
下:
下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译
- 首先,开启 checkpoint,每隔3秒做一次 checkpoint
-
- -- Flink SQL
- Flink SQL> SET execution.checkpointing.interval = 3s
- -- 创建源表t_source_sqlserver,使用SQL Server Change Data Capture (CDC)连接器从SQL Server数据库读取数据
- CREATE TABLE t_source_sqlserver (
- id INT,
- order_date DATE,
- purchaser INT,
- quantity INT,
- product_id INT,
- PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
- ) WITH (
- 'connector' = 'sqlserver-cdc', -- 使用SQL Server CDC连接器
- 'hostname' = '10.194.183.120', -- SQL Server主机名
- 'port' = '30027', -- SQL Server端口
- 'username' = 'sa', -- SQL Server用户名
- 'password' = 'abc@123456', -- SQL Server密码
- 'database-name' = 'cdc_test', -- 数据库名称
- 'schema-name' = 'dbo', -- 模式名称
- 'table-name' = 'orders' -- 要捕获更改的表名
- );
-
- -- 创建目标表table_sink_mysql,使用JDBC连接器将数据写入MySQL数据库
- CREATE TABLE table_sink_mysql (
- id INT,
- order_date DATE,
- purchaser INT,
- quantity INT,
- product_id INT,
- PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
- )
- WITH (
- 'connector' = 'jdbc', -- 使用JDBC连接器
- 'url' = 'jdbc:mysql://10.194.183.120:30025/test', -- MySQL的JDBC URL
- 'username' = 'root', -- MySQL用户名
- 'password' = 'root', -- MySQL密码
- 'table-name' = 'orders' -- 要写入的MySQL表名
- );
-
- -- 从t_source_sqlserver表中选择数据,并将其插入到table_sink_mysql表中
- INSERT INTO table_sink_mysql SELECT * FROM t_source_sqlserver;
- CREATE TABLE income_distribution (
-
- serviceCode STRING,
-
- accountPeriod STRING,
-
- subjectCode STRING,
-
- subjectName STRING,
-
- amt DECIMAL(13,2),
-
- PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED
-
- ) WITH (
-
- 'connector' = 'elasticsearch-7',
-
- 'hosts' = 'http://xxxx:9200',
-
- 'index' = 'income_distribution',
-
- 'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL'
-
- );
可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:
参考文献:
使用 Flink CDC 构建 Streaming ETL | Apache Flink CDC
flink sqlserver cdc实时同步(含sqlserver安装配置等)_flink cdc sqlserver-CSDN博客
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。