当前位置:   article > 正文

Flink MySQL CDC_flink-sql-connector-mysql-cdc

flink-sql-connector-mysql-cdc

Flink Cdc开源的地址为:https://github.com/ververica/flink-cdc-connectors

Flink Cdc接受binlog日志用的:dbzuim

dbzuim接受mysql binlog使用的是: https://github.com/shyiko/mysql-binlog-connector-java

MySQL Cdc Connector

Flink sqlclient使用方式

下载flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar放到<FLINK_HOME>/lib/.

注意: flink-sql-connector-mysql-cdc-XXX-SNAPSHOT 版本为开发分支对应的代码。用户需要下载源代码并编译相应的jar。用户应该使用发布版本,例如flink-sql-connector-mysql-cdc-XXX.jar,发布版本将在 Maven 中央仓库中可用。

为每个reader设置设置不同的Server_id

每一个读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 id,称为 server id。MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。因此,如果不同的作业共享相同的服务器 id,可能会导致从错误的 binlog 位置读取。因此,建议通过SQL Hints为每个阅读器设置不同的服务器 ID ,例如假设源并行度为 4,那么我们可以使用SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;为 4 个源阅读器中的每一个分配唯一的服务器 ID。

设置 MySQL 会话超时

当为大型数据库制作初始一致快照时,您建立的连接可能会在读取表时超时。您可以通过在 MySQL 配置文件中配置 interactive_timeout 和 wait_timeout 来防止这种行为。

  • interactive_timeout:服务器在关闭交互式连接之前等待其活动的秒数。请参阅MySQL 文档

  • wait_timeout:服务器在关闭非交互式连接之前等待其活动的秒数。请参阅MySQL 文档

CREATE TABLE orders (
     order_id INT,
     order_date TIMESTAMP(0),
     customer_name STRING,
     price DECIMAL(10, 5),
     product_id INT,
     order_status BOOLEAN,
     PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'mydb',
    'table-name' = 'orders',
    -- 在 MySQL 集群中所有当前运行的数据库进程中,每个 ID 都必须是唯一的。此连接器作为另一台服务器(具有此唯一 ID)加入 MySQL 集群,因此它可以读取 binlog。默认情况下,会生成一个介于 5400 和 6400 之间的随机数,但我们建议设置一个显式值。
    'server-id' = '5400', 
    -- 增量快照是一种读取表快照的新机制。 与旧的快照机制相比,增量快照有很多优点,包括:(1)在快照读取时source可以并行,(2)在快照读取时source可以在chunk粒度上执行检查点,(3)source不需要 在快照读取之前获取全局读锁(FLUSH TABLES WITH READ LOCK)。 如果您希望源代码并行运行,每个并行阅读器都应该有一个唯一的服务器 ID,因此“服务器 ID”必须是一个类似于“5400-6400”的范围,并且该范围必须大于并行度。
    'scan.incremental.snapshot.enabled' = 'true', 
    -- 表快照的块大小(行数),读取表的快照时,捕获的表被分成多个块。
    'scan.incremental.snapshot.chunk.size' = '8096', 
    -- 读取表快照时每次轮询的最大获取大小。
    'scan.snapshot.fetch.size' = '1024', 
    -- initial:首次启动时对被监控的数据库表进行初始快照,并继续读取最新的binlog。
    -- latest-offset: 不会读取历史快照数据,只会从binlog的末尾读取,这意味着只有连接器启动后的更改数据。
    'scan.startup.mode' = 'initial',
    -- 在快照操作期间,连接器将查询每个包含的表,以便为该表中的所有行生成读取事件。 此参数确定 MySQL 连接是否会将表的所有结果拉入内存(速度很快但需要大量内存),或者是否将结果改为流式传输(可能较慢,但适用于非常大的表)。 该值指定在连接器流式传输结果之前表必须包含的最小行数,默认为 1,000。 将此参数设置为“0”以跳过所有表大小检查并始终在快照期间流式传输所有结果。
    'debezium.min.row.count.to.stream.result' = '1000', 
    -- 连接器在尝试连接到 MySQL 数据库服务器后在超时之前应等待的最长时间。
    'connect.timeout' = '30s', 
    -- 连接池大小
    'connection.pool.size' = '20', 
    -- 发送心跳事件的时间间隔,用于跟踪最新可用的 binlog 偏移量。
    'heartbeat.interval' = '30s'
);

增量快照读取

增量快照读取是一种读取表快照的新机制。与旧的快照机制相比,增量快照有很多优点,包括:

  • (1) MySQL CDC Source 在快照读取时可以并行

  • (2) MySQL CDC Source在快照读取时可以在chunk粒度上进行检查点

  • (3) MySQL CDC Source 在快照读取前不需要获取全局读锁(FLUSH TABLES WITH READ LOCK)

如果您希望源代码并行运行,每个并行阅读器都应该有一个唯一的服务器 ID,因此“服务器 ID”必须是一个类似于“5400-6400”的范围,并且该范围必须大于并行度。

在增量快照读取过程中,MySQL CDC Source首先通过表的主键拆分快照块(splits),然后MySQL CDC Source将这些块分配给多个读取器读取快照块的数据。

原理:

MySQL CDC源启动时,并行读取表的快照,然后单并行读取表的binlog。

在快照阶段,快照根据表的主键和表行的大小被切割成多个快照块。快照块被分配给多个快照阅读器。每个快照阅读器使用块读取算法读取其接收到的块,并将读取的数据发送到下游。源管理块的进程状态(已完成或未完成),因此快照阶段的源可以支持块级别的检查点。如果发生故障,可以恢复源并继续从最后完成的块中读取块。

在所有快照块完成后,源将继续在单个任务中读取 binlog。为了保证快照记录和binlog记录的全局数据顺序,binlog reader会在snapshot chunks完成后开始读取数据,直到有一个完整的checkpoint,以确保所有的snapshot数据都被下游消费了。binlog reader 在 state 中跟踪消费的 binlog 位置,因此 binlog 阶段的源可以支持行级别的检查点。

Flink 会定期对源进行检查点,在发生故障转移的情况下,作业将重新启动并从上次成功的检查点状态恢复,并保证恰好一次的语义。

快照块拆分:

在执行增量快照读取时,MySQL CDC 源需要一个用于拆分表的标准。MySQL CDC Source 使用拆分列将表拆分为多个拆分(块)。默认情况下,MySQL CDC 源会识别表的主键列,并使用主键中的第一列作为拆分列。如果表中没有主键,增量快照读取将失败,您可以禁用scan.incremental.snapshot.enabled回退到旧的快照读取机制。

对于数字和自动增量拆分列,MySQL CDC Source 按固定步长有效地拆分块。例如,如果您有一个主键列id是自动增量 BIGINT 类型的表,最小值为0,最大值为100,并且表选项scan.incremental.snapshot.chunk.size值为25,则该表将被拆分为以下块:

 (-∞, 25),
 [25, 50),
 [50, 75),
 [75, 100),
 [100, +∞)

对于其他主键列类型,MySQL CDC Source 以 分页的形式执行语句SELECT MAX(STR_ID) AS chunk_high FROM (SELECT * FROM TestTable WHERE STR_ID > 'uuid-001' limit 25)以获取每个块的低值和高值,拆分块集如下:

(-∞, 'uuid-001'),
['uuid-001', 'uuid-009'),
['uuid-009', 'uuid-abc'),
['uuid-abc', 'uuid-def'),
[uuid-def, +∞).

块读取算法

对于上面的例子MyTable,如果 MySQL CDC Source 并行度设置为 4,MySQL CDC Source 将运行 4 个 reader,每个 reader 执行Offset Signal Algorithm以获得快照块的最终一致输出。偏移信号算法简单描述如下:

  • (1) 记录当前binlog位置为LOW offset

  • (2) 通过执行语句读取并缓存快照块记录SELECT * FROM MyTable WHERE id > chunk_low AND id <= chunk_high

  • (3) 记录当前binlog位置为HIGH offset。

  • (4) 从LOW offset到HIGH offset读取属于snapshot chunk的binlog记录。

  • (5) 将读取到的binlog记录Upsert到缓存的chunk记录中,并将buffer中的所有记录作为snapshot chunk的最终输出(都作为INSERT记录)发出。

  • (6) 在单个binlog reader中继续读取并发出属于HIGH offset之后的chunk的binlog记录。

该算法受到DBLog Paper的启发。

注意:如果主键的实际值在其范围内不是均匀分布的,这可能会导致增量快照读取时任务不平衡。

注意事项

1.binlog的存活时长, 通过show variables like "%expire_logs_days%";查看。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/594796
推荐阅读
相关标签
  

闽ICP备14008679号