赞
踩
Postgres CDC Connector — CDC Connectors for Apache Flink® documentation
flink cdc捕获postgresql数据
1)更改配置文件
需要更改
linux>vi postgresql.conf
# 更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个
slotsmax_replication_slots = 20 # max number of replication slots
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable
2)注意
注意:wal_level = logical源表的数据修改时,默认的逻辑复制流只包含历史记录的primary key,如果需要输出更新记录的历史记录的所有字段,需要在表级别修改参数:ALTER TABLE tableName REPLICA IDENTITY FULL; 这样才能捕获到源表所有字段更新后的值
3) 将jar包导入flink lib目录
flink-sql-connector-postgres-cdc-2.2.0.jar 到 flink/lib下
4)新建用户并且给用户复制流权限
-- pg新建用户
CREATE USER user WITH PASSWORD 'pwd';
5) 给用户复制流权限
ALTER ROLE user replication;
6) 给用户登录数据库权限
grant CONNECT ON DATABASE test to user;
7)把当前库public下所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;
8) 发布表
-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;
9) 更改表的复制标识包含更新和删除的值
-- 更改复制标识包含更新和删除之前值
ALTER TABLE test0425 REPLICA IDENTITY FULL;
-- 查看复制标识(为f标识说明设置成功)
select relreplident from pg_class where relname='testname';
到这一步,设置已经完全可以啦,上面步骤都是必须的
flink sql 端 创建postgresql 连接器
- linux>bin/sql-client.sh //进入flink sql客户端
-
- CREATE TABLE flink_cdc_source (
- id INT,
- name STRING
- ) WITH (
- 'connector' = 'postgres-cdc',
- 'hostname' = 'pg数据库IP地址',
- 'port' = '5432',
- 'database-name' = 'postgres',
- 'schema-name' = 'public',
- 'username' = 'postgres',
- 'password' = '123456',
- 'table-name' = 'pg_cdc_source',
- 'decoding.plugin.name' = 'pgoutput'
- );
错误: 复制槽名 "flink" 已经存在
( 解决复制槽名 "flink" 已经存在)
1.切换用户
# su - postgres
2.登陆用户
-bash-4.2$ psql -U postgres
3. 查看复制槽
postgres=# select * from pg_replication_slots; 查看复制槽
4. 删除复制槽
SELECT * FROM pg_drop_replication_slot('flink'); 删除复制槽
5.验证
postgres=# select * from pg_replication_slots; 查看复制槽
Flink CDC Stream Postgres变更捕获 (java)
- package pg;
- import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.source.SourceFunction;
- import java.util.Properties;
-
-
- public class FlinkCdcPg {
- public static void main(String[]args) throws Exception {
- Properties properties = new Properties();
- properties.setProperty("snapshot.mode", "initial");
- properties.setProperty("decimal.handling.mode", "double");
- properties.setProperty("database.serverTimezone", "GMT+8"); //设置时区
-
-
- SourceFunction<String>sourceFunction = PostgreSQLSource.<String>builder()
- .hostname("Pg数据库IP地址")
- .port(5432)
- .database("postgres") // monitor postgresdatabase
- .schemaList("public") // monitor inventory schema
- .tableList("public.sink2") // monitor productstable
- .username("postgres")
- .password("123456")
- .decodingPluginName("pgoutput") // pg解码插件
- .slotName("t_table_slot") // 复制槽名称 不能重复
- .deserializer(new JsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String
- .debeziumProperties(properties)
- .build();
-
- StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
-
- env
- .addSource(sourceFunction)
- .print().setParallelism(1); // use parallelism1 for sink to keep message ordering
-
-
- env.execute();
-
- }
- }
Flink CDC SQL TABLE pg读取(java)
- package pg;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.TableResult;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
- public class FlinkCdcOracleExample {
- public static void main(String[]args) throws Exception {
-
- StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.disableOperatorChaining();
- StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);
-
-
- String sourceDDL ="CREATE TABLEpg_source (\n" +
- " ID INT, \n" +
- " NAME STRING, \n" +
- " PRIMARY KEY (ID) NOT ENFORCED \n" +
- " ) WITH (\n" +
- " 'connector' = 'postgres-cdc',\n" +
- " 'hostname' = 'Pg数据库IP地址',\n" +
- " 'port' = '5432',\n" +
- " 'username' = 'postgres',\n" +
- " 'password' = '123456',\n" +
- " 'database-name' = 'postgres',\n" +
- " 'schema-name' = 'public',\n" + // 注意这里要大写
- " 'table-name' = 'sink2',\n" +
- " 'debezium.log.mining.strategy'='online_catalog'\n" +
- )";
- //执行source表ddl
- tableEnv.executeSql(sourceDDL);
- TableResult tableResult =tableEnv.executeSql("select * from pg_source");
- tableResult.print();
- env.execute();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。