当前位置:   article > 正文

kafka 配置部署及SASL安全认证_kafka sasl

kafka sasl

一、下载及配置说明

1. 下载安装

kafka下载地址:https://kafka.apache.org/downloads

# 下载文件
wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.11-2.2.1.tgz

# 文件解压缩
tar -zxvf kafka_2.11-2.2.1.tgz

# 创建软连接
ln -s kafka_2.12-2.2.0 kafka
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2. 配置说明

kafka服务的配置文件为 kafka/config/server.properties ,内容如下:

############################# Server Basics #############################
broker.id=0

######################### Socket Server Settings ########################
listeners=PLAINTEXT://192.168.1.128:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

########################## Log Basics ####################################
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1

#################### Internal Topic Settings  ############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

######################## Log Flush Policy ################################
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

################### Group Coordinator Settings ############################
group.initial.rebalance.delay.ms=0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

配置说明:

  • broker.id:每个broker在集群中的唯一标识。
  • listeners:kafka的监听地址与端口,需要提供外网服务的话,要设置为本地的IP地址。
  • num.network.threads:kafka用于处理网络请求的线程数
  • num.io.threads:kafka用于处理磁盘io的线程数
  • socket.send.buffer.bytes:发送数据的缓冲区
  • socket.receive.buffer.bytes:接收数据的缓冲区
  • socket.request.max.bytes:允许接收的最大数据包的大小(防止数据包过大导致OOM)
  • log.dirs:kakfa用于保存数据的目录,所有的消息都会存储在该目录当中。可以通过逗号来指定多个路径,kafka会根据最少被使用的原则选择目录分配新的partition。需要说明的是,kafka在分配partition的时候选择的原则不是按照磁盘空间大小来定的,而是根据分配的partition的个数多少而定
  • num.partitions:设置新创建的topic的默认分区数
  • number.recovery.threads.per.data.dir:用于恢复每个数据目录时启动的线程数
  • log.retention.hours:配置kafka中消息保存的时间,还支持log.retention.minutes和log.retention.ms。如果多个同时设置会选择时间最短的配置,默认为7天。
  • log.retention.check.interval.ms:用于检测数据过期的周期
  • log.segment.bytes:配置partition中每个segment数据文件的大小。默认为1GB。超出该大小后,会自动创建一个新的segment文件。
  • zookeeper.connect:指定连接的zk的地址,zk中存储了broker的元数据信息。可以通过逗号来设置多个值。格式为:hostname:port/path。hostname为zk的主机名或ip,port为zk监听的端口。/path表示kafka的元数据存储到zk上的目录,如果不设置,默认为根目录
  • zookeeper.connection.timeout:kafka连接zk的超时时间
  • group.initial.rebalance.delay.ms:在实际环境当中,当将多个consumer加入到一个空的consumer group中时,每加入一个consumer就会触发一次对partition消费的重平衡,如果加入100个,就得重平衡100次,这个过程就会变得非常耗时。通过设置该参数,可以延迟重平衡的时间,比如有100个consumer会在10s内全部加入到一个consumer group中,就可以将该值设置为10s,10s之后,只需要做一次重平衡即可。默认为0则代表不开启该特性。
  • auto.create.topics.enable:当有producer向一个不存在的topic中写入消息时,是否自动创建该topic
  • delete.topics.enable:kafka提供了删除topic的功能,但默认并不会直接将topic数据物理删除。如果要从物理上删除(删除topic后,数据文件也一并删除),则需要将此项设置为true

二、单机部署

1. 服务配置

单机模式需要修改 kafka/config/server.properties配置文件:

listener 配置项默认为注释状态,找到配置行,修改为本机IP及需要开启的端口

# 本机IP可省略
# listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.1.128:9092
  • 1
  • 2
  • 3

log.dir 为kafka数据存储地址,修改配置项:

log.dirs=/usr/local/kafka/data
  • 1

启动自带的zookeeper服务

# 后台启动
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  • 1
  • 2

启动kafka服务

# 后台启动
./bin/kafka-server-start.sh -daemon  config/server.properties
  • 1
  • 2

2. kafka命令

# 创建topic,多个服务用逗号分隔
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

# 查看topic列表
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092

# 启动一个控制台消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

# 启动一个控制台生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

三、KAFKA 集群部署

1. zookeeper 集群部署

下载地址:https://zookeeper.apache.org/releases.html

配置文件说明:

# 心跳时间间隔
tickTime=2000
# 初始连接心跳书限制
initLimit=10
# 每次请求与应答间的心跳数限制
syncLimit=5
# 存储快照的目录, 不要使用/tmp用于存储, 这里只是一个示例
dataDir=/tmp/zookeeper
# zookeeper服务端口
clientPort=2181
# 客户端连接的最大数目, 修改此选项以增加更多客户端
#maxClientCnxns=60
# 保存在dataDir中的快照数
#autopurge.snapRetainCount=3
# 清除任务间隔(小时), 设置为 “0” 以禁用自动清除功能
#autopurge.purgeInterval=1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

准备三台服务器,并分别下载并解压zookeeper-3.4.12,三台服务器的IP如下:

  • 192.168.1.101
  • 192.168.1.102
  • 192.168.1.103

拷贝并重命名配置文件:

cp zoo_sample.cfg zoo.cfg
  • 1

在三台服务器的 zoo.cfg 配置文件末尾添加如下配置:

# 配置格式 server.id = ip:port:port
# 配置的id唯一,不可重复
# 2888端口为集群内机器通信使用
# 3888端口为leader选举使用
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在 dataDir 配置目录下新建myid文件,写入id的值:

# 此处仅为示例,dataDir不要使用/tmp目录
vim /tmp/zookeeper/myid
  • 1
  • 2

文件内容如下:
在这里插入图片描述

启动zookeeper时注意开放端口,centos 7.0 开放端口命令如下:

# --zone             作用域
# --add-port         添加端口,格式:端口/通讯协议
# --permanent        永久生效,无此参数时服务器重启后配置失效
firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=2888/tcp --permanent
firewall-cmd --zone=public --add-port=3888/tcp --permanent

# 配置完成后,重启防火墙
firewall-cmd --reload

# 查看防火墙已开放的端口
firewall-cmd --list-ports
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

使用命令 ./zkServer.sh start 分别启动三台服务器的zookeeper,集群启动完成后,三台服务器状态如下:

  • 服务器IP 192.168.1.101:
    在这里插入图片描述
  • 服务器IP 192.168.1.102:
    在这里插入图片描述
  • 服务器IP 192.168.1.103:
    在这里插入图片描述

2. kafka集群部署

在三台服务器上,下载安装kafka,修改配置文件:

# broker.id,每个kafka配置不同的ID,唯一,不可重复
broker.id=0

# listeners 配置
listeners=PLAINTEXT://:9092

# zookeeper配置,多个地址用逗号分隔
zookeeper.connect=192.168.1.101:2181,192.168.1.103:2181,192.168.1.103:2181
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

配置完成后启动kafka

./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  • 1

3. zookeeper 命令简介

zookeeper 命令用于在 zookeeper 服务上执行操作,首先执行命令,打开新的 session 会话,进入终端。

$ ./zkCli.sh
  • 1

进入终端后如下显示:
在这里插入图片描述
查看命令帮助:
在这里插入图片描述

命令说明:

命令格式说明
lsls path [watch]查看路径下的目录列表
ls2ls2 path [watch]查看路径下的目录列表,比 ls 更详细
statstat path [watch]查看节点状态信息
setset path data [version]修改节点存储的数据
getget path [watch]获取节点数据和状态信息
createcreate [-s] [-e] path data acl创建节点
deletedelete path [version]删除节点
rmrrmr path递归删除文件夹
historyhistory查看执行的历史命令
quitquit退出终端

四、使用SASL认证

1. 修改启动配置

将kafka/config目录下server.properties复制并重命名

cp server.properties server-sasl.properties
  • 1

在配置文件末尾添加如下配置:

# 修改listeners
listeners=SASL_PLAINTEXT://:9092
# 权限配置
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

super.users=User:admin
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2. 配置认证文件

在 kafka/config目录下创建kafka-server-jaas.conf文件,内容如下:

# 配置文件的分号千万不能忘
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin"
  user_admin="admin";
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3. 修改启动脚本

复制 kafka/bin 目录下的 kafka-server-start.sh 并重命名

cp kafka-server-start.sh kafka-server-start-sasl.sh
  • 1

打开文件,将最后一行:

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
  • 1

修改为(路径改为服务器的kafka路径):

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/home/user/kafka/config/kafka-server-jaas.conf kafka.Kafka "$@"
  • 1

4. zookeeper 配置

使用 zkCli.sh 命令进入zookeeper控制台,执行如下命令

# 创建用户
create /digest user

# 添加权限
addauth digest admin:admin

# 设置acl加密
setAcl /digest auth:admin:admin:crwda
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

5. 启动命令

./bin/kafka-server-start-sasl.sh config/server-sasl.properties
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/825365
推荐阅读
相关标签
  

闽ICP备14008679号