赞
踩
使用flinkcdc采集mysql数据到kafka,经过长达两个月的各种调试,终于把调试后的版本给写出来了,进行的全量加增量的数据采集,并写了一个窗口,每隔10min中更新一次每张表同步到的数据量,使用FlinkAPI代码实现
组件版本:
flink :flink-1.13.6-bin-scala_2.12
flinkcdc 2.2.1
mysql:5.7
kafka:kafka_2.12-3.0.0
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>FlinkcdcAPI</artifactId> <version>1.0-SNAPSHOT</version> <properties> <java.version>1.8</java.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <flink.version>1.13.6</flink.version> <scala.version>2.12</scala.version> <hadoop.version>3.1.3</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-guava</artifactId> <version>30.1.1-jre-15.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-guava</artifactId> <version>18.0-13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency> <!--如果保存检查点到hdfs上,需要引入此依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> <scope>provided</scope><!--注解工具, 仅仅在 javac 编译的时候有用--> </dependency> <!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.14.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --> <version>2.2.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> <exclude>org.apache.hadoop:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --> <!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件的SecurityExceptions异常--> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <!-- connector和format依赖的工厂类打包时会相互覆盖,需要使用ServicesResourceTransformer解决--> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
package flink; import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.util.*; public class CustomerDeserialization implements DebeziumDeserializationSchema<String> { @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //1.创建 JSON 对象用于存储最终数据 JSONObject result = new JSONObject(); //2.获取库名&表名放入 source String topic = sourceRecord.topic(); String[] fields = topic.split("\\."); String database = fields[1]; String tableName = fields[2]; Struct value = (Struct) sourceRecord.value(); //3.获取"before"数据 Struct before = value.getStruct("before"); HashMap<String, Object> sourceOffset = (HashMap<String, Object>) sourceRecord.sourceOffset(); Long ts_sec = ((Number) sourceOffset.get("ts_sec")).longValue(); JSONObject beforeJson = new JSONObject(); if (before != null) { Schema beforeSchema = before.schema(); List<Field> beforeFields = beforeSchema.fields(); for (Field field : beforeFields) { Object beforeValue = before.get(field); beforeJson.put(field.name(), beforeValue); } } //4.获取"after"数据 Struct after = value.getStruct("after"); JSONObject afterJson = new JSONObject(); if (after != null) { Schema afterSchema = after.schema(); List<Field> afterFields = afterSchema.fields(); for (Field field : afterFields) { Object afterValue = after.get(field); afterJson.put(field.name(), afterValue); // System.out.println(field.name()+":"+afterValue.getClass()); } } //5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母 Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toLowerCase(); if ("insert".equals(type)) { type = "c"; } if ("update".equals(type)) { type = "u"; } if ("delete".equals(type)) { type = "d"; } if ("create".equals(type)) { type = "c"; } //6.将字段写入 JSON 对象 // result.put("source", source); result.put("database",database); result.put("table",tableName); result.put("before", beforeJson); result.put("after", afterJson); result.put("op", type); result.put("ts", ts_sec); //7.输出数据 collector.collect(result.toJSONString()); } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }
package flink; import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import org.apache.kafka.connect.data.SchemaBuilder; import java.time.*; import java.time.format.DateTimeFormatter; import java.util.Properties; /** * @Description:实现CustomConverter接口,重写对应方法对mysql的时间类型进行标准转换 * @author: WuBo * @date:2022/10/11 11:50 */ public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> { private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE; private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME; private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME; private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME; private ZoneId timestampZoneId = ZoneId.systemDefault(); @Override public void configure(Properties props) { } @Override public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) { String sqlType = column.typeName().toUpperCase(); SchemaBuilder schemaBuilder = null; Converter converter = null; if ("DATE".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string"); converter = this::convertDate; } if ("TIME".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string"); converter = this::convertTime; } if ("DATETIME".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string"); converter = this::convertDateTime; } if ("TIMESTAMP".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string"); converter = this::convertTimestamp; } if (schemaBuilder != null) { registration.register(schemaBuilder, converter); } } private String convertDate(Object input) { if (input == null) return null; if (input instanceof LocalDate) { return dateFormatter.format((LocalDate) input); } if (input instanceof Integer) { LocalDate date = LocalDate.ofEpochDay((Integer) input); return dateFormatter.format(date); } return String.valueOf(input); } private String convertTime(Object input) { if (input == null) return null; if (input instanceof Duration) { Duration duration = (Duration) input; long seconds = duration.getSeconds(); int nano = duration.getNano(); LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano); return timeFormatter.format(time); } return String.valueOf(input); } private String convertDateTime(Object input) { if (input == null) return null; if (input instanceof LocalDateTime) { return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " "); } return String.valueOf(input); } private String convertTimestamp(Object input) { if (input == null) return null; if (input instanceof ZonedDateTime) { // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间 ZonedDateTime zonedDateTime = (ZonedDateTime) input; LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime(); return timestampFormatter.format(localDateTime).replaceAll("T", " "); } return String.valueOf(input); } }
package flink; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.Properties; public class FlinkCDC { public static void main(String[] args) throws Exception { String topic = "sapgateway"; String brokers = "hadoop102:9092,hadoop103:9092,hadoop104:9092"; //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); //2.Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序 //2.1 开启Checkpoint,每隔10分鐘做一次CK env.enableCheckpointing(100*6000L, CheckpointingMode.EXACTLY_ONCE);//头和头的之间 // env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);//头和头的之间 env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L); //2.3 设置任务关闭的时候保留最后一次CK数据 env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100*6000L);//头和尾 // env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000L);//头和尾 //2.4 指定从CK自动重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20, 5000)); //2.5 设置状态后端 // env.setStateBackend(new HashMapStateBackend()); //开启增量检查点 env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); env.getCheckpointConfig().enableUnalignedCheckpoints(false); // 3. 设置 checkpoint 的存储路径 // env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck/" + topic); env.getCheckpointConfig().setCheckpointStorage("hdfs://mycluster:8020/ck/" + topic); //2.6 设置访问HDFS的用户名 System.setProperty("HADOOP_USER_NAME", "sarah"); //3.创建Flink-MySQL-CDC的Source //自定义时间转换配置 Properties properties = new Properties(); properties.setProperty("converters", "dateConverters"); properties.setProperty("dateConverters.type", "flink.MySqlDateTimeConverter"); //4.定义jdbc配置 // MySqlSource mysqlCdcSource = MySqlSource.<String>builder() // .hostname("47.52.185.61") // .port(3306) // .username("dw_readonly") // .password("jjhpM#b#Z0") // .serverTimeZone("America/Los_Angeles") // .databaseList("db_sjfood") // .tableList("db_sjfood.tb_010_sjfood_zsd01") // .debeziumProperties(properties) // .scanNewlyAddedTableEnabled(true) // .deserializer(new CustomerDeserialization()) // converts SourceRecord to JSON String // .build(); // //构建mysqlSource MySqlSource mysqlCdcSource = MySqlSource.<String>builder() .hostname("xx.xxx.xxx.xx") .port(3306) .username("sapgateway") .password("xxxxxx") .databaseList("sap_gateway") .tableList( "sap_gateway.VBAK", "sap_gateway.VBAP", "sap_gateway.VBFA", "sap_gateway.LIPS", "sap_gateway.ZTSD039" ) .serverId("5100-6200") .fetchSize(8192) .splitSize(10240) .debeziumProperties(properties) // .scanNewlyAddedTableEnabled(true) .deserializer(new CustomerDeserialization()) // converts SourceRecord to JSON String .build(); //使用CDC Source从MySQL读取数据 DataStreamSource<String> mysqlSourceDS = env.fromSource(mysqlCdcSource, WatermarkStrategy.noWatermarks(), "MysqlSource"); //打印数据并将数据写入 Kafka mysqlSourceDS.addSink(getKafkaProducer(brokers, topic)).name(topic).uid(topic + "uid3"); //写一个窗口,记录每张表同步到的数据量 mysqlSourceDS .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] data = value.split("\"table\""); data[1] = "table" + data[1]; String[] data2 = data[1].split(","); out.collect(Tuple2.of(data2[0], 1)); } }) .keyBy(value -> value.f0) // 使用 f0 作为 key .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(10))) // 5分钟的窗口,每5秒滑动一次 .apply(new CustomWindowFunction()) .print().setParallelism(1); env.execute("FlinkCDC"); } //自定义窗口 private static class CustomWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> { @Override public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple3<String, String, Integer>> out) throws Exception { int count = 0; for (Tuple2<String, Integer> element : input) { count += element.f1; } out.collect(Tuple3.of(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(window.getStart())), key, count)); } } //kafka 生产者 public static FlinkKafkaProducer<String> getKafkaProducer(String brokers,String topic) { Properties props = new Properties(); props.setProperty("bootstrap.servers",brokers); props.put("buffer.memory", 53554432); // props.put("batch.size", 131072); props.put("linger.ms", 10); props.put("max.request.size", 10485760); props.put("acks", "1"); props.put("retries", 10); props.put("retry.backoff.ms", 500); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer(topic,new SimpleStringSchema(), props); return producer; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。