赞
踩
在 Kafka 生态中,目前最主流也是最成熟的 Schema Registry 实现应该是 Confluent 提供的 Confluent Schema Registry,有人也直接称其为 Kafka Schema Registry,我们看到围绕 Kafka 的周边生态组件在需要集成一个 Schema Registry 时基本上都会优先选择 Confluent / Kafka Schema Registry,典型案例就是 Apache Hudi 的 HoodieStreamer
/ HoodieDeltaStreamer
在从 Kafka 接入 Debezium CDC 消息写入 Hudi 表时,仅支持 Confluent Schema Registry 这一种 Schema Registry(截止目前)。
那在 Flink 生态中,对 Confluent Schema Registry 的支持度如何呢?本文,我们来详细地梳理和总结一下。在 Flink 生态中,与 Schema Registry 有关的组件主要是 Flink CDC 和 Confluent Avro Format ( format=debezium-avro-confluent
),下面展开介绍一下具体情况。
作为实时链路的起始点,如果要引入 Schema Registry 的话,那么在 CDC 数据在进入 Kafka 时就是一个非常重要的节点,因为这是链路的起始点,Debezium 消息的 Schema 应该在此时向 Schema Registry 进行注册,只有这样, Schema Registry 才能在数据进入 Kafka 前对于进行管控(根据 Schema Evolution 策略决定有可能发生了 Schema 变更的消息能不能进入到 Kafka 中),同时下游在解析这个消息时也需要从 Schema Registry 中拿到对应的 Schema 数据。
应该说,目前在摄取 CDC 数据进入 Kafka 的各类组件中,对 Confluent / Kafka Schema Registry 支持最好的无疑是 Kafka Connect,搭配上它的两个插件: Debezium MySQL Connector 和 Confluent Avro Converter,Kafka Connect 可以很好地将 MySQL CDC 数据实时摄取到 Kafka 的同时完成在 Confluent Schema Registry 上的消息格式注册。这也不难理解,因为它们背后都是由 Confluent 公司在支持和推动。关于 Kafka Connect + Confluent Schema Registry 摄取 Debezium CDC 数据的方案,请参考 《CDC 数据入湖方案:Kafka Connect > Kafka + Schema Registry > Hudi ( Flink Connector ) 》和《CDC 数据入湖方案:Kafka Connect > Kafka + Schema Registry > Hudi ( HoodieMultiTableStreamer ) 》两套方案。
那 Flink CDC 呢?很遗憾,目前 Flink CDC 还没有集成 Confluent Schema Registry,不能做到在摄取 CDC 数据的同时自动向 Confluent Schema Registry 中注册消息格式!不过,对于 Flink 来说,还有一些回旋余地,就是在下游利用 connector=kafka, format=debezium-avro-confluent
动态表可以实现 Schema 的自动注册,这算是一种“补救”措施吧,不过,这种实现方式还是有一些不一样的,特别是在消息格式上,它注册的消息是不含 source
、ts_ms
、transaction
等字段的,具体请参考《Hudi HoodieStreamer 报错 A column or function parameter with name ts_ms cannot be resolved 解决方法》
目前,Flink 生态中唯一支持与 Confluent Schema Registry 对接的组件是 Confluent Avro Format, 一张配置为 connector=kafka, format=debezium-avro-confluent
的动态表可以自动将消息格式注册的 Confluent Schema Registry 上。如果想看具体示例,可以参考《CDC 数据入湖方案:Flink CDC > Kafka + Schema Registry > Hudi ( Flink Connector ) 》和《CDC 数据入湖方案:Flink CDC > Kafka + Schema Registry > Hudi ( HoodieMultiTableStreamer ) 》两套方案。但是,我们要注意 Confluent Avro Format 有不少局限性:
1. Confluent Avro Format 只能应用于 Apache Kafka SQL connector 和 Upsert Kafka SQL Connector,这意味这:我们只能通过 Flink SQL 一张表一张表地去配置,不能通过 Flink API 批量地完成数据表的定义,也就无法在单一 Flink 作业整库 / 多表接入 Kafka 的方案中引入 Confluent Schema Registry。
2. 配置为connector=kafka, format=debezium-avro-confluent
的动态表物化到 Kafka 里的数据是 Flink 的 changelog 数据,虽然它与标准 debezium 消息高度类似,但不是标准的 debezium 数据,它只含有after
, op
, before
三个字段,无 source
、ts_ms
、transaction
等字段,注册到 Confluent Schema Registry 中注册的 schema 也只包含after
, op
, before
三个字段,如果下游依旧是 Flink 生态(例如使用 Flink Hudi Connector 写入 Hudi 表),一般是不会有问题的,但如果下游是其他生态,例如,如果使用 HoodieStreamer
/ HoodieDeltaStreamer
处理的话,就会报错,因为数据格式匹配不上,HoodieStreamer
/ HoodieDeltaStreamer
处理的是标准的 debezium 格式的数据,这个问题具体可以参考《Hudi HoodieStreamer 报错 A column or function parameter with name ts_ms cannot be resolved 解决方法》
如果要在 Flink 中使用 Confluent Schema Registry,最好整个链路都是 Flink 的生态,如果链路中混合了 Kafka Connect 或 HoodieStreamer
/ HoodieDeltaStreamer
,会因链路上下游 debezium 数据格式的差异导致作业失败。目前在 CDC 数据入湖方向上,只有《CDC 数据入湖方案:Flink CDC > Kafka + Schema Registry > Hudi ( Flink Connector ) 》这一种完整链路是验证通过过,如果下游使用其他组件处理, 例如:HoodieStreamer
/ HoodieDeltaStreamer
,方案是跑不通的。
关联阅读:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。