当前位置:   article > 正文

【大数据平台】——基于Confluent的Kafka Rest API探索(五)_confluent rest api

confluent rest api
  • 介绍

在Confluent的schema注册中心Schema Registry注册,在生产或者消费时使用。本质作用是规定消息的Key和Value的结构和格式,就像为Topic指定一个表结构,随后的消息都和表结构一致。

在发送时并不会因为你指定Schema就可以省去结构中的字段名,比如指定Value结构 {{name:string},{count:int}},并不是在发送的records里可以直接发个Array让Confluent自己按序匹配,你还是要老老实实按照结构写,还要多写出Schema的信息。那么一个让发送的数据便繁琐的东西有什么用呢?

  • 规定结构

就像API一样,严格的规定了格式,避免了上游乱发数据或者下游读不了数据的尴尬情况。

  • 节约消息空间

只是发送的Json多了两个参数而已,虽然Json里没能省略结构中的字段名,但是发送到Kafka的消息却的确省略了结构中的字段名,AVRO实际上只得是一个序列化与反序列化的过程,根据Schema的规则进行序列化存储。所以动动手加两个参数,让你的消息在kafka中以更节约的方式存储。

  • 便于流计算

刚刚有说“就像为Topic指定一个表结构”,当你在流式计算比如Kafka Stream中,消费一个带有Schema的Topic时,你就可以直接像SQL对表操作那样操作消息,Groupby、Count等都不在话下。

  • Kafka Rest + Avro(Schema)

  • 新建Schema

CURL命令:

  1. curl {HOST}:{PORT}/subjects/{SCHEMA_NAME}/versions \
  2. -X POST \
  3. -d {DATA_JSON}

Data JSON:

schema – 符合 Avro schema 标准的字符串

Avro schema标准:

这个标准的官方文档地址是

https://avro.apache.org/docs/1.8.1/spec.html#schemas

schema的种类(type)有很多,例举几个常用的

  • Record:最常用的对应JSON的Object
  1. type(String) - 固定值"record",
  2. name(String) - 别名
  3. fields(Array) - Object每个字段的名称和类型
  4. fields[i].name - 字段名称
  5. fields[i].type - 字段类型

 

  • Enum:枚举型对应JSON的string
  1. type(String) - 固定值"enum",
  2. name(String) - 别名
  3. symbols(Array) - 枚举的元素列表
  • Array:对应JSON的array
  1. type(String) - 固定值"array",
  2. items(String) - 数组内元素类型

“符合 Avro schema 标准的字符串”指的就是上述格式的JSON字符串。

Response JSON:

id(int) – 该Schema的ID

注意,其实{SCHEMA_NAME}是有一个小技巧的,要想让Schema直接与Topic关联并被Control Center管理,那{SCHEMA_NAME}必须是{TOPIC}-key或{TOPIC}-value。

  • 查看Schema

  • 查看所有Schema名称

  1. curl 172.16.32.41:8081/subjects \
  2. -X GET
  • 查看指定Schema名称下的所有版本号

  1. curl {HOST}:{PORT}/subjects/{SCHEMA_NAME}/versions \
  2. -X GET
  • 查看指定Schema的指定版本的定义内容

  1. curl {HOST}:{PORT}/subjects/{SCHEMA_NAME}/versions/{VERSION} \
  2. -X GET
  • 查看指定Schema的指定版本的Avro schema 标准字符串

  1. curl {HOST}:{PORT}/subjects/{SCHEMA_NAME}/versions/{VERSION}/schema \
  2. -X GET
  • 删除Schema

  • 删除整个Schema(慎用可能导致Topic崩溃)

  1. curl {HOST}:{PORT}/subjects/{SCHEMA_NAME} \
  2. -X DELETE
  • 删除Schema的某个版本

  1. curl {HOST}:{PORT}/subjects/{SCHEMA_NAME}/versions/{VERSION} \
  2. -X DELETE
  • 更新Schema

更新不是覆盖,而是基于一个“可兼容规则”判断后,新增一个version作为latest,而之前的不会删除只是变成了history。

这里的“可兼容规则”有:

BACKWARD

如果新version可用于读取以上一个version写入的数据,则允许。

BACKWARD_TRANSITIVE

如果新version可用于读取之前所有version写入的数据,则允许。

FORWARD

如果上一个version可用于读取以新version写入的数据,则允许。

FORWARD_TRANSITIVE

如果之前所有version可用于读取新version写入的数据,则允许。

FULL

兼备BACKWARD和FORWARD

FULL_TRANSITIVE

兼备BACKWARD_TRANSITIVE和FORWARD_TRANSITIVE

NONE

只要是符合 Avro schema 标准的就可以

全局默认是“FULL”可以通过以下命令修改和查看:

  1. curl {HOST}:{PORT}/config/{SCHEMA_NAME} \
  2. -X PUT
  1. curl {HOST}:{PORT}/config/{SCHEMA_NAME} \
  2. -X GET
  • Avro形式数据的发送

  1. curl {HOST}:{PORT}/topics/{TOPIC} \
  2. -X POST \
  3. -H "Content-Type:application/vnd.kafka.avro.v2+json" \
  4. -d {DATA_JSON}

Data JSON:

  1. value_schema_id(int) – 对value的schema ID
  2. key_schema_id(int) – 对key的schema ID,如果有key一定要有这个参数,度过没有key一定不能有这个参数
  3. records(array) – 发送的一系列记录
  4. records[i].key (object) – 符合既定schema的消息的Key
  5. records[i].value (object) – 符合既定schema的消息内容
  6. records[i].partition (int) – 消息的目标分区(非必须)

例子:

key_schema【rest_test4-key,70】是

  1. {
  2. "type": "record",
  3. "name": "id",
  4. "fields": [
  5. {
  6. "name": "bh",
  7. "type": "string"
  8. }
  9. ]
  10. }

value_schema【rest_test4-value,72】是

  1. {
  2. "type": "record",
  3. "name": "data",
  4. "fields": [
  5. {
  6. "name": "val1",
  7. "type": "string"
  8. },
  9. {
  10. "name": "val2",
  11. "type": "int"
  12. }
  13. ]
  14. }

则请求为

  1. curl 172.16.32.41:8083/topics/rest_test4 \
  2. -X POST \
  3. -H "Content-Type:application/vnd.kafka.avro.v2+json" \
  4. -d ‘{"value_schema_id": 72,"key_schema_id": 70,"records": [{"value": {"val2": 2,"val1": "2222"},"key": {"bh": "asdasdas"}}]}’
  • 注册Avro的consumer

  1. curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME} \
  2. -X POST \
  3. -H "Content-Type:application/vnd.kafka.avro.v2+json" \
  4. -d {DATA_JSON}

Data JSON:

  1. name (string) – Consumer名字
  2. format (string) – “avro”
  3. auto.offset.reset (string) – 设置consumer的auto.offset.reset
  4. auto.commit.enable (string) – 设置consumer的auto.commit.enable
  5. fetch.min.bytes (string) – 设置consumer的fetch.min.bytes
  6. consumer.request.timeout.ms (string) – 设置consumer的 consumer.request.timeout.ms
  • Avro形式数据的消费

  1. curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME} \
  2. -X GET\
  3. -H "Content-Type:application/vnd.kafka.avro.v2+json;Accept: application/vnd.kafka.avro.v2+json"

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/79813
推荐阅读
相关标签
  

闽ICP备14008679号