赞
踩
Flink® CDC连接器是用于Apache Flnk®的一组源连接器,使用更改数据捕获(CDC)接收来自不同数据库的更改。Apache Flink®的CDC连接器将Debezium集成为捕获数据更改的引擎。因此,它可以充分利用Debezium的能力。架构如下图:
Connector | Database | Driver |
MongoDB: 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.4 | |
MySQL: 5.6, 5.7, 8.0.x RDS MySQL: 5.6, 5.7, 8.0.x PolarDB MySQL: 5.6, 5.7, 8.0.x Aurora MySQL: 5.6, 5.7, 8.0.x MariaDB: 10.x PolarDB X: 2.0.1 | JDBC Driver: 8.0.28 | |
OceanBase CE: 3.1.x, 4.x OceanBase EE: 2.x, 3.x, 4.x | OceanBase Driver: 2.4.x | |
Oracle: 11, 12, 19, 21 | Oracle Driver: 19.3.0.0 | |
PostgreSQL: 9.6, 10, 11, 12, 13, 14 | JDBC Driver: 42.5.1 | |
Sqlserver: 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 9.4.1.jre8 | |
TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0 | JDBC Driver: 8.0.27 | |
Db2: 11.5 | Db2 Driver: 11.5.0.0 | |
Vitess: 8.0.x, 9.0.x | MySql JDBC Driver: 8.0.26 |
Flink® CDC Version | Flink® Version |
1.0.0 | 1.11.* |
1.1.0 | 1.11.* |
1.2.0 | 1.12.* |
1.3.0 | 1.12.* |
1.4.0 | 1.13.* |
2.0.* | 1.13.* |
2.1.* | 1.13.* |
2.2.* | 1.13.*, 1.14.* |
2.3.* | 1.13.*, 1.14.*, 1.15.*, 1.16.* |
2.4.* | 1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.* |
2.5.* | 1.14.*, 1.15.*, 1.16.*, 1.17.* |
Name | Version | Source | Sink |
Bounded and Unbounded Scan | Streaming Sink, Batch Sink | ||
6.x & 7.x | 不支持 | Streaming Sink, Batch Sink | |
1.x & 2.x | 不支持 | Streaming Sink, Batch Sink | |
0.10+ | Unbounded Scan | Streaming Sink, Batch Sink | |
不支持 | Streaming Sink, Batch Sink | ||
Unbounded Scan | Streaming Sink | ||
不支持 | Streaming Sink | ||
Bounded Scan, Lookup | Streaming Sink, Batch Sink | ||
1.4.x & 2.2.x | Bounded Scan, Lookup | Streaming Sink, Batch Sink | |
Unbounded Scan, Bounded Scan, Lookup | Streaming Sink, Batch Sink | ||
Bounded Scan, Lookup | Streaming Sink, Batch Sink |
如果在DDL上定义了主键,JDBC接收器将以upstart模式操作,以便与外部系统交换UPDATE/DELETE消息,否则,它将以追加模式操作,并且不支持使用UPDATE/DELETE消息。
使用Table API方式,需要在项目中添加如下的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.18</version>
</dependency>
或者使用对应的jar包:
对应数据库的驱动程序,如下表:
Driver | Group Id | Artifact Id | JAR |
MySQL | mysql | mysql-connector-java | |
Oracle | com.oracle.database.jdbc | ojdbc8 | |
PostgreSQL | org.postgresql | postgresql | |
Derby | org.apache.derby | derby | |
SQL Server | com.microsoft.sqlserver | mssql-jdbc |
KADB使用对应PostgreSQL的驱动,或者使用金仓官网下载的驱动
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://dbhost:5432/postgresdb’
'table-name' = 'users'
);
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;
-- scan data from the JDBC table
SELECT id, name, age, status FROM MyUserTable;
如果使用DataStream编码方式,需要在项目中添加如下的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.18</version>
</dependency>
实现向KADB批量插入的java例子。
public class JdbcSinkExample {
static class Book {
public Book(Long id, String title, String authors, Integer year) {
this.id = id;
this.title = title;
this.authors = authors;
this.year = year;
}
final Long id;
final String title;
final String authors;
final Integer year;
}
public static void main(String[] args) throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(
new Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
).addSink(
JdbcSink.sink(
"insert into books (id, title, authors, year) values (?, ?, ?, ?)",
(statement, book) -> {
statement.setLong(1, book.id);
statement.setString(2, book.title);
statement.setString(3, book.authors);
statement.setInt(4, book.year);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
.withDriverName("org.postgresql.Driver")
.withUsername("someUser")
.withPassword("somePassword")
.build()
));
env.execute();
}
}
标红部分为KADB数据库连接信息,其中:
dbhost:为数据库集群入口VIP地址,或者master实例所在节点的IP地址
postgresdb:为数据库名称
someUser/somePassword:为连接数据库的用户名/密码
假设我们正在经营电子商务业务,商品和订单的数据存储在 MySQL 中,订单对应的物流信息存储在 Postgres中。 对于订单表,为了方便进行分析,我们希望让它关联上其对应的商品和物流信息,构成一张宽表,并且实时把它写到 ElasticSearch 中。接下来的内容将介绍如何使用 Flink Mysql/Postgres CDC 来实现这个需求,系统的整体架构如下图所示:
Flink安装介质
https://archive.apache.org/dist/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz
对应mysql和postgres的flink cdc介质
Mysql:
-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
PG
-- PG
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL
);
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
ALTER TABLE public.shipments REPLICA IDENTITY FULL;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
使用下面的命令启动 Flink SQL CLI
./bin/sql-client.sh
首先,开启 checkpoint,每隔3秒做一次 checkpoint
-- Flink SQL
Flink SQL> SET execution.checkpointing.interval = 3s;
然后, 对于数据库中的表 products, orders, shipments, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据
- Flink SQL
Flink SQL> CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
Flink SQL> CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'shipments'
);
最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中
-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://dbhost:5432/postgresdb’
'table-name' = 'enriched_orders'
);
-- Flink SQL
Flink SQL> INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
在 MySQL 的 orders 表中插入一条数据
--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
在 Postgres 的 shipment 表中插入一条数据
--PG
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);
在 MySQL 的 orders 表中更新订单的状态
--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;
在 Postgres 的 shipment 表中更新物流的状态
--PG
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
在 MYSQL 的 orders 表中删除一条数据
--MySQL
DELETE FROM orders WHERE order_id = 10004;
在KADB中观察数据同步的结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。