赞
踩
目录
以下概述Kafka内的几个核心概念,可参考官方文档,有兴趣可读:kafka.apache.org
Topic与日志
Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka 中的 Topics 总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。对于每一个topic, Kafka 集群都会维持一个分区日志,如下所示:
每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。分区中的每一个记录都会分配一个id号来表示顺序,我们称之为offset,offset用来唯一的标识分区中每一条记录。
Kafka 集群保留所有发布的记录—无论他们是否已被消费—并通过一个可配置的参数——保留期限来控制。Kafka的性能和数据大小无关,因此能够长时间存储数据。
在每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置。偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据;也可以跳过最近的记录,从"现在"开始消费。
生产者
生产者可以将数据发布到所选择的topic(主题)中。生产者负责将记录分配到topic的哪一个 partition(分区)中。可以使用循环的方式来简单地实现负载均衡,也可以根据某些语义分区函数(例如:记录中的key)来完成。
消费者
消费者使用一个 消费组 名称来进行标识,发布到topic中的每条记录被分配给订阅消费组中的一个消费者实例.消费者实例可以分布在多个进程中或者多个机器上。如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例。如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程。
如图,这个 Kafka 集群有两台 server 的,四个分区(p0-p3)和两个消费者组。消费组A有两个消费者,消费组B有四个消费者。通常情况下,每个 topic 都会有一些消费组,一个消费组对应一个"逻辑订阅者"。一个消费组由许多消费者实例组成,便于扩展和容错。这就是发布和订阅的概念,只不过订阅者是一组消费者而不是单个的进程。
在Kafka中实现消费的方式是将日志中的分区划分到每一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。维护消费组中的消费关系由Kafka协议动态处理。如果新的实例加入组,他们将从组中其他成员处接管一些 partition 分区;如果一个实例消失,拥有的分区将被分发到剩余的实例。
Kafka 只保证分区内的记录是有序的,而不保证主题中不同分区的顺序。每个 partition 分区按照key值排序足以满足大多数应用程序的需求。但如果你需要总记录在所有记录的上面,可使用仅有一个分区的主题来实现,这意味着每个消费者组只有一个消费者进程。
分布式
日志的分区partition (分布)在Kafka集群的服务器上。每个服务器在处理数据和请求时,共享这些分区。每一个分区都会在已配置的服务器上进行备份,确保容错性。每个分区都有一台 server 作为 “leader”,零台或者多台server作为 follwers 。leader server 处理一切对 partition (分区)的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers 中的一台服务器会自动成为新的 leader。每台 server 都会成为某些分区的 leader 和某些分区的 follower,因此集群的负载是平衡的。
以上内容摘自:www.wenjiangs.com/doc/kafka-intro
本文部署kafka集群至3个服务器节点。
服务器(或虚拟机) | 角色 |
192.168.**.131 | zookeeper节点1、kafka节点1 |
192.168.**.132 | zookeeper节点2、kafka节点2 |
192.168.**.133 | zookeeper节点3、kafka节点3 |
软件 | 角色 |
docker | 环境部署(提前Pull镜像) sudo docker pull zookeeper sudo docker pull wurstmeister/kafka sudo docker pull sheepkiller/kafka-manager |
- sudo mkdir /data/kafka_cluster
- cd /data/kafka_cluster
- sudo mkdir -p zookeeper/config
- sudo mkdir -p zookeeper/data
- sudo vim zookeeper/config/zoo.cfg
- # 粘贴下方代码保存退出
- clientPort=2181
- dataDir=/data
- dataLogDir=/data/log
- tickTime=2000
- initLimit=5
- syncLimit=2
- autopurge.snapRetainCount=3
- autopurge.purgeInterval=0
- maxClientCnxns=60
- 4lw.commands.whitelist=*
- server.1=192.168.**.131:2888:3888
- server.2=192.168.**.132:2888:3888
- server.3=192.168.**.133:2888:3888
- sudo vim zookeeper/data/myid
- # 分别向131,132,133节点写入1,2,3保存退出
- # 节点1服务器:131
- sudo docker run --network host -v /data/kafka_cluster/zookeeper/data:/data -v /data/kafka_cluster/zookeeper/conf/zoo.cfg:/conf/zoo.cfg --name zookeeper-1 -itd zookeeper
- # 节点2服务器:132
- sudo docker run --network host -v /data/kafka_cluster/zookeeper/data:/data -v /data/kafka_cluster/zookeeper/conf/zoo.cfg:/conf/zoo.cfg --name zookeeper-2 -itd zookeeper
- # 节点3服务器:133
- sudo docker run --network host -v /data/kafka_cluster/zookeeper/data:/data -v /data/kafka_cluster/zookeeper/conf/zoo.cfg:/conf/zoo.cfg --name zookeeper-3 -itd zookeeper
sudo mkdir -p kafka/log
- # 该处的启动方式加入了部分环境变量,等同于修改配置文件,更加方便
- zookeepers_con="192.168.**.131:2181,192.168.**.132:2181,192.168.**.133:2181"
- # 节点1服务器:131
- sudo docker run -p 9092:9092 --name kafka-1 -itd \
- -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=${zookeepers_con} \
- -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.**.131:9092 \
- -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /data/kafka/log:/kafka wurstmeister/kafka
- # 节点2服务器:132
- sudo docker run -p 9092:9092 --name kafka-2 -itd \
- -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=${zookeepers_con} \
- -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.**.132:9092 \
- -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /data/kafka/log:/kafka wurstmeister/kafka
- # 节点3服务器:133
- sudo docker run -p 9092:9092 --name kafka-3 -itd \
- -e KAFKA_BROKER_ID=3 -e KAFKA_ZOOKEEPER_CONNECT=${zookeepers_con} \
- -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.**.133:9092 \
- -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /data/kafka/log:/kafka wurstmeister/kafka
- sudo docker run -p 9000:9000 --name kafka-manager -itd \
- -e ZK_HOSTS=${zookeepers_con} sheepkiller/kafka-manager
网页端打开主节点的9000端口,如下图所示进行集群的添加,其他设置可暂时不设置;
选择创建的集群test-cluster,查看集群信息;
在主节点创建消息主题test-topic:
在网页管理端查看集群topic概况已能够查询到消息主题test-topic:
创建生产者并发布消息:
创建消费者并订阅消息,消息消费显示正常:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。