当前位置:   article > 正文

FlinkCDC实时读取PostgreSQL_flink cdc postgresqlsource

flink cdc postgresqlsource

一、准备(PG版本为11.6)

1.更改配置文件postgresql.conf

  1. [postgres@hostname data]$ vim postgresql.conf
  2. # 更改wal日志方式为logical
  3. wal_level = logical # minimal, replica, or logical
  4. # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
  5. max_wal_senders = 20 # max number of walsender processes
  6. # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认值为60s)
  7. wal_sender_timeout = 180s # in milliseconds; 0 disable  
  8. # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
  9. max_replication_slots = 20 # max number of replication slots 

wal_level必须更改,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值

重启pg生效

2.新建用户并且给用户复制流权限(例如在navicat中操作)

  1. -- 创建具有流复制权限的用户flink
  2. CREATE USER flink login replication encrypted password '123456';
  3. -- 给用户flink赋予数据库连接权限
  4. GRANT CONNECT ON DATABASE postgres TO flink;
  5. -- 把当前库所有表查询权限赋给用户flink
  6. GRANT SELECT ON ALL TABLES IN SCHEMA public TO flink;

3.发布表

  1. -- 设置发布为true
  2. update pg_publication set puballtables=true where pubname is not null;
  3. -- 把所有表进行发布
  4. CREATE PUBLICATION dbz_publication FOR ALL TABLES;
  5. -- 查询哪些表已经发布
  6. select * from pg_publication_tables;

4.更改表的复制标识包含更新和删除的值

  1. -- 更改复制标识包含更新和删除之前值
  2. ALTER TABLE xxxxxx REPLICA IDENTITY FULL;
  3. -- 查看复制标识(为f标识说明设置成功)
  4. select relreplident from pg_class where relname='xxxxxx';

二、代码示例

  1. import com.alibaba.ververica.cdc.connectors.postgres.PostgreSQLSource;
  2. import com.yogorobot.gmall.realtime.function.MyDebezium;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  6. import java.time.Duration;
  7. import java.util.Properties;
  8. public class Flink_CDCWIthProduct {
  9. private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis();
  10. //功能:测试实时读取pgsql数据
  11. public static void main(String[] args) throws Exception {
  12. //TODO 创建执行环境
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. Properties properties = new Properties();
  15. properties.setProperty("snapshot.mode", "never");
  16. properties.setProperty("debezium.slot.name", "pg_cdc");
  17. properties.setProperty("debezium.slot.drop.on.stop", "true");
  18. properties.setProperty("include.schema.changes", "true");
  19. //使用连接器配置属性启用定期心跳记录生成
  20. properties.setProperty("heartbeat.interval.ms", String.valueOf(DEFAULT_HEARTBEAT_MS));
  21. //TODO 创建Flink-PgSQL-CDC的Source 读取生产环境pgsql数据库
  22. SourceFunction<String> pgsqlSource = PostgreSQLSource.<String>builder()
  23. .hostname("pgr-***.pg.rds.aliyuncs.com")
  24. .port(1921)
  25. .database("jarvis_ticket") // monitor postgres database
  26. .schemaList("jarvis_ticket") // monitor inventory schema
  27. .tableList("jarvis_ticket.t_category") // monitor products table
  28. .username("***")
  29. .password("***")
  30. //反序列化
  31. .deserializer(new MyDebezium())
  32. //标准逻辑解码输出插件
  33. .decodingPluginName("pgoutput")
  34. //配置
  35. .debeziumProperties(properties)
  36. .build();
  37. //TODO 使用CDC Source从PgSQL读取数据
  38. DataStreamSource<String> pgsqlDS = env.addSource(pgsqlSource);
  39. //TODO 将数据输出到kafka中
  40. //pgsqlDS.addSink(MyKafkaUtil.getKafkaSink("***"));
  41. //TODO 打印到控制台
  42. pgsqlDS.print();
  43. //TODO 执行任务
  44. env.execute();
  45. }
  46. }

 properties相关配置解读:

snapshot.mode(debezium.snapshot.mode)

initial:默认设置,第一次启动创建数据库快照,后面根据记录偏移量继续读取;
never:从不建立快照,如果本地无偏移量,从最后的log开始读取;
always:每次启动都建立快照;
exporter:和initial相同,不同之处在于其不会对表上锁,使用set transaction isolation level repeatable read,可重复读的隔离级别。
实现类is.debezium.connector.postgresql.snapshot.ExportedSnapshotter
custom :用户自定义 快照,配合debezium.snapshot.custom.class使用

debezium.slot.name = 'pg_cdc'

 flinkcdc创建的逻辑复制槽

  1. import com.alibaba.fastjson.JSONObject;
  2. import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
  3. import io.debezium.data.Envelope;
  4. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  5. import org.apache.flink.api.common.typeinfo.TypeInformation;
  6. import org.apache.flink.util.Collector;
  7. import org.apache.kafka.connect.data.Field;
  8. import org.apache.kafka.connect.data.Schema;
  9. import org.apache.kafka.connect.data.Struct;
  10. import org.apache.kafka.connect.source.SourceRecord;
  11. import java.util.List;
  12. public class MyDebezium implements DebeziumDeserializationSchema<String> {
  13. @Override
  14. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  15. //1.创建一个JSONObject用来存放最终封装好的数据
  16. JSONObject result = new JSONObject();
  17. //2.获取数据库以及表名
  18. String topic = sourceRecord.topic();
  19. String[] split = topic.split("\\.");
  20. //数据库名
  21. String schema = split[1];
  22. //表名
  23. String tableName = split[2];
  24. //4.获取数据
  25. Struct value = (Struct) sourceRecord.value();
  26. //5.获取before数据
  27. Struct structBefore = value.getStruct("before");
  28. JSONObject beforeJson = new JSONObject();
  29. if (structBefore != null) {
  30. Schema schemas = structBefore.schema();
  31. List<Field> fields = schemas.fields();
  32. for (Field field : fields) {
  33. beforeJson.put(field.name(), structBefore.get(field));
  34. }
  35. }
  36. //6.获取after数据
  37. Struct structAfter = value.getStruct("after");
  38. JSONObject afterJson = new JSONObject();
  39. if (structAfter != null) {
  40. Schema schemas = structAfter.schema();
  41. List<Field> fields = schemas.fields();
  42. for (Field field : fields) {
  43. afterJson.put(field.name(), structAfter.get(field));
  44. }
  45. }
  46. String type="update";
  47. if(structBefore==null){
  48. type="insert";
  49. }
  50. if(structAfter==null){
  51. type="delete";
  52. }
  53. //将数据封装到JSONObject中
  54. result.put("schema", schema);
  55. result.put("tableName", tableName);
  56. result.put("before", beforeJson);
  57. result.put("after", afterJson);
  58. result.put("type", type);
  59. //将数据发送至下游
  60. collector.collect(result.toJSONString());
  61. }
  62. @Override
  63. public TypeInformation<String> getProducedType() {
  64. return BasicTypeInfo.STRING_TYPE_INFO;
  65. }
  66. }

参考文章:整合flink-cdc实现实时读postgrasql

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/757374
推荐阅读
相关标签
  

闽ICP备14008679号