当前位置:   article > 正文

FLINK CDC postgresql (Stream与SQL)_flinkcdc postgresql

flinkcdc postgresql

Postgres CDC Connector — CDC Connectors for Apache Flink® documentation

flink cdc捕获postgresql数据

1)更改配置文件

需要更改

linux>vi postgresql.conf

# 更改wal日志方式为logical

wal_level = logical # minimal, replica, or logical

# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个

slotsmax_replication_slots = 20 # max number of replication slots

# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样

max_wal_senders = 20 # max number of walsender processes

# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)

wal_sender_timeout = 180s # in milliseconds; 0 disable

2)注意

注意:wal_level = logical源表的数据修改时,默认的逻辑复制流只包含历史记录的primary key,如果需要输出更新记录的历史记录的所有字段,需要在表级别修改参数:ALTER TABLE tableName REPLICA IDENTITY FULL; 这样才能捕获到源表所有字段更新后的值

3) 将jar包导入flink lib目录

flink-sql-connector-postgres-cdc-2.2.0.jar 到 flink/lib下

4)新建用户并且给用户复制流权限
-- pg新建用户

CREATE USER user WITH PASSWORD 'pwd';

5) 给用户复制流权限

ALTER ROLE user replication;

6) 给用户登录数据库权限

grant CONNECT ON DATABASE test to user;

7)把当前库public下所有表查询权限赋给用户

GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;

8) 发布表

-- 设置发布为true

update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布

CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布

select * from pg_publication_tables;

9) 更改表的复制标识包含更新和删除的值

-- 更改复制标识包含更新和删除之前值

ALTER TABLE test0425 REPLICA IDENTITY FULL;

-- 查看复制标识(为f标识说明设置成功)

select relreplident from pg_class where relname='testname';

到这一步,设置已经完全可以啦,上面步骤都是必须的

flink sql 端 创建postgresql 连接器

  1. linux>bin/sql-client.sh //进入flink sql客户端
  2. CREATE TABLE flink_cdc_source (
  3. id INT,
  4. name STRING
  5. ) WITH (
  6. 'connector' = 'postgres-cdc',
  7. 'hostname' = 'pg数据库IP地址',
  8. 'port' = '5432',
  9. 'database-name' = 'postgres',
  10. 'schema-name' = 'public',
  11. 'username' = 'postgres',
  12. 'password' = '123456',
  13. 'table-name' = 'pg_cdc_source',
  14. 'decoding.plugin.name' = 'pgoutput'
  15. );

错误: 复制槽名 "flink" 已经存在

( 解决复制槽名 "flink" 已经存在)

1.切换用户

# su - postgres

2.登陆用户

-bash-4.2$ psql -U postgres

3. 查看复制槽

postgres=# select * from pg_replication_slots; 查看复制槽

 4. 删除复制槽

SELECT * FROM pg_drop_replication_slot('flink'); 删除复制槽

5.验证

postgres=# select * from pg_replication_slots; 查看复制槽

Flink CDC Stream Postgres变更捕获 (java)

  1. package pg;
  2. import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
  3. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  6. import java.util.Properties;
  7. public class FlinkCdcPg {
  8. public static void main(String[]args) throws Exception {
  9. Properties properties = new Properties();
  10. properties.setProperty("snapshot.mode", "initial");
  11. properties.setProperty("decimal.handling.mode", "double");
  12. properties.setProperty("database.serverTimezone", "GMT+8"); //设置时区
  13. SourceFunction<String>sourceFunction = PostgreSQLSource.<String>builder()
  14. .hostname("Pg数据库IP地址")
  15. .port(5432)
  16. .database("postgres") // monitor postgresdatabase
  17. .schemaList("public") // monitor inventory schema
  18. .tableList("public.sink2") // monitor productstable
  19. .username("postgres")
  20. .password("123456")
  21. .decodingPluginName("pgoutput") // pg解码插件
  22. .slotName("t_table_slot") // 复制槽名称 不能重复
  23. .deserializer(new JsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String
  24. .debeziumProperties(properties)
  25. .build();
  26. StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
  27. env
  28. .addSource(sourceFunction)
  29. .print().setParallelism(1); // use parallelism1 for sink to keep message ordering
  30. env.execute();
  31. }
  32. }

Flink CDC  SQL TABLE pg读取(java)

  1. package pg;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.TableResult;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. public class FlinkCdcOracleExample {
  6. public static void main(String[]args) throws Exception {
  7. StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
  8. env.setParallelism(1);
  9. env.disableOperatorChaining();
  10. StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);
  11. String sourceDDL ="CREATE TABLEpg_source (\n" +
  12. " ID INT, \n" +
  13. " NAME STRING, \n" +
  14. " PRIMARY KEY (ID) NOT ENFORCED \n" +
  15. " ) WITH (\n" +
  16. " 'connector' = 'postgres-cdc',\n" +
  17. " 'hostname' = 'Pg数据库IP地址',\n" +
  18. " 'port' = '5432',\n" +
  19. " 'username' = 'postgres',\n" +
  20. " 'password' = '123456',\n" +
  21. " 'database-name' = 'postgres',\n" +
  22. " 'schema-name' = 'public',\n" + // 注意这里要大写
  23. " 'table-name' = 'sink2',\n" +
  24. " 'debezium.log.mining.strategy'='online_catalog'\n" +
  25. )";
  26. //执行source表ddl
  27. tableEnv.executeSql(sourceDDL);
  28. TableResult tableResult =tableEnv.executeSql("select * from pg_source");
  29. tableResult.print();
  30. env.execute();
  31. }
  32. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/200302
推荐阅读
相关标签
  

闽ICP备14008679号