当前位置:   article > 正文

flink postgresql cdc实时同步(含pg安装配置等)

flink postgresql cdc实时同步(含pg安装配置等)

1. 环境信息

类型版本/描述
docker20.10.9
Postgresql10.6
初始化账号密码:postgres/postgres
普通用户:test1/test123
数据库:test_db
flink1.13.6

2. 安装

step1: 拉取 PostgreSQL 10.6 版本的镜像:

docker pull postgres:10.6
  • 1

step2:创建并启动 PostgreSQL 容器,在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为postgres,并将 pgoutput 插件加载到 PostgreSQL 实例中:

docker run -d -p 30028:5432 --name postgres-10.6 -e POSTGRES_PASSWORD=postgres postgres:10.6 -c 'shared_preload_libraries=pgoutput'
  • 1

step3: 查看容器是否创建成功:

docker ps | grep postgres-10.6
  • 1

3. 配置

step1docker进去Postgresql数据的容器:

docker exec -it postgres-10.6  bash
  • 1

step2:编辑postgresql.conf配置文件:

vi /var/lib/postgresql/data/postgresql.conf 
  • 1

配置内容如下:

# 更改wal日志方式为logical(方式有:minimal、replica 、logical  )
wal_level = logical  

# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20

# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20     

# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s	          
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

step3:重启容器:

docker restart postgres-10.6
  • 1

连接数据库,如果查询一下语句,返回logical表示修改成功:

SHOW wal_level;
  • 1

4. 新建用户并赋权

使用创建容器时的账号密码(postgres/postgres)登录Postgresql数据库

先创建数据库和表:

-- 创建数据库 test_db
CREATE DATABASE test_db;

-- 连接到新创建的数据库 test_db
\c test_db

-- 创建 t_user 表
CREATE TABLE "public"."t_user" (
    "id" int8 NOT NULL,
    "name" varchar(255),
    "age" int2,
    PRIMARY KEY ("id")
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

新建用户并且给用户权限:

-- pg新建用户
CREATE USER test1 WITH PASSWORD 'test123';

-- 给用户复制流权限
ALTER ROLE test1 replication;

-- 给用户登录数据库权限
GRANT CONNECT ON DATABASE test_db to test1;

-- 把当前库public下所有表查询权限赋给用户
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

5. 发布表

-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布
select * from pg_publication_tables;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

更改表的复制标识包含更新和删除的值:

-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE t_user REPLICA IDENTITY FULL;

-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='t_user';
  • 1
  • 2
  • 3
  • 4
  • 5

6. flink sql

-- 源表定义
CREATE TABLE `table_source_pg` (
      id BIGINT,
      name STRING,
      age INT
      ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = '10.194.183.120',
      'port' = '30028',
      'username' = 'test1',
      'password' = 'test123',
      'database-name' = 'test_db',
      'schema-name' = 'public',
      'table-name' = 't_user',
      'decoding.plugin.name' = 'pgoutput'
)

-- 目标表表定义
CREATE TABLE `table_sink_mysql` (
      id BIGINT,
      name STRING,
      age INT,
      PRIMARY KEY (`id`) NOT ENFORCED
      ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://10.194.183.120:30306/test',
      'username' = 'root',
      'password' = 'root',
      'table-name' = 't_user_copy'
)

-- insert语句
INSERT INTO `table_sink_mysql` (`id`, `name`, `age`) (SELECT `id`, `name`, `age` FROM `table_source_pg`)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

7. 命令汇总

-- pg新建用户
CREATE USER test1 WITH PASSWORD 'test123';

-- 给用户复制流权限
ALTER ROLE ODPS_ETL replication;

-- 给用户数据库权限
GRANT CONNECT ON DATABASE test_db to test1;

-- 设置发布开关
update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布
select * from pg_publication_tables;

-- 给表查询权限
grant select on TABLE aa to ODPS_ETL;

-- 给用户读写权限
grant select,insert,update,delete ON  ALL TABLES IN SCHEMA public to bd_test;

-- 把当前库所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO ODPS_ETL;

-- 把当前库以后新建的表查询权限赋给用户
alter default privileges in schema public grant select on tables to ODPS_ETL;

-- 更改复制标识包含更新和删除之前值
ALTER TABLE test0425 REPLICA IDENTITY FULL;

-- 查看复制标识
select relreplident from pg_class where relname='test0425';

-- 查看solt使用情况
SELECT * FROM pg_replication_slots;

-- 删除solt
SELECT pg_drop_replication_slot('zd_org_goods_solt');

-- 查询用户当前连接数
select usename, count(*) from pg_stat_activity group by usename order by count(*) desc;

-- 设置用户最大连接数
alter role odps_etl connection limit 200;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

附:

  • 参考文章:https://www.cnblogs.com/xiongmozhou/p/14817641.html
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/381112
推荐阅读
相关标签
  

闽ICP备14008679号