当前位置:   article > 正文

flink CDC 采集数据_debeziumsourcefunction

debeziumsourcefunction

一、普通代码方法:

  1. package com.atguigu;
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  3. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
  5. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  6. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  7. import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
  8. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  9. import org.apache.flink.streaming.api.CheckpointingMode;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  12. public class FlinkCDC {
  13. public static void main(String[] args) throws Exception {
  14. //1.获取执行环境
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. env.setParallelism(1);
  17. //1.1 开启CK并指定状态后端为FS memory fs rocksdb
  18. env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
  19. env.enableCheckpointing(5000L);
  20. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  21. env.getCheckpointConfig().setCheckpointTimeout(10000L);
  22. env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
  23. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
  24. //env.setRestartStrategy(RestartStrategies.fixedDelayRestart());
  25. //2.通过FlinkCDC构建SourceFunction并读取数据
  26. DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
  27. .hostname("hadoop102")
  28. .port(3306)
  29. .username("root")
  30. .password("000000")
  31. .databaseList("gmall-210325-flink")
  32. .tableList("gmall-210325-flink.z_user_info") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
  33. .deserializer(new StringDebeziumDeserializationSchema())
  34. .startupOptions(StartupOptions.initial())
  35. .build();
  36. DataStreamSource<String> streamSource = env.addSource(sourceFunction);
  37. //3.打印数据
  38. streamSource.print();
  39. //4.启动任务
  40. env.execute("FlinkCDC");
  41. }
  42. }

二、自定义代码方法: 

  1. package com.atguigu;
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  3. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
  5. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  6. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  7. import org.apache.flink.streaming.api.CheckpointingMode;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. public class FlinkCDCWithCustomerDeserialization {
  11. public static void main(String[] args) throws Exception {
  12. //1.获取执行环境
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. env.setParallelism(1);
  15. //2.通过FlinkCDC构建SourceFunction并读取数据
  16. DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
  17. .hostname("hadoop102")
  18. .port(3306)
  19. .username("root")
  20. .password("123456")
  21. .databaseList("gmall-210325-flink")
  22. .tableList("gmall-210325-flink.base_trademark") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
  23. .deserializer(new CustomerDeserialization())
  24. .startupOptions(StartupOptions.initial())
  25. .build();
  26. DataStreamSource<String> streamSource = env.addSource(sourceFunction);
  27. //3.打印数据
  28. streamSource.print();
  29. //4.启动任务
  30. env.execute("FlinkCDCWithCustomerDeserialization");
  31. }
  32. }

其他类:

 

  1. package com.atguigu;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
  4. import io.debezium.data.Envelope;
  5. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  6. import org.apache.flink.api.common.typeinfo.TypeInformation;
  7. import org.apache.flink.util.Collector;
  8. import org.apache.kafka.connect.data.Field;
  9. import org.apache.kafka.connect.data.Schema;
  10. import org.apache.kafka.connect.data.Struct;
  11. import org.apache.kafka.connect.source.SourceRecord;
  12. import java.util.List;
  13. public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
  14. /**
  15. * 封装的数据格式
  16. * {
  17. * "database":"",
  18. * "tableName":"",
  19. * "before":{"id":"","tm_name":""....},
  20. * "after":{"id":"","tm_name":""....},
  21. * "type":"c u d",
  22. * //"ts":156456135615
  23. * }
  24. */
  25. @Override
  26. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  27. //1.创建JSON对象用于存储最终数据
  28. JSONObject result = new JSONObject();
  29. //2.获取库名&表名
  30. String topic = sourceRecord.topic();
  31. String[] fields = topic.split("\\.");
  32. String database = fields[1];
  33. String tableName = fields[2];
  34. Struct value = (Struct) sourceRecord.value();
  35. //3.获取"before"数据
  36. Struct before = value.getStruct("before");
  37. JSONObject beforeJson = new JSONObject();
  38. if (before != null) {
  39. Schema beforeSchema = before.schema();
  40. List<Field> beforeFields = beforeSchema.fields();
  41. for (Field field : beforeFields) {
  42. Object beforeValue = before.get(field);
  43. beforeJson.put(field.name(), beforeValue);
  44. }
  45. }
  46. //4.获取"after"数据
  47. Struct after = value.getStruct("after");
  48. JSONObject afterJson = new JSONObject();
  49. if (after != null) {
  50. Schema afterSchema = after.schema();
  51. List<Field> afterFields = afterSchema.fields();
  52. for (Field field : afterFields) {
  53. Object afterValue = after.get(field);
  54. afterJson.put(field.name(), afterValue);
  55. }
  56. }
  57. //5.获取操作类型 CREATE UPDATE DELETE
  58. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  59. String type = operation.toString().toLowerCase();
  60. if ("create".equals(type)) {
  61. type = "insert";
  62. }
  63. //6.将字段写入JSON对象
  64. result.put("database", database);
  65. result.put("tableName", tableName);
  66. result.put("before", beforeJson);
  67. result.put("after", afterJson);
  68. result.put("type", type);
  69. //7.输出数据
  70. collector.collect(result.toJSONString());
  71. }
  72. @Override
  73. public TypeInformation<String> getProducedType() {
  74. return BasicTypeInfo.STRING_TYPE_INFO;
  75. }
  76. }

三、SQl写法:

  1. package com.atguigu;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.table.api.Table;
  6. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  7. import org.apache.flink.types.Row;
  8. public class FlinkCDCWithSQL {
  9. public static void main(String[] args) throws Exception {
  10. //1.获取执行环境
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. env.setParallelism(1);
  13. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  14. //2.DDL方式建表
  15. tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
  16. " id STRING NOT NULL, " +
  17. " tm_name STRING, " +
  18. " logo_url STRING " +
  19. ") WITH ( " +
  20. " 'connector' = 'mysql-cdc', " +
  21. " 'hostname' = 'hadoop102', " +
  22. " 'port' = '3306', " +
  23. " 'username' = 'root', " +
  24. " 'password' = '000000', " +
  25. " 'database-name' = 'gmall-210325-flink', " +
  26. " 'table-name' = 'base_trademark' " +
  27. ")");
  28. //3.查询数据
  29. Table table = tableEnv.sqlQuery("select * from mysql_binlog");
  30. //4.将动态表转换为流
  31. DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
  32. retractStream.print();
  33. //5.启动任务
  34. env.execute("FlinkCDCWithSQL");
  35. }
  36. }

 

 

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号