当前位置:   article > 正文

Flink SQL CDC 配置文档

Flink SQL CDC 配置文档

说明

本文档适用于使用ApacheStream + Flink + Flink CDC Connectors来实现实时流式计算和同步

Flink Mysql CDC 配置

  1. -- 配置flink checkpoing间隔时间,显式启动checkpoing,不加此配置不能实现同时读取全量数据和增量(binlog)数据。
  2. SET 'execution.checkpointing.interval' = '3s';
  3. drop table if exists source;  -- 删除flink表(不是真实的表)
  4. create table source(  -- 创建字段
  5.   `ID` int primary key,
  6.   `name` varchar(32),
  7.   `CREATE_TIME` TIMESTAMP(3),
  8.   `UPDATE_TIME` TIMESTAMP(3)
  9.   )
  10. with(  -- 配置连接参数
  11.   'connector' = 'mysql-cdc',
  12.   'hostname' = '${hostname}',
  13.   'port' = '3306',
  14.   'username' = '${username}',
  15.   'password' = '${password}',
  16.   'database-name' = 'test_db',
  17.   'table-name' = 'test',
  18.   'server-time-zone' = 'Asia/Shanghai'  -- 配置时区,在mysql时间对象转flink对象时需要
  19. );
  20.  
  21. create table target(  -- 创建字段
  22.   `ID` int primary key,
  23.   `name` varchar(32),
  24.   `CREATE_TIME` TIMESTAMP(3),
  25.   `UPDATE_TIME` TIMESTAMP(3)
  26.   )
  27. with(
  28. 'connector.type' = 'jdbc', -- 使用 jdbc connector
  29. 'connector.driver' = 'com.mysql.jdbc.Driver',
  30. 'connector.url' = 'jdbc:mysql://${hostname}:3306/ms_data', -- jdbc url
  31. 'connector.table' = 'test', -- 表名
  32. 'connector.username' = '${username}', -- 用户名
  33. 'connector.password' = '${password}', -- 密码
  34. 'connector.write.flush.max-rows' = '1' -- 缓存到指定条数,再一起执行,默认 5000 条,改为 1 条
  35. );
  36. -- 把源表数据,插入目标表
  37. insert into merchant_store_cost_info_target select * from merchant_store_cost_info_source;
  38. 需要的依赖包
  39.   <dependency>
  40.     <groupId>org.apache.flink</groupId>
  41.     <artifactId>flink-connector-jdbc_2.12</artifactId>
  42.     <version>1.14.5</version>
  43.   </dependency>
  44.   <dependency>
  45.     <groupId>com.mysql</groupId>
  46.     <artifactId>mysql-connector-j</artifactId>
  47.     <version>8.0.33</version>
  48.   </dependency>
  49.   <dependency>
  50.     <groupId>com.ververica</groupId>
  51.     <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  52.     <version>2.4.2</version>
  53.   </dependency>

Flink PostgreSQL CDC 配置

修改PG配置

  1. # 更改wal日志方式为logical(必须)
  2. wal_level = logical # minimal, replica, or logical
  3. # 更改solts最大数量(非必须)(默认值为10),flink-cdc默认一张表占用一个slots
  4. max_replication_slots = 20 # max number of replication slots
  5. # 更改wal发送最大进程数(非必须)(默认值为10),这个值和上面的solts设置一样
  6. max_wal_senders = 20 # max number of walsender processes
  7. # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(非必须)(默认60s)
  8. wal_sender_timeout = 180s # in milliseconds; 0 disable  

创建用户并授予权限

按实际修改下列参数,postgres是数据库名,public是分区名,test_cdc_user是用户名

  1. CREATE USER test_cdc_user WITH PASSWORD '999'; # 创建用户密码
  2. ALTER ROLE test_cdc_user replication; # 用户必须有逻辑复制权限
  3. GRANT CONNECT ON DATABASE postgres to test_cdc_user; # 远程连接权限
  4. GRANT SELECT ON ALL TABLES IN SCHEMA public TO test_cdc_user; # 获取数据权限
  5. GRANT CREATE ON database postgres to test_cdc_user  # 创建权限

发布表

发布所有表(也可以改为按需发布单个表),发布了之后,从库才可以订阅

  1. UPDATE pg_publication SET puballtables=true WHERE pubname is not null;
  2. CREATE PUBLICATION dbz_publication FOR ALL TABLES;

Flink Table API

  1. SET 'execution.checkpointing.interval' = '5s';  -- 设置flink checkpoint间隔时间
  2. SET 'execution.checkpointing.min-pause' = '2S';  -- 设置两个flink checkpoint之间最小间隔时间,防止checkpoint太频繁产生的错误
  3. CREATE TABLE table_source (
  4. id varchar primary key NOT ENFORCED,
  5.   name varchar,
  6.   doc varchar,
  7.   play int
  8. ) WITH (
  9. 'connector' = 'postgres-cdc',
  10. 'hostname' = '${huku-pg-hostname}',
  11. 'port' = '5432',
  12. 'username' = '${huku-pg-username}',
  13. 'password' = '${huku-pg-password}',
  14. 'database-name' = 'postgres',
  15. 'schema-name' = 'public',
  16. 'table-name' = 'table',
  17. 'slot.name' = 'table_slot',
  18. 'debezium.slot.drop.on.stop' = 'true',
  19. 'decoding.plugin.name' = 'pgoutput'
  20. );
  21. create table table_target(
  22.   id varchar primary key NOT ENFORCED,
  23.   name varchar,
  24.   doc varchar,
  25.   play int
  26. )
  27. with(
  28. 'connector.type' = 'jdbc', -- 使用 jdbc connector
  29. 'connector.driver' = 'com.mysql.jdbc.Driver',
  30. 'connector.url' = 'jdbc:mysql://${mysql-hostname}:3306/ms_data', -- jdbc url
  31. 'connector.table' = 'ods_hk_r_dim_organ_nst', -- 表名
  32. 'connector.username' = '${mysql-username}', -- 用户名
  33. 'connector.password' = '${mysqk-password}', -- 密码
  34. 'connector.write.flush.max-rows' = '1' -- 默认 5000 条,为了演示改为 1 条
  35. );
  36. insert into table_target select *,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS cdc_sync_time from table_source;  -- 插入数据库,并记录插入时间

常见问题

同步update和delete日志时报错

需要修改表的REPLICA IDENTITY 属性,该属性默认是主键,当没有主键的时候,需要指定唯一索引,没有唯一索引时,选full,full的性能最差。

  1. ALTER TABLE public.table REPLICA IDENTITY USING INDEX pk_index;
  2. ALTER TABLE public.table REPLICA IDENTITY full;
WAL日志无限增长

wal_keep_size(默认是100G)设置保留的wal日志大小,但真正保留的是wal_keep_size + 一个wal日志,所以真实保留的日志永远会大于wal_keep_size的大小。

并且当存在Replication Slots(复制槽)的时候,复制槽会保留未同步的wal日志,只会导致wal日志总大小突破wal_keep_size的限制。尤其是当创建了复制槽,但因为远程数据库没用的话,就会导致wal日志一直保留无限增大。挂起的复制槽,需要手动删除。

用max_slot_wal_keep_size就能限制复制槽保留的wal日志大小,设置要比wal_keep_size大。

关于wal_keep_size的官方文档:

PostgreSQL: Documentation: 15: 30.5. WAL Configuration

关于max_slot_wal_keep_size的官方文档:

PostgreSQL: Documentation: 15: 27.2. Log-Shipping Standby Servers

更多常见问题:

FAQ(ZH) · apache/flink-cdc Wiki · GitHub

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/252095
推荐阅读
相关标签
  

闽ICP备14008679号