Confluent platform是个什么东西?
是由LinkedIn开发出Apache Kafka的团队成员,基于这项技术创立了新公司Confluent,Confluent的产品也是围绕着Kafka做的。基本架构如下:
可以免费使用的组件:
Confluent Kafka Brokers (开源)
Confluent Kafka Connectors(开源)
Confluent Kafka Clients(开源)
Confluent Kafka REST Proxy(开源)
Confluent Schema Registry(开源)
我们的关注:
本次我们主要使用REST Proxy,当然底层的broker也是使用confluent的kafka组件。
实验平台:CentOS release 6.7 (Final)
kafka版本:confluent-kafka-2.11-0.10.1.0-1
rest proxy版本:confluent-kafka-rest-3.1.1-1
添加Yum仓库:
本地添加confluent的repo仓库即可
- [Confluent.dist]
- name=Confluent repository (dist)
- baseurl=http://packages.confluent.io/rpm/3.1/6
- gpgcheck=1
- gpgkey=http://packages.confluent.io/rpm/3.1/archive.key
- enabled=1
-
- [Confluent]
- name=Confluent repository
- baseurl=http://packages.confluent.io/rpm/3.1
- gpgcheck=1
- gpgkey=http://packages.confluent.io/rpm/3.1/archive.key
- enabled=1
安装:
- yum clean all
- yum makecache
- yum install confluent-kafka confluent-kafka-rest -y
配置:
zookeeper:/etc/kafka/zookeeper.properties
- dataDir=/var/lib/zookeeper
- clientPort=2181
- maxClientCnxns=0
kafka broker:/etc/kafka/server.properties
- broker.id=50
- delete.topic.enable=true
- listeners=PLAINTEXT://10.205.51.50:9092
- num.network.threads=3
- num.io.threads=8
- socket.send.buffer.bytes=102400
- socket.receive.buffer.bytes=102400
- socket.request.max.bytes=104857600
- log.dirs=/var/lib/kafka
- num.partitions=1
- num.recovery.threads.per.data.dir=1
- log.retention.hours=168
- log.segment.bytes=1073741824
- log.retention.check.interval.ms=300000
- zookeeper.connect=10.205.51.50:2181
- zookeeper.connection.timeout.ms=6000
- confluent.support.metrics.enable=true
- confluent.support.customer.id=anonymous
rest proxy:/etc/kafka-rest/kafka-rest.properties
- id=kafka-rest-server
- zookeeper.connect=10.205.51.50:2181
schema registry:/etc/schema-registry/schema-registry.properties
- listeners=http://0.0.0.0:8081
- kafkastore.connection.url=10.205.51.50:2181
- kafkastore.topic=_schemas
- debug=false
启动:
启动zookeeper:
zookeeper-server-start -daemon /etc/kafka/zookeeper.properties
启动kafka broker
kafka-server-start -daemon /etc/kafka/server.properties
启动rest proxy
kafka-rest-start -daemon /etc/kafka-rest/kafka-rest.properties
启动schema registry
schema-registry-start -daemon /etc/schema-registry/schema-registry.properties
实验过程:
rest proxy支持avro、json、binary数据格式,本文以avro、json格式为例进行实战。
查看当前topics:
curl http://10.205.51.50:8082/topics
查看集群的brokers:
curl http://10.205.51.50:8082/brokers
创建topic test2,存放avro格式的数据:
kafka-topics --create --zookeeper 10.205.51.50:2181 --partitions 1 --replication-factor 1 --topic test2
通过rest接口向test2 push数据:
curl -i -X POST -H "Content-Type: application/vnd.kafka.avro.v1+json" --data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"username\", \"type\": \"string\"}]}","records": [{"value": {"username": "testUser"}},{"value": {"username": "testUser2"}}]}' http://10.205.51.50:8082/topics/test2
注册consumer group:
curl -i -X POST -H "Content-Type: application/vnd.kafka.v1+json" --data '{"format": "avro", "auto.offset.reset": "smallest"}' http://10.205.51.50:8082/consumers/my_avro_consumer
通过rest接口消费数据:
curl -i -X GET -H "Accept: application/vnd.kafka.avro.v1+json" http://10.205.51.50:8082/consumers/my_avro_consumer/instances/rest-consumer-kafka-rest-server-25354850-1a4e-4503-bce2-75b9d9a6fd1a/topics/test2
删除注册的consumer实例:
curl -i -X DELETE http://10.205.51.50:8082/consumers/my_avro_consumer/instances/rest-consumer-kafka-rest-server-25354850-1a4e-4503-bce2-75b9d9a6fd1a
创建topic test3,存放json格式的数据:
kafka-topics --create --zookeeper 10.205.51.50:2181 --topic test3 --replication-factor 1 --partitions 1
通过rest接口向test3 push数据:
curl -i -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records": [{"key": "somekey","value": {"foo": "bar"}},{"value": [ "foo", "bar" ],"partition": 0}]}' http://10.205.51.50:8082/topics/test3
注册consumer group:
curl -i -X POST -H "Content-Type: application/vnd.kafka.v1+json" --data '{"name": "test3","format": "json", "auto.offset.reset": "smallest"}' http://10.205.51.50:8082/consumers/my_json_consumer
通过rest接口消费数据:
curl -i -X GET -H "Accept: application/vnd.kafka.json.v1+json" http://10.205.51.50:8082/consumers/my_json_consumer/instances/test3/topics/test3
删除consumer实例
curl -i -X DELETE http://10.205.51.50:8082/consumers/my_json_consumer/instances/test3
可以看到整个过程还是比较麻烦的,依赖多个服务。