当前位置:   article > 正文

【大数据平台】——基于Confluent的Kafka Rest API探索(二)_confluent key schema

confluent key schema
  • Kafka Rest Proxy特点与使用

  • Metadata

可以使用对相应URL的GET请求读取有关群集的大多数Metadata 信息,如:brokers, topics, partitions, configs等。

 

  • Producers

区别于Java Client,Kafka-rest不会对外公开Producer对象,而是通过一个Producers Pool来处理每一个生产消息的请求。

如此,Producer实例便是共享的了,也就是说无法通过每一个请求来配置Producer属性。但是可以通过修改Kafka Rest Proxy的配置文件来制定一个全局的Producer属性。默认的Producer全局属性如下:

producer.threads = 5

其余默认属性继承Kafka-Producer的默认属性,如果想配置的话使用前缀“producer.”+在kafka的producer.propertities里的key就可以了。

  • Consumers

Kafka Rest Prox使用High Level Consumer来实现消费topic的数据。每一个Consumer实例都是有状态(state)的并绑定给一个Topic。Offset提交可以是自动的,也可以由用户在请求中显式指定或提交。默认限制为每个 Consumer对应一个线程,可以使用Multiple Consumers来处理高吞吐量。

尽管不同于Producer实例,Consumer实例是不共享的,但仍然是底层服务才能操控的资源。通过API可以有限的修改一些配置,也可以通过修改Kafka Rest Proxy的配置文件来制定一个全局的Consumer属性。默认的Consumer全局属性如下:

  1. consumer.request.max.bytes = 67108864
  2. consumer.request.timeout.ms = 1000
  3. consumer.threads = 1
  4. simpleconsumer.pool.size.max = 25
  5. consumer.instance.timeout.ms = 300000
  6. consumer.iterator.backoff.ms = 50
  7. consumer.iterator.timeout.ms = 1

其余默认属性继承Kafka-Consumer的默认属性,如果想配置的话使用前缀“consumer.”+在kafka的consumer.propertities里的key就可以了。

具体参数作用可参考

https://docs.confluent.io/2.0.0/kafka-rest/docs/config.html 

  • Multi-topic Produce Requests(研究中)

  • Multi-threaded Consumers(研究中)

  • API使用

通过HTTP的GET、POST请求实现,请求URL格式为

http:/ /<HOST> : <PORT> / [ option / option_value]

HOST是服务器IP,PORT是Kafka-rest的端口,本文配置的是8083。之后便是成对的 [ option / option_value],其中可用的option有:

Option

请求类型   

说明

topics

GET

option_value指定Topic。

option_value为空时用来获得所有Topic列表。

option_value不为空时用来获得指定Topic的metadata 信息

topics

POST

option_value指定Topic,向其发送消息。被发送的消息以特定的Json放在请求中。

partitions

GET

option_value指定Partition。

option_value为空时用来(结合topics/)获得指定Topic的分区表及分区信息。

option_value为不空时用来(结合topics/)获得指定Topic的指定分区的信息。

partitions

POST

option_value指定Partition,向指定分区发送消息

consumers

POST

option_value指定consumer_group

instances

POST

option_value指定instances_id

brokers

GET

option_value为空用来获得broker列表。

  • 基于CURL命令的API介绍

  • 获取Topic列表

CURL命令:

  1. curl {HOST}:{PORT}/topics \
  2. -X GET

Response JSON:

topics (array) – Topic名称列表
  • 获取Topic的metadata

CURL命令:

  1. curl {HOST}:{PORT}/topics/{TOPIC_NAME} \
  2. -X GET

 

Response JSON:

  1. name (string) – 名称
  2. configs (map) – 一系列的Topic配置参数
  3. partitions (array) – 这个Topic的Partition列表
  4. partitions[i].partition (int) – partition ID
  5. partitions[i].leader (int) – partition的leader的 broker ID
  6. partitions[i].replicas (array) – 备份列表
  7. partitions[i].replicas[j].broker (array) – 备份所在broker ID
  8. partitions[i].replicas[j].leader (boolean) – 这个备份是否是leader
  9. partitions[i].replicas[j].in_sync (boolean) – 这个备份是否正在与leader同步
  • 向Topic发送消息

CURL命令:

  1. curl {HOST}:{PORT}/topics/{TOPIC_NAME} \
  2. -X POST \
  3. -H "Content-Type:application/vnd.kafka.binary.v2+json" \
  4. -d {DATA_JSON}

Data JSON:

  1. key_schema (string) – 完整的schema字符串(非必须)
  2. key_schema_id (int) – 注册schema时返回的ID(非必须)
  3. value_schema (string) – 完整的schema字符串(非必须)
  4. value_schema_id (int) – 注册schema时返回的ID(非必须)
  5. records(array) – 发送的一系列记录
  6. records[i].key (object) – 消息的Key用来分区的(非必须)
  7. records[i].value (object) – 消息内容
  8. records[i].partition (int) – 消息的目标分区(非必须)
  9. 注:*schema 和对应的*schema_id二者有一即可。

Response JSON:

  1. offsets (object) – 目标分区的offset信息
  2. offsets[i].partition (int) – 目标分区ID
  3. offsets[i].offset (long) – 目标分区的offset
  4. offsets[i].error_code (long) –此次发送的错误代码
  5. offsets[i].error (string) – 此次发送的错误信息
  • 获取Toppic所有分区信息

CURL命令:

  1. curl {HOST}:{PORT}/topics/{TOPIC_NAME}/partitions \
  2. -X GET

Response JSON:

  1. partition (int) – 分区ID
  2. leader (int) – 分区 leader 的Broker ID
  3. replicas (array) – 分区备份的Broker列表
  4. replicas[i].broker (int) – Broker ID
  5. replicas[i].leader (boolean) – 是否是 leader
  6. replicas[i].in_sync (boolean) – 是否正在同步
  • 获取Topic的指定分区信息

CURL命令:

  1. curl {HOST}:{PORT}/topics/{TOPIC_NAME}/partitions/{PARTITION_ID} \
  2. -X GET

Response JSON:

  1. partition (int) – 分区ID
  2. leader (int) – 分区 leader 的Broker ID
  3. replicas (array) – 分区备份的Broker列表
  4. replicas[i].broker (int) – Broker ID
  5. replicas[i].leader (boolean) – 是否是 leader
  6. replicas[i].in_sync (boolean) – 是否正在同步
  • 向Topic指定分区发送消息

CURL命令:

  1. curl {HOST}:{PORT}/topics/{TOPIC_NAME}/partitions/{PARTITION_ID} \
  2. -X POST \
  3. -H "Content-Type:application/vnd.kafka.binary.v2+json" \
  4. -d {DATA_JSON}

Data JSON:

  1. key_schema (string) – 完整的schema字符串(非必须)
  2. key_schema_id (int) – 注册schema时返回的ID(非必须)
  3. value_schema (string) – 完整的schema字符串(非必须)
  4. value_schema_id (int) – 注册schema时返回的ID(非必须)
  5. records(array) – 发送的一系列记录
  6. records[i].key (object) – 消息的Key用来分区的(非必须)
  7. records[i].value (object) – 消息内容
  8. records[i].partition (int) – 消息的目标分区(非必须)
  9. 注:*schema 和对应的*schema_id二者有一即可。

Response JSON:

  1. offsets (object) – 目标分区的offset信息
  2. offsets[i].partition (int) – 目标分区ID
  3. offsets[i].offset (long) – 目标分区的offset
  4. offsets[i].error_code (long) –此次发送的错误代码
  5. offsets[i].error (string) – 此次发送的错误信息
  • 从Topic指定分区消费消息

CURL命令:

  1. curl {HOST}:{PORT}/topics/{TOPIC_NAME}/partitions/{PARTITION_ID}/messages?offset=(int)[&count=(int)]\
  2. -X GET

查询参数:

  1. offset (int) – 消费的起始Offset
  2. count (int) – 消费数据条数. 默认是 1.

Response JSON:

  1. key (string) – 消息的Key
  2. value (string) – 消息的内容
  3. partition (int) – 所属分区
  4. offset (long) – 所在Offset
  • 注册一个Consumer

CURL命令:

  1. curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME} \
  2. -X POST

Data JSON:

  1. name (string) – Consumer名字
  2. format (string) – 数据反序列化格式,有“binary”, “avro”, “json”. 默认“binary”.
  3. auto.offset.reset (string) – 设置consumer的auto.offset.reset
  4. auto.commit.enable (string) – 设置consumer的auto.commit.enable
  5. fetch.min.bytes (string) – 设置consumer的fetch.min.bytes
  6. consumer.request.timeout.ms (string) – 设置consumer的 consumer.request.timeout.ms

Response JSON:

  1. instance_id (string) – 消费时使用的唯一ID
  2. base_uri (string) – 用于构造针对此使用者实例的后续请求的URI
  •    手动提交Consumer的offsets

CURL命令:

  1. curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME}/instances/{CONSUMER_ID}/offsets \
  2. -X POST

Response JSON:

  1. topic (string) – Topic名称
  2. partition (int) – 提交的分区ID
  3. consumed (long) – 最近消费的消息的offset
  4. committed (long) – 刚刚提交的offset
  • 删除一个Consumer

CURL命令:

  1. curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME}/instances/{CONSUMER_ID} \
  2. -X DELETE
  • 使用一个Consumer消费数据

CURL命令

  1. curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME}/instances/{CONSUMER_ID}/topics/{TOPIC} \
  2. -X GET

Response JSON:

  1. key (string) – 消息的Key
  2. value (string) – 消息的内容
  3. partition (int) – 所属分区
  4. offset (long) – 所在Offset
  • Consumer订阅

CURL命令:

  1. curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME}/instances/{CONSUMER_ID}/subscription \
  2. -X POST
  3. -H "Content-Type:application/vnd.kafka.json.v2+json" \
  4. -d {DATA_JSON}

 Data JSON:

topics(array) – 要订阅的Topic列表
  • Records接口消费

CURL命令:

  1. curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME}\
  2. /instances/{CONSUMER_ID}/records \
  3. -X GET

 Response JSON:

  1. topic(string) - 所在Topic
  2. key (string) – 消息的Key
  3. value (string) – 消息的内容
  4. partition (int) – 所属分区
  5. offset (long) – 所在Offset
  • 详细使用参考:

https://docs.confluent.io/2.0.0/kafka-rest/docs/api.html

  • Java 工具类(DEMO)

目前实现了简单的逐条发送、成批发送、创建消费者、消费 4个方法

https://github.com/zjw271208550/learn/tree/master/console/src/main/java/KafkaREST 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/79816
推荐阅读
相关标签
  

闽ICP备14008679号