当前位置:   article > 正文

kafka restful api功能介绍与使用

kafka restful接口
  • 前述

采用confluent kafka-rest proxy实现kafka restful service时候(具体参考上一篇笔记),通过http协议数据传输,需要注意的是采用了base64编码(或者称之为加密),如果消息再post之前不采用base64处理将会出现:服务端消息乱码、程序报错等,因此正常的处理流程是:
1.先对待post的消息做UTF-8统一处理
2.采用base64编码包处理消息

s='Kafka,hi'
ad="hi,kafka,i'm xnchall"
aa=ad.encode()#UTF-8统一处理
print(aa)
b64=base64.b64encode(ad.encode())#base64编码包统一处理
  • 利用kafka-rest生产消息
POST /topics/(string:topic_name)

data={"records":[
{
"key":"a2V5",
"value":"Y29uZmx1ZW50"
},
{
"value":"a2Fma2E=",
"partition":1
},
{
"value":"bG9ncw=="
}
]}
data1={"records":[{"value":"5bCK5pWs55qE5a6i5oi35oKo5aW977yMaGkga2Fma2EsIGknbSB4bmNoYWxs"}]}
header={"Content-Type":"application/vnd.kafka.v1+json"}
r=requests.post(url=url,json=data,headers=header)
r=requests.post(url=url,json=data1,headers=header)
View Code
  • 向指定分区生产消息:Produce messages to one partition of the topic
POST /topics/(string:topic_name)/partitions/(int:partition_id)
ad="hi kafka,i'm xnchall"
url11="http://192.168.160.101:8082/topics/test_kfk_lk/partitions/1"

data2={"records":[{"value":(base64.b64encode(ad.encode())).decode()}]}
print(data2)
r2=requests.post(url=url11,json=data2,headers=header)
print(r2)
print(r2.content)
View Code
  • 创建或者注册消费实例:Create a new consumer instance in the consumer group
POST /consumers/(string:group_name)

url3="http://192.168.160.101:8082/consumers/my_group"
data3={
"id":"my_consumer1",
"format":"binary",
"auto.offset.reset":"smallest",
"auto.commit.enable":"false"
}

r3=requests.post(url=url3,json=data3,headers=header)
View Code
  • 提交偏移  Commit offsets for the consumer
POST /consumers/(string:group_name)/instances/(string:instance)/offsets

url4="http://192.168.160.101:8082/consumers/my_group/instances/my_consumer1/offsets"
r4=requests.post(url=url4,headers=header)
View Code
  • 消费消息
GET /consumers/(string:group_name)/instances/(string:instance)/topics/(string:topic_name)

url_get2="http://192.168.160.101:8082/consumers/my_group/instances/my_consumer1/topics/test_kfk_lk"
rr2=requests.get(url=url_get2,headers=header)#,params={"timeout":3000000}
print(rr2)
print(rr2.content)
print(rr2.text)
View Code
  • 删除消费者实例 Destroy the consumer instance
DELETE /consumers/(string:group_name)/instances/(string:instance)
#url_del="http://192.168.160.101:8082/consumers/test_kfk_lk/instances/my_consumer"
#d1=requests.delete(url_del)#删除消费者实例
#print(d1)
View Code
  • 获取指定分区、偏移消息: Consume messages from one partition of the topic.(api V2)
GET /topics/(string:topic_name)/partitions/(int:partition_id)/messages?offset=(int)[&count=(int)]

  1. Fetch Response v1 only contains message format v0.
  2. Fetch Response v2 might either contain message format v0 or message format v1.
  3. Possible Error Codes
  4. * OFFSET_OUT_OF_RANGE (1)
  5. * UNKNOWN_TOPIC_OR_PARTITION (3)
  6. * NOT_LEADER_FOR_PARTITION (6)
  7. * REPLICA_NOT_AVAILABLE (9)
  8. * UNKNOWN (-1)
url_p="http://192.168.160.101:8082/topics/test_kfk/partitions/0/messages"
rst=requests.get(url_p,headers=header,params={"offset":3,"count":2})#,"count":2})
print(rst)
print(len(rst.json()))
if(rst.status_code!=500):
For itr in rst.json():
    print(base64.b64decode(itr['value']).decode())
print(rst.url)#http://192.168.160.101:8082/topics/test_kfk/partitions/0/messages?offset=3&count=2
View Code
  • 获取当前订阅的topic列表.(api V2)
POST /consumers/(string:group_name)/instances/(string:instance)/subscription
  • 获取手工指定的消费者的分区(api V2)
GET /consumers/(string:group_name)/instances/(string:instance)/assignments
  1. GET /consumers/testgroup/instances/my_consumer/assignments HTTP/1.1
  2. Host: proxy-instance.kafkaproxy.example.com
  3. Accept: application/vnd.kafka.v2+json
  4. HTTP/1.1 200 OK
  5. Content-Type: application/vnd.kafka.v2+json
  6. {
  7. "partitions": [
  8. {
  9. "topic": "test",
  10. "partition": 0
  11. },
  12. {
  13. "topic": "test",
  14. "partition": 1
  15. }
  16. ]
  17. }
  • 覆盖消费者即将消费的消息的偏移量(api V2)
POST /consumers/(string:group_name)/instances/(string:instance)/positions
  1. POST /consumers/testgroup/instances/my_consumer/positions HTTP/1.1
  2. Host: proxy-instance.kafkaproxy.example.com
  3. Content-Type: application/vnd.kafka.v2+json
  4. {
  5. "offsets": [
  6. {
  7. "topic": "test",
  8. "partition": 0,
  9. "offset": 20
  10. },
  11. {
  12. "topic": "test",
  13. "partition": 1,
  14. "offset": 30
  15. }
  16. ]
  17. }
  • 获取给定topic的分区的最后偏移
POST /consumers/(string:group_name)/instances/(string:instance)/positions/end
  1. POST /consumers/testgroup/instances/my_consumer/positions/end HTTP/1.1
  2. Host: proxy-instance.kafkaproxy.example.com
  3. Content-Type: application/vnd.kafka.v2+json
  4. {
  5. "partitions": [
  6. {
  7. "topic": "test",
  8. "partition": 0
  9. },
  10. {
  11. "topic": "test",
  12. "partition": 1
  13. }
  14. ]
  15. }
  • 使用分配和订阅api消费topic或者分区数据
GET /consumers/(string:group_name)/instances/(string:instance)/records
  1. GET /consumers/testgroup/instances/my_consumer/records?timeout=3000&max_bytes=300000 HTTP/1.1
  2. Host: proxy-instance.kafkaproxy.example.com
  3. Accept: application/vnd.kafka.binary.v2+json
  4. Example binary response:
  5. HTTP/1.1 200 OK
  6. Content-Type: application/vnd.kafka.binary.v2+json
  7. [
  8. {
  9. "topic": "test",
  10. "key": "a2V5",
  11. "value": "Y29uZmx1ZW50",
  12. "partition": 1,
  13. "offset": 100,
  14. },
  15. {
  16. "topic": "test",
  17. "key": "a2V5",
  18. "value": "a2Fma2E=",
  19. "partition": 2,
  20. "offset": 101,
  21. }
  22. ]

  

 

转载于:https://www.cnblogs.com/xnchll/p/9618432.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/79829
推荐阅读
相关标签
  

闽ICP备14008679号