赞
踩
公司由于业务需求想要通过http的方式往kafka发送消息,所以需要安装一个kafka的插件Confluent 实现。
官网下载链接:https://www.confluent.io/download.
vim /etc/kafka-rest/kafka-rest.properties
#在此文件中去掉注释或者修改以下内容(自行修改自己的IP,这个不多说)
id=kafka-rest-test-server
schema.registry.url=http://Commit-Master:8081
zookeeper.connect=Commit-Master:2181,Commit-Slave-1:2182
bootstrap.servers=PLAINTEXT://Commit-Master:9092
# 把这个文件也改一下
vim /etc/schema-registry/schema-registry.properties
# 去掉注释或者修改以下内容,监听的这个端口默认的是多少我忘记了,是与我的spark的某个端口冲突的,所以我改成了这个,这个就自行判断了
listeners=http://0.0.0.0:8089
kafkastore.connection.url=Commit-Master:2181,Commit-Slave-1:2181
kafkastore.topic=_schemas
debug=True
bin
目录下执行分别执行以下命令./kafka-rest-start ../etc/kafka-rest/kafka-rest.properties
./schema-registry-start ../etc/schema-registry/schema-registry.properties
这里提醒一下,如果不开启这个注册功能,你会发现,只能查询,但是不能往kafka发送消息,所以都要开启
#通过rest接口向topic push数据:
curl -i -X POST -H "Content-Type: application/vnd.kafka.avro.v1+json" --data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"username\", \"type\": \"string\"}]}","records": [{"value": {"username": "testUser"}},{"value": {"username": "testUser2"}}]}' http://192.168.0.221:8082/topics/test0210
#注册consumer group
curl -i -X POST -H "Content-Type: application/vnd.kafka.v1+json" --data '{"format": "avro", "auto.offset.reset": "smallest"}' http://192.168.0.221:8082/consumers/my_consumer_group
#手动消费消息
curl -i -X GET -H "Accept: application/vnd.kafka.avro.v1+json" http://10.205.51.50:8082/consumers/my_avro_consumer/instances/rest-consumer-kafka-rest-server-25354850-1a4e-4503-bce2-75b9d9a6fd1a/topics/test0210
from kafka import KafkaConsumer
conf = {
'bootstrap_servers': '192.168.0.221:9092',
'topic_name': 'test0210',
'consumer_id': 'consumer-id'
}
# print (start consumer)
consumer = KafkaConsumer(conf['topic_name'],
bootstrap_servers=conf['bootstrap_servers'],
group_id=conf['consumer_id'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
header: "Content-Type: application/vnd.kafka.v1+json"
body/raw: {"records":[{"value":"5bCK5pWs55qE5a6i5oi35oKo5aW977yMaGkga2Fma2EsIGknbSB4bmNoYWxs"}]}
header: "Content-Type:application/vnd.kafka.json.v1+json" body/raw: { "records": [ { "key": "somekey", "value": {"foo": "bar"} }, { "key": "somekey", "value": {"foo": "11111111"} }, { "value": 123456 } ] }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。