当前位置:   article > 正文

管理Kafka集群_kafka集群管理

kafka集群管理

管理Kafka集群

集群管理

启动broker

启动Kafka服务器需要首先启动ZooKeeper服务器,Kafka使用ZooKeeper作为集群的管理服务。在生产环境中启动Kafka建议使用-daemon参数启动

bin/kafka-server-start.sh -damon <path>/server.properties
  • 1

不要使用如下方式启动服务

bin/kafka-server-start.sh <path>/server.properties &
  • 1

虽然通过添加&符号使该命令在后台运行,但当用户会话登出时该进程会自动kill掉,导致broker关闭。我们也可以使用nohup … &的方式启动Kafka集群

nohup bin/kafka-server-start.sh <path>/server.properties &
  • 1

启动broker之后,建议用户查看启动日志以确保broker启动成功。服务器日志通常保存在kafka安装目录的logs子目录下,名字为server.log.

关闭broker

前台方式启动broker进程时

前台方式启动broker是指,在Linux终端不加nohup或-daemon参数的方式直接启动broker,这种方式多用于本地测试或DEBUG。关闭这种方式启动的broker 进程,只需要在该终端使用Ctrl + C组合键发送SIGINT信号终止进程即可。Kafka broker进程会捕获SIGINT信号并执行broker关闭逻辑

后台方式启动broker进程

当使用前面的-daemon或nohup方式启动broker后,正确关闭broker的方式就是使用Kafka自带的kafka-server-stop.sh脚本。该脚本位于Kafka安装目录 的bin子目录下

bin/kafka-server-stop.sh
  • 1

该脚本会搜索当前机器中所有的Kafka broker进程,然后关闭他们。如果用户机器上存在多个broker进程,该脚本会全部关闭它们。不建议直接使用kill -9方式杀死broker进程,因为这通常会造成一些broker状态的不一致。当broker在关闭过程中如果出现错误会进行重试。若重试次数用完了依然无法正常关闭
broker进程,Kafka会进行强制关闭。当用户发现关闭broker时不会立即停止,可以打开server.log实时监控关闭进度,一般过一段时间broker进程总能 结束

由于上述脚本依靠操作系统命令(如ps ax和grep)来获取Kafka broker的进程号,一旦用户的Kafka安装目录的路径过长,则可能令该脚本失效从而无法正确 获取broker的PID。对于这种情况,可以使用如下方式关闭broker

  • 如果机器上安装JDK,可运行jps命令直接获取Kafka的PID,否则转到下一步

  • 运行ps ax | grep -i ‘kafka.Kafka’ | grep java | grep -v grep | awk '{print $1}'命令自行寻找Kafka的PID

  • 运行Kill -s TERM $PID关闭broker

以上3步也是kafka-server-stop脚本的执行逻辑

设置JMX端口

Kafka提供了丰富的JMX指标用于实时监控集群运行的健康程度。不过若要使用它们,用户必须启动broker前首先设置JMX端口。设置方式分为两种。若以 前台方式运行broker,只需在执行启动命令前设置JMX_PORT环境变量,如果下

JMX_PORT=9997 bin/kafka-server-start.sh <path>/server.properties
  • 1

另一种以后台运行broker的设置方法类似,依然是在启动broker前设置JMX_PORT环境变量

export JMX_PORT=9997 bin/kafka-server-start.sh -daemon <path>/server.properties
  • 1

增加broker

由于Kafka集群的服务发现交由ZooKeeper来处理,因此向Kafka集群增加新的broker服务非常容易。用户只需要为新增broker设置一个唯一的broker.id,然后启动即可。Kafka集群能自动发现新启动的broker并同步所有的元数据信息,主要包括当前集群有哪些主题以及topic都有哪些分区等

但是新增的broker不会自动被分配任何已有的topic分区,用户必须手动执行分区重分配操作才能使他们为已有topic服务。在这些broker启动之后新创建的topic分区还是可能分配给这些broker。用户需要对各个broker上的负载进行规划,避免出现负载极度不均匀情况

升级broker版本

升级kafka集群的版本实际上非常简单,核心步骤只需要4步,但通常情况下用户都想要最大程度缩短服务的宕机时间,特别不要干扰集群上producer和consumer 的正常运转

  • 第一步:更新broker间通信版本和消息版本。向所有broker的server.properties中增加下面两行

        inter.broker.protocol.version=0.10.0
        log.message.format.version=0.10.0
    
    • 1
    • 2

    如果不是从0.10.0.0版本升级,则只需要填写用户环境下当前broker的版本

  • 第二步:依次更新代码,重启所有broker。下载需要升级到的版本的Kafka二进制包,覆盖已有的目录,然后依次重启所有broker,比如先重启broker1, 然后重启broker2。也可以不覆盖安装目录,单独放置在全新目录下

  • 第三步:再次更新broker间通信版本和消息版本。将通信版本和消息版本都调整为需要升级到的版本

  • 第四步:再次依次重启broker

到此Kafka集群的升级工作就完成了。在上述每一步执行过程中,producer都应该是可以正常工作的,提交失败的消息数应该始终是0;而consumer可能会出现提交失败的警告造成消息重复消费,这个问题通常都是由下游子系统负责去重。broker端,会出现短暂的org.apache.kafka.common.errors.NotLeaderForPartitionException异常,表明内部topic __consumer_offsets各分区的leader在轮流重启过程中出现短暂的不可用。这个异常通常是瞬时发生且通常可以自行恢复

topic管理

创建topic

Kafka创建topic的途径目前有4种

  • 执行kafka-topic.sh命令行工具创建

  • 显示发送CreateTopicsRequest请求创建topic

  • 发送MetadataRequest请求且broker端设置了auto.create.topics.enable为true

  • 向ZooKeeper的/brokers/topic路径下写入以topic名称命名的子节点

除上面4种方法外,还有其他方式可创建topic但并不推荐,包括上述的第4种方法也是不推荐的。官方推荐用户使用前两种方式来创建topic

参数名参数含义
–alter用于修改topic的信息,比如分区数、副本因子
–config <key=value>设置topic级别的参数,比如cleanup.policy等
–create创建topic
–delete删除topic
–delete-config 删除topic级别参数
–describe列出topic详情
–disable-rack-aware创建topic时不考虑机架信息
–force无效参数,当前未使用
–help打印帮助信息
–if-exists若设置,脚本只会对已存在的topic执行操作
–if-no-exists若设置,当创建已存在同名topic时不会抛出错误
–list列出集群当前所有topic
–partitions <分区数>创建或修改topic时指定分区数
–replication-assignment手动指定分区分配CSV方案,副本之间使用冒号分割。比如指定双分区方案为0:1:2,3:4:5,表示分区1的3个副本在broker 0、1、2上,分区2在broker 3、4、5上
–replication-factor指定副本因子
–topic指定topic名称
–topic-with-overrides展示topic详情时不显示具体的分区信息
–unavaliable-paritions只显示topic不可用的分区,即没有leader的分区
–under-replicated-partitions只显示副本数不足的分区信息
–zookeeper指定连接的ZooKeeper信息
自动分区分配创建topic

使用自动分区分配创建一个topic,名为test-topic, 6个分区,每个分区3个副本,同时指定该topic的日志留存时间为3天

bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 6 --replication-factory 3 --topic test-topic 
--config delete.retention.ms=259200000
  • 1
  • 2
手动指定分区分配策略创建topic

假设有3台broker构成的Kafka集群,broker id分别为0、1、2。创建一个topic,名称为test-topic2,分区数为4,副本因子是2。手动分配方案如下

  • 分区1:0、1

  • 分区2:1、2

  • 分区3:0、2

创建topic命令如下

bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test-topic2 --replica-assignment 0:1,1:2,0:2,1:2
  • 1

若指定–replica-assignment,用户不必指定–partitions和–replication-factor,因为脚本可以从手动分配方案中计算出topic的分区数和副本因子

删除topic

与创建topic类似,删除topic有如下3种方式

  • 使用kafka-topics脚本:这是最常见也是最正式的方法

  • 构造DeleteTopicsRequest请求:这种方式需要手动编写程序实现

  • 直接向ZooKeeper的/admin/delete_topics下写入子节点:不推荐,实际场景中谨慎使用

通过手动删除topic底层的日志文件及ZooKeeper中与topic相关的所有znode来删除topic数据是不推荐的方式,这种方式只能保证物理文件被删除,但集群 broker内存中依然保存有这些信息

最正式的删除方式是执行kafka-topic.sh --delete命令,该命令执行之后通常会立即返回,但真正的删除动作是在一个异步任务中完成。不管使用那种删除方式,首先需要确保broker端参数delete.topic.enable被设置为true,否则Kafka不会删除topic。在1.0.0之前的版本,默认设置为false,在1.0.0之后的版本默认为true。具体删除topic命令如下

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test-topic
  • 1

执行上述命令test-topic将被标记为"待删除"状态,如果delete.topic.enable为false,则不会实际完成删除操作。用户需要等一段时间后通过 kafka-topics.sh -list命令查询topic列表确定topic是否已经被删除

查询topic列表

使用kafka-topic脚本运行用户查询当前集群的topic列表

bin/kafka-topics.sh --zookeeper localhost:2181 --list
  • 1

查询topic详情

kafka-topic脚本可以查询单个或所有topic的详情,显示topic的分区数、副本数、以及leader副本所在broker的ID值、副本所在broker的ID、ISR副本 列表等信息

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test-topic2
  • 1

当不指定–topic test-topic2则表明查询集群所有topic的详情

bin/kafka-topics.sh --zookeeper localhost:2181 --describe
  • 1

修改topic

topic创建后,Kafka依然允许修改topic的某些参数及配置,如分区数、副本因子和topic级别参数等

以下命令展示如果新增topic分区。在新增分区时我们需要注意,当topic消息有key时,根据key确定目标分区的算法就会因为分区数的增加而发生变更。 虽然分区数可以增加但并不允许减少

bin/kafka-topic.sh --alter --zookeeper localhost:2181 --partitions 10 --topic test-topic2
  • 1

可以使用kafka-configs脚本为已有topic设置topic级别的参数,虽然kafka-topics.sh --alter也能设置参数,但并不推荐使用。下面表格说明kafka-configs 脚本命令行参数及其含义

参数名参数含义
–add-config k1=v1,k2=v2,…设置topic级别参数
–alter修改topic或其他实体类型
–delete-config删除指定的参数
–entity-default与配额设置有关,用于Kafka配置的默认配额值
–describe列出给定实体类型的参数详情
–entity-name指定实体名称,若是topic类型,则是topic名称
–entity-type指定实体类型,总共4类实体类型:user、topic、clients、broker
–help打印帮助信息
–zookeeper指定连接的ZooKeeper信息

当前Kafka定义4类实体,分别是topic、user、clients和broker,kafka-configs脚本用于为这4类实体进行参数设置,每类实体都能设置专属的配置 参数

我们为test-topic2设置cleanup.policy=compact参数,命令如下

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test-topic2 --add-config 
cleanup.policy=compact
  • 1
  • 2

使用kafka-configs的–describe选项确认参数是否增加成功

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name test-topic2
  • 1

topic动态配置管理

增加topic配置

Kafka支持的topic参数可参见官网http://kafka.apache.org/documentation/#topicconfigs,以下列举topic常见且较为重要的参数

参数名参数含义
cleanup.policy指定topic留存策略,可以是compact、delete或同时指定两者
compression.type指定该topic消息的压缩类型
max.message.bytes指定broker端能接收该topic消息的最大长度
min.insync.replicas指定ISR中需要接收topic消息的最少broker数,与producer端参数acks=-1配合使用
preallocate是否为该topic的日志文件提前分配存储空间
retention.ms指定持有该topic单个分区消息的最长时间
segment.bytes指定该topic日志段文件的大小
unclean.leader.election.enable是否为topic启用unclean领导者选举

与broker端参数不同,上面这些topic级别参数可以动态修改而无须重启broker。我们创建一个topic,之后动态增加preallocate、segment.bytes两个 参数

bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test --add-config 
preallocate = true,segment.bytes=104857600

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

查看topic配置

查看topic配置,既可以使用kafka-topics脚本,又可以使用kafka-configs脚本

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics --entity-name test
  • 1
  • 2
  • 3

删除topic配置

通过kafka-configs脚本删除topic配置

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test --delete-config 
preallocate

bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics --entity-name test
  • 1
  • 2
  • 3
  • 4

consumer相关管理

在生产环境对于消费者组的运行情况有很强的监控需求。可以通过各种第三方的监控框架来监控消费者组。Kafka默认也自带了工具脚本来监控并管理消费者组的 执行情况

Kafka提供的用于查询消费者组的脚本是kafka-consumer-groups.sh,位于Kafka路径的bin/子目录下,该脚本的主要参数及含义如下

参数名参数含义
–bootstrap-server指定Kafka集群的broker列表,CSV格式,查询新版本消费者组时使用
–list列出集群当前所有消费者组
–describe查询消费者组详情(包括消费滞后情况)
–group指定消费者组名称
–zookeeper指定ZooKeeper连接信息,查询老版本消费者组时使用
–reset-offsets重新设置消费者组位移

首先创建一个测试topic,名为test,单分区,副本因子为1,然后使用kafka-producer-perf-test.sh脚本向该topic生产500万条消息

bin/kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

bin/kafka-producer-perf-test.sh --topic test --throughput -1 --num-records 500000 --record-size 100 --producer-props 
bootstrap.servers=localhost:9092 acks=-1
  • 1
  • 2
  • 3
  • 4

之后分别打开两个终端,通过kafka-console-consumer.sh创建两个测试消费者组

bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic test --from-beginning --consumer-property 
group.id=test-group1

bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic test --consumer-property group.id=test-group2
  • 1
  • 2
  • 3
  • 4

为模拟不同消费进度,test-group1指定–from-beginning,表示从头消费topic;test-group2未指定–from-beginning,表示从topic的最新位移处开始消费。使用kafka-consumer-groups.sh来查询消费者组的情况

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  • 1

通过以下命令可以查询每个消费组的详情信息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group1

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group2
  • 1
  • 2
  • 3

运行以上命令将会展示如下内容

  • TOPIC:表示该消费者组消费了哪些topic

  • PARTITION:表示该消费者组消费的是哪个分区

  • CURRENT-OFFSET:表示消费者组最新消费的位移值

  • LOG-END-OFFSET:表示topic所有分区当前的日志终端位移值

  • LAG:表示消费滞后进度,该值等于LOG-END-OFFSET与CURRENT-OFFSET的差值,通常为大于或等于0的正数。若该值接近LOG-END-OFFSET则表明该消费者组消费滞后严重。特别的,若该值小于0,则表明存在消费数据丢失的情况,即有些消息为被消费就被consumer直接跳过了。若出现这种情况,需要确认broker端参数unclean.leader.election.enable的值是否被设置为true,并进一步研究是否有可能出现因为unclean领导者选择而造成的数据丢失

  • CONSUMER-ID:表示consumer的ID,通常是Kafka自动生成的。如果该列没有值,通常表明此consumer目前尚未处于运行中

  • HOST:表示consumer所在的broker主机信息。如果该列没有值,通常表明此consumer目前尚未处于运行中

  • CLIENT-ID:表示用户指定或系统自动生成的一个标识consumer所在客户端的ID。此ID通常用于定位或调试问题

重设消费者组位移

此功能0.11.0.0版本开始提供。对于之前的老版本,对已有的消费者组调整位移,必须手动编写Java程序调用KafkaConsumer#seek方法。在0.11.0.0版本开始kafka-consumer-groups脚本支持为已有consumer group重新设置位移,但consumer group不能处于运行状态

重设位移流程由3步组成
在这里插入图片描述

第一步确定消费者组下topic作用域,当前支持3种作用域

  • –all-topics:为消费者组下所有topic的所有分区调整位移

  • –topic t1, --topic t2:为指定的若干个topic的所有分区调整位移

  • –topic t1:0,1,2:为topic的指定分区调整位移

确定topic作用域后,第二步确定位移重设策略,当前支持8种设置规则

  • –to-earliest:把位移调整到分区当前最早位移处

  • –to-latest:把位移调整到分区当前最新位移处

  • –to-current:把位移调整到分区当前位移处

  • –to-offset :把位移调整到指定位移处

  • –shift-by N:把位移调整到当前位移+N处,N可为负值

  • –to-datetime :把当前位移调整到大于给定时间的最早位移处。datatime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000

  • –by-duration :把位移调整到距离当前时间指定间隔的位移处。duration格式是PnDTnHnMnS,比如PT0H5M0S

  • –from-file :从CSV文件中读取位移调整策略

最后一步确定执行方案,当前支持如下3种方案

  • 不加任何参数:只是打印位移调整方案,不实际执行

  • –execute:执行真正的位移调整

  • –export:把位移调整方案保存成CSV格式并输出到控制台,方便用户保存为CSV文件,供后续结合–from-file参数使用

具体用法如下

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics 
--to-earliest --execute

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics 
--to-latest --excute

...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

删除消费者组

Kafka自带的kafka-consumer-groups脚本提供了删除处于inactive状态的老版本消费者组信息。对于新版本不需要手动删除,通过offsets.retention.minutes的参数,默认1天,kafka会根据此参数控制移除inactive消费者组位移的信息的时间。默认情况下,对于一个新版本的消费者而言,即使所有的成员都已经推出组,它的位移信息不会马上删除,Kafka会在最后一个成员推出组1天之后删除该组的所有信息

topic分区管理

preferred leader选举

在Kafka集群中,broker服务器宕机或者崩溃是不可避免的,一旦发生这种情况,该broker上的那些leader副本将不可用,因此必然要求Kafka把这些分区的leader转移到其他的broker上,即使奔溃broker重启回来,其上的副本也能作为follower副本加入ISR中,不能再对外提供服务

随着集群不断运行,leader的不均衡现象开始出现,集群中的一小部分broker上承载了大量的leader分区副本。为应对这种情况,Kafka引入首选副本(preferred replica)概念.假设为一个分区分配了3个副本,分别为0、1、2。节点0就是该分区的preferred replica,并且通常情况下不会发生变更。选择节点0的原因仅仅是因为它是 第一个副本

Kafka提供两种方式让用户将指定分区的leader调整回其preferred replica。这个过程被称为preferred leader选举。第一种方式:使用Kafka自带的kafka-preferred-replica-election.sh脚本,其参数及含义如下

属性名属性含义
–zookeeper指定ZooKeeper连接信息
–path-to-json-file指定json文件路径,该文件包含为那些分区执行preferred leader选举。也可以不指定该参数,若不指定则表明为集群中所有分区都执行preferred leader选举

搭建一个3节点Kafka环境,创建test-topic测试topic,包含3个分区、3个副本

bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --replication-factory 3 --topic test-topic

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic
  • 1
  • 2
  • 3

当前3个分区的leader副本已经是他们的preferred replica,一次关闭broker1和broker2模拟集群崩溃,此时所有分区的leader都在broker0上,造成集群leader分布不均匀的情况。执行describe确认leader副本所在分区

bin/kafka-topic.sh --describe --zookeeper localhost:2181 --topic test-topic
  • 1

我们看到3个分区的leader都变成了broker0。现在开始执行kafka-preferred-replica-election.sh脚本来调整leader。构造如下json文件

echo '{"partitons":[{"topic":"test-topic","partition":0},{"topic":"test-topic","partition":1}]}' > preferred-leader-plan.json
  • 1

执行kafka-preferred-replica-election.sh脚本

bin/kafka-perferred-replica-election.sh -zookeeper localhost:2181 --path-to-json-file <path>/preferred-leader-plan.json
  • 1

再次describe查看topic情况

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic-test-topic
  • 1

通过输出可以发现test-topic所有分区都已经调整成他们的preferred replica.当然也可以不指定–path-to-json-file参数直接运行kafka-preferred-replica-election.sh对所有topic执行preferred leader选举。但在实际使用场景中不推荐这种使用方式,对所有分区进行leader转移是一项成本很高的工作

除通过kafka-preferred-replica-election.sh脚本进行preferred leader选举之外,还可以通过broker端参数auto.leader.rebalance.enable自动执行此操作。该参数默认值为true,表明每台broker启动后都会在后台自定定期执行preferred leader选举,与此相关的还有两个broker端参数
leader.imbalance.check.interval.seconds和leader.imbalance.per.broker.precentage。前者控制阶段性操作的时间间隔,当前默认值300秒,即Kafka每5分钟会尝试在后台运行一个preferred leader选举;后者用于确定要执行preferred leader选举的目标分区,当前默认是10,表示若broker上leader不均衡程度超过10%,则Kafka需要为该broker上的分区执行preferred leader选举。Kafka计算不均衡程度的算法为:该broker上的leader不是 preferred replica的分区数/broker上总的分区数

分区重分配

新增broker不会自动分担已有topic的负载,只会对增加topic后新创建的topic生效,如果要让新增broker为已有topci服务,需要用户手动调整已有topic分区分布,将一部分分区搬移到新增broker上。这就是分区重分配操作。Kafka提供了分区重分配脚本kafka-reassign-partition.sh。用户使用该工具需要提供一组需要执行分区重分配的topic列表及对应的一组broker。该脚本接到用户这些消息后会尝试制作一个重分配方案,会力求保证均匀分配给定topic 的所有分区到目标broker上

搭建一个4节点的Kafka集群,然后创建两个测试topic,foo1和foo2,都是3个分区,副本因子都是2。之后把两个topic的分区都搬移到新的broker节点上, 例如broker5和broker6

首先创建一个json文件指定需要执行分区重分配方案的topic列表

echo '{"topics": [{"topic": "fool"}, {"topic": "foo2"}], "version":1}' > topics-to-move.json
  • 1

执行kafka-reassign-partitions.sh脚本产生一个分配方案

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topic-to-move.json --broker-list 
"5,6" --generate
  • 1
  • 2

上述命令执行后会给出当前的分区情况以及一个候选的重分配方案。此时分区重分配并没有真正执行,仅是产生一个可能的方案,用户需要把当前的情况保存下来以备后续的rollback,同时把新的候选方案保存为另一个新的json文件expand-cluster-reassignment.json,之后便可以开始执行真正的分区重分配

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.
json --execute
  • 1
  • 2

命令执行成功,之后可以指定–verify参数来验证分区重分配执行成功

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.
json --verify
  • 1
  • 2

也可以不使用–generate参数生成预分配方案,直接自己指定分配方案。假设我们要把foo1分区0的两个副本移到broker5和broker6上,同时把foo2分区1的两个副本移到broker2和broker3上,我们可以自己编写分配方案

echo '{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}' > custom-reassignment.json
  • 1

使用该json文件和–execute参数来开启重分配操作

bin/kafka-reassign-partions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
  • 1

执行完上面命令后,可以使用–verify进行验证

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
  • 1

实际生产环境中,用户需要谨慎发起重分配操作,分区在不同broker间进行数据迁移会极大占用broker机器的带宽资源,显著影响clients端业务应用的性能。 尽量在非高峰时段执行重分配操作

增加副本因子

Kafka支持为已有topic分区增加副本因子,具体方法是使用kafka-reassign-partitions.sh脚本,为topic分区增加额外的副本。假设在一个3节点的Kafka集群中创建一个单分区、副本因子是1的topic名为test,运行如下命令

bin/kafka-topic.sh --create --topic test --zookeeper localhost:2181 --partitions 1 --replication-factor 1
  • 1

我们把副本数设置为3,首先创建json文件

echo '{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[0,1,2]}]}' > increase-replication-factor.json
  • 1

执行重分配命令

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factory.
json --execute
  • 1
  • 2

之后同样可以使用–verify来验证重分配是否成功

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factory.
json --verify
  • 1
  • 2

我们可以通过kafka-topics脚本来查看topic的信息

bin/kafka-topic.sh --describe --topic test --zookeeper localhost:2181
  • 1

Kafka常见脚本工具

kafka-console-producer脚本

kafka-console-producer脚本与kafka-console-consumer脚本允许用户在控制台上方便地对Kafka集群进行producer和consumer测试。kafka-console-producer脚本从控制台读取标准输入,将其发送到指定的Kafka topic上。该脚本位于Kafka路径的/bin子目录下。其主要参数及含义如下

参数名参数含义
–broker-list指定Kafka集群连接信息。如果多台broker需要以CSV格式指定,如k1:port1,k2:port,k3:port…
–topic指定producer将消息发送到哪个topic
–producer-property指定producer的其他定制属性,如acks、compression.type等
–producer.config将producer其他定制属性保存在文件中,指定给该producer
–compression-code指定producer消息压缩类型
–timeout指定producer的linger.ms值
–request-required-acks指定producer的acks值
–max-memory-bytes指定producer的buffer.memory值
–message-send-max-retries指定producer的retries值
–max-partition-memory-bytes指定producer的batch.size值

我们使用kafka-console-producer脚本向Topic为test的主题发送消息,指定acks为all,使用LZ4进行消息压缩,把失败重试次数设置为10次,linger.ms设置为3秒, 执行如下命令

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --compression-code lz4 
--request-required-acks all --timeout 3000 --message-send-max-reties 10
  • 1
  • 2

kafka-console-consumer脚本

kafka-console-consumer脚本从Kafka topic读取消息并写入到标准输出。该脚本位于Kafka路径的/bin子目录下,其主要参数及含义如下

参数名参数含义
–bootstrap-server指定Kafka集群连接信息。如果多台broker需要以CSV格式指定,如k1:port1,k2:port,k3:port…
–topic指定consumer消费的topic
–from-beginning类似于设置consumer属性auto.offset.reset=earliest,即从当前最早位移处开始消费
–zookeeper指定使用老版本consumer,不可与–bootstrap-server同时使用
–consumer-property指定consumer端参数
–consumer.config以文件方式指定consumer端参数
–partition指定要消费的特定分区

运行脚本,从test topic获取消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  • 1

kafka-run-class脚本

之前所述的各种Kafka脚本虽然实现了各自不同功能,但底层都是Kafka-run-class脚本实现。如kafka-console-consumer.sh是使用kafka-run-class.sh Kafka.tool.ConsoleConsumer <参数列表>实现,kafka-console-producer.sh是使用kafka-run-class.sh kafka.tools.ConsoleProducer<参数列表>

kafka-run-class.sh是一个通用的脚本,允许用户直接指定一个可执行的Java类和一组可选择的参数列表,从而调用该类的实现逻辑。上面提到的脚本都是Kafka社区开发人员帮助用户封装的。还有很多其他的有用的功能没有封装成单独的Shell脚本。下面将介绍几种比较实用的工具

查看消息元数据

通过kafka.tools.DumpLogSegments可以查看topic的消息元数据,消息元数据信息包括消息的位移、创建时间戳、压缩类型、字节数等

bin/kakfa-run-class.sh kafka.tools.DumpLogSegments --files ../datalogs/kafka_1/t1-0/000000000000000000.log
  • 1

上面命令中–files指定topic为kafka_1的日志段文件的完整路径。以上命令将解析topic kafka_1的0分区的第一个日志段的内容。每行表示一条消息,详细列出消息的位移信息、物理文件位置、消息长度、压缩类型、CRC码等元数据信息。如果topic开启消息压缩,则可能会有多条消息被压缩进一条外层消息(wrapper message)中, DumpLogSegments默认只显示wrapper message。我们查看baseOffset、与lastOffset即可知道一共包含几条消息,比如baseOffset为0,lastOffset为1, 则一共包含两条消息

DumpLogSegments工具为开启消息压缩的日志提供了–deep-iteration参数来深度遍历被压缩的消息

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ../datalogs/kafka_1/t2-0/000000000000000000.log 
--deep-iteration
  • 1
  • 2

使用–deep-iteration参数之后多条被压缩在一起的消息将被打开,并按顺序展示。对于非压缩的消息,使用该参数将输出相同的结果。DumpLogSegments 不只查询日志段文件,也可以查询索引文件

bin/kafka-run-class.sh kafka.tool.DumpLogSegments --files ../datalogs/kafka_1/t1-0/0000000000000000000.index
  • 1

将打印出索引文件内容,每一行格式为[位移 相对物理文件位置]

获取topic当前消息数

通过GetShellOffset类帮助用户实时计算特定topic总的消息数

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -2
  • 1
  • 2
  • 3

–time -1表示要获取指定topic所有分区当前的最大位移;–time -2表示获取当前最早位移。将两个命令结果相减可获取所有分区当前的消息总数。–time -1表示的是历史上该topic产生的最大消息数,但topic的数据可能会被移除一部分,因此只有–time -1的结果减去–time -2的结果才是当前topic 总的 消息数

查询__consumer_offsets

新版本consumer位移保存在Kafka内部topic __consumer_offsets中,可以通过kafka-simple-consumer-shell.sh脚本来查看该topic的内容。在查询之前我们需要确认消费组位移信息保存在__consumer__offsets的那个分区上。计算方法为计算消费topic的消费组即group ID的哈希值,然后使用此哈希值对__consumer_offset的分区数取模所得值即为消费者组消费位移在__consumer__offsets上的存储分区。用户可以通过broker端参数offsets.topic.num.partitions 设置__consumer__offsets的分区数,默认为50

在确定目标分区后,可以使用kafka-simple-consumer-shell脚本查询消费者组的位移信息

bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 12 --broker-list localhost:9092 
--fromatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"
  • 1
  • 2

API方式管理集群

服务端API管理topic

Kafka提供两个脚本来管理topic,包括topic的增删改查。kafka-topics.sh负责topic的创建与删除,kafka-configs.sh负责topic参数的修改和查询,如果需要使用API对topic进行操作,可以使用服务端API,在编写程序时添加如下Kafka客户端依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency> 
  • 1
  • 2
  • 3
  • 4
  • 5

添加客户端依赖后,便可以使用客户端提供的API。Admin接口定义了管理Kafka的众多方法,一下展示使用Admin实现topic的创建

//创建属性对象
Properties properties = new Properties();
//设置broker的地址,可以支持多个,这样当某些broker挂掉还可以使用其它broker。必须指定
properties.put("bootstrap.servers", "172.23.16.84:9092");
AdminClient adminClient = AdminClient.create(properties);
NewTopic myTopic = new NewTopic("my-topic", 3, (short) 3);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(myTopic));
adminClient.close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在创建topic之前首先需要创建AdminClient,通过AdminClient提供的静态方法create创建AdminClient,在创建时指定通过Properties指定Kafka集群的地址。之后创建NewTopic指定topic名称、分区数、副本数信息,调用adminClient的createTopics方法完成topic创建。完成创建topic之后调用adminClient的close方法完成资源释放。如果需要复用此连接,可以不关闭

删除topic与创建类似

//创建属性对象
Properties properties = new Properties();
//设置broker的地址,可以支持多个,这样当某些broker挂掉还可以使用其它broker。必须指定
properties.put("bootstrap.servers", "172.23.16.84:9092");
AdminClient adminClient = AdminClient.create(properties);
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("my-topic"));
adminClient.close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

使用API查询topic级别属性,其他类型如broker等属性查询类似

//创建属性对象
Properties properties = new Properties();
//设置broker的地址,可以支持多个,这样当某些broker挂掉还可以使用其它broker。必须指定
properties.put("bootstrap.servers", "172.23.16.84:9092");
AdminClient adminClient = AdminClient.create(properties);
DescribeConfigsResult describeConfigsResult =
        adminClient.describeConfigs(Arrays.asList(new ConfigResource(ConfigResource.Type.TOPIC, "my-topic")));
Map<ConfigResource, KafkaFuture<Config>> configMap = describeConfigsResult.values();
configMap.forEach(((configResource, configKafkaFuture) -> {
    try {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("type: " + configResource.type() + " name: " + configResource.name());
        }
        Config config = configKafkaFuture.get();
        config.entries().forEach(configEntry -> {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("name: " + configEntry.name() + " value: " + configEntry.value());
            }
        });
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}));
adminClient.close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

变更各种类型资源配置属性如下

//创建属性对象
Properties properties = new Properties();
//设置broker的地址,可以支持多个,这样当某些broker挂掉还可以使用其它broker。必须指定
properties.put("bootstrap.servers", "172.23.16.84:9092");
AdminClient adminClient = AdminClient.create(properties);

Map<ConfigResource, Collection<AlterConfigOp>> configMap = new HashMap<>();
// 创建需要修改参数的资源及名称
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "my-topic");

// 设置需要修改、删除、追加(集合)、清除(集合)参数
List<AlterConfigOp> alterConfigOps = new ArrayList<>();
AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("compression.type", CompressionType.LZ4.name)
        , AlterConfigOp.OpType.SET);
alterConfigOps.add(alterConfigOp);

configMap.put(configResource, alterConfigOps);

AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMap);
adminClient.close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

服务器端API管理位移

这里所说的位移指consumer的位移信息,下面将介绍通过API管理位移信息。查询当前集群下所有consumer group信息

try {
    //创建属性对象
    Properties properties = new Properties();
    //设置broker的地址,可以支持多个,这样当某些broker挂掉还可以使用其它broker。必须指定
    properties.put("bootstrap.servers", "172.23.16.84:9092");
    AdminClient adminClient = AdminClient.create(properties);

    ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups();
    KafkaFuture<Collection<ConsumerGroupListing>> kafkaFuture = listConsumerGroupsResult.all();
    Collection<ConsumerGroupListing> consumerGroupListings = kafkaFuture.get();

    consumerGroupListings.forEach(consumerGroupListing -> {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("groupId: " + consumerGroupListing.groupId());
        }
    });
} catch (Exception e) {
    throw new RuntimeException(e);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

查找特定topic位移信息

public void listConsumerGroupOffsets() {
    try {
        //创建属性对象
        Properties properties = new Properties();
        //设置broker的地址,可以支持多个,这样当某些broker挂掉还可以使用其它broker。必须指定
        properties.put("bootstrap.servers", "172.23.16.84:9092");
        AdminClient adminClient = AdminClient.create(properties);

        ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets("groupId");
        KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> kafkaFuture =
                listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata();
        Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = kafkaFuture.get();
        topicPartitionOffsetAndMetadataMap.forEach((topicPartition, offsetAndMetadata) -> {
            LOGGER.info("topic: " + topicPartition.topic() + " patition: " + topicPartition.partition());

            LOGGER.info(" offset: " + offsetAndMetadata.offset() + " metadata: " + offsetAndMetadata.metadata());
        });
        adminClient.close();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

MirrorMaker

概要介绍

对于Kafka企业级用户来说,一个痛点就是跨机房或跨数据中心的数据传输。大型企业通常在多个数据中心部署Kafka集群,这里的数据中心可能是企业拥有的 自建机房,也可能是公有云厂商的不同机房,在多个机房部署Kafka集群有如下的优势

  • 实现灾备

  • 较近的地理位置可缩短延时及用户响应时间

  • 实现负载均衡,即每个数据中心上的集群可能只保存部分数据集合

  • 区别隔离不同优先级的数据处理

在这里插入图片描述
跨机房部署方案有一个共同的特点:数据需要能够从一个Kafka集群被拷贝到另一个集群,必须支持双向拷贝,某次传输的源集群可能是下次传输的目标集群。为此Kafka默认提供一个工具MirrorMaker,用来帮助用户实现数据在两个集群间拷贝。MirrorMaker仅仅是一个consumer+producer的混合体,对于源集群而言,它是一个consumer,对于目标集群而言,它是一个producer。MirrorMaker读取源集群指定topic的数据,然后写入目标集群中的同名topic下。用户可以运行多个MirrorMaker实例增加整体数据拷贝的吞吐量,同时还提升了容错性,当一个实例崩溃后,其他实例能够自动地承担起它的负载

实际生产环境中,源集群和目标集群完全独立的两套环境,它们上的topic可能设置了不同的分区数且有不同的位移值,因此MirrorMaker工具并不能完美实现容错性,因为consumer的位移值可能是不同的。不过MirrorMaker依然会保存并使用消息的key来执行分区任务

主要参数

MirrorMaker的脚本名为kafka-mirror-maker.sh,位于Kafka安装目录的/bin子目录下。MirrorMaker脚本参数及含义

参数名参数含义
–whitelist指定一个正则表达式,指定拷贝源集群的那些topic,比如a|b表示拷贝源集群上两个topic的数据a和b。使用新版本consumer必须指定该参数
–blacklist指定一个正则表达式,屏蔽指定topic的拷贝,该参数只适用于老版本consumer
–abort.on.send.failure若设置为true,当发送失败时则关闭MirrorMaker
–consumer.config指定MirrorMaker下consumer的属性文件,至少要在文件中指定bootstrap.servers
–producer.config指定MirrorMaker下producer的属性文件
–consumer.rebalance.listener指定MirrorMaker使用的consumer rebalance监听器类
–rebalance.listener.args指定MirrorMaker使用的consumer rebalance监听的参数,与consumer.rebalance.listener一同使用
–message.handler指定消息处理类。消息处理器在consumer获取消息与producer发送消息之间调用
–message.handler.args指定消息处理类的参数,与message.handler一同使用
–num.streams指定MirrorMaker线程数。默认是1,即启动一个线程执行数据拷贝
–offset.commit.interval.ms设置MirrorMaker位移提交间隔,默认1分种
–help打印帮助信息

实际使用中,被经常使用的参数是whitelist、consumer.config和producer.config.如果需要实现某些特定的拷贝逻辑,比如拷贝指定分区数据或选择性拷贝数据等,就需要实现特定的消息处理器并使用message.handler进行指定

典型的命令执行如下。该命令读取由consumer.properties文件指定的源Kafka集群,去读取名为topicA和topicB主题的消息,并写入到由producer.properties文件指定的目标集群

bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist 
topicA|topicB
  • 1
  • 2

使用实例

一个具体的实例,展示如何使用MirrorMaker工具在集群间进行数据拷贝。搭建3个Kafka集群,分别用k1、k2、k3表示。为简便起见都采用单节点部署,三个集群端口号分别为9092、9093和9094,在ZooKeeper上chroot分别是/k1、/k2和/k3

使用MirrorMaker将在k1上生产的消息,拷贝到k2和k3上,最后k3上运行consumer以验证消息是否被拷贝成功

我们在k1上创建一个测试topic

bin/kafka-topic.sh --create --zookeeper localhost:2181/k1 --topic test --partitions 1 --replication-factor 1
  • 1

创建两个consumer.config文件mm-consumer-1st.config和mm-consumer-2nd.config

bootstrap.servers=localhost:9092
client.id=mm1.k1Andk2
group.id=mm1.k1Andk2.consumer
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

bootstrap.servers=localhost:9093
client.id=mm1.k2Andk3
group.id=mm1.k2Andk3.consumer
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

之后创建两个对象的producer.config文件mm-producer-1st.config和mm-producer-2nd.config

bootstrap.servers=localhost:9093
client.id=mm1.k1Andk2

bootstrap.servers=localhost:9094
client.id=mm1.k2Andk3
  • 1
  • 2
  • 3
  • 4
  • 5

分别启动两个MirrorMaker

bin/kafka-mirror-maker.sh --consumer.config mm-consumer-1st.config --producer.config mm-producer-1st.config 
--whitelist test

bin/kafka-mirror-maker.sh --consumer.config mm-consumer-2nd.config --producer.config mm-producer-2nd.config 
--whitelist test
  • 1
  • 2
  • 3
  • 4
  • 5

以上操作便完成了集群间消息的拷贝。用户可以启动一个生产端向k1发送消息,再在k3启动一个消费者完成消息消费。以验证上述操作是否生效

Kafka安全

目前Kafka包括的安全特性如下

  • 连接认证机制,包含服务端与客户端(生产者/消费者)连接、服务器间连接以及服务器与工具间连接。支持的认证机制包括SSL(TLS)或SASL

  • 服务器与ZooKeeper之间连接的认证机制

  • 基于SSL的连接通道数据传输加密

  • 客户端读/写授权

  • 支持可插拔的授权服务和与外部授权服务的集成

上面提到了认证和授权。认证是证明你是谁的过程,在访问Kafka服务时必须显式提供身份信息证明你的身份是合法的。授权则是证明你能访问哪些服务的过程。

SASL + ACL

Kafka安全主要包含三大功能:认证、信道加密、授权,其中的认证机制主要是配置SASL,而授权是通过ACL接口命令完成

在生产环境中,用户配置SASL,通常会配置Kerberos,但对于小体量的应用,并且运行于内网环境中,使用基于明文传输的SASL集群环境足以应付。下面介绍在不使用Kerberos情况下配置SASL + ACL构建Kafka集群

要开启SASL和ACL机制,需要在broker进行两个方面的设置。首先创建包含所有认证信息的JAAS文件。假设Kafka集群中有3个用户admin、reader、writer,其中admin是集群管理员,reader用户负责读取Kafka集群中的topic数据,writer用户负责向Kafka集群写入消息。则我们可以将JAAS文件设置如下

KafkaServer{
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_reader="reader"
    user_writer="writer";
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

如果不写JAAS文件将被视为无效。现在保存文件为jaas.conf,之后我们需要把这个文件的完整路径作为一个JVM参数传递给Kafka的启动脚本。由于bin/kafka-server.sh只接收server.properties的位置,不接收任何其它参数,故需要修改Kafka启动脚本

cd kafka_2.12-1.0.0/
# 备份一份新的启动脚本,并命名为secured-kafka-servrer-start.sh
cp bin/kafka-server-start.sh bin/secured-kafka-server-start.sh
  • 1
  • 2
  • 3

修改新启动脚本

# 把文件中的这一行
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

# 修改为下面这行,然后保存退出
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=<你的路径>/jaas.conf kafka.Kafka "$@"
  • 1
  • 2
  • 3
  • 4
  • 5

做完上面的步骤,我们在bin/目录下做好了一份新的Kafka启动脚本,下面修改broker启动所需要的server.properties文件,至少需要配置以下参数

# 配置ACL入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 本例使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 设置本例中admin为超级用户
super.users=User:admin
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

启动broker服务器

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/secured-kafka-server-start.sh config/server.properties
  • 1
  • 2

如果发现unable to find LoginModule class:*****之类的报错,则需要检查jaas.conf是否存在非法字符

此时broker端的认证已经开启且授权ACL接口也建立起来。下面开始为客户端开启认证。在之前我们需要创建一个test topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1
  • 1

以上创建topic命令能正常执行成功,因为Kafka-topic.sh脚本是直接连接ZooKeeper,完全绕过ACL审查机制,不受ACL限制。因此无论是否配置ACL,用户都能使用kafka-topics来管理topic,因此在实际使用过程中,最好能对连接ZooKeeper的用户也增加认证机制

关于ACL还有一个参数配置必须要提一下,在server.properties中设置allow.everyone.if.no.acl.found=true,整个ACL机制将变为黑名单机制,即只有在黑名单中的用户才无法访问资源。非黑名单用户可以畅通无阻地访问任何Kafka资源。默认此参数为false,ACL为白名单机制,只有在白名单中的用户才能访问设定的资源,其它任何用户都属于未授权用户

我们下面开始,配置使用write用户发送消息。首先我们需要创建一个属于用户write的JAAS文件,该文件中指定用户write的连接信息

KafkaClient{
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="writer"
   password="writer";
};
  • 1
  • 2
  • 3
  • 4
  • 5

我们把上述内容保存为writer_jaas.conf文件。我们需要拷贝一份新的bin/kafka-console-producer.sh脚本,并将该JAAS文件作为一个JVM参数

cp bin/kafka-console-producer.sh bin/writer-kafka-console-producer.sh

vi bin/writer-kafka-console-producer.sh

# 把该文件中的这一行
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

# 修改为下面这一行
exec @(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=<你的路径>/writer_jaas.conf kafka.tools.
ConsoleProducer "$@"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

以上配置使生产端可以通过认证,但并不能通过授权。我们还需要配置ACL来让用户writer有权限写入topic

在设计具体ACL规则之前,需要先了解一下Kafka ACL的格式。Kafka中一条ACL的格式为"Principal P is [Allowed/Denied] Operation O From Host H On Resource R",含义描述如下

  • principal:表示一个Kafka user

  • operation:表示一个具体操作类型,如WRITE、READ、DESCRIBE等。完整操作列表

  • Host:表示连向Kafka集群的client的IP地址,如果是"*"则表示所有IP

  • Resource:表示一种Kafka资源类型,当前共有4种类型TOPIC、CLUSTER、GROUP和TRANSACTIONID

我们需要为writer用户赋予对应topic的写入权限,执行如下命令完成创建对应的ACL规则

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.
connect=localhost:2181 --add --allow-principal User:writer --operation Write --topic test
  • 1
  • 2

在配置好SASL和ACL后,我们用新的生产端发送消息

bin/wtiter-kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer-property security.
protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
  • 1
  • 2

与生产者类似,首先创建reader用户的JAAS文件reader_jaas.conf

KafkaClient{
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="reader"
    password="reader"
}
  • 1
  • 2
  • 3
  • 4
  • 5

拷贝新的console consumer脚本来指定上面的reader_jaas.conf

cp bin/kafka-console-consumer.sh bin/reader-kafka-console-consumer.sh

vi bin/reader-kafka-console-consumer.sh

# 把该文件中的这行
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

# 修改为下面行
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=<你的路径>/reader_jaas.conf kafka.tools.
ConsoleConsumer "$@"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

然后创建一个consumer.config为该console producer指定以下3个属性

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
gorup.id=test-group
  • 1
  • 2
  • 3

为reader用户分配读取topic权限,创建如下ACL规则

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.
connect=localhost:2181 --add --allow-principal User:reader --operation Read --topic testc
  • 1
  • 2

为reader用户分配消费者组的读取权限,创建如下ACL规则

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.
connect=localhost:2181 --add --allow-principal User:reader --operation Read --group test-group
  • 1
  • 2

运行新的客户端实现消费消息

bin/reader-kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.
config consumer.config
  • 1
  • 2

我们使用admin用户查询消费者组的消费状态。为admin用户创建对应的JAAS文件admin_jaas.conf

KafkaClient{
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
}
  • 1
  • 2
  • 3
  • 4
  • 5

拷贝一份新的kafka-consumer-groups.sh脚本

cp bin/kafka-consumer-groups.sh bin/admin-kafka-consumer-groups.sh
vi bin/admin-kafka-consumer-groups.sh

# 把该文件中的这一行
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"

# 修改为下面这行,然后保存
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=<你的路径>/admin_jaas.conf kafka.admin.
ConsumerGroupCommand "$@"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

也需要设置以下两个参数,保存到admin_sasl.config文件中

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
  • 1
  • 2

运行admin-kafka-consumer-group.sh脚本查询消费者组消费进度

bin/kafka-admin-consumer-group.sh --bootstrap-server localhost:9092 --group test-group --describe --command-config 
admin_jaas.config
  • 1
  • 2

SSL加密

SSL信道加密在很多生产环境是一个必选项,为避免在Kafka中明文传输敏感的公司业务数据,开启SSL显得至关重要,SSL配置流程如下

在这里插入图片描述

前7步可以编写一个setup_ssl_for_server.sh的脚本完成。实际配置步骤就变为了

  • 运行setup_ssl_for_servers.sh脚本

  • 配置broker的server.properties

  • 配置clients的特定属性

我们先看一下setup_ssl_for_servers.sh脚本实际内容

  1. 设置环境变量
#! /bin/bash
echo "1. 设置环境变量"
##########################设置环境变量########################
BASE_DIR=/mnt/disk/kafka_ssl # SSL各种生成文件的基础路径
CERT_OUTPUT_PATH="$BASE_DIR/certificates" # 证书文件生成路径
PASSWORD=kafka1234567 # 密码
KEY_STORE="$CERT_OUTPUT_PATH/kafka.keystore" # Kafka keystore文件路径
TRUST_STORE="$CERT_OUTPUT_PATH/kafka.truststore" # Kafka truststore文件路径
KEY_PASSWORD=$PASSWORD # keystore的key密码
STORE_PASSWORD=$PASSWORD # keystore的key密码
TRUST_KEY_PASSWORD=$PASSWORD # truststore的key密码
TRUST_STORE_PASSWORD=$PASSWORD # truststore的store密码
CLUSTER_NAME=test-cluster # 指定别名
CERT_AUTH_FILE="$CERT_OUTPUT_PATH/ca-cert" # CA证书文件路径
CLUSTER_CERT_FILE="$CERT_OUTPUT_PATH/${CLUSTER_NAME}-cert" # 集群证书文件路径
DAYS_VALID=365 # key有效期
D_NAME="CN=Xi Hu, OU=YourDept, O=YourCompany, L=Beiging, ST=Beijing, C=CN" # distingguished name
#############################################################
mkdir -p $CERT_OUTPUT_PATH
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  1. 创建集群证书到keystore
echo "2. 创建集群证书到keystore"
keytool -keystore $KEY_STORE -alias $CLUSTER_NAME -validity $DAYS_VALID -genkey -keyalg RSA -storepass 
$STORE_PASSWORD -keypass $KEY_PASSWORD -dname "$DNAME"
  • 1
  • 2
  • 3
  1. 创建CA
echo "3. 创建CA"
openssl req -new -x509 -keyout $CERT_OUTPUT_PATH/ca-key -out "$CERT_AUTH_FILE" -days "$DAYS_VALID" -passin 
pass:"$PASSWORD" -passout pass:"$PASSWORD" -subj "/C=CN/ST=Beijing/L=Beiging/O=YourCompany/CN=Xi Hu"
  • 1
  • 2
  • 3
  1. 导入CA文件到truststore
echo "4. 导入CA文件到truststore"
keytool -keystore "$TRUST_STORE" -alias CARoot -import -file "$CERT_AUTH_FILE" -storepass "$TRUST_STORE_PASSWORD" 
-keypass "$TRUST_KEY_PASS" -noprompt
  • 1
  • 2
  • 3
  1. 从keystore中导出集群证书
echo "5. 从key store中导入集群证书"
keytool -keystore "$KEY_STORE -alias "$CLUSTER_NAME" -certreq -file "$CLUSTER_CERT_FILE" -storepass 
"$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt
  • 1
  • 2
  • 3
  1. 签发证书
echo "6. 签发证书"
openssl x509 -req -CA "$CERT_AUTH_FILE" -CAkey $CERT_OUTPUT_PATH/ca-key -in "$CLUSTER_CERT_FILE" -out "$
{CLUSTER_CERT_FILE}-signed" -days "$DAYS_VALID" -CAcreateserial -passin pass:"$PASSWORD"
  • 1
  • 2
  • 3
  1. 导出CA文件到keystore
echo "7. 导入CA文件到keystore"
keytool -keystore "$KEY_STORE" -alias CARoot -import -file "$CERT_AUTH_FILE" -storepass "$STORE_PASSWORD" -keypass 
"$KEY_PASSWORD" -noprompt
  • 1
  • 2
  • 3
  1. 导入已签发证书到keystore
echo "8. 导入已签发证书到keystore"
keytool -keystore "$KEY_STORE" -alias "${CLUSTER_NAME}" -import -file "${CLUSTER_CERT_FILE}-signed" -storepass 
"$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt
  • 1
  • 2
  • 3

执行上述脚本后,到对应目录查看生成文件列表

  • ca-cert:CA文件,不要把该文件拷贝到别的broker机器上

  • test-cluster-cert-signed:CA已签发的Kafka证书文件,不要把该文件拷贝到别的broker机器上

  • test-cluster-cer:Kafka认证文件(包含公钥和私钥),不要把该文件拷贝到别的broker机器上

  • kafka.keystore:Kafka的keystore文件,所有clients端和broker机器上都需要

  • kafka.truststore:Kafka的truststore文件,所有clients端和broker机器上都需要。

脚本执行成功后,我们需要对相应的Kafka参数进行设置,与SSL相关的部分如下

  • listeners=PLAINTEXT://:9092,SSL://:9093 # 为Kafka broker配置两个listeners,一个是明文传输,另一个使用SSL加密进行数据传输。连接
    9093端口则会进行SSL加密

  • advertised.listeners=PLAINTEXT://公网IP:9092,SSL://公网IP:9093 # 如果是云上环境,即clients通过公网去连接broker,那么advertiesd.listeners
    就必须配置成所在机器的公网IP

  • ssl.keystore.location=/mnt/disk/aim/certificates/kafka.keystore # 提供SSL keystore的文件

  • ssl.keystore.password=kafka123456 # 提供keystore密码

  • ssl.truststore.location=/mnt/disk/aim/certificates/kafka.truststore # 提供SSL truststore的文件

  • ssl.truststore.password=kafka123456 # 提供truststore密码

  • ssl.key.password=kafka123456 # keystore中的私钥密码

  • ssl.client.auth=required # 设置clients也要开启认证

配置好broker后,就可以尝试启动broker,只有运行下面命令创建topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 1 --replication-factor 1
  • 1

至此broker端的SSL配置便完成了。为让客户端程序使用SSL信道给broker端发送请求,我们需要配置客户端程序

首先我们创建一个producer.config文件

bootstrap.servers=kafka1:9093 # 指定9093端口,即使用SSL监听器端口
security.protocol=SSL
ssl.truststore.location=<你的路径>/kafka.truststore # 指定truststore文件
ssl.truststore.password=kafka123456
ssl.keystore.password=kafka123456
ssl.keystore.location=<你的路径>/kafka.keystore # 指定keystore文件
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

然后运行脚本开始发送消息

bin/kafka-console-producer.sh --broker-list kafka1:9093 --topic test --producer.config producer.config
  • 1

此生产者连接端口是9093,同理消费者我们也需要创建一个consumer.config文件

security.protocol=SSL
group.id=test-group
ssl.truststore.location=<你的路径>/kafka.truststore # 指定truststore文件
ssl.truststore.password=kafka123456
ssl.keystore.password=kafka123456
ssl.keystore.location=<你的路径>/kafka.keystore # keystore文件
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

创建消费者验证

bin/kafka-console-consumer.sh --bootstrap-server kafka1:9093 --topic test --from-beginning --consumer.config 
consumer.config
  • 1
  • 2

这样客户端与服务端通信皆使用SSL进行加密。开启SSL后Kafka的吞吐量,特别是客户端通常有10%~40%下降,用户需要权衡SSL与新能之间的收益

常见问题

UnknownTopicOrPartitionException

org.apache.kafka.common.errors.UnknownTopicOrPartitionException: this server does not host this topic-partition
  • 1

该异常表示请求分区数据不在抛出该异常的broker上,kafka抛出该异常的原因通常有下面三种

  • follower副本所在的broker在另一个broker成为leader之前率先完成了成为follower的操作,使得follower从leader拉取数据时发现leader broker上还未准备好数据,抛出此错误,该问题在下一轮PRC中会恢复

  • producer向不存在的topic发送数据时,broker会封装该异常返回给producer。不过该异常属于可重试类异常,打开"自动创建topic开关"的前提下,producer端会自动创建topic,重试发送请求。如果用户持续在producer端看到该异常,可检查auto.create.topics.enable参数,也可适当增大retries值

  • 当启动ACL后,Kafka对于未授权操作中的topic一律返回该异常,主要是基于不对外暴露topic是否存在的考虑

此异常属于可重试异常,能自行修复,偶尔在日志中看到也不必过于在意,但如果持续观测到该异常则需要进行相应处理。若broker端抛出,则检查对应的topic分区部署情况;若clients端抛出,则检查cloents端是否向不存在topic发送请求

LEADER_NOT_AVAILABLE

LEADER_NOT_AVAILABLE: There is not leader for this topic-partition as we are in the middle of a leadership election
  • 1

由抛错信息可知对应分区当前没有leader。没有leader的原因很多,比如正在进行新leader的选举,或是该topic正在被删除,使得leader broker是-1,即无leader。实际环境中,该异常都是瞬时出现,比如当producer向一个不存在的topic发送消息时,若auto.create.topic.enable=true,broker会自动创建
该topic,然后向clients端返回LEADER_NOT_AVAILABLE异常,显示告知producer要重新请求最新的元数据,因此在producer端会瞬时抛出LEADER_NOT_AVAILABLE警告

若用户持续发现该异常,则需要使用kafka-topics脚本检查分区的leader信息。如果leader为-1,可考虑使用kafka-preferred-replica-election.sh手动调整leader选举。依然不好用,则检查controller broker的存活情况

NotLeaderForPartitionException

NotLeaderForPartitionException: This server is not the leader for that topic-partition
  • 1

该异常主要是指当前broker已不是对应分区的leader broker,通常发送在leader变更的情况下。当leader从一个broker切换到另一个broker时,原有的clients或follower依然可能向老的leader请求数据。在这种情况下Kafka会抛出异常

该异常应该是瞬时的,如果持续观测到则需要检查broker端、clients端的日志来进一步定位问题

TimeoutExcption

TimeoutException: ...
  • 1

此异常表示请求超时,请求超时可以是各种类型的请求,如请求元数据信息或生产消息请求。若遇到该异常需要定位该异常抛出的地方,比如producer端、broker端、还是consumer端。哪里抛出的异常就增加哪里的request.timeout.ms参数值。但若依然不管用,则需要考虑用户环境中broker或clients是否负载过重,导致任务堆积不被处理,这就可能需要从架构方面进行改造

RecordTooLargeException

实际生产环境该异常多发生在producer端,通常是因为producer应用的后台发送线程无法匹配用户主线程的消息创建速率。有两种解决方法

  • 在出现该异常后尽量避免共享相同的producer实例

  • 适当增加request.timeout.ms以及适当减少batch.size

当producer端无法从Kafka集群获取元数据时,也会抛出这个异常,特别对于那些未正确配置连接信息的producer而言。此时用户需要仔细检查bootstrap.server连接设置,查看是否存在无法连接的情况。如果Kafka broker搭建在云环境主机上,通常需要设置broker端参数advertised.listeners

当前若要让Kafka集群处理大消息,总共有3个参数需要调整

  • broker端参数message.max.bytes:设置broker端能处理的最大消息长度

  • producer端参数max.request.size:设置producer端能处理的最大消息长度

  • consumer端参数fetch.max.bytes:设置consumer端能处理的最大消息长度

  • broker端参数socket.request.max.bytes:设置broker端socker请求的最大字节数。通常不需要额外配置该参数,但如果向Kafka发送超过100MB超大消息,则需要调整该参数

除socket.request.max.bytes默认值是100MB,其它几个参数值大约在1MB左右,对于生产环境的大消息而言,用户需要同时调整上面这些参数

NetworkException

NetworkException:The server disconnected before a response was received
  • 1

此异常是producer端抛出,主要是因为producer在工作过程中断开了与某些broker的连接,从而使得发送到这些broker的PRODUCE请求失败。NetworkException是可重试异常,通常情况下是瞬时出现。但若用户在producer端持续观测到该异常,则需要检查producer与对应broker节点的连通性及broker节点的存活情况

ILLEGAL_GENERATION

ILLEGAL_GENERATION:Specified group generation id is not avlid
  • 1

这是consumer抛出的异常,表示当前consumer错过了consumer group正在进行的rebalance,原因是该consumer花费大量时间处理poll()返回的数据。用户需要适当减少max.poll.records值以及增加max.poll.interval.ms值。也可以优化消息处理的逻辑,比如将poll回来的消息放入单独的线程进行处理

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/690450
推荐阅读
相关标签
  

闽ICP备14008679号