当前位置:   article > 正文

Flink CDC介绍及KADB支持_flink cdc opensearch

flink cdc opensearch

Flink CDC介绍及KADB支持

  • 简介

Flink® CDC连接器是用于Apache Flnk®的一组源连接器,使用更改数据捕获(CDC)接收来自不同数据库的更改。Apache Flink®的CDC连接器将Debezium集成为捕获数据更改的引擎。因此,它可以充分利用Debezium的能力。架构如下图:

  1.  源端支持

Connector

Database

Driver

mongodb-cdc

MongoDB: 3.6, 4.x, 5.0

MongoDB Driver: 4.3.4

mysql-cdc

MySQL: 5.6, 5.7, 8.0.x

RDS MySQL: 5.6, 5.7, 8.0.x

PolarDB MySQL: 5.6, 5.7, 8.0.x

Aurora MySQL: 5.6, 5.7, 8.0.x

MariaDB: 10.x

PolarDB X: 2.0.1

JDBC Driver: 8.0.28

oceanbase-cdc

OceanBase CE: 3.1.x, 4.x

OceanBase EE: 2.x, 3.x, 4.x

OceanBase Driver: 2.4.x

oracle-cdc

Oracle: 11, 12, 19, 21

Oracle Driver: 19.3.0.0

postgres-cdc

PostgreSQL: 9.6, 10, 11, 12, 13, 14

JDBC Driver: 42.5.1

sqlserver-cdc

Sqlserver: 2012, 2014, 2016, 2017, 2019

JDBC Driver: 9.4.1.jre8

tidb-cdc

TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0

JDBC Driver: 8.0.27

db2-cdc

Db2: 11.5

Db2 Driver: 11.5.0.0

vitess-cdc

Vitess: 8.0.x, 9.0.x

MySql JDBC Driver: 8.0.26

  1.  Flink版本支持

Flink® CDC Version

Flink® Version

1.0.0

1.11.*

1.1.0

1.11.*

1.2.0

1.12.*

1.3.0

1.12.*

1.4.0

1.13.*

2.0.*

1.13.*

2.1.*

1.13.*

2.2.*

1.13.*, 1.14.*

2.3.*

1.13.*, 1.14.*, 1.15.*, 1.16.*

2.4.*

1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.*

2.5.*

1.14.*, 1.15.*, 1.16.*, 1.17.*

  1.  Flink连接器支持(connecotor)
  1. DataStream连接器目标端(源端)支持
  1. Apache Kafka (源端/目标端)
  2. Apache Cassandra (源端/目标端)
  3. Amazon DynamoDB (目标端)
  4. Amazon Kinesis Data Streams (源端/目标端)
  5. Amazon Kinesis Data Firehose (目标端)
  6. DataGen (源端)
  7. Elasticsearch (目标端)
  8. Opensearch (目标端)
  9. FileSystem (源端/目标端)
  10. RabbitMQ (源端/目标端)
  11. Google PubSub (源端/目标端)
  12. Hybrid Source (源端)
  13. Apache Pulsar (源端)
  14. JDBC (目标端)
  15. MongoDB (源端/目标端)
  1. Table API连接器目标端(源端)支持

Name

Version

Source

Sink

Filesystem

Bounded and Unbounded Scan

Streaming Sink, Batch Sink

Elasticsearch

6.x & 7.x

不支持

Streaming Sink, Batch Sink

Opensearch

1.x & 2.x

不支持

Streaming Sink, Batch Sink

Apache Kafka

0.10+

Unbounded Scan

Streaming Sink, Batch Sink

Amazon DynamoDB

不支持

Streaming Sink, Batch Sink

Amazon Kinesis Data Streams

Unbounded Scan

Streaming Sink

Amazon Kinesis Data Firehose

不支持

Streaming Sink

JDBC

Bounded Scan, Lookup

Streaming Sink, Batch Sink

Apache HBase

1.4.x & 2.2.x

Bounded Scan, Lookup

Streaming Sink, Batch Sink

Apache Hive

Supported Versions

Unbounded Scan, Bounded Scan, Lookup

Streaming Sink, Batch Sink

MongoDB

Bounded Scan, Lookup

Streaming Sink, Batch Sink

  • KADB作为目标端(JDBC连接器)
  1. 使用Table API的JDBC连接器

如果在DDL上定义了主键,JDBC接收器将以upstart模式操作,以便与外部系统交换UPDATE/DELETE消息,否则,它将以追加模式操作,并且不支持使用UPDATE/DELETE消息。

使用Table API方式,需要在项目中添加如下的依赖

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-connector-jdbc</artifactId>

    <version>3.1.0-1.18</version>

</dependency>

或者使用对应的jar包:

对应数据库的驱动程序,如下表:

Driver

Group Id

Artifact Id

JAR

MySQL

mysql

mysql-connector-java

Download

Oracle

com.oracle.database.jdbc

ojdbc8

Download

PostgreSQL

org.postgresql

postgresql

Download

Derby

org.apache.derby

derby

Download

SQL Server

com.microsoft.sqlserver

mssql-jdbc

Download

KADB使用对应PostgreSQL的驱动,或者使用金仓官网下载的驱动

  1. Table API方式步骤
  1. 使用Flink客户端程序创建表

CREATE TABLE MyUserTable (

  id BIGINT,

  name STRING,

  age INT,

  status BOOLEAN,

  PRIMARY KEY (id) NOT ENFORCED

) WITH (

   'connector' = 'jdbc',

   'url' = 'jdbc:postgresql://dbhost:5432/postgresdb’

   'table-name' = 'users'

);

  1. 向表中插入测试数据

INSERT INTO MyUserTable

SELECT id, name, age, status FROM T;

-- scan data from the JDBC table

SELECT id, name, age, status FROM MyUserTable;

  1. 使用DataStream的JDBC连接器

如果使用DataStream编码方式,需要在项目中添加如下的依赖

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-connector-jdbc</artifactId>

    <version>3.1.0-1.18</version>

</dependency>

实现向KADB批量插入的java例子。

public class JdbcSinkExample {

    static class Book {

        public Book(Long id, String title, String authors, Integer year) {

            this.id = id;

            this.title = title;

            this.authors = authors;

            this.year = year;

        }

        final Long id;

        final String title;

        final String authors;

        final Integer year;

    }

    public static void main(String[] args) throws Exception {

        var env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.fromElements(

                new Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),

                new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),

                new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),

                new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)

        ).addSink(

                JdbcSink.sink(

                        "insert into books (id, title, authors, year) values (?, ?, ?, ?)",

                        (statement, book) -> {

                            statement.setLong(1, book.id);

                            statement.setString(2, book.title);

                            statement.setString(3, book.authors);

                            statement.setInt(4, book.year);

                        },

                        JdbcExecutionOptions.builder()

                                .withBatchSize(1000)

                                .withBatchIntervalMs(200)

                                .withMaxRetries(5)

                                .build(),

                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

                                .withUrl("jdbc:postgresql://dbhost:5432/postgresdb")

                                .withDriverName("org.postgresql.Driver")

                                .withUsername("someUser")

                                .withPassword("somePassword")

                                .build()

                ));

               

        env.execute();

    }

}

标红部分为KADB数据库连接信息,其中:

dbhost:为数据库集群入口VIP地址,或者master实例所在节点的IP地址

postgresdb:为数据库名称

someUser/somePassword:为连接数据库的用户名/密码

  • 使用Flink CDC将mysql、postgresql数据同步至KADB

假设我们正在经营电子商务业务,商品和订单的数据存储在 MySQL 中,订单对应的物流信息存储在 Postgres中。 对于订单表,为了方便进行分析,我们希望让它关联上其对应的商品和物流信息,构成一张宽表,并且实时把它写到 ElasticSearch 中。接下来的内容将介绍如何使用 Flink Mysql/Postgres CDC 来实现这个需求,系统的整体架构如下图所示:

  1. 环境准备

Flink安装介质

https://archive.apache.org/dist/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz

对应mysql和postgres的flink cdc介质

  • flink-sql-connector-mysql-cdc-2.5-SNAPSHOT.jar
  • flink-sql-connector-postgres-cdc-2.5-SNAPSHOT.ja
  1. 准备数据

Mysql:

-- MySQL

CREATE DATABASE mydb;

USE mydb;

CREATE TABLE products (

  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,

  name VARCHAR(255) NOT NULL,

  description VARCHAR(512)

);

ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products

VALUES (default,"scooter","Small 2-wheel scooter"),

       (default,"car battery","12V car battery"),

       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),

       (default,"hammer","12oz carpenter's hammer"),

       (default,"hammer","14oz carpenter's hammer"),

       (default,"hammer","16oz carpenter's hammer"),

       (default,"rocks","box of assorted rocks"),

       (default,"jacket","water resistent black wind breaker"),

       (default,"spare tire","24 inch spare tire");

CREATE TABLE orders (

  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,

  order_date DATETIME NOT NULL,

  customer_name VARCHAR(255) NOT NULL,

  price DECIMAL(10, 5) NOT NULL,

  product_id INTEGER NOT NULL,

  order_status BOOLEAN NOT NULL -- Whether order has been placed

) AUTO_INCREMENT = 10001;

INSERT INTO orders

VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),

       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),

       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

PG

-- PG

CREATE TABLE shipments (

  shipment_id SERIAL NOT NULL PRIMARY KEY,

  order_id SERIAL NOT NULL,

  origin VARCHAR(255) NOT NULL,

  destination VARCHAR(255) NOT NULL,

  is_arrived BOOLEAN NOT NULL

);

ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;

ALTER TABLE public.shipments REPLICA IDENTITY FULL;

INSERT INTO shipments

VALUES (default,10001,'Beijing','Shanghai',false),

       (default,10002,'Hangzhou','Shanghai',false),

       (default,10003,'Shanghai','Hangzhou',false);

  1. 创建flink表

使用下面的命令启动 Flink SQL CLI

./bin/sql-client.sh

首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL                  

Flink SQL> SET execution.checkpointing.interval = 3s;

然后, 对于数据库中的表 products, orders, shipments, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

- Flink SQL
Flink SQL> CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'mydb',
    'table-name' = 'products'
  );
 
 
Flink SQL> CREATE TABLE orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
   'connector' = 'mysql-cdc',
   'hostname' = 'localhost',
   'port' = '3306',
   'username' = 'root',
   'password' = '123456',
   'database-name' = 'mydb',
   'table-name' = 'orders'
 );
 
 
Flink SQL> CREATE TABLE shipments (
   shipment_id INT,
   order_id INT,
   origin STRING,
   destination STRING,
   is_arrived BOOLEAN,
   PRIMARY KEY (shipment_id) NOT ENFORCED
 ) WITH (
   'connector' = 'postgres-cdc',
   'hostname' = 'localhost',
   'port' = '5432',
   'username' = 'postgres',
   'password' = 'postgres',
   'database-name' = 'postgres',
   'schema-name' = 'public',
   'table-name' = 'shipments'
 );

最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中

-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   product_name STRING,
   product_description STRING,
   shipment_id INT,
   origin STRING,
   destination STRING,
   is_arrived BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (

    'connector' = 'jdbc',

   'url' = 'jdbc:postgresql://dbhost:5432/postgresdb’

   'table-name' = 'enriched_orders'
 );
  1. 数据同步插入
-- Flink SQL
Flink SQL> INSERT INTO enriched_orders
 SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
 FROM orders AS o
 LEFT JOIN products AS p ON o.product_id = p.id
 LEFT JOIN shipments AS s ON o.order_id = s.order_id;
  1. 测试数据同步更新

在 MySQL 的 orders 表中插入一条数据

--MySQL

INSERT INTO orders

VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

在 Postgres 的 shipment 表中插入一条数据

--PG

INSERT INTO shipments

VALUES (default,10004,'Shanghai','Beijing',false);

在 MySQL 的 orders 表中更新订单的状态

--MySQL

UPDATE orders SET order_status = true WHERE order_id = 10004;

在 Postgres 的 shipment 表中更新物流的状态

--PG

UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;

在 MYSQL 的 orders 表中删除一条数据

--MySQL

DELETE FROM orders WHERE order_id = 10004;

在KADB中观察数据同步的结果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/991367
推荐阅读
相关标签
  

闽ICP备14008679号