当前位置:   article > 正文

kafka mysql cdc_【Flink小试】Flink CDC DataStream API监听MySQL动态发往Kafka Topic

flinkcdc 监控mysql多张表,同步到kafka的一个主题

[toc]

一、背景

业务背景: MySQL增量数据实时更新同步到Kafka中供下游使用

查看了一下Flink CDC的官方文档,其中Features的描述中提到了SQL和DataStream API不同的支持程度。

Features

1. Supports reading database snapshot and continues to read binlogs with exactly-once processing even failures happen.

2. CDC connectors for DataStream API, users can consume changes on multiple databases and tables in a single job without Debezium and Kafka deployed.

3. CDC connectors for Table/SQL API, users can use SQL DDL to create a CDC source to monitor changes on a single table.

虽然SQL API使用很丝滑,也很简单。但是由于业务表较多,若是使用一个表的监听就开启一个Flink Job,会对资源消耗和运维操作带来很大的麻烦,所以笔者决定使用DataStream API实现单任务监听库级的MySQL CDC并根据表名将数据发往不同的Kafka Topic中。

二、代码实现

1. 关键maven依赖

com.alibaba.ververica

flink-connector-mysql-cdc

1.1.1

org.apache.flink

flink-connector-kafka_2.11

org.apache.kafka

kafka-clients

org.apache.kafka

kafka-clients

2.4.0

2. 自定义CDC数据反序列化器

Flink CDC定义了com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema接口用以对CDC数据进行反序列化。默认实现类com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema和com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema,由于我们需要自定义Schema,所以我们不采用这两周默认的实现类,自己实现该接口定义我们需要的Schema.

定义JsonDebeziumDeserializeSchema实现DebeziumDeserializationSchema接口方法

class JsonDebeziumDeserializeSchema extends DebeziumDeserializationSchema[String] {

private final val log: Logger = LoggerFactory.getLogger(classOf[JsonDebeziumDeserializeSchema])

override def

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/836406
推荐阅读
相关标签
  

闽ICP备14008679号