当前位置:   article > 正文

Confluent 安装 Kafka REST Proxy服务使用_confluent kafka rest proxy

confluent kafka rest proxy

背景:

公司由于业务需求想要通过http的方式往kafka发送消息,所以需要安装一个kafka的插件Confluent 实现。

1.下载安装

官网下载链接:https://www.confluent.io/download.

  • 安装的前提是kafka和zookeeper有安装成功,能正常运行(网上很多教程,自行查找吧)
  • 下载好tar包之后解压放在自己指定的目录下然后开始修改配置文件
  • 解压后进入到confluent的目录下修改以下文件
vim /etc/kafka-rest/kafka-rest.properties
#在此文件中去掉注释或者修改以下内容(自行修改自己的IP,这个不多说)
id=kafka-rest-test-server
schema.registry.url=http://Commit-Master:8081
zookeeper.connect=Commit-Master:2181,Commit-Slave-1:2182
bootstrap.servers=PLAINTEXT://Commit-Master:9092

# 把这个文件也改一下
vim /etc/schema-registry/schema-registry.properties
# 去掉注释或者修改以下内容,监听的这个端口默认的是多少我忘记了,是与我的spark的某个端口冲突的,所以我改成了这个,这个就自行判断了
listeners=http://0.0.0.0:8089
kafkastore.connection.url=Commit-Master:2181,Commit-Slave-1:2181
kafkastore.topic=_schemas
debug=True
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 修改后就可以开启,进入到bin目录下执行分别执行以下命令
./kafka-rest-start ../etc/kafka-rest/kafka-rest.properties
./schema-registry-start ../etc/schema-registry/schema-registry.properties
  • 1
  • 2

这里提醒一下,如果不开启这个注册功能,你会发现,只能查询,但是不能往kafka发送消息,所以都要开启

  • 开启之后可以看到以下的日志
    在这里插入图片描述
    在这里插入图片描述

二.使用

  • 直接访问查询当前存在的topic(ip自行修改)
    http://192.168.0.221:8082/topics
  • 查询指定topic详情
    http://192.168.0.221:8082/topics/test0210
#通过rest接口向topic push数据:
curl -i -X POST -H "Content-Type: application/vnd.kafka.avro.v1+json" --data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"username\", \"type\": \"string\"}]}","records": [{"value": {"username": "testUser"}},{"value": {"username": "testUser2"}}]}' http://192.168.0.221:8082/topics/test0210

#注册consumer group
curl -i -X POST -H "Content-Type: application/vnd.kafka.v1+json" --data '{"format": "avro", "auto.offset.reset": "smallest"}' http://192.168.0.221:8082/consumers/my_consumer_group

#手动消费消息
curl -i -X GET -H "Accept: application/vnd.kafka.avro.v1+json" http://10.205.51.50:8082/consumers/my_avro_consumer/instances/rest-consumer-kafka-rest-server-25354850-1a4e-4503-bce2-75b9d9a6fd1a/topics/test0210
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 以推送消息为例,执行正常后是返回以下结果的
    在这里插入图片描述
    -为了方便,这里我使用python的代码简单开启一个消费者来接收消息
from kafka import KafkaConsumer

conf = {
    'bootstrap_servers': '192.168.0.221:9092',
    'topic_name': 'test0210',
    'consumer_id': 'consumer-id'
}

# print (start consumer)
consumer = KafkaConsumer(conf['topic_name'],
                        bootstrap_servers=conf['bootstrap_servers'],
                        group_id=conf['consumer_id'])

for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 收是收到了,可是这个东西前面有一串不知道什么鬼东西,编了码,不知道用什么解码。。
    在这里插入图片描述
    我参考了网上教程和官方文档看到了有好几种格式,然后每一个我都试一试,使用postman进行发送数据,我试了一下下面这种是以utf-8编码的,下面有成功后的响应
header: "Content-Type: application/vnd.kafka.v1+json"
body/raw: {"records":[{"value":"5bCK5pWs55qE5a6i5oi35oKo5aW977yMaGkga2Fma2EsIGknbSB4bmNoYWxs"}]}
  • 1
  • 2

在这里插入图片描述
用utf-8解一下码
在这里插入图片描述

header: "Content-Type:application/vnd.kafka.json.v1+json"
body/raw:
{
  "records": [
    {
      "key": "somekey",
      "value": {"foo": "bar"}
    },
    {
      "key": "somekey",
      "value": {"foo": "11111111"}
    },
    {
      "value": 123456
    }
  ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 消费者收到的是这样的
    在这里插入图片描述
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/79806
推荐阅读
相关标签
  

闽ICP备14008679号