赞
踩
[toc]
一、背景
业务背景: MySQL增量数据实时更新同步到Kafka中供下游使用
查看了一下Flink CDC的官方文档,其中Features的描述中提到了SQL和DataStream API不同的支持程度。
Features
1. Supports reading database snapshot and continues to read binlogs with exactly-once processing even failures happen.
2. CDC connectors for DataStream API, users can consume changes on multiple databases and tables in a single job without Debezium and Kafka deployed.
3. CDC connectors for Table/SQL API, users can use SQL DDL to create a CDC source to monitor changes on a single table.
虽然SQL API使用很丝滑,也很简单。但是由于业务表较多,若是使用一个表的监听就开启一个Flink Job,会对资源消耗和运维操作带来很大的麻烦,所以笔者决定使用DataStream API实现单任务监听库级的MySQL CDC并根据表名将数据发往不同的Kafka Topic中。
二、代码实现
1. 关键maven依赖
com.alibaba.ververica
flink-connector-mysql-cdc
1.1.1
org.apache.flink
flink-connector-kafka_2.11
org.apache.kafka
kafka-clients
org.apache.kafka
kafka-clients
2.4.0
2. 自定义CDC数据反序列化器
Flink CDC定义了com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema接口用以对CDC数据进行反序列化。默认实现类com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema和com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema,由于我们需要自定义Schema,所以我们不采用这两周默认的实现类,自己实现该接口定义我们需要的Schema.
定义JsonDebeziumDeserializeSchema实现DebeziumDeserializationSchema接口方法
class JsonDebeziumDeserializeSchema extends DebeziumDeserializationSchema[String] {
private final val log: Logger = LoggerFactory.getLogger(classOf[JsonDebeziumDeserializeSchema])
override def
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。