赞
踩
最近听说Spark很火,于是开始学习Spark的相关知识,在学到Spark Streaming的时候无意中发现Kafka这个流处理平台挺不错的,特别是关于它的日志压缩(log compaction) 和安全认证两个特性可以很好的用在公司的项目里。
SASL的配置网上有很多,遇到的坑也不少,也要感谢提供正确指导的博主们,现整理一下自己的经验。大伙也可以根据官方文档来折腾Kafka 中文文档 - ApacheCN
1. 复制示例配置文件为zoo.cfg
cp $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOMEconf/zoo.cfg
2. 为zoo.cfg 添加SASL支持
- authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
- requireClientAuthScheme=sasl
- jaasLoginRenew=3600000
3. 新建$ZOOKEEPER_HOME/conf/zk_server_jaas.conf文件,为Zookeeper添加账号认证信息
- Server {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="kafka"
- password="kafka#321"
- user_kafka="kafka#321";
- };
注:
4. 因为认证方式使用的是Kafka的认证类 org.apache.kafka.common.security.plain.PlainLoginModule ,所以要用到Kafka相关的jar。
新建一个$ZOOKEEPER_HOME/sasl_jars目录,从$KAFKA_HOME/libs将相关jar拷到sasl_jars目录。以下是相关的jar包,这些版本会因为你的Kafka版本不同而不同
- kafka-clients-2.0.0.jar
- lz4-java-1.4.1.jar
- slf4j-api-1.7.25.jar
- slf4j-log4j12-1.7.25.jar
- snappy-java-1.1.7.1.jar
5. 修改$ZOOKEEPER_HOME/bin/zkEnv.sh 脚本,让Zookeeper启动的时候可以加载相关jar包和认证信息
- for i in "$ZOOBINDIR"/../sasl_jars/*.jar;
- do
- CLASSPATH="$i:$CLASSPATH"
- done
- SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "
1. 新建$KAFKA_HOME/kafka_server_jaas.conf 文件,为Kafka 添加客户端账号认证信息和自己认证到Zookeeper的客户端账号信息
- KafkaServer {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="kafka"
- password="kafka#321"
- user_kafka="kafka#321"
- user_producer="pro#321"
- user_consumer="con#321"
- user_both="both#321";
- };
- Client{
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="kafka"
- password="kafka#321";
- };
注:
2. 复制示例配置文件为kafka.properties
cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/kafka.properties
3. 在kafka.properties中添加SASL认证支持
- #开启SASL 客户端认证
- listeners=SASL_PLAINTEXT://localhost:9092
- security.inter.broker.protocol=SASL_PLAINTEXT
- sasl.enabled.mechanisms=PLAIN
- sasl.mechanism.inter.broker.protocol=PLAIN
- authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
-
- zookeeper.set.acl=true
- super.users=User:kafka
4. 添加启动环境变量,在$KAFKA_HOME/bin/kafka-run-class.sh 添加以下内容
- KAFKA_SASL_OPTS='-Djava.security.auth.login.config=$KAFKA_HOME/kafka_server_jaas.conf'
-
- # Launch mode
- if [ "x$DAEMON_MODE" = "xtrue" ]; then
- nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS $KAFKA_SASL_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
- else
- exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS $KAFKA_SASL_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
- fi
5. 指定配置文件启动服务
nohup bin/kafka-server-start.sh config/kafka.properties
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --allow-host 192.168.1.30 --consumer --topic wuye1 --group yc-test
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --producer --topic wuye1
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic wuye1
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:consumer --operation Read --topic wuye1
1. 新建$KAFKA_HOME/kafka_consumer_jaas.conf填入以下内容
- KafkaClient {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="consumer"
- password="con#321";
- };
2. 分别修改$KAFKA_HOME/config/consumer.properties 新增以下内容
- security.protocol=SASL_PLAINTEXT
- sasl.mechanism=PLAIN
3. 修改$KAFKA_HOME/bin/kafka-console-consumer.sh 新增以下内容
- if [ "x$KAFKA_OPTS" ]; then
- export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_HOME/kafka_consumer_jaas.conf"
- fi
- #下面这行是原始内容主要是为了体现上面的内容要加在哪里
- exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
4. 启动消费者客户端,一定要注意--consumer.config config/consumer.properties 这个参数,之前因为没加这个折腾了挺久的
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wuye1 --group my-group-1 --partition 0 --consumer.config config/consumer.properties
1. 新建$KAFKA_HOME/kafka_producer_jaas.conf填入以下内容
- KafkaClient {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="producer"
- password="pro#321";
- };
2. 分别修改$KAFKA_HOME/config/producer.properties 新增以下内容
- security.protocol=SASL_PLAINTEXT
- sasl.mechanism=PLAIN
3. 修改$KAFKA_HOME/bin/kafka-console-producer.sh 新增以下内容
- if [ "x$KAFKA_OPTS" ]; then
- export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_HOME/kafka_producer_jaas.conf"
- fi
- #下面这行是原始内容主要是为了体现上面的内容要加在哪里
- exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
4. 启动生产者客户端
kafka-console-producer.sh --broker-list localhost:9092 --topic wuye1 --producer.config config/producer.properties
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.0.0</version>
- </dependency>
- package com.steely.kafka;
-
- import java.time.Duration;
- import java.util.Arrays;
- import java.util.Date;
- import java.util.Properties;
- import java.util.concurrent.atomic.AtomicBoolean;
-
- import org.apache.kafka.clients.CommonClientConfigs;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.config.SaslConfigs;
- import org.apache.kafka.common.errors.WakeupException;
- /**
- * SASL Kafka消费者客户端
- * @ate 2018年10月9日
- * @author chenyongchao
- */
- public class SASLConsumer {
-
- public static void main(String[] args) throws InterruptedException {
- ConsumerThread consumerThread = new ConsumerThread(getConsumer("localhost:9092"), "wuye1");
- new Thread(consumerThread).start();
- Thread.sleep(1000 * 60 * 60); //60分钟后把消费者停掉
- consumerThread.shutdown();
- }
-
- private static class ConsumerThread implements Runnable {
- private final KafkaConsumer<String, String> consumer;
- private final AtomicBoolean closed = new AtomicBoolean(false);
-
- public ConsumerThread(KafkaConsumer<String, String> consumer, String... topics) {
- this.consumer = consumer;
- consumer.subscribe(Arrays.asList(topics));
- }
-
- @Override
- public void run() {
- try {
- int count = 0;
- while (!closed.get()) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
- count +=records.count();
- records.forEach(record -> System.out.println("收到数据 partion : " + record.partition() + " key : " + record.key() + " value : " + record.value()+" offset ;"+record.offset()));
- System.out.println(count + " 获取了一次数据!!!" + new Date(System.currentTimeMillis()));
- }
- } catch (WakeupException e) {
- if (!closed.get()) throw e;
- } finally {
- consumer.close();
- }
- }
-
- public void shutdown() {
- closed.set(true);
- consumer.wakeup();
- }
- }
-
- /**
- * 获取消费者实例
- * @date 2018年10月9日
- * @author chenyongchao
- * @param servers
- * @return
- */
- private static KafkaConsumer<String, String> getConsumer(String servers) {
- System.setProperty("java.security.auth.login.config","/opt/kafka_2.12-2.0.0/kafka_consumer_jaas.conf");
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-1");
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
- return new KafkaConsumer<>(props);
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。