当前位置:   article > 正文

zookeeper和kafka的安全认证机制SASL(静态)_kafka sasl.mechanism

kafka sasl.mechanism

zookeeper在生产环境中,如果不是只在内网开放的话,就需要设置安全认证,可以选择SASL的安全认证。以下是和kafka的联合配置,如果不需要kafka可以去掉kafka相关的权限即可,以下基于zk3.5.5和kafka2.12进行操作。 下面就是详细的部署步骤:

zookeeper的安全认证配置

zookeeper所有节点都是对等的,只是各个节点角色可能不相同。以下步骤所有的节点配置相同。

  1. 导入kafka的相关jar
    从kafka/lib目录下复制以下几个jar包到zookeeper的lib目录下:

  1. kafka-clients-2.3.0.jar
  2. lz4-java-1.6.0.jar
  3. slf4j-api-1.7.25.jar
  4. slf4j-log4j12-1.7.25.jar
  5. snappy-java-1.1.7.3.jar
  1. zoo.cfg文件配置 添加如下配置:

  1. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  2. requireClientAuthScheme=sasl
  3. jaasLoginRenew=3600000
  1. 编写JAAS文件,zk_server_jaas.conf,放置在conf目录下
    这个文件定义需要链接到Zookeeper服务器的用户名和密码。JAAS配置节默认为Server:

  1. Server {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin"password="admin_2019"user_kafka="kafka-2019"user_producer="prod-2019";};

这个文件中定义了两个用户,一个是kafka,一个是producer,这些用user_配置出来的用户都可以提供给生产者程序和消费者程序认证使用。还有两个属性,username和password,其中username是配置Zookeeper节点之间内部认证的用户名,password是对应的密码。

  1. 修改zkEnv.sh
    在zkEnv.sh添加以下内容,路径按你直接的实际路径来填写:

exportSERVER_JVMFLAGS=" -Djava.security.auth.login.config=/mnt/tools/zookeeper/apache-zookeeper-3.5.5/conf/zk_server_jaas.conf "
  1. 在各个节点分别执行bin/zkServer.sh start启动zk。如果启动异常查看日志排查问题。

kafka的安全认证配置

zookeeper启动之后,就配置kafka,下面步骤的配置在所有节点上都相同。

  1. 在kafka的config目录下,新建kafka_server_jaas.conf文件,内容如下:

  1. KafkaServer {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin"password="admin-2019"user_admin="admin-2019"user_producer="prod-2019"user_consumer="cons-2019";};
  4. Client {
  5. org.apache.kafka.common.security.plain.PlainLoginModule required
  6. username="kafka"password="kafka-2019";};

KafkaServer配置的是kafka的账号和密码,Client配置节主要配置了broker到Zookeeper的链接用户名密码,这里要和前面zookeeper配置中的zk_server_jaas.conf中user_kafka的账号和密码相同。

  1. 配置server.properties,同样的在config目录下

  1. listeners=SASL_PLAINTEXT://0.0.0.0:9092
  2. advertised.listeners=SASL_PLAINTEXT://node1:9092
  3. security.inter.broker.protocol=SASL_PLAINTEXT
  4. sasl.enabled.mechanisms=PLAIN
  5. sasl.mechanism.inter.broker.protocol=PLAIN
  6. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  7. allow.everyone.if.no.acl.found=true
kafka3.0之后版本弃用了SimpleAclAuthorizer验证,改为kafka.security.authorizer.AclAuthorizer

这里注意listeners配置项,将主机名部分(本例主机名是node1)替换成当前节点的主机名。其他在各个节点的配置一致。注意,allow.everyone.if.no.acl.found这个配置项默认是false,若不配置成true,后续生产者、消费者无法正常使用Kafka。

  1. 在server启动脚本JVM参数,在bin目录下的kafka-server-start.sh中,将

exportKAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

这一行修改为

exportKAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/mnt/tools/kafka/kafka_2.12/config/kafka_server_jaas.conf"
  1. 配置其他节点 配置剩余的kafka broker节点,注意server.properties的listeners配置项

  1. 启动各个节点的kafka服务端,在bin目录下执行

./kafka-server-start.sh ../config/server.properties

springboot中使用kafka


在springboot项目中,注意:如果在springboot1.x的版本中报错,请升级为speingboot2.x的版本,本例使用的是2.1.1.RELEASE

  1. 新建sprinboot工程,添加pom依赖,设置parent为2.1.1.RELEASE,这样就不需要在dependency中添加版本号了

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.kafka</groupId>
  7. <artifactId>spring-kafka</artifactId>
  8. </dependency>
  1. 在resource下新建文件kafka_client_jaas.conf,内容如下:

  1. KafkaClient {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin"password="admin-2019";
  4. };

这个用户名和密码要和前面kafka中使用的账号密码相同,这样才能有访问权限

  1. 在application.yml文件配置,注意这里将node1,2,3替换为你真实的服务器地址

  1. spring:
  2. kafka:
  3. listener:
  4. batch-listener: true #是否开启批量消费,true表示批量消费
  5. concurrency: 10 #设置消费的线程数
  6. poll-timeout: 1500 #自动提交设置,如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
  7. template:
  8. default-topic: probe2
  9. producer:
  10. bootstrap-servers: node1:9092,node2,node3:9092
  11. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  12. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  13. properties:
  14. sasl.mechanism: PLAIN
  15. security.protocol: SASL_PLAINTEXT
  16. consumer:
  17. bootstrap-servers: node1:9092,node2:9092,node3:9092
  18. group-id: group-2
  19. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  20. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  21. properties:
  22. sasl.mechanism: PLAIN
  23. security.protocol: SASL_PLAINTEXT
  24. auto:
  25. offset:
  26. reset: latest
  27. enable:
  28. auto:
  29. commit: true

在这里开启了消费者10个线程,这里可以根据你实际业务来调整消费者的数量

  1. 程序启动类:

  1. @SpringBootApplication
  2. public class ProbeApplication {
  3. //初始化系统属性
  4. static {
  5. ClassLoader loader = Thread.currentThread().getContextClassLoader();
  6. System.setProperty("java.security.auth.login.config",
  7. loader.getResource("").getPath()+File.separator+"kafka_client_jaas.conf");
  8. }
  9. public static void main(String[] args) {
  10. SpringApplication.run(ProbeApplication.class, args);
  11. }
  12. }
  1. 生产者:

  1. @Component
  2. public class Sender {
  3. private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
  4. @Autowired
  5. private KafkaTemplate<String, String> template;
  6. public void send(String message) {
  7. ListenableFuture<SendResult<String, String>> future = this.template.sendDefault(message);
  8. future.addCallback(success -> LOG.info("KafkaMessageProducer 发送消息成功!消息内容是:"+message),
  9. fail -> LOG.error("KafkaMessageProducer 发送消息失败!消息内容是:"+message));
  10. }
  11. }
  1. 消费者,将生产者发过来的消息进行处理

  1. @Component
  2. public class Receiver {
  3. @KafkaListener(topics = "probe2")
  4. public void receiveMessage(ConsumerRecord<String, String> record) {
  5. System.out.println("【*** 消费者开始接收消息 ***】key = " + record.key() + "、value = " + record.value());
  6. //TODO,在这里进行自己的业务操作,例如入库
  7. }
  8. }
  1. controller

  1. @RestController
  2. public class KafkaController {
  3. @Autowired
  4. private Sender sender;
  5. @PostMapping("/send/{msg}")
  6. public String send(@PathVariable("msg") String msg) {
  7. sender.send(msg);
  8. return msg;
  9. }
  10. }
  1. 启动springboot工程,然后访问相应的地址即可得到想要的结果

如果生产者的速度大于消费者的速度,可以适当调整生产者和消费者的数量来处理,同时不要在消费者进行太过于耗时的操作。

总结


本文主要分享了zookeeper的应用场景和节点特性、注册原理、zookeeper集群搭建和kafka集群搭建、zookeeper和kafka的SASL认证机制、在springboot中实操基于SASL认证的kafka。

参考:https://ohmycat.me/2019/05/08/kafka-with-zookeeper-authentication.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/983209
推荐阅读
相关标签
  

闽ICP备14008679号