赞
踩
1,下载并解压
#解压到/data1
cd /data1
#解压
tar -zxf apache-zookeeper-3.6.2-bin.tar.gz
2,conf目录配置文件zoo.cfg
#改个名
mv apache-zookeeper-3.6.2-bin apache-zookeeper-3.6.2
#进入conf目录下复制zoo_sample.cfg名为zoo.cfg
cd conf
cp zoo_sample.cfg zoo.cfg
修改zoo.zfg以下配置项,没有则新增
#数据目录
dataDir=/data1/apache-zookeeper-3.6.2/data
#日志目录
dataLogDir=/data1/apache-zookeeper-3.6.2/logs
#开启鉴权
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
3,在conf目录下创建zk_server_jaas.conf文件,配置zookeeper用户名密码
vi zk_server_jaas.conf
内容如下
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_kafka="pwd";
};
4,添加JVM参数
修改zkServer.sh启动脚本,在第一行添加JVMFLAGS参数加载zk_server_jaas.conf文件
vi zkServer.sh
添加以下内容
export JVMFLAGS="-Djava.security.auth.login.config=/data1/apache-zookeeper-3.6.2/conf/zk_server_jaas.conf -Dzookeeper.allowSaslFailedClients=false"
5,启动zookeeper
cd ../bin
#启动
./zkServer.sh start
#停止
./zkServer.sh stop
1,下载并解压
#解压到/data1
cd /data1
#解压
tar -zxf kafka_2.13-2.7.0.tgz
2,修改config目录下server.properties
cd kafka_2.13-2.7.0/config
vi server.properties
需要修改以下配置项
#指定服务的ip和端口,省略ip或配置hostname需要配置host与ip的映射关系
listeners=SASL_PLAINTEXT://192.168.25.202:9092
#日志路径
log.dirs=/data1/kafka_2.13-2.7.0/logs
#zookeeper地址
zookeeper.connect=localhost:2181
新增以下配置开启kafka鉴权
#kafka鉴权配置
advertised.listeners=SASL_PLAINTEXT://192.168.25.202:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
zookeeper.set.acl=true
3,在conf目录下创建vi kafka_server_jaas.conf文件
vi kafka_server_jaas.conf
添加以下内容
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="pwd"
user_admin="pwd";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="kafka"
password="pwd";
};
配置说明:
4,添加JVM参数
修改kafka-server-start.sh启动脚本,加载kafka_server_jaas.conf文件。
找到KAFKA_HEAP_OPTS参数,在其后添加
-Djava.security.auth.login.config=/data1/kafka_2.13-2.7.0/config/kafka_server_jaas.conf
效果如下
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/data1/kafka_2.13-2.7.0/config/kafka_server_jaas.conf"
5,启动
#进入kafka安装目录
/data1/kafka_2.13-2.7.0
#启动
bin/kafka-server-start.sh -daemon config/server.properties
#停止
bin/kafka-server-stop.sh
1,创建client认证文件
此文件是后面console的生产者和消费者使用 ,配置kafka用户名为admin密码pwd;
vi kafka_client_jaas.conf
添加以下内容
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="pwd";
};
2,添加kafka-console-producer.sh和kafka-console-consumer.sh 的认证文件路径
KAFKA_HEAP_OPTS后添加参数
-Djava.security.auth.login.config=/data1/kafka_2.13-2.7.0/config/kafka_client_jaas.conf
3,修改 config/producer.properties 和 config/consumer.properties
添加以下两行
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
4,测试启动生产者和消费者
先启动消费者,等待接收消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.25.202:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
再启动生产者,执行以下命令后,就会在控制台等待键入消息体,直接输入消息值(value)即可,每行(以换行符分隔)表示一条消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
1,添加maven依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
2,连接示例
生产者
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
//安全认证SASL/PLAIN
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='pwd';");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
消费者
Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, url); properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "540000"); properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000"); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //安全认证SASL/PLAIN properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='pwd';"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
1,设置环境变量
此处修改的bin目录下的ke.sh启动脚本,也可以直接添加系统环境变量
export JAVA_HOME=/data1/jdk1.8.0_271
export KE_HOME=/data1/kafka-eagle-web-2.0.3
2,修改conf下配置文件 system-config.properties
修改以下配置项
#多个集群名称,分割 kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=127.0.0.1:2181 ###################################### # kafka sasl authenticate ###################################### cluster1.kafka.eagle.sasl.enable=true cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT cluster1.kafka.eagle.sasl.mechanism=PLAIN cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="pwd"; ###################################### # zookeeper enable acl ###################################### cluster1.zk.acl.enable=ture cluster1.zk.acl.schema=digest cluster1.zk.acl.username=kafka cluster1.zk.acl.password=pwd kafka.eagle.driver=org.sqlite.JDBC #修改此路径且必须存在,不使用mysql数据库 kafka.eagle.url=jdbc:sqlite:/data1/kafka-eagle-web-2.0.3/db/dbke.db kafka.eagle.username=root kafka.eagle.password=root
3,启动
cd /data1/kafka-eagle-web-2.0.3/bin
./ke.sh start
访问:http://192.168.25.202:8048/ke/
默认用户密码
admin 123456
登陆成功后修改密码
点击右上角 reset
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。