赞
踩
现场业务由于多厂商集成,共享数据需要,需对接当前kafka集群,为做到类似租户隔离的功能,需要开启kafka的权限控制和动态用户管理功能,实现不同厂商访问被授权的合法资源,消费者账号只能消费数据,生产者账号只能生产数据。
文献参考:Kafka中文文档;stackoverflow.。
☛ kafka权限分类:
❥ 身份认证(Authentication):是对client 与服务器的连接进行身份认证,brokers和zookeeper之间的连接进行Authentication(producer 和 consumer)、其他 brokers、tools与 brokers 之间连接的认证。
❥ 权限控制(Authorization):实现对于消息级别的权限控制,clients的读写操作进行Authorization:(生产/消费/资源group/Topic)数据访问权限。
自0.9.0.0版本开始,Kafka社区添加了许多功能用于提高Kafka群集的安全性,Kafka提供SSL或者SASL两种安全策略。SSL方式主要是通过CA令牌实现Kafka ,Kafka有**三种认证模式**:Kerberos(需要独立部署验证服务)、Plain【简单帐号密码模式】、SSL 模式,官方参考点击跳转;这里我们主要了解下Plain模式,Plain模式,即 简单帐号密码模式。它又分为 SASL/SCRAM 和 SASL/PLAIN 两种方式。区别是:PLAIN认证不能动态新增用户,每次添加用户后,需要重启正在运行的Kafka集群才能生效。这明显在生产环境,这种认证方式时不符合实际业务场景的。而SCRAM不一样,使用SCRAM认证,可以动态新增用户,添加用户后,可以不用重启正在运行的Kafka集群即可进行鉴权。SASL/SCRAM 可以在 Kafka 服务启动之后,动态的新增用户分并配权限,在业务变动频繁,开发人员多的情况下比 SASL/PLAIN 方法更加灵活。SASL全称:Simple Authentication Security Layer, 用于安全认证,又分为: GSSAPI 、Kerberos、NTLM。
另外还涉及到一个认证文件,即JAAS,全称:Java Authentication and Authorization Service,Java 提供的安全认证服务,Kafka 使用 SASL 协议时,通过 JAAS 实现认证,需要配置 jaas 认证文件。
Salted Challenge Response Authentication Mechanism (SCRAM) is a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username/password authentication like PLAIN and DIGEST-MD5. The mechanism is defined in RFC 5802. Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512 which can be used with TLS to perform secure authentication. Salted质询-响应身份验证机制(SCRAM)是一个萨斯尔机制家族,它解决了执行用户名/密码身份验证的传统机制(如平原和消化-MD5)的安全问题。该机制在射频5802中进行了定义。卡夫卡支持紧急停堆-SHA-256和紧急停堆-SHA-512它们可以与TLS公司一起使用来执行安全身份验证。
SASL三种实现机制:
Kerberos(SASL/GSSAPI - starting at version 0.9.0.0)
PLAIN(SASL/PLAIN - starting at version 0.10.0.0)
SCRAM(SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 - starting at version 0.10.2.0)
最后修改kafka配置文件 server.properties ,在尾部添加认证协议配置:
listeners=SASL_PLAINTEXT://:9092 # 指定采用sasl协议,即采用认证访问kafka,默认无安全认证
allow.everyone.if.no.acl.found=true #false的话就只能超级用户才能访问资源,true的话其他也可以,如果用户读写一个 Topic,但是没有配置 ACL 权限,客户端会报认证失败错误。
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer #
security.inter.broker.protocol=SASL_PLAINTEXT #kafka间通信所使用的安全协议
sasl.mechanism.inter.broker.protocol=PLAIN #Kafka 服务器的 SASL 安全认证协议类型
sasl.enabled.mechanisms=PLAIN
super.users=User:admin #超级管理月用户名
另外还需在 config 目录下创建一个 jaas 配置文件 kafka-server-jaas-plain.conf ,使用简单帐号密码方式,配置内容如下:
KafkaServer { ##指定 KafkaServer 端的配置 org.apache.kafka.common.security.plain.PlainLoginModule required #安全认证类名为 PlainLoginModule,需与 Kafka 属性文件中的协议类型一致 username="admin" # kafka之间内部通信的用户名和密码 password="admin" user_admin="admin" #不受鉴权控制,管理员 user_test1="test1" # 预设普通帐号认证信息,user_真正的用户名=''密码",用于 Client 连接所使用的用户名和密码 user_test2="test2" user_test3="test3" user_test4="test4"; #必须以分号 ; 结尾 }; #也能使用org.apache.kafka.common.security.scram.ScramLoginModule KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="XXXX" #用于Zookeeper 中注册的账号密码,用于 broker 与 zk 的通信(若 zk 没有配置 SASL 可以忽略、若 zookeeper.sasl.client 为 false 也可忽略 password="XXXX"; };
Kafka 版本为 kafka_2.13-2.8.0 ,只对 Kafka 服务做安全认证,认证信息存储在zk集群里,访问zk集群来鉴权,zookeeper有一个内存数据库+定期刷盘永久存储保障元数据可靠;Broker与Zookeeper 之间访问使用默认的,即不做安全认证。关键配置列表:
Kafka 服务配置文件 server.propertis,配置认证协议及认证实现类;
Kafka 的 jaas.config 认证配置文件,登录类,超管密码和管理的帐号密码列表;
Kafka 服务启动脚本 kafka-start-server.sh,设置安全认证环境变量;
Kafka 客户端连接配置认证属性。
传统的消息传递模型分为两类:
1)共享消息队列模式(Shared Message Queue)
上图中的共享消息队列模式允许来自producer的消息流只能到达单个消费者。推送到队列的每条消息只能读取一次,并且只能由一个consumer读取。consumer从他们之间共享的队列的末尾拉取消息。然后共享队列就从队列中删除被成功拉取的消息,即一旦一条消息被一个消费者拉取,这条信息就会从队列中删除。这种消息队列更适合命令式编程,虽然多个消费者可以连接到共享队列,但他们必须都属于同一个逻辑域并执行相同的功能。因此,共享消息队列中处理的可扩展性受限于单个消费域。
2)发布订阅模式(Publish-Subscribe)
发布订阅模型允许多个生产者将消息发布到代理托管的topic,这些topic可以被多个消费者订阅。因此,一条消息实际上是被广播给一个主题的所有订阅者。发布者与订阅者的逻辑分离允许松耦合的架构,但规模有限。可扩展性是有限的,因为每个消费者者必须订阅每个分区才能访问来自所有分区的消息。因此,这种 pub-sub 模型适用于小型网络,但不稳定性随着节点的增长而增加。
另解耦的副作用还体现在消息传递的不可靠性上。由于每条消息都广播给所有订阅者,因此很难缩放流的处理,因为消费者彼此不同步。
kafka可结合共享消息队列模式和发布-订阅模型的特点。它通过使用消费者组(consumer grup)+broker保留信息实现;当消费者加入一个组并订阅一个主题时,该组中只有一个消费者实际消费了该主题的每条消息。但与传统消息队列不同的是,消息将由broker保留其在所属主题分区中。
消费者组使 Kafka 可以灵活地同时拥有消息队列和发布订阅模型的优势。属于同一个消费者组的 Kafka 消费者共享一个组 ID。然后,组中的消费者通过确定每个分区仅由组中的单个消费者使用来尽可能公平地划分主题分区。如果所有消费者都来自同一组,那么 Kafka 这时就像传统的消息队列一样,会对所有记录和处理进行负载平衡,每条消息将仅由该组的一个使用者使用。每个分区最多连接到一个组中的一个消费者。当存在多个消费组时,这时的数据消费模型的流程就与传统的发布订阅模型基本一致。消息将被广播到所有消费者组。对于只有一个消费者的消费群体,称为独占消费者,这样的消费者必须连接到它需要的所有分区。另再回顾下,一个消费组中可包含多个消费者,消费组与消费者是个一对多的关系。一个消费者只能属于一个消费组,每一个消费者,只能消费所分到的消息,每一个分区只能被一个消费组中的一个消费者所消费。
理想情况下,分区数等于消费者数。如果消费者数量更多,多余的消费者就会闲置,浪费客户端资源。如果分区数量更多,一些消费者将从多个分区读取,这应该不是问题,由于分区只能与消费者组中的消费者具有一对一或多对一的关系,因此避免了消费者组内的消息复制,因为给定的消息一次仅到达组中的一个消费者。
在kafka启动时,Broker被标记为消费者组子集的协调器,这些组从消费者接收RegisterConsumer 请求并返回包含他们应该拥有的分区列表的RegisterConsumer 响应。协调器还启动故障检测以检查消费者是活着还是死了。当消费者未能在会话超时之前向协调器代理发送心跳时,协调器将消费者标记为死亡,并设置重新平衡以发生。可以使用Kafka 服务的session.timeout.ms属性设置此会话时间段。该heartbeat.interval.ms属性使健康的消费者认识到再平衡的发生,以便重新发送RegisterConsumer向协调器请求。如下图所示,假设 消费A 组的使用者 C2 发生故障,C1 和 C3 将暂时暂停对来自其分区的消息的消费,并且这些分区将在它们之间重新分配。当消费者 C2 丢失时,会触发重新平衡过程,并将分区重新分配给组中的其他消费者。B 组消费者不受 A 组事件的影响。
其它概念回顾:同一 topic 下的不同partition中包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka 通过 offset保证消息在分区内的顺序,offset 的顺序不跨分区,即 kafka只保证在同一个分区内的消息是有序的。设置多个partition后,这些partition会接近均匀的分布在kafka各个节点之上。每一条消息发送到 broker 时,会根据 partition 的规则选择存储到哪一个 partition。如果 partition 规则设置合理,那么所有的消息会均匀的分布在不同的partition中。Consumer Group就是逻辑上的订阅者,每个Consumer都从属于一个特定的Consumer Group**,消息的单播和多播都是基于消费组** 来实现的,多个 “消费者组” 读可取同一个主题的消息,同一个 “消费者组” 内,每个 “消费者” 仅接收主题的一部分分区的消息,“消费者组” 增加 “消费者” 数量,能够实现消费能力的水平扩展,但 消费组中的消费者不是越多越好,消费者数量超过分区数量时,会导致消费者分配不到资源,造成资源浪费,“消费者” 数量上限是分区的主题数量。每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息,partition命名规则:<topic_name>-<partition_id>。Message是Kafka中最基本的数据单元,主要由key和value构成;真正有效的消息是value数据,key只作为消息路由分区使用,kafka根据key决定将当前消息存储在哪个分区。
同一时刻,一条消息只能被group中的一个消费者实例消费,一个topic下的每个partition只从属于group中的一个消费者,不可能出现group中的两个消费者消费同一个分区。为了提高消费端的消费能力,一般会通过多个consumer 去消费同一个 topic,消费者的数量不要大于分区,否则会造成资源浪费。消费者的数量最好控制为分区的数量,这样能为消费者合理分配分区。消费者消费消息时是按照分区内消息顺序进行消费,有序消费保证了消息是由消费者主动拉取的(pull),保证一个分区只能由一个消费者负责,kafka消费者自己可以控制读取消息的offset,消费者的offset是保存在zookeeper中的,通过kafka监控工具可以看到。
kafka使用分区将topic的消息打散存储到多个分区分布保存在不同的broker上,实现了producer和consumer消息处理的高吞吐量。Kafka的producer和consumer都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于producer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费。另每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件),但partition 并不是最终的存储粒度,partition 还可以细分为 segment,一个 partition 物理上由多个 segment 组成。Kafka 在 ZooKeeper 中为每个 Partition 维护一个AR(Assigned Replicas)列表,由 ISR(In-Sync Replicas,与 Leader 数据同步的 Replica)和 OSR(Outof-Sync Replicas,与 Leader 数据不同步的 Replica)组成。初始状态下,所有的 Replica 都在 ISR 中,但在 Kafka 工作的过程中,由于各种问题(网络、磁盘、内存)可能导致部分 Replica 的同步速度慢于参数replica.lag.time.max.ms
指定的阈值,一旦出现这种情况,这部分 Replica 会被移出 ISR,降级至 OSR 中。leader 将负责维护和跟踪一个 ISR(In-Sync Replicas)列表,即同步副本队列,这个列表里面的副本与 leader 保持同步,状态一致,只有 ISR 里的成员才能有被选为 leader 的可能(通过参数配置:unclean.leader.election.enable=false)。如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求。实际对于任意一条消息,只有它被 ISR 中的所有 follower 都从 leader 复制过去才会被认为已提交,并返回信息给 producer,从而保证可靠性。注意的是,Kafka ISR 列表中副本的数量不需要超过副本总数的一半,即不需要满足 “多数派” 原则,通常,ISR 列表副本数大于等于 2 即可。
我们可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 = Tt / max(Tp, Tc)
说明:Tp表示producer的吞吐量。测试producer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大, 因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。
关于Consumer消费Partition的分配策略,Kafka提供了两种分配策略: range和roundrobin,由参数partition.assignment.strategy
指定,默认是range策略。当以下事件发生时,Kafka 将会进行一次分区分配:
一个 Consumer Group 内新增消费者
消费者离开当前所属的Consumer Group,包括shuts down 或 crashes
订阅的主题新增分区
将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance),kafka consuemr 的 rebalance 机制规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic的每个分区。当出现以下几种情况时,kafka 会进行一次rebalance分区分配操作:
同一个 consumer group 内新增了消费者
消费者离开当前所属的 consumer group,比如主动停机或者宕机
topic 新增了分区(也就是分区数量发生了变化)
consumer 消费消息时不关注 offset,最后一个 offset 由 ZooKeeper 保存(下次消费时,该group 中的consumer将从offset记录的位置开始消费)。如果消费线程大于 patition 数量,则有些线程将收不到消息;如果 patition 数量大于消费线程数,则有些线程多收到多个 patition 的消息;如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的。
producer 不断发送消息,会引起 partition 文件的无限扩张,因此 kafka 通过分段的方式将 Log 分为多个 LogSegment,LogSegment 是逻辑上的概念,一个 LogSegment 对应磁盘上的日志文件和一个索引文件,其中日志文件用来记录消息。索引文件是用来保存消息的索引。segment file 由两部分组成,index file 和 data file,.index 和 .timeindex 文件为索引文件,.data 文件为数据文件。partion 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值进行递增。为了提高查找消息的性能,为每一个日志文件添加 2 个索引文件:OffsetIndex 和 TimeIndex,分别对应 .index 以及 .timeindex 文件, TimeIndex 索引文件格式是映射时间戳和相对 offset,index 中存储了索引以及物理偏移量。根据消息的保留时间,当消息在 kafka 中保存的时间超过了指定的时间,就会触发清理过程根据 topic 存储的数据大小,当 topic 所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。kafka 会启动一个后台线程,定期检查是否存在可以删除的消息,通过 log.retention.bytes 和 log.retention.hours 这两个参数来设置,当其中任意一个达到要求,都会执行删除。
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:
指定了 patition,则直接使用;
未指定 patition 但指定 key,通过对 key 进行 hash 选出一个 patition;
patition 和 key 都未指定,使用轮询选出一个 patition。
消息写入流程:
producer 先从 ZooKeeper 的 “/brokers/…/state” 节点找到该 partition 的leader;
producer 将消息发送给该 leader;
leader 将消息写入本地 log;
followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK;
leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK;
1)部署忽略
https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xzf kafka_2.13-2.8.0.tgz
# 配置环境变量
vim /etc/profile
# 添加下面的内容
export KAFKA_HOME=/opt/app/kafka
# 刷新环境变量
source /etc/profile
直接编译好的压缩包下载下来解压运行即可。
2)认证配置
Kafka 安全认证可以直接通过环境变量 -Djava.security.auth.login.config 设置,修改 Kafka 启动脚本 kafka-start-server.sh 文件最后一行,增加一个参数指向第二步的 jaas 配置文件的绝对路径:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data1/kafka_2.11-2.3.1/config/kafka-server-jaas-plain.conf kafka.Kafka "$@"
#或如下修改加到KAFKA_HEAP_OPTS后
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/data1/kafka_2.11-2.3.1/config/kafka-server-jaas-plain.conf"
fi
#或加到KAFKA_OPTS里
KAFKA_OPTS="-Djava.security.auth.login.config=/data1/kafka_2.11-2.3.1/config/kafka-server-jaas-plain.conf"
jaas文件配置:
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required #或者plain,动态的我们这里选scram
username="admin"
password="admin"
user_admin="admin-secret"
user_dialup="test-secret"; #说明,参照上文
};
注:也可在broker配置文件server.properties里使用sasl.jaas.config参数,格式必须采用如下前缀方式:listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config=”org.apache.kafka.common.security.scram.ScramLoginModule required“,具体示例参考下文介绍。优先级方面如下:listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config优先级最高,企次未JAAS静态文件中的{listenerName}.KafkaServer配置段,最后是JAAS数据文件纯KafkaServer配置段。
Kafka的SCRAM实现机制会把用户认证信息保存在Zookeeper中,我们借助kafka-configs.sh根据来创建3个测试用户:
#创建broker端的消息生产用户,并配置topic写权限;其中,SCRAM-SHA-256/SCRAM-SHA-512是对密码加密的算法,二者有其一即可。 $ bin/kafka-configs.sh --zookeeper 172.18.1.112:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=producer@123],SCRAM-SHA-512=[password=producer@123]' --entity-type users --entity-name producer #返回如下 …… Completed Updating config for entity: user-principal 'writer'. #创建broker端的admin,用于broker间通信,须在kafka启动前创建 bin/kafka-configs.sh --zookeeper 172.18.1.112:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin #创建client端的消费者用户,并配置topic读权限 bin/kafka-configs.sh --zookeeper 172.18.1.112:2181 --alter --add-config 'SCRAM-SHA-256=[password=reader#pwd],SCRAM-SHA-512=[password=reader#pwd]' --entity-type users --entity-name reader #验证,查看SCRAM证书 bin/kafka-configs.sh --zookeeper 172.18.1.112:2181 --describe --entity-type users --entity-name producer #删除SCRAM证书 bin/kafka-configs.sh --zookeeper 172.18.1.112:2181 --alter --delete-config 'SCRAM-SHA-512' --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name producer
注:上面这些凭证(credential)存储在Zookeeper中,我们使用kafka-configs.sh在Zookeeper中创建凭据。对于每个SCRAM机制,必须添加具有机制名称的配置来创建凭证,在启动Kafka broker之
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。