赞
踩
kafka 是一个分布式数据流平台,可以运行在单台服务器上,也可以在多台服务器上部署形成集群。他提供了发布和订阅功能,使用者可以发送数据到kafka中,也可以从kafka中读取数据,以便进行后续的处理,kafka具有高吞吐,低延迟,高容错等特点
kafka cluster: kafka集群,一台或多台服务器组成
Consumer:消费者,即消息的消费方,是消息的出口
Consumer group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能本消费族中的某一个消费者消费,同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量
在kafka中,如果某个topic有多个partition,producer有怎么知道该将数据发往哪个partition呢?kafka中有几个原则:
producer在向kafka写入消息的时候,可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为0,1,all
最后要注意的是,如果往不存在的topic写数据,kafka会自动创建topic,partition和replication的数量默认配置都是1
topic是同一类别的消息记录(record)的集合,在kafaka中,一个主题通常有多个订阅者,对于每个主题,kafka集群维护了一个分区数据日志文件结构,如下
每个partition都是一个有序并且不可变的消息记录集合,当新的数据写入是,就被追加到partition的末尾,在每个partition中,每条消息都会被分配一个顺序的唯一标识,这个标识被成为offset,即偏移量
注意,在kafka只保证在同一个partition内部消息是有序的,在不同partion之间,并不能保证消息有序
kafka可以配置一个保留期限,用来标识日志会在kafka集群内保留多长时间,kafka集群会保留在保留期限内所有被发布的消息,不管这些消息是否被消费过。比如保留期限设置为两天,那么数据就被发布到kafka集群的两天以内,所有的这些数据都可以被消费,当超过两天,这些数据就会被晴空,以便为后续的数据腾出空间,由于kafka会将数据进行持久化存储(即写入到硬盘),所以保留的数据大小可以设置为一个比较大的值
Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文佳又包含.index文件,.log文件,.timeindex文件三个文件,其中.log文件就是实际存储message的地方,而.index文件和.timeindex文件为索引文件,用于检索消息
多个消费者实例可以组成一个消费者组,并用一个标签来标识这个消费者组,一个消费者组中的不同消费者实例可以运行在不同的进程甚至是不用的服务器上
如果所有的消费者实例都在同一个消费者组中,那么消费记录会被很好的均衡的发送到每个消费者实例
如果所有的消费者实例都在不用的消费者组,那么每一条消费记录会被广播到每一个消费者实例
举个例子,如上图所示一个两个节点的kafka集群上拥有一个四个partition(p0-p3)的topic。有两个消费者组都在消费这个topic中的数据,消费者组Ay欧两个消费者实例,消费者组B有四个消费者实例从图中我们可以看到,在同一个消费者组中,每个消费者实例可以消费多个分区,但是每个分区最多只能被消费者组中的一个实例消费,也就是说,如果有一个4个分区的主题,那么消费者组中最多只能有4个消费者实例取消费,多出来的都不会被分配到分区,其实这也很好理解,如果允许两个消费者实例同时消费同一个分区,那么就无法记录这个分区被这个消费组消费的offset来,如果在消费者组中动态的上线或下线消费者,那么kafka集群会自动调整分区和消费者实例间的对应关系
1、kafka的运行需要依赖zookeeper,所以需要zookeeper的镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
2、启动kafka
(1)首先启动zookeeper
docker run -it --rm --name zookeeper -p 2181:2181 wurstmeister/zookeeper
# 84f9e7dfc0c5
(2)再启动kafka
docker run -it --rm --name kafka -p9092:9092 --link zookeeper --env KAFKA_BROKER_ID=1 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.5 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime -v /Users/bobwang/Downloads/go_logagent/kafkalog:/kafka wurstmeister/kafka:latest
# 注意:加了参数: --env KAFKA_BROKER_ID=1
# 会报错“kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes.意味着,如果对kafka的Broker做标记,则意味着进行集群部署
测试时的启动命令如下:
docker run -it --rm --name kafka -p9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.5 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime -v /Users/bobwang/Downloads/go_logagent/kafkalog:/kafka wurstmeister/kafka:latest
3、测试发送消息
(1)在终端输入container ID或容器名,进入容器,如下:
docker exec -it kafka /bin/bash
> bash-4.4#
(2)进入kafka默认目录
> bash-4.4# cd /opt/kafka_2.13-2.7.0
> bash-4.4# ls
LICENSE NOTICE bin config libs logs site-docs
(3)在下面就和kafka的功能一样,首先创建主题(mykafka)
> bash-4.4# bin/kafka-console-producer.sh --broker-list 192.168.44.158:9092 --topic mykafka
bin/kafka-console-producer.sh --broker-list 192.168.1.5:9092 --topic mykafka
Created topic mykafka.
ctrl +c 推出
(4)查看我创建主题
>bash-4.4# bin/kafka-topics.sh --list --zookeeper 192.168.1.5:2181
mykafka
(5) 发送消息
首先在第一个终端输入下面命令 ,等待输入
> bash-4.4#bin/kafka-console-producer.sh --broker-list 192.168.1.5:9092 --topic mykafka
>
其次,再开启一个kafka容器终端,在第二个终端同样进入kafka的后台,等待接受消息
docker exec -it 91b75ed05740 /bin/bash
bash-4.4# cd /opt/kafka_2.13-2.7.0
bash-4.4# bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.5:9092 --topic mykafka
在第一终端输入 this is a message
> bash-4.4# bin/kafka-console-producer.sh --broker-list 192.168.1.5:9092 --topic mykafka
>this is a message
第二终端展示
> bash-4.4# bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.5:9092 --topic mykafka
> this is a message
参考:https://blog.csdn.net/oschina_40730821/article/details/108511476
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。