赞
踩
一、普通代码方法:
- package com.atguigu;
-
- import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
- import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
- import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
- import org.apache.flink.api.common.restartstrategy.RestartStrategies;
- import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
- import org.apache.flink.runtime.state.filesystem.FsStateBackend;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- public class FlinkCDC {
-
- public static void main(String[] args) throws Exception {
-
- //1.获取执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- //1.1 开启CK并指定状态后端为FS memory fs rocksdb
- env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
- env.enableCheckpointing(5000L);
- env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointTimeout(10000L);
- env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
- env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
-
- //env.setRestartStrategy(RestartStrategies.fixedDelayRestart());
-
- //2.通过FlinkCDC构建SourceFunction并读取数据
- DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
- .hostname("hadoop102")
- .port(3306)
- .username("root")
- .password("000000")
- .databaseList("gmall-210325-flink")
- .tableList("gmall-210325-flink.z_user_info") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
- .deserializer(new StringDebeziumDeserializationSchema())
- .startupOptions(StartupOptions.initial())
- .build();
-
- DataStreamSource<String> streamSource = env.addSource(sourceFunction);
-
- //3.打印数据
- streamSource.print();
-
- //4.启动任务
- env.execute("FlinkCDC");
-
- }
-
- }
二、自定义代码方法:
- package com.atguigu;
-
- import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
- import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
- import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
- import org.apache.flink.runtime.state.filesystem.FsStateBackend;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- public class FlinkCDCWithCustomerDeserialization {
-
- public static void main(String[] args) throws Exception {
-
- //1.获取执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- //2.通过FlinkCDC构建SourceFunction并读取数据
- DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
- .hostname("hadoop102")
- .port(3306)
- .username("root")
- .password("123456")
- .databaseList("gmall-210325-flink")
- .tableList("gmall-210325-flink.base_trademark") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
- .deserializer(new CustomerDeserialization())
- .startupOptions(StartupOptions.initial())
- .build();
- DataStreamSource<String> streamSource = env.addSource(sourceFunction);
-
- //3.打印数据
- streamSource.print();
-
- //4.启动任务
- env.execute("FlinkCDCWithCustomerDeserialization");
-
- }
-
- }
其他类:
- package com.atguigu;
-
- import com.alibaba.fastjson.JSONObject;
- import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import io.debezium.data.Envelope;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.util.Collector;
- import org.apache.kafka.connect.data.Field;
- import org.apache.kafka.connect.data.Schema;
- import org.apache.kafka.connect.data.Struct;
- import org.apache.kafka.connect.source.SourceRecord;
-
- import java.util.List;
-
- public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
-
- /**
- * 封装的数据格式
- * {
- * "database":"",
- * "tableName":"",
- * "before":{"id":"","tm_name":""....},
- * "after":{"id":"","tm_name":""....},
- * "type":"c u d",
- * //"ts":156456135615
- * }
- */
- @Override
- public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
-
- //1.创建JSON对象用于存储最终数据
- JSONObject result = new JSONObject();
-
- //2.获取库名&表名
- String topic = sourceRecord.topic();
- String[] fields = topic.split("\\.");
- String database = fields[1];
- String tableName = fields[2];
-
- Struct value = (Struct) sourceRecord.value();
- //3.获取"before"数据
- Struct before = value.getStruct("before");
- JSONObject beforeJson = new JSONObject();
- if (before != null) {
- Schema beforeSchema = before.schema();
- List<Field> beforeFields = beforeSchema.fields();
- for (Field field : beforeFields) {
- Object beforeValue = before.get(field);
- beforeJson.put(field.name(), beforeValue);
- }
- }
-
- //4.获取"after"数据
- Struct after = value.getStruct("after");
- JSONObject afterJson = new JSONObject();
- if (after != null) {
- Schema afterSchema = after.schema();
- List<Field> afterFields = afterSchema.fields();
- for (Field field : afterFields) {
- Object afterValue = after.get(field);
- afterJson.put(field.name(), afterValue);
- }
- }
-
- //5.获取操作类型 CREATE UPDATE DELETE
- Envelope.Operation operation = Envelope.operationFor(sourceRecord);
- String type = operation.toString().toLowerCase();
- if ("create".equals(type)) {
- type = "insert";
- }
-
- //6.将字段写入JSON对象
- result.put("database", database);
- result.put("tableName", tableName);
- result.put("before", beforeJson);
- result.put("after", afterJson);
- result.put("type", type);
-
- //7.输出数据
- collector.collect(result.toJSONString());
-
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
- }
三、SQl写法:
- package com.atguigu;
-
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- import org.apache.flink.types.Row;
-
- public class FlinkCDCWithSQL {
-
- public static void main(String[] args) throws Exception {
-
- //1.获取执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
-
- //2.DDL方式建表
- tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
- " id STRING NOT NULL, " +
- " tm_name STRING, " +
- " logo_url STRING " +
- ") WITH ( " +
- " 'connector' = 'mysql-cdc', " +
- " 'hostname' = 'hadoop102', " +
- " 'port' = '3306', " +
- " 'username' = 'root', " +
- " 'password' = '000000', " +
- " 'database-name' = 'gmall-210325-flink', " +
- " 'table-name' = 'base_trademark' " +
- ")");
-
- //3.查询数据
- Table table = tableEnv.sqlQuery("select * from mysql_binlog");
-
- //4.将动态表转换为流
- DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
- retractStream.print();
-
- //5.启动任务
- env.execute("FlinkCDCWithSQL");
-
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。