赞
踩
在Confluent的schema注册中心Schema Registry注册,在生产或者消费时使用。本质作用是规定消息的Key和Value的结构和格式,就像为Topic指定一个表结构,随后的消息都和表结构一致。
在发送时并不会因为你指定Schema就可以省去结构中的字段名,比如指定Value结构 {{name:string},{count:int}},并不是在发送的records里可以直接发个Array让Confluent自己按序匹配,你还是要老老实实按照结构写,还要多写出Schema的信息。那么一个让发送的数据便繁琐的东西有什么用呢?
规定结构
就像API一样,严格的规定了格式,避免了上游乱发数据或者下游读不了数据的尴尬情况。
节约消息空间
只是发送的Json多了两个参数而已,虽然Json里没能省略结构中的字段名,但是发送到Kafka的消息却的确省略了结构中的字段名,AVRO实际上只得是一个序列化与反序列化的过程,根据Schema的规则进行序列化存储。所以动动手加两个参数,让你的消息在kafka中以更节约的方式存储。
便于流计算
刚刚有说“就像为Topic指定一个表结构”,当你在流式计算比如Kafka Stream中,消费一个带有Schema的Topic时,你就可以直接像SQL对表操作那样操作消息,Groupby、Count等都不在话下。
CURL命令:
- curl {HOST}:{PORT}/subjects/{SCHEMA_NAME}/versions \
- -X POST \
- -d {DATA_JSON}
Data JSON:
schema – 符合 Avro schema 标准的字符串
Avro schema标准:
这个标准的官方文档地址是
https://avro.apache.org/docs/1.8.1/spec.html#schemas
schema的种类(type)有很多,例举几个常用的
- type(String) - 固定值"record",
- name(String) - 别名
- fields(Array) - Object每个字段的名称和类型
- fields[i].name - 字段名称
- fields[i].type - 字段类型
- type(String) - 固定值"enum",
- name(String) - 别名
- symbols(Array) - 枚举的元素列表
- type(String) - 固定值"array",
- items(String) - 数组内元素类型
“符合 Avro schema 标准的字符串”指的就是上述格式的JSON字符串。
Response JSON:
id(int) – 该Schema的ID
注意,其实{SCHEMA_NAME}是有一个小技巧的,要想让Schema直接与Topic关联并被Control Center管理,那{SCHEMA_NAME}必须是{TOPIC}-key或{TOPIC}-value。
- curl 172.16.32.41:8081/subjects \
- -X GET
查看指定Schema名称下的所有版本号
- curl {HOST}:{PORT}/subjects/{SCHEMA_NAME}/versions \
- -X GET
查看指定Schema的指定版本的定义内容
- curl {HOST}:{PORT}/subjects/{SCHEMA_NAME}/versions/{VERSION} \
- -X GET
查看指定Schema的指定版本的Avro schema 标准字符串
- curl {HOST}:{PORT}/subjects/{SCHEMA_NAME}/versions/{VERSION}/schema \
- -X GET
- curl {HOST}:{PORT}/subjects/{SCHEMA_NAME} \
- -X DELETE
删除Schema的某个版本
- curl {HOST}:{PORT}/subjects/{SCHEMA_NAME}/versions/{VERSION} \
- -X DELETE
更新不是覆盖,而是基于一个“可兼容规则”判断后,新增一个version作为latest,而之前的不会删除只是变成了history。
这里的“可兼容规则”有:
BACKWARD | 如果新version可用于读取以上一个version写入的数据,则允许。 |
BACKWARD_TRANSITIVE | 如果新version可用于读取之前所有version写入的数据,则允许。 |
FORWARD | 如果上一个version可用于读取以新version写入的数据,则允许。 |
FORWARD_TRANSITIVE | 如果之前所有version可用于读取新version写入的数据,则允许。 |
FULL | 兼备BACKWARD和FORWARD |
FULL_TRANSITIVE | 兼备BACKWARD_TRANSITIVE和FORWARD_TRANSITIVE |
NONE | 只要是符合 Avro schema 标准的就可以 |
全局默认是“FULL”可以通过以下命令修改和查看:
- curl {HOST}:{PORT}/config/{SCHEMA_NAME} \
- -X PUT
- curl {HOST}:{PORT}/config/{SCHEMA_NAME} \
- -X GET
- curl {HOST}:{PORT}/topics/{TOPIC} \
- -X POST \
- -H "Content-Type:application/vnd.kafka.avro.v2+json" \
- -d {DATA_JSON}
Data JSON:
- value_schema_id(int) – 对value的schema ID
- key_schema_id(int) – 对key的schema ID,如果有key一定要有这个参数,度过没有key一定不能有这个参数
- records(array) – 发送的一系列记录
- records[i].key (object) – 符合既定schema的消息的Key
- records[i].value (object) – 符合既定schema的消息内容
- records[i].partition (int) – 消息的目标分区(非必须)
例子:
key_schema【rest_test4-key,70】是
- {
- "type": "record",
- "name": "id",
- "fields": [
- {
- "name": "bh",
- "type": "string"
- }
- ]
- }
value_schema【rest_test4-value,72】是
- {
- "type": "record",
- "name": "data",
- "fields": [
- {
- "name": "val1",
- "type": "string"
- },
- {
- "name": "val2",
- "type": "int"
- }
- ]
- }
则请求为
- curl 172.16.32.41:8083/topics/rest_test4 \
- -X POST \
- -H "Content-Type:application/vnd.kafka.avro.v2+json" \
- -d ‘{"value_schema_id": 72,"key_schema_id": 70,"records": [{"value": {"val2": 2,"val1": "2222"},"key": {"bh": "asdasdas"}}]}’
- curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME} \
- -X POST \
- -H "Content-Type:application/vnd.kafka.avro.v2+json" \
- -d {DATA_JSON}
Data JSON:
- name (string) – Consumer名字
- format (string) – “avro”
- auto.offset.reset (string) – 设置consumer的auto.offset.reset
- auto.commit.enable (string) – 设置consumer的auto.commit.enable
- fetch.min.bytes (string) – 设置consumer的fetch.min.bytes
- consumer.request.timeout.ms (string) – 设置consumer的 consumer.request.timeout.ms
- curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME} \
- -X GET\
- -H "Content-Type:application/vnd.kafka.avro.v2+json;Accept: application/vnd.kafka.avro.v2+json"
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。