当前位置:   article > 正文

Confluent kafka rest实战

添加confluentio仓库

Confluent platform是个什么东西?

    是由LinkedIn开发出Apache Kafka的团队成员,基于这项技术创立了新公司Confluent,Confluent的产品也是围绕着Kafka做的。基本架构如下:

 

可以免费使用的组件:

    Confluent Kafka Brokers (开源)
    Confluent Kafka Connectors(开源)
    Confluent Kafka Clients(开源)
    Confluent Kafka REST Proxy(开源)
    Confluent Schema Registry(开源)

我们的关注:

    本次我们主要使用REST Proxy,当然底层的broker也是使用confluent的kafka组件。

    实验平台:CentOS release 6.7 (Final)

    kafka版本:confluent-kafka-2.11-0.10.1.0-1

    rest proxy版本:confluent-kafka-rest-3.1.1-1

添加Yum仓库:

    本地添加confluent的repo仓库即可

  1. [Confluent.dist]
  2. name=Confluent repository (dist)
  3. baseurl=http://packages.confluent.io/rpm/3.1/6
  4. gpgcheck=1
  5. gpgkey=http://packages.confluent.io/rpm/3.1/archive.key
  6. enabled=1
  7. [Confluent]
  8. name=Confluent repository
  9. baseurl=http://packages.confluent.io/rpm/3.1
  10. gpgcheck=1
  11. gpgkey=http://packages.confluent.io/rpm/3.1/archive.key
  12. enabled=1

安装:

  1. yum clean all
  2. yum makecache
  3. yum install confluent-kafka confluent-kafka-rest -y

配置:

    zookeeper:/etc/kafka/zookeeper.properties

  1. dataDir=/var/lib/zookeeper
  2. clientPort=2181
  3. maxClientCnxns=0

    kafka broker:/etc/kafka/server.properties

  1. broker.id=50
  2. delete.topic.enable=true
  3. listeners=PLAINTEXT://10.205.51.50:9092
  4. num.network.threads=3
  5. num.io.threads=8
  6. socket.send.buffer.bytes=102400
  7. socket.receive.buffer.bytes=102400
  8. socket.request.max.bytes=104857600
  9. log.dirs=/var/lib/kafka
  10. num.partitions=1
  11. num.recovery.threads.per.data.dir=1
  12. log.retention.hours=168
  13. log.segment.bytes=1073741824
  14. log.retention.check.interval.ms=300000
  15. zookeeper.connect=10.205.51.50:2181
  16. zookeeper.connection.timeout.ms=6000
  17. confluent.support.metrics.enable=true
  18. confluent.support.customer.id=anonymous

    rest proxy:/etc/kafka-rest/kafka-rest.properties

  1. id=kafka-rest-server
  2. zookeeper.connect=10.205.51.50:2181

   schema registry:/etc/schema-registry/schema-registry.properties

  1. listeners=http://0.0.0.0:8081
  2. kafkastore.connection.url=10.205.51.50:2181
  3. kafkastore.topic=_schemas
  4. debug=false

启动:

    启动zookeeper:

zookeeper-server-start -daemon /etc/kafka/zookeeper.properties

   

    启动kafka broker

kafka-server-start -daemon /etc/kafka/server.properties

   

    启动rest proxy

kafka-rest-start -daemon /etc/kafka-rest/kafka-rest.properties

    启动schema registry

schema-registry-start -daemon /etc/schema-registry/schema-registry.properties

实验过程:

    rest proxy支持avro、json、binary数据格式,本文以avro、json格式为例进行实战。

    查看当前topics:

curl http://10.205.51.50:8082/topics

   

    查看集群的brokers:

curl http://10.205.51.50:8082/brokers

    

    创建topic test2,存放avro格式的数据:

kafka-topics --create --zookeeper 10.205.51.50:2181 --partitions 1 --replication-factor 1 --topic test2

    通过rest接口向test2 push数据:

curl -i -X POST -H "Content-Type: application/vnd.kafka.avro.v1+json" --data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"username\", \"type\": \"string\"}]}","records": [{"value": {"username": "testUser"}},{"value": {"username": "testUser2"}}]}' http://10.205.51.50:8082/topics/test2

    注册consumer group:

curl -i -X POST -H "Content-Type: application/vnd.kafka.v1+json" --data '{"format": "avro", "auto.offset.reset": "smallest"}' http://10.205.51.50:8082/consumers/my_avro_consumer

    通过rest接口消费数据:

curl -i -X GET -H "Accept: application/vnd.kafka.avro.v1+json" http://10.205.51.50:8082/consumers/my_avro_consumer/instances/rest-consumer-kafka-rest-server-25354850-1a4e-4503-bce2-75b9d9a6fd1a/topics/test2

    删除注册的consumer实例:

curl -i -X DELETE http://10.205.51.50:8082/consumers/my_avro_consumer/instances/rest-consumer-kafka-rest-server-25354850-1a4e-4503-bce2-75b9d9a6fd1a

    创建topic test3,存放json格式的数据:

kafka-topics --create --zookeeper 10.205.51.50:2181 --topic test3 --replication-factor 1 --partitions 1

    通过rest接口向test3 push数据:

curl -i -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records": [{"key": "somekey","value": {"foo": "bar"}},{"value": [ "foo", "bar" ],"partition": 0}]}' http://10.205.51.50:8082/topics/test3

    注册consumer group:

curl -i -X POST -H "Content-Type: application/vnd.kafka.v1+json" --data '{"name": "test3","format": "json", "auto.offset.reset": "smallest"}' http://10.205.51.50:8082/consumers/my_json_consumer

    通过rest接口消费数据:

curl -i -X GET -H "Accept: application/vnd.kafka.json.v1+json" http://10.205.51.50:8082/consumers/my_json_consumer/instances/test3/topics/test3

    删除consumer实例

curl -i -X DELETE http://10.205.51.50:8082/consumers/my_json_consumer/instances/test3

 

    可以看到整个过程还是比较麻烦的,依赖多个服务。

 

 

转载于:https://my.oschina.net/guol/blog/822980

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/79812
推荐阅读
相关标签
  

闽ICP备14008679号