赞
踩
本文档适用于使用ApacheStream + Flink + Flink CDC Connectors来实现实时流式计算和同步
- -- 配置flink checkpoing间隔时间,显式启动checkpoing,不加此配置不能实现同时读取全量数据和增量(binlog)数据。
- SET 'execution.checkpointing.interval' = '3s';
- drop table if exists source; -- 删除flink表(不是真实的表)
- create table source( -- 创建字段
- `ID` int primary key,
- `name` varchar(32),
- `CREATE_TIME` TIMESTAMP(3),
- `UPDATE_TIME` TIMESTAMP(3)
- )
- with( -- 配置连接参数
- 'connector' = 'mysql-cdc',
- 'hostname' = '${hostname}',
- 'port' = '3306',
- 'username' = '${username}',
- 'password' = '${password}',
- 'database-name' = 'test_db',
- 'table-name' = 'test',
- 'server-time-zone' = 'Asia/Shanghai' -- 配置时区,在mysql时间对象转flink对象时需要
- );
-
- create table target( -- 创建字段
- `ID` int primary key,
- `name` varchar(32),
- `CREATE_TIME` TIMESTAMP(3),
- `UPDATE_TIME` TIMESTAMP(3)
- )
- with(
- 'connector.type' = 'jdbc', -- 使用 jdbc connector
- 'connector.driver' = 'com.mysql.jdbc.Driver',
- 'connector.url' = 'jdbc:mysql://${hostname}:3306/ms_data', -- jdbc url
- 'connector.table' = 'test', -- 表名
- 'connector.username' = '${username}', -- 用户名
- 'connector.password' = '${password}', -- 密码
- 'connector.write.flush.max-rows' = '1' -- 缓存到指定条数,再一起执行,默认 5000 条,改为 1 条
- );
- -- 把源表数据,插入目标表
- insert into merchant_store_cost_info_target select * from merchant_store_cost_info_source;
- 需要的依赖包
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc_2.12</artifactId>
- <version>1.14.5</version>
- </dependency>
- <dependency>
- <groupId>com.mysql</groupId>
- <artifactId>mysql-connector-j</artifactId>
- <version>8.0.33</version>
- </dependency>
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-sql-connector-mysql-cdc</artifactId>
- <version>2.4.2</version>
- </dependency>
- # 更改wal日志方式为logical(必须)
- wal_level = logical # minimal, replica, or logical
- # 更改solts最大数量(非必须)(默认值为10),flink-cdc默认一张表占用一个slots
- max_replication_slots = 20 # max number of replication slots
- # 更改wal发送最大进程数(非必须)(默认值为10),这个值和上面的solts设置一样
- max_wal_senders = 20 # max number of walsender processes
- # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(非必须)(默认60s)
- wal_sender_timeout = 180s # in milliseconds; 0 disable
按实际修改下列参数,postgres是数据库名,public是分区名,test_cdc_user是用户名
- CREATE USER test_cdc_user WITH PASSWORD '999'; # 创建用户密码
- ALTER ROLE test_cdc_user replication; # 用户必须有逻辑复制权限
- GRANT CONNECT ON DATABASE postgres to test_cdc_user; # 远程连接权限
- GRANT SELECT ON ALL TABLES IN SCHEMA public TO test_cdc_user; # 获取数据权限
- GRANT CREATE ON database postgres to test_cdc_user # 创建权限
发布所有表(也可以改为按需发布单个表),发布了之后,从库才可以订阅
- UPDATE pg_publication SET puballtables=true WHERE pubname is not null;
- CREATE PUBLICATION dbz_publication FOR ALL TABLES;
SET 'execution.checkpointing.interval' = '5s'; -- 设置flink checkpoint间隔时间 SET 'execution.checkpointing.min-pause' = '2S'; -- 设置两个flink checkpoint之间最小间隔时间,防止checkpoint太频繁产生的错误 CREATE TABLE table_source ( id varchar primary key NOT ENFORCED, name varchar, doc varchar, play int ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = '${huku-pg-hostname}', 'port' = '5432', 'username' = '${huku-pg-username}', 'password' = '${huku-pg-password}', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'table', 'slot.name' = 'table_slot', 'debezium.slot.drop.on.stop' = 'true', 'decoding.plugin.name' = 'pgoutput' ); create table table_target( id varchar primary key NOT ENFORCED, name varchar, doc varchar, play int ) with( 'connector.type' = 'jdbc', -- 使用 jdbc connector 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.url' = 'jdbc:mysql://${mysql-hostname}:3306/ms_data', -- jdbc url 'connector.table' = 'ods_hk_r_dim_organ_nst', -- 表名 'connector.username' = '${mysql-username}', -- 用户名 'connector.password' = '${mysqk-password}', -- 密码 'connector.write.flush.max-rows' = '1' -- 默认 5000 条,为了演示改为 1 条 ); insert into table_target select *,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS cdc_sync_time from table_source; -- 插入数据库,并记录插入时间
需要修改表的REPLICA IDENTITY 属性,该属性默认是主键,当没有主键的时候,需要指定唯一索引,没有唯一索引时,选full,full的性能最差。
- ALTER TABLE public.table REPLICA IDENTITY USING INDEX pk_index;
- ALTER TABLE public.table REPLICA IDENTITY full;
wal_keep_size(默认是100G)设置保留的wal日志大小,但真正保留的是wal_keep_size + 一个wal日志,所以真实保留的日志永远会大于wal_keep_size的大小。
并且当存在Replication Slots(复制槽)的时候,复制槽会保留未同步的wal日志,只会导致wal日志总大小突破wal_keep_size的限制。尤其是当创建了复制槽,但因为远程数据库没用的话,就会导致wal日志一直保留无限增大。挂起的复制槽,需要手动删除。
用max_slot_wal_keep_size就能限制复制槽保留的wal日志大小,设置要比wal_keep_size大。
关于wal_keep_size的官方文档:
PostgreSQL: Documentation: 15: 30.5. WAL Configuration
关于max_slot_wal_keep_size的官方文档:
PostgreSQL: Documentation: 15: 27.2. Log-Shipping Standby Servers
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。