当前位置:   article > 正文

Mac 系统Kafka + Zookeeper SASL 配置_mac kafka edit jaas.conf first

mac kafka edit jaas.conf first

背景

     最近听说Spark很火,于是开始学习Spark的相关知识,在学到Spark Streaming的时候无意中发现Kafka这个流处理平台挺不错的,特别是关于它的日志压缩(log compaction) 和安全认证两个特性可以很好的用在公司的项目里。

     SASL的配置网上有很多,遇到的坑也不少,也要感谢提供正确指导的博主们,现整理一下自己的经验。大伙也可以根据官方文档来折腾Kafka 中文文档 - ApacheCN

环境

  • 系统 MacOs High Sierra 10.13.6
  • Kafka版本 kafka_2.12-2.0.0
  • Zookeeper版本 zookeeper-3.4.13

步骤

  1. 配置Zookeeper
  2. 配置Kafka 服务端
  3. 权限操作
  4. 配置Kafka 的Consumer 和 Producer 客户端
  5. 编写Java Consumer客户端用例

配置Zookeeper

1. 复制示例配置文件为zoo.cfg

cp $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOMEconf/zoo.cfg

 2.  为zoo.cfg 添加SASL支持

  1. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  2. requireClientAuthScheme=sasl
  3. jaasLoginRenew=3600000

3. 新建$ZOOKEEPER_HOME/conf/zk_server_jaas.conf文件,为Zookeeper添加账号认证信息

  1. Server {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="kafka"
  4. password="kafka#321"
  5. user_kafka="kafka#321";
  6. };

  注:

  • org.apache.kafka.common.security.plain.PlainLoginModule 是认证的类
  • username 是Kafka服务做为客户端连接到zookeeper的用户名
  • password 是Kafka服务做为客户端连接到zookeeper的密码
  • user_kafka 中的user_是固定的,kafka是对应username里的值,这个必须设置,要不然会认证失败   

 4.  因为认证方式使用的是Kafka的认证类  org.apache.kafka.common.security.plain.PlainLoginModule ,所以要用到Kafka相关的jar。

      新建一个$ZOOKEEPER_HOME/sasl_jars目录,从$KAFKA_HOME/libs将相关jar拷到sasl_jars目录。以下是相关的jar包,这些版本会因为你的Kafka版本不同而不同

  1. kafka-clients-2.0.0.jar
  2. lz4-java-1.4.1.jar
  3. slf4j-api-1.7.25.jar
  4. slf4j-log4j12-1.7.25.jar
  5. snappy-java-1.1.7.1.jar

5. 修改$ZOOKEEPER_HOME/bin/zkEnv.sh 脚本,让Zookeeper启动的时候可以加载相关jar包和认证信息

  1. for i in "$ZOOBINDIR"/../sasl_jars/*.jar;
  2. do
  3. CLASSPATH="$i:$CLASSPATH"
  4. done
  5. SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "

配置Kafka 服务端

1. 新建$KAFKA_HOME/kafka_server_jaas.conf 文件,为Kafka 添加客户端账号认证信息和自己认证到Zookeeper的客户端账号信息

  1. KafkaServer {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="kafka"
  4. password="kafka#321"
  5. user_kafka="kafka#321"
  6. user_producer="pro#321"
  7. user_consumer="con#321"
  8. user_both="both#321";
  9. };
  10. Client{
  11. org.apache.kafka.common.security.plain.PlainLoginModule required
  12. username="kafka"
  13. password="kafka#321";
  14. };

注:

  • KafkaServer 是Kafka服务端配置哪些账号可以认证连接进来
  • Client 是Kafka服务端配置要什么哪些方式及账号进行登录

2. 复制示例配置文件为kafka.properties

cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/kafka.properties

3.  在kafka.properties中添加SASL认证支持

  1. #开启SASL 客户端认证
  2. listeners=SASL_PLAINTEXT://localhost:9092
  3. security.inter.broker.protocol=SASL_PLAINTEXT
  4. sasl.enabled.mechanisms=PLAIN
  5. sasl.mechanism.inter.broker.protocol=PLAIN
  6. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  7. zookeeper.set.acl=true
  8. super.users=User:kafka

4. 添加启动环境变量,在$KAFKA_HOME/bin/kafka-run-class.sh 添加以下内容

  1. KAFKA_SASL_OPTS='-Djava.security.auth.login.config=$KAFKA_HOME/kafka_server_jaas.conf'
  2. # Launch mode
  3. if [ "x$DAEMON_MODE" = "xtrue" ]; then
  4. nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS $KAFKA_SASL_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
  5. else
  6. exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS $KAFKA_SASL_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
  7. fi

5. 指定配置文件启动服务

nohup bin/kafka-server-start.sh config/kafka.properties

权限操作

  • 为用户consumer 添加消费者权限,并只允许192.168.1.30这个ip访问,只限于消费wuye1主题和yc-test这个组
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --allow-host 192.168.1.30 --consumer --topic wuye1 --group yc-test
  • 为用户producer 添加生产者权限,只能对wuye1这个主题生产信息
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --producer --topic wuye1
  • 查看wuye1这个主题配置的权限。查看全部权限可以把--topic wuye1 去掉
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic wuye1
  • 删除consumer用户针对wuye1主题的读权限,所有的权限删除后这个用户就会在权限列表中消失
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:consumer --operation Read --topic wuye1

配置Kafka 的Consumer 客户端

1. 新建$KAFKA_HOME/kafka_consumer_jaas.conf填入以下内容

  1. KafkaClient {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="consumer"
  4. password="con#321";
  5. };

2. 分别修改$KAFKA_HOME/config/consumer.properties 新增以下内容

  1. security.protocol=SASL_PLAINTEXT
  2. sasl.mechanism=PLAIN

3. 修改$KAFKA_HOME/bin/kafka-console-consumer.sh 新增以下内容

  1. if [ "x$KAFKA_OPTS" ]; then
  2. export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_HOME/kafka_consumer_jaas.conf"
  3. fi
  4. #下面这行是原始内容主要是为了体现上面的内容要加在哪里
  5. exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

4. 启动消费者客户端,一定要注意--consumer.config config/consumer.properties 这个参数,之前因为没加这个折腾了挺久的

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wuye1 --group my-group-1 --partition 0 --consumer.config config/consumer.properties

配置Kafka 的 Producer 客户端

1. 新建$KAFKA_HOME/kafka_producer_jaas.conf填入以下内容

  1. KafkaClient {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="producer"
  4. password="pro#321";
  5. };

2. 分别修改$KAFKA_HOME/config/producer.properties 新增以下内容

  1. security.protocol=SASL_PLAINTEXT
  2. sasl.mechanism=PLAIN

3. 修改$KAFKA_HOME/bin/kafka-console-producer.sh 新增以下内容

  1. if [ "x$KAFKA_OPTS" ]; then
  2. export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_HOME/kafka_producer_jaas.conf"
  3. fi
  4. #下面这行是原始内容主要是为了体现上面的内容要加在哪里
  5. exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

4. 启动生产者客户端

kafka-console-producer.sh --broker-list localhost:9092 --topic wuye1 --producer.config config/producer.properties

编写Java Consumer客户端用例

  • 要导入的包
  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.0.0</version>
  5. </dependency>
  • java代码
  1. package com.steely.kafka;
  2. import java.time.Duration;
  3. import java.util.Arrays;
  4. import java.util.Date;
  5. import java.util.Properties;
  6. import java.util.concurrent.atomic.AtomicBoolean;
  7. import org.apache.kafka.clients.CommonClientConfigs;
  8. import org.apache.kafka.clients.consumer.ConsumerConfig;
  9. import org.apache.kafka.clients.consumer.ConsumerRecords;
  10. import org.apache.kafka.clients.consumer.KafkaConsumer;
  11. import org.apache.kafka.common.config.SaslConfigs;
  12. import org.apache.kafka.common.errors.WakeupException;
  13. /**
  14. * SASL Kafka消费者客户端
  15. * @ate 2018年10月9日
  16. * @author chenyongchao
  17. */
  18. public class SASLConsumer {
  19. public static void main(String[] args) throws InterruptedException {
  20. ConsumerThread consumerThread = new ConsumerThread(getConsumer("localhost:9092"), "wuye1");
  21. new Thread(consumerThread).start();
  22. Thread.sleep(1000 * 60 * 60); //60分钟后把消费者停掉
  23. consumerThread.shutdown();
  24. }
  25. private static class ConsumerThread implements Runnable {
  26. private final KafkaConsumer<String, String> consumer;
  27. private final AtomicBoolean closed = new AtomicBoolean(false);
  28. public ConsumerThread(KafkaConsumer<String, String> consumer, String... topics) {
  29. this.consumer = consumer;
  30. consumer.subscribe(Arrays.asList(topics));
  31. }
  32. @Override
  33. public void run() {
  34. try {
  35. int count = 0;
  36. while (!closed.get()) {
  37. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
  38. count +=records.count();
  39. records.forEach(record -> System.out.println("收到数据 partion : " + record.partition() + " key : " + record.key() + " value : " + record.value()+" offset ;"+record.offset()));
  40. System.out.println(count + " 获取了一次数据!!!" + new Date(System.currentTimeMillis()));
  41. }
  42. } catch (WakeupException e) {
  43. if (!closed.get()) throw e;
  44. } finally {
  45. consumer.close();
  46. }
  47. }
  48. public void shutdown() {
  49. closed.set(true);
  50. consumer.wakeup();
  51. }
  52. }
  53. /**
  54. * 获取消费者实例
  55. * @date 2018年10月9日
  56. * @author chenyongchao
  57. * @param servers
  58. * @return
  59. */
  60. private static KafkaConsumer<String, String> getConsumer(String servers) {
  61. System.setProperty("java.security.auth.login.config","/opt/kafka_2.12-2.0.0/kafka_consumer_jaas.conf");
  62. Properties props = new Properties();
  63. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  64. props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
  65. props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
  66. props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-1");
  67. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  68. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  69. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  70. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  71. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  72. return new KafkaConsumer<>(props);
  73. }
  74. }

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号