赞
踩
可以使用对相应URL的GET请求读取有关群集的大多数Metadata 信息,如:brokers, topics, partitions, configs等。
区别于Java Client,Kafka-rest不会对外公开Producer对象,而是通过一个Producers Pool来处理每一个生产消息的请求。
如此,Producer实例便是共享的了,也就是说无法通过每一个请求来配置Producer属性。但是可以通过修改Kafka Rest Proxy的配置文件来制定一个全局的Producer属性。默认的Producer全局属性如下:
producer.threads = 5
其余默认属性继承Kafka-Producer的默认属性,如果想配置的话使用前缀“producer.”+在kafka的producer.propertities里的key就可以了。
Kafka Rest Prox使用High Level Consumer来实现消费topic的数据。每一个Consumer实例都是有状态(state)的并绑定给一个Topic。Offset提交可以是自动的,也可以由用户在请求中显式指定或提交。默认限制为每个 Consumer对应一个线程,可以使用Multiple Consumers来处理高吞吐量。
尽管不同于Producer实例,Consumer实例是不共享的,但仍然是底层服务才能操控的资源。通过API可以有限的修改一些配置,也可以通过修改Kafka Rest Proxy的配置文件来制定一个全局的Consumer属性。默认的Consumer全局属性如下:
- consumer.request.max.bytes = 67108864
- consumer.request.timeout.ms = 1000
- consumer.threads = 1
- simpleconsumer.pool.size.max = 25
- consumer.instance.timeout.ms = 300000
- consumer.iterator.backoff.ms = 50
- consumer.iterator.timeout.ms = 1
其余默认属性继承Kafka-Consumer的默认属性,如果想配置的话使用前缀“consumer.”+在kafka的consumer.propertities里的key就可以了。
具体参数作用可参考
https://docs.confluent.io/2.0.0/kafka-rest/docs/config.html
通过HTTP的GET、POST请求实现,请求URL格式为
http:/ /<HOST> : <PORT> / [ option / option_value]
HOST是服务器IP,PORT是Kafka-rest的端口,本文配置的是8083。之后便是成对的 [ option / option_value],其中可用的option有:
Option | 请求类型 | 说明 |
---|---|---|
topics | GET | option_value指定Topic。 option_value为空时用来获得所有Topic列表。 option_value不为空时用来获得指定Topic的metadata 信息 |
topics | POST | option_value指定Topic,向其发送消息。被发送的消息以特定的Json放在请求中。 |
partitions | GET | option_value指定Partition。 option_value为空时用来(结合topics/)获得指定Topic的分区表及分区信息。 option_value为不空时用来(结合topics/)获得指定Topic的指定分区的信息。 |
partitions | POST | option_value指定Partition,向指定分区发送消息 |
consumers | POST | option_value指定consumer_group |
instances | POST | option_value指定instances_id |
brokers | GET | option_value为空用来获得broker列表。 |
CURL命令:
- curl {HOST}:{PORT}/topics \
- -X GET
Response JSON:
topics (array) – Topic名称列表
CURL命令:
- curl {HOST}:{PORT}/topics/{TOPIC_NAME} \
- -X GET
Response JSON:
- name (string) – 名称
- configs (map) – 一系列的Topic配置参数
- partitions (array) – 这个Topic的Partition列表
- partitions[i].partition (int) – partition ID
- partitions[i].leader (int) – partition的leader的 broker ID
- partitions[i].replicas (array) – 备份列表
- partitions[i].replicas[j].broker (array) – 备份所在broker ID
- partitions[i].replicas[j].leader (boolean) – 这个备份是否是leader
- partitions[i].replicas[j].in_sync (boolean) – 这个备份是否正在与leader同步
CURL命令:
- curl {HOST}:{PORT}/topics/{TOPIC_NAME} \
- -X POST \
- -H "Content-Type:application/vnd.kafka.binary.v2+json" \
- -d {DATA_JSON}
Data JSON:
- key_schema (string) – 完整的schema字符串(非必须)
- key_schema_id (int) – 注册schema时返回的ID(非必须)
- value_schema (string) – 完整的schema字符串(非必须)
- value_schema_id (int) – 注册schema时返回的ID(非必须)
- records(array) – 发送的一系列记录
- records[i].key (object) – 消息的Key用来分区的(非必须)
- records[i].value (object) – 消息内容
- records[i].partition (int) – 消息的目标分区(非必须)
- 注:*schema 和对应的*schema_id二者有一即可。
Response JSON:
- offsets (object) – 目标分区的offset信息
- offsets[i].partition (int) – 目标分区ID
- offsets[i].offset (long) – 目标分区的offset
- offsets[i].error_code (long) –此次发送的错误代码
- offsets[i].error (string) – 此次发送的错误信息
CURL命令:
- curl {HOST}:{PORT}/topics/{TOPIC_NAME}/partitions \
- -X GET
Response JSON:
- partition (int) – 分区ID
- leader (int) – 分区 leader 的Broker ID
- replicas (array) – 分区备份的Broker列表
- replicas[i].broker (int) – Broker ID
- replicas[i].leader (boolean) – 是否是 leader
- replicas[i].in_sync (boolean) – 是否正在同步
CURL命令:
- curl {HOST}:{PORT}/topics/{TOPIC_NAME}/partitions/{PARTITION_ID} \
- -X GET
Response JSON:
- partition (int) – 分区ID
- leader (int) – 分区 leader 的Broker ID
- replicas (array) – 分区备份的Broker列表
- replicas[i].broker (int) – Broker ID
- replicas[i].leader (boolean) – 是否是 leader
- replicas[i].in_sync (boolean) – 是否正在同步
CURL命令:
- curl {HOST}:{PORT}/topics/{TOPIC_NAME}/partitions/{PARTITION_ID} \
- -X POST \
- -H "Content-Type:application/vnd.kafka.binary.v2+json" \
- -d {DATA_JSON}
Data JSON:
- key_schema (string) – 完整的schema字符串(非必须)
- key_schema_id (int) – 注册schema时返回的ID(非必须)
- value_schema (string) – 完整的schema字符串(非必须)
- value_schema_id (int) – 注册schema时返回的ID(非必须)
- records(array) – 发送的一系列记录
- records[i].key (object) – 消息的Key用来分区的(非必须)
- records[i].value (object) – 消息内容
- records[i].partition (int) – 消息的目标分区(非必须)
- 注:*schema 和对应的*schema_id二者有一即可。
Response JSON:
- offsets (object) – 目标分区的offset信息
- offsets[i].partition (int) – 目标分区ID
- offsets[i].offset (long) – 目标分区的offset
- offsets[i].error_code (long) –此次发送的错误代码
- offsets[i].error (string) – 此次发送的错误信息
CURL命令:
- curl {HOST}:{PORT}/topics/{TOPIC_NAME}/partitions/{PARTITION_ID}/messages?offset=(int)[&count=(int)]\
- -X GET
查询参数:
- offset (int) – 消费的起始Offset
- count (int) – 消费数据条数. 默认是 1.
Response JSON:
- key (string) – 消息的Key
- value (string) – 消息的内容
- partition (int) – 所属分区
- offset (long) – 所在Offset
CURL命令:
- curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME} \
- -X POST
Data JSON:
- name (string) – Consumer名字
- format (string) – 数据反序列化格式,有“binary”, “avro”, “json”. 默认“binary”.
- auto.offset.reset (string) – 设置consumer的auto.offset.reset
- auto.commit.enable (string) – 设置consumer的auto.commit.enable
- fetch.min.bytes (string) – 设置consumer的fetch.min.bytes
- consumer.request.timeout.ms (string) – 设置consumer的 consumer.request.timeout.ms
Response JSON:
- instance_id (string) – 消费时使用的唯一ID
- base_uri (string) – 用于构造针对此使用者实例的后续请求的URI
CURL命令:
- curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME}/instances/{CONSUMER_ID}/offsets \
- -X POST
Response JSON:
- topic (string) – Topic名称
- partition (int) – 提交的分区ID
- consumed (long) – 最近消费的消息的offset
- committed (long) – 刚刚提交的offset
CURL命令:
- curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME}/instances/{CONSUMER_ID} \
- -X DELETE
CURL命令
- curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME}/instances/{CONSUMER_ID}/topics/{TOPIC} \
- -X GET
Response JSON:
- key (string) – 消息的Key
- value (string) – 消息的内容
- partition (int) – 所属分区
- offset (long) – 所在Offset
CURL命令:
- curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME}/instances/{CONSUMER_ID}/subscription \
- -X POST
- -H "Content-Type:application/vnd.kafka.json.v2+json" \
- -d {DATA_JSON}
Data JSON:
topics(array) – 要订阅的Topic列表
CURL命令:
- curl {HOST}:{PORT}/consumers/{CONSUMER_GROUP_NAME}\
- /instances/{CONSUMER_ID}/records \
- -X GET
Response JSON:
- topic(string) - 所在Topic
- key (string) – 消息的Key
- value (string) – 消息的内容
- partition (int) – 所属分区
- offset (long) – 所在Offset
https://docs.confluent.io/2.0.0/kafka-rest/docs/api.html
目前实现了简单的逐条发送、成批发送、创建消费者、消费 4个方法
https://github.com/zjw271208550/learn/tree/master/console/src/main/java/KafkaREST
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。