赞
踩
一、准备(PG版本为11.6)
1.更改配置文件postgresql.conf
- [postgres@hostname data]$ vim postgresql.conf
- # 更改wal日志方式为logical
- wal_level = logical # minimal, replica, or logical
-
- # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
- max_wal_senders = 20 # max number of walsender processes
- # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认值为60s)
- wal_sender_timeout = 180s # in milliseconds; 0 disable
-
- # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
- max_replication_slots = 20 # max number of replication slots
wal_level必须更改,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值
重启pg生效
2.新建用户并且给用户复制流权限(例如在navicat中操作)
- -- 创建具有流复制权限的用户flink
- CREATE USER flink login replication encrypted password '123456';
- -- 给用户flink赋予数据库连接权限
- GRANT CONNECT ON DATABASE postgres TO flink;
- -- 把当前库所有表查询权限赋给用户flink
- GRANT SELECT ON ALL TABLES IN SCHEMA public TO flink;
3.发布表
- -- 设置发布为true
- update pg_publication set puballtables=true where pubname is not null;
- -- 把所有表进行发布
- CREATE PUBLICATION dbz_publication FOR ALL TABLES;
- -- 查询哪些表已经发布
- select * from pg_publication_tables;
4.更改表的复制标识包含更新和删除的值
- -- 更改复制标识包含更新和删除之前值
- ALTER TABLE xxxxxx REPLICA IDENTITY FULL;
- -- 查看复制标识(为f标识说明设置成功)
- select relreplident from pg_class where relname='xxxxxx';
二、代码示例
- import com.alibaba.ververica.cdc.connectors.postgres.PostgreSQLSource;
- import com.yogorobot.gmall.realtime.function.MyDebezium;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
- import java.time.Duration;
- import java.util.Properties;
-
- public class Flink_CDCWIthProduct {
- private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis();
- //功能:测试实时读取pgsql数据
- public static void main(String[] args) throws Exception {
-
- //TODO 创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- Properties properties = new Properties();
- properties.setProperty("snapshot.mode", "never");
- properties.setProperty("debezium.slot.name", "pg_cdc");
- properties.setProperty("debezium.slot.drop.on.stop", "true");
- properties.setProperty("include.schema.changes", "true");
- //使用连接器配置属性启用定期心跳记录生成
- properties.setProperty("heartbeat.interval.ms", String.valueOf(DEFAULT_HEARTBEAT_MS));
-
- //TODO 创建Flink-PgSQL-CDC的Source 读取生产环境pgsql数据库
- SourceFunction<String> pgsqlSource = PostgreSQLSource.<String>builder()
- .hostname("pgr-***.pg.rds.aliyuncs.com")
- .port(1921)
- .database("jarvis_ticket") // monitor postgres database
- .schemaList("jarvis_ticket") // monitor inventory schema
- .tableList("jarvis_ticket.t_category") // monitor products table
- .username("***")
- .password("***")
- //反序列化
- .deserializer(new MyDebezium())
- //标准逻辑解码输出插件
- .decodingPluginName("pgoutput")
- //配置
- .debeziumProperties(properties)
- .build();
-
- //TODO 使用CDC Source从PgSQL读取数据
- DataStreamSource<String> pgsqlDS = env.addSource(pgsqlSource);
-
- //TODO 将数据输出到kafka中
- //pgsqlDS.addSink(MyKafkaUtil.getKafkaSink("***"));
-
- //TODO 打印到控制台
- pgsqlDS.print();
-
- //TODO 执行任务
- env.execute();
- }
- }
properties相关配置解读:
snapshot.mode(debezium.snapshot.mode)
initial:默认设置,第一次启动创建数据库快照,后面根据记录偏移量继续读取;
never:从不建立快照,如果本地无偏移量,从最后的log开始读取;
always:每次启动都建立快照;
exporter:和initial相同,不同之处在于其不会对表上锁,使用set transaction isolation level repeatable read,可重复读的隔离级别。
实现类is.debezium.connector.postgresql.snapshot.ExportedSnapshotter
custom :用户自定义 快照,配合debezium.snapshot.custom.class使用
debezium.slot.name = 'pg_cdc'
flinkcdc创建的逻辑复制槽
- import com.alibaba.fastjson.JSONObject;
- import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import io.debezium.data.Envelope;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.util.Collector;
- import org.apache.kafka.connect.data.Field;
- import org.apache.kafka.connect.data.Schema;
- import org.apache.kafka.connect.data.Struct;
- import org.apache.kafka.connect.source.SourceRecord;
-
- import java.util.List;
-
- public class MyDebezium implements DebeziumDeserializationSchema<String> {
- @Override
- public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
- //1.创建一个JSONObject用来存放最终封装好的数据
- JSONObject result = new JSONObject();
-
- //2.获取数据库以及表名
- String topic = sourceRecord.topic();
- String[] split = topic.split("\\.");
-
- //数据库名
- String schema = split[1];
- //表名
- String tableName = split[2];
-
-
- //4.获取数据
- Struct value = (Struct) sourceRecord.value();
-
- //5.获取before数据
- Struct structBefore = value.getStruct("before");
- JSONObject beforeJson = new JSONObject();
- if (structBefore != null) {
- Schema schemas = structBefore.schema();
- List<Field> fields = schemas.fields();
- for (Field field : fields) {
- beforeJson.put(field.name(), structBefore.get(field));
- }
- }
-
- //6.获取after数据
- Struct structAfter = value.getStruct("after");
- JSONObject afterJson = new JSONObject();
- if (structAfter != null) {
- Schema schemas = structAfter.schema();
- List<Field> fields = schemas.fields();
- for (Field field : fields) {
- afterJson.put(field.name(), structAfter.get(field));
- }
- }
-
- String type="update";
- if(structBefore==null){
- type="insert";
- }
- if(structAfter==null){
- type="delete";
- }
-
- //将数据封装到JSONObject中
- result.put("schema", schema);
- result.put("tableName", tableName);
- result.put("before", beforeJson);
- result.put("after", afterJson);
- result.put("type", type);
-
- //将数据发送至下游
- collector.collect(result.toJSONString());
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。