赞
踩
启动Kafka服务器需要首先启动ZooKeeper服务器,Kafka使用ZooKeeper作为集群的管理服务。在生产环境中启动Kafka建议使用-daemon参数启动
bin/kafka-server-start.sh -damon <path>/server.properties
不要使用如下方式启动服务
bin/kafka-server-start.sh <path>/server.properties &
虽然通过添加&符号使该命令在后台运行,但当用户会话登出时该进程会自动kill掉,导致broker关闭。我们也可以使用nohup … &的方式启动Kafka集群
nohup bin/kafka-server-start.sh <path>/server.properties &
启动broker之后,建议用户查看启动日志以确保broker启动成功。服务器日志通常保存在kafka安装目录的logs子目录下,名字为server.log.
前台方式启动broker是指,在Linux终端不加nohup或-daemon参数的方式直接启动broker,这种方式多用于本地测试或DEBUG。关闭这种方式启动的broker 进程,只需要在该终端使用Ctrl + C组合键发送SIGINT信号终止进程即可。Kafka broker进程会捕获SIGINT信号并执行broker关闭逻辑
当使用前面的-daemon或nohup方式启动broker后,正确关闭broker的方式就是使用Kafka自带的kafka-server-stop.sh脚本。该脚本位于Kafka安装目录 的bin子目录下
bin/kafka-server-stop.sh
该脚本会搜索当前机器中所有的Kafka broker进程,然后关闭他们。如果用户机器上存在多个broker进程,该脚本会全部关闭它们。不建议直接使用kill -9方式杀死broker进程,因为这通常会造成一些broker状态的不一致。当broker在关闭过程中如果出现错误会进行重试。若重试次数用完了依然无法正常关闭
broker进程,Kafka会进行强制关闭。当用户发现关闭broker时不会立即停止,可以打开server.log实时监控关闭进度,一般过一段时间broker进程总能 结束
由于上述脚本依靠操作系统命令(如ps ax和grep)来获取Kafka broker的进程号,一旦用户的Kafka安装目录的路径过长,则可能令该脚本失效从而无法正确 获取broker的PID。对于这种情况,可以使用如下方式关闭broker
如果机器上安装JDK,可运行jps命令直接获取Kafka的PID,否则转到下一步
运行ps ax | grep -i ‘kafka.Kafka’ | grep java | grep -v grep | awk '{print $1}'命令自行寻找Kafka的PID
运行Kill -s TERM $PID关闭broker
以上3步也是kafka-server-stop脚本的执行逻辑
Kafka提供了丰富的JMX指标用于实时监控集群运行的健康程度。不过若要使用它们,用户必须启动broker前首先设置JMX端口。设置方式分为两种。若以 前台方式运行broker,只需在执行启动命令前设置JMX_PORT环境变量,如果下
JMX_PORT=9997 bin/kafka-server-start.sh <path>/server.properties
另一种以后台运行broker的设置方法类似,依然是在启动broker前设置JMX_PORT环境变量
export JMX_PORT=9997 bin/kafka-server-start.sh -daemon <path>/server.properties
由于Kafka集群的服务发现交由ZooKeeper来处理,因此向Kafka集群增加新的broker服务非常容易。用户只需要为新增broker设置一个唯一的broker.id,然后启动即可。Kafka集群能自动发现新启动的broker并同步所有的元数据信息,主要包括当前集群有哪些主题以及topic都有哪些分区等
但是新增的broker不会自动被分配任何已有的topic分区,用户必须手动执行分区重分配操作才能使他们为已有topic服务。在这些broker启动之后新创建的topic分区还是可能分配给这些broker。用户需要对各个broker上的负载进行规划,避免出现负载极度不均匀情况
升级kafka集群的版本实际上非常简单,核心步骤只需要4步,但通常情况下用户都想要最大程度缩短服务的宕机时间,特别不要干扰集群上producer和consumer 的正常运转
第一步:更新broker间通信版本和消息版本。向所有broker的server.properties中增加下面两行
inter.broker.protocol.version=0.10.0
log.message.format.version=0.10.0
如果不是从0.10.0.0版本升级,则只需要填写用户环境下当前broker的版本
第二步:依次更新代码,重启所有broker。下载需要升级到的版本的Kafka二进制包,覆盖已有的目录,然后依次重启所有broker,比如先重启broker1, 然后重启broker2。也可以不覆盖安装目录,单独放置在全新目录下
第三步:再次更新broker间通信版本和消息版本。将通信版本和消息版本都调整为需要升级到的版本
第四步:再次依次重启broker
到此Kafka集群的升级工作就完成了。在上述每一步执行过程中,producer都应该是可以正常工作的,提交失败的消息数应该始终是0;而consumer可能会出现提交失败的警告造成消息重复消费,这个问题通常都是由下游子系统负责去重。broker端,会出现短暂的org.apache.kafka.common.errors.NotLeaderForPartitionException异常,表明内部topic __consumer_offsets各分区的leader在轮流重启过程中出现短暂的不可用。这个异常通常是瞬时发生且通常可以自行恢复
Kafka创建topic的途径目前有4种
执行kafka-topic.sh命令行工具创建
显示发送CreateTopicsRequest请求创建topic
发送MetadataRequest请求且broker端设置了auto.create.topics.enable为true
向ZooKeeper的/brokers/topic路径下写入以topic名称命名的子节点
除上面4种方法外,还有其他方式可创建topic但并不推荐,包括上述的第4种方法也是不推荐的。官方推荐用户使用前两种方式来创建topic
参数名 | 参数含义 |
---|---|
–alter | 用于修改topic的信息,比如分区数、副本因子 |
–config <key=value> | 设置topic级别的参数,比如cleanup.policy等 |
–create | 创建topic |
–delete | 删除topic |
–delete-config | 删除topic级别参数 |
–describe | 列出topic详情 |
–disable-rack-aware | 创建topic时不考虑机架信息 |
–force | 无效参数,当前未使用 |
–help | 打印帮助信息 |
–if-exists | 若设置,脚本只会对已存在的topic执行操作 |
–if-no-exists | 若设置,当创建已存在同名topic时不会抛出错误 |
–list | 列出集群当前所有topic |
–partitions <分区数> | 创建或修改topic时指定分区数 |
–replication-assignment | 手动指定分区分配CSV方案,副本之间使用冒号分割。比如指定双分区方案为0:1:2,3:4:5,表示分区1的3个副本在broker 0、1、2上,分区2在broker 3、4、5上 |
–replication-factor | 指定副本因子 |
–topic | 指定topic名称 |
–topic-with-overrides | 展示topic详情时不显示具体的分区信息 |
–unavaliable-paritions | 只显示topic不可用的分区,即没有leader的分区 |
–under-replicated-partitions | 只显示副本数不足的分区信息 |
–zookeeper | 指定连接的ZooKeeper信息 |
使用自动分区分配创建一个topic,名为test-topic, 6个分区,每个分区3个副本,同时指定该topic的日志留存时间为3天
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 6 --replication-factory 3 --topic test-topic
--config delete.retention.ms=259200000
假设有3台broker构成的Kafka集群,broker id分别为0、1、2。创建一个topic,名称为test-topic2,分区数为4,副本因子是2。手动分配方案如下
分区1:0、1
分区2:1、2
分区3:0、2
创建topic命令如下
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test-topic2 --replica-assignment 0:1,1:2,0:2,1:2
若指定–replica-assignment,用户不必指定–partitions和–replication-factor,因为脚本可以从手动分配方案中计算出topic的分区数和副本因子
与创建topic类似,删除topic有如下3种方式
使用kafka-topics脚本:这是最常见也是最正式的方法
构造DeleteTopicsRequest请求:这种方式需要手动编写程序实现
直接向ZooKeeper的/admin/delete_topics下写入子节点:不推荐,实际场景中谨慎使用
通过手动删除topic底层的日志文件及ZooKeeper中与topic相关的所有znode来删除topic数据是不推荐的方式,这种方式只能保证物理文件被删除,但集群 broker内存中依然保存有这些信息
最正式的删除方式是执行kafka-topic.sh --delete命令,该命令执行之后通常会立即返回,但真正的删除动作是在一个异步任务中完成。不管使用那种删除方式,首先需要确保broker端参数delete.topic.enable被设置为true,否则Kafka不会删除topic。在1.0.0之前的版本,默认设置为false,在1.0.0之后的版本默认为true。具体删除topic命令如下
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test-topic
执行上述命令test-topic将被标记为"待删除"状态,如果delete.topic.enable为false,则不会实际完成删除操作。用户需要等一段时间后通过 kafka-topics.sh -list命令查询topic列表确定topic是否已经被删除
使用kafka-topic脚本运行用户查询当前集群的topic列表
bin/kafka-topics.sh --zookeeper localhost:2181 --list
kafka-topic脚本可以查询单个或所有topic的详情,显示topic的分区数、副本数、以及leader副本所在broker的ID值、副本所在broker的ID、ISR副本 列表等信息
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test-topic2
当不指定–topic test-topic2则表明查询集群所有topic的详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe
topic创建后,Kafka依然允许修改topic的某些参数及配置,如分区数、副本因子和topic级别参数等
以下命令展示如果新增topic分区。在新增分区时我们需要注意,当topic消息有key时,根据key确定目标分区的算法就会因为分区数的增加而发生变更。 虽然分区数可以增加但并不允许减少
bin/kafka-topic.sh --alter --zookeeper localhost:2181 --partitions 10 --topic test-topic2
可以使用kafka-configs脚本为已有topic设置topic级别的参数,虽然kafka-topics.sh --alter也能设置参数,但并不推荐使用。下面表格说明kafka-configs 脚本命令行参数及其含义
参数名 | 参数含义 |
---|---|
–add-config k1=v1,k2=v2,… | 设置topic级别参数 |
–alter | 修改topic或其他实体类型 |
–delete-config | 删除指定的参数 |
–entity-default | 与配额设置有关,用于Kafka配置的默认配额值 |
–describe | 列出给定实体类型的参数详情 |
–entity-name | 指定实体名称,若是topic类型,则是topic名称 |
–entity-type | 指定实体类型,总共4类实体类型:user、topic、clients、broker |
–help | 打印帮助信息 |
–zookeeper | 指定连接的ZooKeeper信息 |
当前Kafka定义4类实体,分别是topic、user、clients和broker,kafka-configs脚本用于为这4类实体进行参数设置,每类实体都能设置专属的配置 参数
我们为test-topic2设置cleanup.policy=compact参数,命令如下
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test-topic2 --add-config
cleanup.policy=compact
使用kafka-configs的–describe选项确认参数是否增加成功
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name test-topic2
Kafka支持的topic参数可参见官网http://kafka.apache.org/documentation/#topicconfigs,以下列举topic常见且较为重要的参数
参数名 | 参数含义 |
---|---|
cleanup.policy | 指定topic留存策略,可以是compact、delete或同时指定两者 |
compression.type | 指定该topic消息的压缩类型 |
max.message.bytes | 指定broker端能接收该topic消息的最大长度 |
min.insync.replicas | 指定ISR中需要接收topic消息的最少broker数,与producer端参数acks=-1配合使用 |
preallocate | 是否为该topic的日志文件提前分配存储空间 |
retention.ms | 指定持有该topic单个分区消息的最长时间 |
segment.bytes | 指定该topic日志段文件的大小 |
unclean.leader.election.enable | 是否为topic启用unclean领导者选举 |
与broker端参数不同,上面这些topic级别参数可以动态修改而无须重启broker。我们创建一个topic,之后动态增加preallocate、segment.bytes两个 参数
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test --add-config
preallocate = true,segment.bytes=104857600
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
查看topic配置,既可以使用kafka-topics脚本,又可以使用kafka-configs脚本
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics --entity-name test
通过kafka-configs脚本删除topic配置
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test --delete-config
preallocate
bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics --entity-name test
在生产环境对于消费者组的运行情况有很强的监控需求。可以通过各种第三方的监控框架来监控消费者组。Kafka默认也自带了工具脚本来监控并管理消费者组的 执行情况
Kafka提供的用于查询消费者组的脚本是kafka-consumer-groups.sh,位于Kafka路径的bin/子目录下,该脚本的主要参数及含义如下
参数名 | 参数含义 |
---|---|
–bootstrap-server | 指定Kafka集群的broker列表,CSV格式,查询新版本消费者组时使用 |
–list | 列出集群当前所有消费者组 |
–describe | 查询消费者组详情(包括消费滞后情况) |
–group | 指定消费者组名称 |
–zookeeper | 指定ZooKeeper连接信息,查询老版本消费者组时使用 |
–reset-offsets | 重新设置消费者组位移 |
首先创建一个测试topic,名为test,单分区,副本因子为1,然后使用kafka-producer-perf-test.sh脚本向该topic生产500万条消息
bin/kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-producer-perf-test.sh --topic test --throughput -1 --num-records 500000 --record-size 100 --producer-props
bootstrap.servers=localhost:9092 acks=-1
之后分别打开两个终端,通过kafka-console-consumer.sh创建两个测试消费者组
bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic test --from-beginning --consumer-property
group.id=test-group1
bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic test --consumer-property group.id=test-group2
为模拟不同消费进度,test-group1指定–from-beginning,表示从头消费topic;test-group2未指定–from-beginning,表示从topic的最新位移处开始消费。使用kafka-consumer-groups.sh来查询消费者组的情况
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
通过以下命令可以查询每个消费组的详情信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group2
运行以上命令将会展示如下内容
TOPIC:表示该消费者组消费了哪些topic
PARTITION:表示该消费者组消费的是哪个分区
CURRENT-OFFSET:表示消费者组最新消费的位移值
LOG-END-OFFSET:表示topic所有分区当前的日志终端位移值
LAG:表示消费滞后进度,该值等于LOG-END-OFFSET与CURRENT-OFFSET的差值,通常为大于或等于0的正数。若该值接近LOG-END-OFFSET则表明该消费者组消费滞后严重。特别的,若该值小于0,则表明存在消费数据丢失的情况,即有些消息为被消费就被consumer直接跳过了。若出现这种情况,需要确认broker端参数unclean.leader.election.enable的值是否被设置为true,并进一步研究是否有可能出现因为unclean领导者选择而造成的数据丢失
CONSUMER-ID:表示consumer的ID,通常是Kafka自动生成的。如果该列没有值,通常表明此consumer目前尚未处于运行中
HOST:表示consumer所在的broker主机信息。如果该列没有值,通常表明此consumer目前尚未处于运行中
CLIENT-ID:表示用户指定或系统自动生成的一个标识consumer所在客户端的ID。此ID通常用于定位或调试问题
此功能0.11.0.0版本开始提供。对于之前的老版本,对已有的消费者组调整位移,必须手动编写Java程序调用KafkaConsumer#seek方法。在0.11.0.0版本开始kafka-consumer-groups脚本支持为已有consumer group重新设置位移,但consumer group不能处于运行状态
重设位移流程由3步组成
第一步确定消费者组下topic作用域,当前支持3种作用域
–all-topics:为消费者组下所有topic的所有分区调整位移
–topic t1, --topic t2:为指定的若干个topic的所有分区调整位移
–topic t1:0,1,2:为topic的指定分区调整位移
确定topic作用域后,第二步确定位移重设策略,当前支持8种设置规则
–to-earliest:把位移调整到分区当前最早位移处
–to-latest:把位移调整到分区当前最新位移处
–to-current:把位移调整到分区当前位移处
–to-offset :把位移调整到指定位移处
–shift-by N:把位移调整到当前位移+N处,N可为负值
–to-datetime :把当前位移调整到大于给定时间的最早位移处。datatime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
–by-duration :把位移调整到距离当前时间指定间隔的位移处。duration格式是PnDTnHnMnS,比如PT0H5M0S
–from-file :从CSV文件中读取位移调整策略
最后一步确定执行方案,当前支持如下3种方案
不加任何参数:只是打印位移调整方案,不实际执行
–execute:执行真正的位移调整
–export:把位移调整方案保存成CSV格式并输出到控制台,方便用户保存为CSV文件,供后续结合–from-file参数使用
具体用法如下
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics
--to-earliest --execute
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics
--to-latest --excute
...
Kafka自带的kafka-consumer-groups脚本提供了删除处于inactive状态的老版本消费者组信息。对于新版本不需要手动删除,通过offsets.retention.minutes的参数,默认1天,kafka会根据此参数控制移除inactive消费者组位移的信息的时间。默认情况下,对于一个新版本的消费者而言,即使所有的成员都已经推出组,它的位移信息不会马上删除,Kafka会在最后一个成员推出组1天之后删除该组的所有信息
在Kafka集群中,broker服务器宕机或者崩溃是不可避免的,一旦发生这种情况,该broker上的那些leader副本将不可用,因此必然要求Kafka把这些分区的leader转移到其他的broker上,即使奔溃broker重启回来,其上的副本也能作为follower副本加入ISR中,不能再对外提供服务
随着集群不断运行,leader的不均衡现象开始出现,集群中的一小部分broker上承载了大量的leader分区副本。为应对这种情况,Kafka引入首选副本(preferred replica)概念.假设为一个分区分配了3个副本,分别为0、1、2。节点0就是该分区的preferred replica,并且通常情况下不会发生变更。选择节点0的原因仅仅是因为它是 第一个副本
Kafka提供两种方式让用户将指定分区的leader调整回其preferred replica。这个过程被称为preferred leader选举。第一种方式:使用Kafka自带的kafka-preferred-replica-election.sh脚本,其参数及含义如下
属性名 | 属性含义 |
---|---|
–zookeeper | 指定ZooKeeper连接信息 |
–path-to-json-file | 指定json文件路径,该文件包含为那些分区执行preferred leader选举。也可以不指定该参数,若不指定则表明为集群中所有分区都执行preferred leader选举 |
搭建一个3节点Kafka环境,创建test-topic测试topic,包含3个分区、3个副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --replication-factory 3 --topic test-topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic
当前3个分区的leader副本已经是他们的preferred replica,一次关闭broker1和broker2模拟集群崩溃,此时所有分区的leader都在broker0上,造成集群leader分布不均匀的情况。执行describe确认leader副本所在分区
bin/kafka-topic.sh --describe --zookeeper localhost:2181 --topic test-topic
我们看到3个分区的leader都变成了broker0。现在开始执行kafka-preferred-replica-election.sh脚本来调整leader。构造如下json文件
echo '{"partitons":[{"topic":"test-topic","partition":0},{"topic":"test-topic","partition":1}]}' > preferred-leader-plan.json
执行kafka-preferred-replica-election.sh脚本
bin/kafka-perferred-replica-election.sh -zookeeper localhost:2181 --path-to-json-file <path>/preferred-leader-plan.json
再次describe查看topic情况
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic-test-topic
通过输出可以发现test-topic所有分区都已经调整成他们的preferred replica.当然也可以不指定–path-to-json-file参数直接运行kafka-preferred-replica-election.sh对所有topic执行preferred leader选举。但在实际使用场景中不推荐这种使用方式,对所有分区进行leader转移是一项成本很高的工作
除通过kafka-preferred-replica-election.sh脚本进行preferred leader选举之外,还可以通过broker端参数auto.leader.rebalance.enable自动执行此操作。该参数默认值为true,表明每台broker启动后都会在后台自定定期执行preferred leader选举,与此相关的还有两个broker端参数
leader.imbalance.check.interval.seconds和leader.imbalance.per.broker.precentage。前者控制阶段性操作的时间间隔,当前默认值300秒,即Kafka每5分钟会尝试在后台运行一个preferred leader选举;后者用于确定要执行preferred leader选举的目标分区,当前默认是10,表示若broker上leader不均衡程度超过10%,则Kafka需要为该broker上的分区执行preferred leader选举。Kafka计算不均衡程度的算法为:该broker上的leader不是 preferred replica的分区数/broker上总的分区数
新增broker不会自动分担已有topic的负载,只会对增加topic后新创建的topic生效,如果要让新增broker为已有topci服务,需要用户手动调整已有topic分区分布,将一部分分区搬移到新增broker上。这就是分区重分配操作。Kafka提供了分区重分配脚本kafka-reassign-partition.sh。用户使用该工具需要提供一组需要执行分区重分配的topic列表及对应的一组broker。该脚本接到用户这些消息后会尝试制作一个重分配方案,会力求保证均匀分配给定topic 的所有分区到目标broker上
搭建一个4节点的Kafka集群,然后创建两个测试topic,foo1和foo2,都是3个分区,副本因子都是2。之后把两个topic的分区都搬移到新的broker节点上, 例如broker5和broker6
首先创建一个json文件指定需要执行分区重分配方案的topic列表
echo '{"topics": [{"topic": "fool"}, {"topic": "foo2"}], "version":1}' > topics-to-move.json
执行kafka-reassign-partitions.sh脚本产生一个分配方案
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topic-to-move.json --broker-list
"5,6" --generate
上述命令执行后会给出当前的分区情况以及一个候选的重分配方案。此时分区重分配并没有真正执行,仅是产生一个可能的方案,用户需要把当前的情况保存下来以备后续的rollback,同时把新的候选方案保存为另一个新的json文件expand-cluster-reassignment.json,之后便可以开始执行真正的分区重分配
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.
json --execute
命令执行成功,之后可以指定–verify参数来验证分区重分配执行成功
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.
json --verify
也可以不使用–generate参数生成预分配方案,直接自己指定分配方案。假设我们要把foo1分区0的两个副本移到broker5和broker6上,同时把foo2分区1的两个副本移到broker2和broker3上,我们可以自己编写分配方案
echo '{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}' > custom-reassignment.json
使用该json文件和–execute参数来开启重分配操作
bin/kafka-reassign-partions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
执行完上面命令后,可以使用–verify进行验证
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
实际生产环境中,用户需要谨慎发起重分配操作,分区在不同broker间进行数据迁移会极大占用broker机器的带宽资源,显著影响clients端业务应用的性能。 尽量在非高峰时段执行重分配操作
Kafka支持为已有topic分区增加副本因子,具体方法是使用kafka-reassign-partitions.sh脚本,为topic分区增加额外的副本。假设在一个3节点的Kafka集群中创建一个单分区、副本因子是1的topic名为test,运行如下命令
bin/kafka-topic.sh --create --topic test --zookeeper localhost:2181 --partitions 1 --replication-factor 1
我们把副本数设置为3,首先创建json文件
echo '{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[0,1,2]}]}' > increase-replication-factor.json
执行重分配命令
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factory.
json --execute
之后同样可以使用–verify来验证重分配是否成功
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factory.
json --verify
我们可以通过kafka-topics脚本来查看topic的信息
bin/kafka-topic.sh --describe --topic test --zookeeper localhost:2181
kafka-console-producer脚本与kafka-console-consumer脚本允许用户在控制台上方便地对Kafka集群进行producer和consumer测试。kafka-console-producer脚本从控制台读取标准输入,将其发送到指定的Kafka topic上。该脚本位于Kafka路径的/bin子目录下。其主要参数及含义如下
参数名 | 参数含义 |
---|---|
–broker-list | 指定Kafka集群连接信息。如果多台broker需要以CSV格式指定,如k1:port1,k2:port,k3:port… |
–topic | 指定producer将消息发送到哪个topic |
–producer-property | 指定producer的其他定制属性,如acks、compression.type等 |
–producer.config | 将producer其他定制属性保存在文件中,指定给该producer |
–compression-code | 指定producer消息压缩类型 |
–timeout | 指定producer的linger.ms值 |
–request-required-acks | 指定producer的acks值 |
–max-memory-bytes | 指定producer的buffer.memory值 |
–message-send-max-retries | 指定producer的retries值 |
–max-partition-memory-bytes | 指定producer的batch.size值 |
我们使用kafka-console-producer脚本向Topic为test的主题发送消息,指定acks为all,使用LZ4进行消息压缩,把失败重试次数设置为10次,linger.ms设置为3秒, 执行如下命令
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --compression-code lz4
--request-required-acks all --timeout 3000 --message-send-max-reties 10
kafka-console-consumer脚本从Kafka topic读取消息并写入到标准输出。该脚本位于Kafka路径的/bin子目录下,其主要参数及含义如下
参数名 | 参数含义 |
---|---|
–bootstrap-server | 指定Kafka集群连接信息。如果多台broker需要以CSV格式指定,如k1:port1,k2:port,k3:port… |
–topic | 指定consumer消费的topic |
–from-beginning | 类似于设置consumer属性auto.offset.reset=earliest,即从当前最早位移处开始消费 |
–zookeeper | 指定使用老版本consumer,不可与–bootstrap-server同时使用 |
–consumer-property | 指定consumer端参数 |
–consumer.config | 以文件方式指定consumer端参数 |
–partition | 指定要消费的特定分区 |
运行脚本,从test topic获取消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
之前所述的各种Kafka脚本虽然实现了各自不同功能,但底层都是Kafka-run-class脚本实现。如kafka-console-consumer.sh是使用kafka-run-class.sh Kafka.tool.ConsoleConsumer <参数列表>实现,kafka-console-producer.sh是使用kafka-run-class.sh kafka.tools.ConsoleProducer<参数列表>
kafka-run-class.sh是一个通用的脚本,允许用户直接指定一个可执行的Java类和一组可选择的参数列表,从而调用该类的实现逻辑。上面提到的脚本都是Kafka社区开发人员帮助用户封装的。还有很多其他的有用的功能没有封装成单独的Shell脚本。下面将介绍几种比较实用的工具
通过kafka.tools.DumpLogSegments可以查看topic的消息元数据,消息元数据信息包括消息的位移、创建时间戳、压缩类型、字节数等
bin/kakfa-run-class.sh kafka.tools.DumpLogSegments --files ../datalogs/kafka_1/t1-0/000000000000000000.log
上面命令中–files指定topic为kafka_1的日志段文件的完整路径。以上命令将解析topic kafka_1的0分区的第一个日志段的内容。每行表示一条消息,详细列出消息的位移信息、物理文件位置、消息长度、压缩类型、CRC码等元数据信息。如果topic开启消息压缩,则可能会有多条消息被压缩进一条外层消息(wrapper message)中, DumpLogSegments默认只显示wrapper message。我们查看baseOffset、与lastOffset即可知道一共包含几条消息,比如baseOffset为0,lastOffset为1, 则一共包含两条消息
DumpLogSegments工具为开启消息压缩的日志提供了–deep-iteration参数来深度遍历被压缩的消息
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ../datalogs/kafka_1/t2-0/000000000000000000.log
--deep-iteration
使用–deep-iteration参数之后多条被压缩在一起的消息将被打开,并按顺序展示。对于非压缩的消息,使用该参数将输出相同的结果。DumpLogSegments 不只查询日志段文件,也可以查询索引文件
bin/kafka-run-class.sh kafka.tool.DumpLogSegments --files ../datalogs/kafka_1/t1-0/0000000000000000000.index
将打印出索引文件内容,每一行格式为[位移 相对物理文件位置]
通过GetShellOffset类帮助用户实时计算特定topic总的消息数
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -2
–time -1表示要获取指定topic所有分区当前的最大位移;–time -2表示获取当前最早位移。将两个命令结果相减可获取所有分区当前的消息总数。–time -1表示的是历史上该topic产生的最大消息数,但topic的数据可能会被移除一部分,因此只有–time -1的结果减去–time -2的结果才是当前topic 总的 消息数
新版本consumer位移保存在Kafka内部topic __consumer_offsets中,可以通过kafka-simple-consumer-shell.sh脚本来查看该topic的内容。在查询之前我们需要确认消费组位移信息保存在__consumer__offsets的那个分区上。计算方法为计算消费topic的消费组即group ID的哈希值,然后使用此哈希值对__consumer_offset的分区数取模所得值即为消费者组消费位移在__consumer__offsets上的存储分区。用户可以通过broker端参数offsets.topic.num.partitions 设置__consumer__offsets的分区数,默认为50
在确定目标分区后,可以使用kafka-simple-consumer-shell脚本查询消费者组的位移信息
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 12 --broker-list localhost:9092
--fromatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"
Kafka提供两个脚本来管理topic,包括topic的增删改查。kafka-topics.sh负责topic的创建与删除,kafka-configs.sh负责topic参数的修改和查询,如果需要使用API对topic进行操作,可以使用服务端API,在编写程序时添加如下Kafka客户端依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
添加客户端依赖后,便可以使用客户端提供的API。Admin接口定义了管理Kafka的众多方法,一下展示使用Admin实现topic的创建
//创建属性对象
Properties properties = new Properties();
//设置broker的地址,可以支持多个,这样当某些broker挂掉还可以使用其它broker。必须指定
properties.put("bootstrap.servers", "172.23.16.84:9092");
AdminClient adminClient = AdminClient.create(properties);
NewTopic myTopic = new NewTopic("my-topic", 3, (short) 3);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(myTopic));
adminClient.close();
在创建topic之前首先需要创建AdminClient,通过AdminClient提供的静态方法create创建AdminClient,在创建时指定通过Properties指定Kafka集群的地址。之后创建NewTopic指定topic名称、分区数、副本数信息,调用adminClient的createTopics方法完成topic创建。完成创建topic之后调用adminClient的close方法完成资源释放。如果需要复用此连接,可以不关闭
删除topic与创建类似
//创建属性对象
Properties properties = new Properties();
//设置broker的地址,可以支持多个,这样当某些broker挂掉还可以使用其它broker。必须指定
properties.put("bootstrap.servers", "172.23.16.84:9092");
AdminClient adminClient = AdminClient.create(properties);
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("my-topic"));
adminClient.close();
使用API查询topic级别属性,其他类型如broker等属性查询类似
//创建属性对象 Properties properties = new Properties(); //设置broker的地址,可以支持多个,这样当某些broker挂掉还可以使用其它broker。必须指定 properties.put("bootstrap.servers", "172.23.16.84:9092"); AdminClient adminClient = AdminClient.create(properties); DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(new ConfigResource(ConfigResource.Type.TOPIC, "my-topic"))); Map<ConfigResource, KafkaFuture<Config>> configMap = describeConfigsResult.values(); configMap.forEach(((configResource, configKafkaFuture) -> { try { if (LOGGER.isInfoEnabled()) { LOGGER.info("type: " + configResource.type() + " name: " + configResource.name()); } Config config = configKafkaFuture.get(); config.entries().forEach(configEntry -> { if (LOGGER.isInfoEnabled()) { LOGGER.info("name: " + configEntry.name() + " value: " + configEntry.value()); } }); } catch (Exception e) { throw new RuntimeException(e); } })); adminClient.close();
变更各种类型资源配置属性如下
//创建属性对象 Properties properties = new Properties(); //设置broker的地址,可以支持多个,这样当某些broker挂掉还可以使用其它broker。必须指定 properties.put("bootstrap.servers", "172.23.16.84:9092"); AdminClient adminClient = AdminClient.create(properties); Map<ConfigResource, Collection<AlterConfigOp>> configMap = new HashMap<>(); // 创建需要修改参数的资源及名称 ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "my-topic"); // 设置需要修改、删除、追加(集合)、清除(集合)参数 List<AlterConfigOp> alterConfigOps = new ArrayList<>(); AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("compression.type", CompressionType.LZ4.name) , AlterConfigOp.OpType.SET); alterConfigOps.add(alterConfigOp); configMap.put(configResource, alterConfigOps); AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMap); adminClient.close();
这里所说的位移指consumer的位移信息,下面将介绍通过API管理位移信息。查询当前集群下所有consumer group信息
try { //创建属性对象 Properties properties = new Properties(); //设置broker的地址,可以支持多个,这样当某些broker挂掉还可以使用其它broker。必须指定 properties.put("bootstrap.servers", "172.23.16.84:9092"); AdminClient adminClient = AdminClient.create(properties); ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups(); KafkaFuture<Collection<ConsumerGroupListing>> kafkaFuture = listConsumerGroupsResult.all(); Collection<ConsumerGroupListing> consumerGroupListings = kafkaFuture.get(); consumerGroupListings.forEach(consumerGroupListing -> { if (LOGGER.isInfoEnabled()) { LOGGER.info("groupId: " + consumerGroupListing.groupId()); } }); } catch (Exception e) { throw new RuntimeException(e); }
查找特定topic位移信息
public void listConsumerGroupOffsets() { try { //创建属性对象 Properties properties = new Properties(); //设置broker的地址,可以支持多个,这样当某些broker挂掉还可以使用其它broker。必须指定 properties.put("bootstrap.servers", "172.23.16.84:9092"); AdminClient adminClient = AdminClient.create(properties); ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets("groupId"); KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> kafkaFuture = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata(); Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = kafkaFuture.get(); topicPartitionOffsetAndMetadataMap.forEach((topicPartition, offsetAndMetadata) -> { LOGGER.info("topic: " + topicPartition.topic() + " patition: " + topicPartition.partition()); LOGGER.info(" offset: " + offsetAndMetadata.offset() + " metadata: " + offsetAndMetadata.metadata()); }); adminClient.close(); } catch (Exception e) { throw new RuntimeException(e); } }
对于Kafka企业级用户来说,一个痛点就是跨机房或跨数据中心的数据传输。大型企业通常在多个数据中心部署Kafka集群,这里的数据中心可能是企业拥有的 自建机房,也可能是公有云厂商的不同机房,在多个机房部署Kafka集群有如下的优势
实现灾备
较近的地理位置可缩短延时及用户响应时间
实现负载均衡,即每个数据中心上的集群可能只保存部分数据集合
区别隔离不同优先级的数据处理
跨机房部署方案有一个共同的特点:数据需要能够从一个Kafka集群被拷贝到另一个集群,必须支持双向拷贝,某次传输的源集群可能是下次传输的目标集群。为此Kafka默认提供一个工具MirrorMaker,用来帮助用户实现数据在两个集群间拷贝。MirrorMaker仅仅是一个consumer+producer的混合体,对于源集群而言,它是一个consumer,对于目标集群而言,它是一个producer。MirrorMaker读取源集群指定topic的数据,然后写入目标集群中的同名topic下。用户可以运行多个MirrorMaker实例增加整体数据拷贝的吞吐量,同时还提升了容错性,当一个实例崩溃后,其他实例能够自动地承担起它的负载
实际生产环境中,源集群和目标集群完全独立的两套环境,它们上的topic可能设置了不同的分区数且有不同的位移值,因此MirrorMaker工具并不能完美实现容错性,因为consumer的位移值可能是不同的。不过MirrorMaker依然会保存并使用消息的key来执行分区任务
MirrorMaker的脚本名为kafka-mirror-maker.sh,位于Kafka安装目录的/bin子目录下。MirrorMaker脚本参数及含义
参数名 | 参数含义 |
---|---|
–whitelist | 指定一个正则表达式,指定拷贝源集群的那些topic,比如a|b表示拷贝源集群上两个topic的数据a和b。使用新版本consumer必须指定该参数 |
–blacklist | 指定一个正则表达式,屏蔽指定topic的拷贝,该参数只适用于老版本consumer |
–abort.on.send.failure | 若设置为true,当发送失败时则关闭MirrorMaker |
–consumer.config | 指定MirrorMaker下consumer的属性文件,至少要在文件中指定bootstrap.servers |
–producer.config | 指定MirrorMaker下producer的属性文件 |
–consumer.rebalance.listener | 指定MirrorMaker使用的consumer rebalance监听器类 |
–rebalance.listener.args | 指定MirrorMaker使用的consumer rebalance监听的参数,与consumer.rebalance.listener一同使用 |
–message.handler | 指定消息处理类。消息处理器在consumer获取消息与producer发送消息之间调用 |
–message.handler.args | 指定消息处理类的参数,与message.handler一同使用 |
–num.streams | 指定MirrorMaker线程数。默认是1,即启动一个线程执行数据拷贝 |
–offset.commit.interval.ms | 设置MirrorMaker位移提交间隔,默认1分种 |
–help | 打印帮助信息 |
实际使用中,被经常使用的参数是whitelist、consumer.config和producer.config.如果需要实现某些特定的拷贝逻辑,比如拷贝指定分区数据或选择性拷贝数据等,就需要实现特定的消息处理器并使用message.handler进行指定
典型的命令执行如下。该命令读取由consumer.properties文件指定的源Kafka集群,去读取名为topicA和topicB主题的消息,并写入到由producer.properties文件指定的目标集群
bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist
topicA|topicB
一个具体的实例,展示如何使用MirrorMaker工具在集群间进行数据拷贝。搭建3个Kafka集群,分别用k1、k2、k3表示。为简便起见都采用单节点部署,三个集群端口号分别为9092、9093和9094,在ZooKeeper上chroot分别是/k1、/k2和/k3
使用MirrorMaker将在k1上生产的消息,拷贝到k2和k3上,最后k3上运行consumer以验证消息是否被拷贝成功
我们在k1上创建一个测试topic
bin/kafka-topic.sh --create --zookeeper localhost:2181/k1 --topic test --partitions 1 --replication-factor 1
创建两个consumer.config文件mm-consumer-1st.config和mm-consumer-2nd.config
bootstrap.servers=localhost:9092
client.id=mm1.k1Andk2
group.id=mm1.k1Andk2.consumer
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
bootstrap.servers=localhost:9093
client.id=mm1.k2Andk3
group.id=mm1.k2Andk3.consumer
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
之后创建两个对象的producer.config文件mm-producer-1st.config和mm-producer-2nd.config
bootstrap.servers=localhost:9093
client.id=mm1.k1Andk2
bootstrap.servers=localhost:9094
client.id=mm1.k2Andk3
分别启动两个MirrorMaker
bin/kafka-mirror-maker.sh --consumer.config mm-consumer-1st.config --producer.config mm-producer-1st.config
--whitelist test
bin/kafka-mirror-maker.sh --consumer.config mm-consumer-2nd.config --producer.config mm-producer-2nd.config
--whitelist test
以上操作便完成了集群间消息的拷贝。用户可以启动一个生产端向k1发送消息,再在k3启动一个消费者完成消息消费。以验证上述操作是否生效
目前Kafka包括的安全特性如下
连接认证机制,包含服务端与客户端(生产者/消费者)连接、服务器间连接以及服务器与工具间连接。支持的认证机制包括SSL(TLS)或SASL
服务器与ZooKeeper之间连接的认证机制
基于SSL的连接通道数据传输加密
客户端读/写授权
支持可插拔的授权服务和与外部授权服务的集成
上面提到了认证和授权。认证是证明你是谁的过程,在访问Kafka服务时必须显式提供身份信息证明你的身份是合法的。授权则是证明你能访问哪些服务的过程。
Kafka安全主要包含三大功能:认证、信道加密、授权,其中的认证机制主要是配置SASL,而授权是通过ACL接口命令完成
在生产环境中,用户配置SASL,通常会配置Kerberos,但对于小体量的应用,并且运行于内网环境中,使用基于明文传输的SASL集群环境足以应付。下面介绍在不使用Kerberos情况下配置SASL + ACL构建Kafka集群
要开启SASL和ACL机制,需要在broker进行两个方面的设置。首先创建包含所有认证信息的JAAS文件。假设Kafka集群中有3个用户admin、reader、writer,其中admin是集群管理员,reader用户负责读取Kafka集群中的topic数据,writer用户负责向Kafka集群写入消息。则我们可以将JAAS文件设置如下
KafkaServer{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_reader="reader"
user_writer="writer";
};
如果不写JAAS文件将被视为无效。现在保存文件为jaas.conf,之后我们需要把这个文件的完整路径作为一个JVM参数传递给Kafka的启动脚本。由于bin/kafka-server.sh只接收server.properties的位置,不接收任何其它参数,故需要修改Kafka启动脚本
cd kafka_2.12-1.0.0/
# 备份一份新的启动脚本,并命名为secured-kafka-servrer-start.sh
cp bin/kafka-server-start.sh bin/secured-kafka-server-start.sh
修改新启动脚本
# 把文件中的这一行
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
# 修改为下面这行,然后保存退出
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=<你的路径>/jaas.conf kafka.Kafka "$@"
做完上面的步骤,我们在bin/目录下做好了一份新的Kafka启动脚本,下面修改broker启动所需要的server.properties文件,至少需要配置以下参数
# 配置ACL入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 本例使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 设置本例中admin为超级用户
super.users=User:admin
启动broker服务器
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/secured-kafka-server-start.sh config/server.properties
如果发现unable to find LoginModule class:*****之类的报错,则需要检查jaas.conf是否存在非法字符
此时broker端的认证已经开启且授权ACL接口也建立起来。下面开始为客户端开启认证。在之前我们需要创建一个test topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1
以上创建topic命令能正常执行成功,因为Kafka-topic.sh脚本是直接连接ZooKeeper,完全绕过ACL审查机制,不受ACL限制。因此无论是否配置ACL,用户都能使用kafka-topics来管理topic,因此在实际使用过程中,最好能对连接ZooKeeper的用户也增加认证机制
关于ACL还有一个参数配置必须要提一下,在server.properties中设置allow.everyone.if.no.acl.found=true,整个ACL机制将变为黑名单机制,即只有在黑名单中的用户才无法访问资源。非黑名单用户可以畅通无阻地访问任何Kafka资源。默认此参数为false,ACL为白名单机制,只有在白名单中的用户才能访问设定的资源,其它任何用户都属于未授权用户
我们下面开始,配置使用write用户发送消息。首先我们需要创建一个属于用户write的JAAS文件,该文件中指定用户write的连接信息
KafkaClient{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="writer"
password="writer";
};
我们把上述内容保存为writer_jaas.conf文件。我们需要拷贝一份新的bin/kafka-console-producer.sh脚本,并将该JAAS文件作为一个JVM参数
cp bin/kafka-console-producer.sh bin/writer-kafka-console-producer.sh
vi bin/writer-kafka-console-producer.sh
# 把该文件中的这一行
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
# 修改为下面这一行
exec @(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=<你的路径>/writer_jaas.conf kafka.tools.
ConsoleProducer "$@"
以上配置使生产端可以通过认证,但并不能通过授权。我们还需要配置ACL来让用户writer有权限写入topic
在设计具体ACL规则之前,需要先了解一下Kafka ACL的格式。Kafka中一条ACL的格式为"Principal P is [Allowed/Denied] Operation O From Host H On Resource R",含义描述如下
principal:表示一个Kafka user
operation:表示一个具体操作类型,如WRITE、READ、DESCRIBE等。完整操作列表
Host:表示连向Kafka集群的client的IP地址,如果是"*"则表示所有IP
Resource:表示一种Kafka资源类型,当前共有4种类型TOPIC、CLUSTER、GROUP和TRANSACTIONID
我们需要为writer用户赋予对应topic的写入权限,执行如下命令完成创建对应的ACL规则
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.
connect=localhost:2181 --add --allow-principal User:writer --operation Write --topic test
在配置好SASL和ACL后,我们用新的生产端发送消息
bin/wtiter-kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer-property security.
protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
与生产者类似,首先创建reader用户的JAAS文件reader_jaas.conf
KafkaClient{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="reader"
password="reader"
}
拷贝新的console consumer脚本来指定上面的reader_jaas.conf
cp bin/kafka-console-consumer.sh bin/reader-kafka-console-consumer.sh
vi bin/reader-kafka-console-consumer.sh
# 把该文件中的这行
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
# 修改为下面行
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=<你的路径>/reader_jaas.conf kafka.tools.
ConsoleConsumer "$@"
然后创建一个consumer.config为该console producer指定以下3个属性
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
gorup.id=test-group
为reader用户分配读取topic权限,创建如下ACL规则
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.
connect=localhost:2181 --add --allow-principal User:reader --operation Read --topic testc
为reader用户分配消费者组的读取权限,创建如下ACL规则
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.
connect=localhost:2181 --add --allow-principal User:reader --operation Read --group test-group
运行新的客户端实现消费消息
bin/reader-kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.
config consumer.config
我们使用admin用户查询消费者组的消费状态。为admin用户创建对应的JAAS文件admin_jaas.conf
KafkaClient{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
}
拷贝一份新的kafka-consumer-groups.sh脚本
cp bin/kafka-consumer-groups.sh bin/admin-kafka-consumer-groups.sh
vi bin/admin-kafka-consumer-groups.sh
# 把该文件中的这一行
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"
# 修改为下面这行,然后保存
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=<你的路径>/admin_jaas.conf kafka.admin.
ConsumerGroupCommand "$@"
也需要设置以下两个参数,保存到admin_sasl.config文件中
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
运行admin-kafka-consumer-group.sh脚本查询消费者组消费进度
bin/kafka-admin-consumer-group.sh --bootstrap-server localhost:9092 --group test-group --describe --command-config
admin_jaas.config
SSL信道加密在很多生产环境是一个必选项,为避免在Kafka中明文传输敏感的公司业务数据,开启SSL显得至关重要,SSL配置流程如下
前7步可以编写一个setup_ssl_for_server.sh的脚本完成。实际配置步骤就变为了
运行setup_ssl_for_servers.sh脚本
配置broker的server.properties
配置clients的特定属性
我们先看一下setup_ssl_for_servers.sh脚本实际内容
#! /bin/bash echo "1. 设置环境变量" ##########################设置环境变量######################## BASE_DIR=/mnt/disk/kafka_ssl # SSL各种生成文件的基础路径 CERT_OUTPUT_PATH="$BASE_DIR/certificates" # 证书文件生成路径 PASSWORD=kafka1234567 # 密码 KEY_STORE="$CERT_OUTPUT_PATH/kafka.keystore" # Kafka keystore文件路径 TRUST_STORE="$CERT_OUTPUT_PATH/kafka.truststore" # Kafka truststore文件路径 KEY_PASSWORD=$PASSWORD # keystore的key密码 STORE_PASSWORD=$PASSWORD # keystore的key密码 TRUST_KEY_PASSWORD=$PASSWORD # truststore的key密码 TRUST_STORE_PASSWORD=$PASSWORD # truststore的store密码 CLUSTER_NAME=test-cluster # 指定别名 CERT_AUTH_FILE="$CERT_OUTPUT_PATH/ca-cert" # CA证书文件路径 CLUSTER_CERT_FILE="$CERT_OUTPUT_PATH/${CLUSTER_NAME}-cert" # 集群证书文件路径 DAYS_VALID=365 # key有效期 D_NAME="CN=Xi Hu, OU=YourDept, O=YourCompany, L=Beiging, ST=Beijing, C=CN" # distingguished name ############################################################# mkdir -p $CERT_OUTPUT_PATH
echo "2. 创建集群证书到keystore"
keytool -keystore $KEY_STORE -alias $CLUSTER_NAME -validity $DAYS_VALID -genkey -keyalg RSA -storepass
$STORE_PASSWORD -keypass $KEY_PASSWORD -dname "$DNAME"
echo "3. 创建CA"
openssl req -new -x509 -keyout $CERT_OUTPUT_PATH/ca-key -out "$CERT_AUTH_FILE" -days "$DAYS_VALID" -passin
pass:"$PASSWORD" -passout pass:"$PASSWORD" -subj "/C=CN/ST=Beijing/L=Beiging/O=YourCompany/CN=Xi Hu"
echo "4. 导入CA文件到truststore"
keytool -keystore "$TRUST_STORE" -alias CARoot -import -file "$CERT_AUTH_FILE" -storepass "$TRUST_STORE_PASSWORD"
-keypass "$TRUST_KEY_PASS" -noprompt
echo "5. 从key store中导入集群证书"
keytool -keystore "$KEY_STORE -alias "$CLUSTER_NAME" -certreq -file "$CLUSTER_CERT_FILE" -storepass
"$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt
echo "6. 签发证书"
openssl x509 -req -CA "$CERT_AUTH_FILE" -CAkey $CERT_OUTPUT_PATH/ca-key -in "$CLUSTER_CERT_FILE" -out "$
{CLUSTER_CERT_FILE}-signed" -days "$DAYS_VALID" -CAcreateserial -passin pass:"$PASSWORD"
echo "7. 导入CA文件到keystore"
keytool -keystore "$KEY_STORE" -alias CARoot -import -file "$CERT_AUTH_FILE" -storepass "$STORE_PASSWORD" -keypass
"$KEY_PASSWORD" -noprompt
echo "8. 导入已签发证书到keystore"
keytool -keystore "$KEY_STORE" -alias "${CLUSTER_NAME}" -import -file "${CLUSTER_CERT_FILE}-signed" -storepass
"$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt
执行上述脚本后,到对应目录查看生成文件列表
ca-cert:CA文件,不要把该文件拷贝到别的broker机器上
test-cluster-cert-signed:CA已签发的Kafka证书文件,不要把该文件拷贝到别的broker机器上
test-cluster-cer:Kafka认证文件(包含公钥和私钥),不要把该文件拷贝到别的broker机器上
kafka.keystore:Kafka的keystore文件,所有clients端和broker机器上都需要
kafka.truststore:Kafka的truststore文件,所有clients端和broker机器上都需要。
脚本执行成功后,我们需要对相应的Kafka参数进行设置,与SSL相关的部分如下
listeners=PLAINTEXT://:9092,SSL://:9093 # 为Kafka broker配置两个listeners,一个是明文传输,另一个使用SSL加密进行数据传输。连接
9093端口则会进行SSL加密
advertised.listeners=PLAINTEXT://公网IP:9092,SSL://公网IP:9093 # 如果是云上环境,即clients通过公网去连接broker,那么advertiesd.listeners
就必须配置成所在机器的公网IP
ssl.keystore.location=/mnt/disk/aim/certificates/kafka.keystore # 提供SSL keystore的文件
ssl.keystore.password=kafka123456 # 提供keystore密码
ssl.truststore.location=/mnt/disk/aim/certificates/kafka.truststore # 提供SSL truststore的文件
ssl.truststore.password=kafka123456 # 提供truststore密码
ssl.key.password=kafka123456 # keystore中的私钥密码
ssl.client.auth=required # 设置clients也要开启认证
配置好broker后,就可以尝试启动broker,只有运行下面命令创建topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 1 --replication-factor 1
至此broker端的SSL配置便完成了。为让客户端程序使用SSL信道给broker端发送请求,我们需要配置客户端程序
首先我们创建一个producer.config文件
bootstrap.servers=kafka1:9093 # 指定9093端口,即使用SSL监听器端口
security.protocol=SSL
ssl.truststore.location=<你的路径>/kafka.truststore # 指定truststore文件
ssl.truststore.password=kafka123456
ssl.keystore.password=kafka123456
ssl.keystore.location=<你的路径>/kafka.keystore # 指定keystore文件
然后运行脚本开始发送消息
bin/kafka-console-producer.sh --broker-list kafka1:9093 --topic test --producer.config producer.config
此生产者连接端口是9093,同理消费者我们也需要创建一个consumer.config文件
security.protocol=SSL
group.id=test-group
ssl.truststore.location=<你的路径>/kafka.truststore # 指定truststore文件
ssl.truststore.password=kafka123456
ssl.keystore.password=kafka123456
ssl.keystore.location=<你的路径>/kafka.keystore # keystore文件
创建消费者验证
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9093 --topic test --from-beginning --consumer.config
consumer.config
这样客户端与服务端通信皆使用SSL进行加密。开启SSL后Kafka的吞吐量,特别是客户端通常有10%~40%下降,用户需要权衡SSL与新能之间的收益
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: this server does not host this topic-partition
该异常表示请求分区数据不在抛出该异常的broker上,kafka抛出该异常的原因通常有下面三种
follower副本所在的broker在另一个broker成为leader之前率先完成了成为follower的操作,使得follower从leader拉取数据时发现leader broker上还未准备好数据,抛出此错误,该问题在下一轮PRC中会恢复
producer向不存在的topic发送数据时,broker会封装该异常返回给producer。不过该异常属于可重试类异常,打开"自动创建topic开关"的前提下,producer端会自动创建topic,重试发送请求。如果用户持续在producer端看到该异常,可检查auto.create.topics.enable参数,也可适当增大retries值
当启动ACL后,Kafka对于未授权操作中的topic一律返回该异常,主要是基于不对外暴露topic是否存在的考虑
此异常属于可重试异常,能自行修复,偶尔在日志中看到也不必过于在意,但如果持续观测到该异常则需要进行相应处理。若broker端抛出,则检查对应的topic分区部署情况;若clients端抛出,则检查cloents端是否向不存在topic发送请求
LEADER_NOT_AVAILABLE: There is not leader for this topic-partition as we are in the middle of a leadership election
由抛错信息可知对应分区当前没有leader。没有leader的原因很多,比如正在进行新leader的选举,或是该topic正在被删除,使得leader broker是-1,即无leader。实际环境中,该异常都是瞬时出现,比如当producer向一个不存在的topic发送消息时,若auto.create.topic.enable=true,broker会自动创建
该topic,然后向clients端返回LEADER_NOT_AVAILABLE异常,显示告知producer要重新请求最新的元数据,因此在producer端会瞬时抛出LEADER_NOT_AVAILABLE警告
若用户持续发现该异常,则需要使用kafka-topics脚本检查分区的leader信息。如果leader为-1,可考虑使用kafka-preferred-replica-election.sh手动调整leader选举。依然不好用,则检查controller broker的存活情况
NotLeaderForPartitionException: This server is not the leader for that topic-partition
该异常主要是指当前broker已不是对应分区的leader broker,通常发送在leader变更的情况下。当leader从一个broker切换到另一个broker时,原有的clients或follower依然可能向老的leader请求数据。在这种情况下Kafka会抛出异常
该异常应该是瞬时的,如果持续观测到则需要检查broker端、clients端的日志来进一步定位问题
TimeoutException: ...
此异常表示请求超时,请求超时可以是各种类型的请求,如请求元数据信息或生产消息请求。若遇到该异常需要定位该异常抛出的地方,比如producer端、broker端、还是consumer端。哪里抛出的异常就增加哪里的request.timeout.ms参数值。但若依然不管用,则需要考虑用户环境中broker或clients是否负载过重,导致任务堆积不被处理,这就可能需要从架构方面进行改造
实际生产环境该异常多发生在producer端,通常是因为producer应用的后台发送线程无法匹配用户主线程的消息创建速率。有两种解决方法
在出现该异常后尽量避免共享相同的producer实例
适当增加request.timeout.ms以及适当减少batch.size
当producer端无法从Kafka集群获取元数据时,也会抛出这个异常,特别对于那些未正确配置连接信息的producer而言。此时用户需要仔细检查bootstrap.server连接设置,查看是否存在无法连接的情况。如果Kafka broker搭建在云环境主机上,通常需要设置broker端参数advertised.listeners
当前若要让Kafka集群处理大消息,总共有3个参数需要调整
broker端参数message.max.bytes:设置broker端能处理的最大消息长度
producer端参数max.request.size:设置producer端能处理的最大消息长度
consumer端参数fetch.max.bytes:设置consumer端能处理的最大消息长度
broker端参数socket.request.max.bytes:设置broker端socker请求的最大字节数。通常不需要额外配置该参数,但如果向Kafka发送超过100MB超大消息,则需要调整该参数
除socket.request.max.bytes默认值是100MB,其它几个参数值大约在1MB左右,对于生产环境的大消息而言,用户需要同时调整上面这些参数
NetworkException:The server disconnected before a response was received
此异常是producer端抛出,主要是因为producer在工作过程中断开了与某些broker的连接,从而使得发送到这些broker的PRODUCE请求失败。NetworkException是可重试异常,通常情况下是瞬时出现。但若用户在producer端持续观测到该异常,则需要检查producer与对应broker节点的连通性及broker节点的存活情况
ILLEGAL_GENERATION:Specified group generation id is not avlid
这是consumer抛出的异常,表示当前consumer错过了consumer group正在进行的rebalance,原因是该consumer花费大量时间处理poll()返回的数据。用户需要适当减少max.poll.records值以及增加max.poll.interval.ms值。也可以优化消息处理的逻辑,比如将poll回来的消息放入单独的线程进行处理
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。