赞
踩
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独特的设计。可以这样来说,Kafka借鉴了JMS规范的思想,但是确并没有完全遵循JMS规范。
首先,让我们来看一下基础的消息(Message)相关术语:
从一个较高的层面上来看,producer通过网络发送消息到Kafka集群,然后consumer来进行消费,如下图:
主题Topic和消息日志Log
让我们首先深入理解Kafka提出一个高层次的抽象概念-Topic。
可以理解Topic是一个类别的名称,同类消息发送到同一个Topic下面。对于每一个Topic,下面可以有多个分区
(Partition)日志文件:
Partition是一个有序的message序列,这些message按顺序添加到一个叫做commit log的文件中。每个partition中的消息都有一个唯一的编号,称之为offset,用来唯一标示某个分区中的message。
注意:每个partition,都对应一个commit log文件。一个partition中的message的offset都是唯一的,但是不同的partition中的message的offset可能是相同的。
可以这么来理解Topic,Partition和Broker
一个topic,代表逻辑上的一个业务数据集,比如按数据库里不同表的数据操作消息区分放入不同topic,订单相关操作消息放入订单topic,用户相关操作消息放入用户topic,对于大型网站来说,后端数据都是海量的,订单消息很可能是非常巨量的,比如有几百个G甚至达到TB级别,如果把这么多数据都放在一台机器上可定会有容量限制问题,那么就可以在topic内部划分多个partition来分片存储数据,不同的partition可以位于不同的机器上,每台机器上都运行一个Kafka的进程Broker。
kafka集群,在配置的时间范围内,维护所有的由producer生成的消息,而不管这些消息有没有被消费。例如日志保留(log retention )时间被设置为2天。kafka会维护最近2天生产的所有消息,而2天前的消息会被丢弃。kafka的性能与保留的数据量的大小没有关系,因此保存大量的数据(日志信息)不会有什么影响。
每个consumer是基于自己在commit log中的消费进度(offset)来进行工作的。在kafka中,消费offset由consumer自己来维护;一般情况下我们按照顺序逐条消费commit log中的消息,当然我可以通过指定offset来重复消费某些消息,或者跳过某些消息。
这意味kafka中的consumer对集群的影响是非常小的,添加一个或者减少一个consumer,对于集群或者其他consumer来说,都是没有影响的,因为每个consumer维护各自的offset。所以说kafka集群是无状态的,性能不会因为consumer数量受太多影响。kafka还将很多关键信息记录在zookeeper里,保证自己的无状态,从而在水平扩容时非常方便。
为什么要对Topic下数据进行分区存储?
1、commit log文件会受到所在机器的文件系统大小的限制,分区之后,理论上一个topic可以处理任意数量的数据。
2、为了提高并行度。
log的partitions分布在kafka集群中不同的broker上,每个broker可以请求备份其他broker上partition上的数据。kafka集群支持配置一个partition备份的数量。
针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwers”的作用。leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果。如果这个leader失效了,其中的一个follower将会自动的变成新的leader。
Producers
生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。通过roundrobin做简单的
负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多。
Consumers
传统的消息传递模式有2种:队列( queue) 和(publish-subscribe)
Kafka基于这2种模式提供了一种consumer的抽象概念:consumer group。
上图说明:由2个broker组成的kafka集群,总共有4个partition(P0-P3)。这个集群由2个Consumer Group, A有2个consumer instances ,B有四个。通常一个topic会有几个consumer group,每个consumer group都是一个逻辑上的订阅者( logicalsubscriber )。每个consumer group由多个consumer instance组成,从而达到可扩展和容灾的功能。
消费顺序
Kafka比传统的消息系统有着更强的顺序保证。
一个partition同一个时刻在一个consumer group中只有一个consumer instance在消费,从而保证顺序。consumer group中的consumer instance的数量不能比一个Topic中的partition的数量多,否则,多出来的consumer消费不到消息。Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。如果有在总体上保证消费顺序的需求,那么我们可以通过将topic的partition数量设置为1,将consumer group中的consumer instance数量也设置为1。
从较高的层面上来说的话,Kafka提供了以下的保证:
发送到一个Topic中的message会按照发送的顺序添加到commit log中。意思是,如果消息 M1,M2由同一个producer发送,M1比M2发送的早的话,那么在commit log中,M1的offset就会比commit 2的offset小。一个consumer在commit log中可以按照发送顺序来消费message。如果一个topic的备份因子设置为N,那么Kafka可以容忍N-1个服务器的失败,而存储在commit log中的消息不会丢失。
安装前的环境准备
由于Kafka是用Scala语言开发的,运行在JVM上,因此在安装Kafka之前需要先安装JDK。
yum install java‐1.8.0‐openjdk* ‐y
kafka依赖zookeeper,所以需要先安装zookeeper
wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper‐3.4.12.tar.gztar ‐zxvf zookeeper‐3.4.12.tar.gzcd zookeeper‐3.4.12cp conf/zoo_sample.cfg conf/zoo.cfg# 启动zookeeperbin/zkServer.sh startbin/zkCli.sh#查看zk的根目录相关节点ls /
第一步:下载安装包
下载2.2.0 release版本,并解压:
wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11‐1.1.0.tgztar ‐xzf kafka_2.11‐1.1.0.tgzcd kafka_2.11‐1.1.0
第二步:启动服务
现在来启动kafka服务:
启动脚本语法:kafkaserverstart.sh [daemon] server.properties
可以看到,server.properties的配置路径是一个强制的参数,daemon表示以后台进程运行,否则ssh客户端退出后,就会停止服务。(注意,在启动kafka时会使用linux主机名关联的ip地址,所以需要把主机名和linux的ip映射配置到本地host里,用vim /etc/hosts)
bin/kafka‐server‐start.sh ‐daemon config/server.properties# 我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树bin/zkCli.sh#查看zk的根目录kafka相关节点ls / #查看kafka节点ls /brokers/ids
server.properties核心配置详解:(详细请参考官方文档)
第三步:创建主题
现在我们来创建一个名字为“test”的Topic,这个topic只有一个partition,并且备份因子也设置为1:
bin/kafka‐topics.sh ‐‐create ‐‐zookeeper 192.168.1.1:2181 ‐‐replication‐factor 1 ‐‐partitions 1 ‐‐topic test
现在我们可以通过以下命令来查看kafka中目前存在的topic
bin/kafka‐topics.sh ‐‐list ‐‐zookeeper 192.168..0.60:2181
除了我们通过手工的方式创建Topic,当producer发布一个消息某个指定的Topic,但是这个Topic并不存在时,就自动创建。
删除主题
bin/kafka‐topics.sh ‐‐delete ‐‐topic test ‐‐zookeeper 192.168.00:2181
第四步:发送消息
kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。首先我们要运行发布消息的脚本,然后在命令中输入要发送的消息的内容:
bin/kafka‐console‐producer.sh ‐‐broker‐list 192.168.1.1:9092 ‐‐topic test>this is a msg>this is a another msg
第五步:消费消息
对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息:
bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐consumer‐property group.id=testGroup ‐‐topic test
如果想要消费之前的消息可以通过--from-beginning参数指定,如下命令:
bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐from‐beginning ‐‐topic test
如果你是通过不同的终端窗口来运行以上的命令,你将会看到在producer终端输入的内容,很快就会在consumer的终端窗口上显示出来。
以上所有的命令都有一些附加的选项;当我们不携带任何参数运行命令的时候,将会显示出这个命令的详细用法。还有一些其他命令如下:
查看组名
bin/kafka‐consumer‐groups.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐list
查看消费者的消费偏移量
bin/kafka‐consumer‐groups.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐describe ‐‐group testGroup
注意看current-offset 和 log-end-offset还有 lag ,分别为当前消费偏移量,结束的偏移量(HW),落后消费的偏移量
消费多主题
bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐whitelist "test|test‐2"
单播消费
一条消息只能被某一个消费者消费的模式,类似queue模式,只需让所有消费者在同一个消费组里即可分别在两个客户端执行如下消费命令,然后往主题里发送消息,结果只有一个客户端能收到消息
bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.16810.60:9092 ‐‐consumer‐property group.id=testGroup ‐‐topic test
多播消费
一条消息能被多个消费者消费的模式,类似publish-subscribe模式费,针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。我们再增加一个消费者,该消费者属于testGroup-2消费组,结果两个客户端都能收到消息
bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐consumer‐property group.id=testGroup‐2 ‐‐topic test
第六步:kafka集群配置
到目前为止,我们都是在一个单节点上运行broker,这并没有什么意思。对于kafka来说,一个单独的broker意味着kafka集群中只有一个节点。要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。为了有更好的理解,现在我们在一台机器上同时启动三个broker实例。首先,我们需要建立好其他2个broker的配置文件:
cp config/server.properties config/server‐1.propertiescp config/server.properties config/server‐2.properties
配置文件的内容分别如下:
config/server-1.properties:
#broker.id属性在kafka集群中必须要是唯一broker.id=1#kafka部署的机器ip和提供服务的端口号listeners=PLAINTEXT://192.168.1.1:9093log.dir=/usr/local/data/kafka‐logs‐1
config/server-2.properties:
broker.id=2listeners=PLAINTEXT://192.168.1.1:9094log.dir=/usr/local/data/kafka‐logs‐2
目前我们已经有一个zookeeper实例和一个broker实例在运行了,现在我们只需要在启动2个broker实例即可:
bin/kafka‐server‐start.sh ‐daemon config/server‐1.propertiesbin/kafka‐server‐start.sh ‐daemon config/server‐2.properties
现在我们创建一个新的topic,副本数设置为3,分区数设置为2:
bin/kafka‐topics.sh ‐‐create ‐‐zookeeper 192.168.1.1:2181 ‐‐replication‐factor 3 ‐‐partitions ‐‐topic my‐replicated‐topic
查看下topic的情况
bin/kafka‐topics.sh ‐‐describe ‐‐zookeeper 192.168.1.1:2181 ‐‐topic my‐replicated‐topic
以下是输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。
我们可以运行相同的命令查看之前创建的名称为”test“的topic
bin/kafka‐topics.sh ‐‐describe ‐‐zookeeper 192.168.1.1:2181 ‐‐topic test
之前设置了topic的partition数量为1,备份因子为1,因此显示就如上所示了。当然我们也可以通过如下命令增加topic的分区数量(目前kafka不支持减少分区):
bin/kafka‐topics.sh ‐alter ‐‐partitions 3 ‐‐zookeeper 127.0.0.1:2181 ‐‐topic test
现在我们向新建的 my-replicated-topic 中发送一些message,kafka集群可以加上所有kafka节点:
bin/kafka‐console‐producer.sh ‐‐broker‐list 192.168.1.1:9092,192.168.1.1:9093,192.168.1.1:9094 ‐‐topic my‐replicated‐topic>my test msg 1>my test msg 2
现在开始消费:
bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐from‐beginning ‐‐topic my‐replicated‐topicmy test msg 1my test msg 2
现在我们来测试我们容错性,因为broker1目前是my-replicated-topic的分区0的leader,所以我们要将其kill
ps ‐ef | grep server.propertieskill ‐9 132323
现在再执行命令:
bin/kafka‐topics.sh ‐‐describe ‐‐zookeeper 192.168.1.1:9092 ‐‐topic my‐replicated‐topic
我们可以看到,分区0的leader节点已经变成了broker 0。要注意的是,在Isr中,已经没有了1号节点。leader的选举也是从ISR(in-sync replica)中进行的。此时,我们依然可以 消费新消息:
bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐from‐beginning ‐‐topic my‐replicated‐topicmy test msg 1my test msg 2
查看主题分区对应的leader信息:
引入maven依赖
org.apache.kafkakafka‐clients1.1.0
消息发送端代码
消息接收端代码
引入spring boot kafka依赖,详见项目实例:spring-boot-kafka
org.springframework.kafkaspring‐kafka
application.yml配置如下:
server:
port: 8080
spring:
kafka:
bootstrap‐servers: 192.168.0.60:9092,192.168.0.60:9093
# 生产者
producer:
# 设置大于0的值,则客户端会将发送失败的记录重新发送
retries: 3
batch‐size: 16384
buffer‐memory: 33554432
# 指定消息key和消息体的编解码方式
key‐serializer: org.apache.kafka.common.serialization.StringSerializer
value‐serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group‐id: mygroup
enable‐auto‐commit: true
发送者代码:
@RestController public class KafkaController { @Autowired private KafkaTemplate kafkaTemplate; @RequestMapping("/send") public void send() { kafkaTemplate.send("mytopic", 0, "key", "this is a msg"); } }
消费者代码:
@Component public class MyConsumer { @KafkaListener(topics = "mytopic",groupId = "testGroup") public void listen(ConsumerRecord record) { String value = record.value(); System.out.println(value); System.out.println(record); } }
本文为原创内容,未经允许不得转载。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。