赞
踩
Kafka中需要加上认证,并动态新增用户,SASL/SCRAM验证可以支持
本文章是对https://blog.csdn.net/qq_38616503/article/details/117529690中的内容整理与重新记录
第一步,在没有设置任何权限的配置下启动Kafka和Zookeeper,如需要从头安装Kafka,可参见Kafka的安装单机安装以及集群安装
(1)创建broker通信用户:admin(在使用sasl之前必须先创建,否则启动报错)
bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-sec],SCRAM-SHA-512=[password=admin-sec]' --entity-type users --entity-name admin
(2)创建生产用户producer
bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=prod-sec],SCRAM-SHA-512=[password=prod-sec]' --entity-type users --entity-name producer
(2)创建消费用户:consumer
bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=cons-sec],SCRAM-SHA-512=[password=cons-sec]' --entity-type users --entity-name consumer
SCRAM-SHA-256/SCRAM-SHA-512是对密码加密的算法,二者有其一即可
bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --describe --entity-type users --entity-name consumer
bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --describe --entity-type users --entity-name producer
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name producer
在用户证书创建完毕之后开始Kafka服务端的配置
(1)创建JAAS文件:
cat > kafka_server_jaas.conf << EOF
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-sec";
};
EOF
(2)将JAAS配置文件位置作为JVM参数传递给每个Kafka Broker【bin/kafka-server-start.sh】添加-Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf
-Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf kafka.Kafka "$@"
(3)配置server.properties【config/server.properties】
#认证配置
listeners=SASL_PLAINTEXT://IP:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
advertised.listeners=SASL_PLAINTEXT://IP:9092
#ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
可以根据自己的需求选择SASL_SSL或SASL_PLAINTEXT, PLAINTEXT为不加密明文传输,性能好一点。配置完后重启Kafka和Zookeeper
(1)创建的三个用户的三个JAAS文件:
kafka_client_scram_admin_jaas.conf
kafka_client_scram_producer_jaas.conf
kafka_client_scram_consumer_jaas.conf
cat > kafka_client_scram_admin_jaas.conf << EOF KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-sec"; }; EOF cat > kafka_client_scram_producer_jaas.conf << EOF KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="prod-sec"; }; EOF cat > kafka_client_scram_consumer_jaas.conf << EOF KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="cons-sec"; }; EOF
(2)修改启动脚本引入JAAS文件
###生产者配置bin/kafka-console-producer.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_client_scram_producer_jaas.conf
###消费者配置bin/kafka-console-consumer.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_client_scram_consumer_jaas.conf
(3)配置consumer.properties和producer.properties,都加入以下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
bootstrap.servers=192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092
(4)创建主题
bin/kafka-topics.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --create --topic topictest --partitions 3 --replication-factor 1
(5)启动生产
bin/kafka-console-producer.sh --broker-list 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092 --topic topictest --producer.config config/producer.properties
(6)对生产者赋予写的权限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --add --allow-principal User:producer --operation Write --topic topictest
(7)查看权限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --list
(8)对消费者赋予读的权限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --add --allow-principal User:consumer --operation Read --topic topictest
(9)对消费者赋予组的权限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --add --allow-principal User:consumer --operation Read --group test-consumer-group
(10)启动消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092 --topic topictest --from-beginning --consumer.config config/consumer.properties
maven的pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
<!-- <version>0.10.2.0</version> -->
</dependency>
kafka_client_scram_producer_jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="producer"
password="prod-sec";
};
代码:
import java.util.Properties; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class MySaslScramProducer { public static MySaslScramProducer ins ; private Producer<String, String> producer; private MySaslScramProducer(){ Properties props = new Properties(); props.put("bootstrap.servers","192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092"); props.put("acks", "1"); props.put("retries", 3); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); //props.put("compression.type","gzip"); //props.put("max.request.size","5242880"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //配置文件采用项目相对路径访问,plan-text鉴权将以下注解开放即可 System.out.println(MySaslScramProducer.class.getResource("/").getPath() + "kafka_client_scram_producer_jaas.conf"); System.setProperty("java.security.auth.login.config", MySaslScramProducer.class.getResource("/").getPath() + "kafka_client_scram_producer_jaas.conf"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-256"); producer = new KafkaProducer<>(props); } public static MySaslScramProducer getIns(){ if(ins == null) { synchronized (MySaslScramProducer.class) { if(ins == null) { ins = new MySaslScramProducer(); } } } return ins; } public Future<RecordMetadata> send(String topic, String valueStr){ //采用异步发送,在失败时打印出失败的日志,备核查 Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>(topic, valueStr), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null) { //发送失败的打印出来到error.log System.out.println("sendi failed--->>> " + valueStr); }else { System.out.println("topic:" + metadata.topic() + " ,partition:" +metadata.partition() +" , offset:" + metadata.offset() + " -> " + valueStr); } } }); return meta; } public void close(){ if(producer != null) producer.close(); } public static void main(String[] args) throws InterruptedException { String valueStr = "{\"metric\":\"host.mem.pused\",\"value\":\"97.781098\",\"tags\":{\"resCi\":\"TA_RES_PHYSICALHOST\",\"dataType\":0,\"ip\":\"132.121.93.69\",\"cmd\":\"\",\"resId\":\"auto217A77657DDC70403B949090D3EA5543\",\"itemKey\":\"vm.memory.size[pavailable]\"},\"timestamp\":\"1617673320000\"}"; MySaslScramProducer.getIns().send("topictest", valueStr); MySaslScramProducer.getIns().close(); } }
kafka_client_scram_consumer_jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="consumer"
password="cons-sec";
};
代码:
package cn.gzsendi; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SaslScramTopicTest { public static boolean stop = false; private static Logger logger = LoggerFactory.getLogger(SaslScramTopicTest.class); public static void main(String[] args) { KafkaConsumer<String, String> consumer = null; Properties props = new Properties(); props.put("bootstrap.servers","192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092"); props.put("group.id", "liutest"); props.put("enable.auto.commit", "true"); // 自动提交 props.put("auto.offset.reset", "latest"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "300000"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); System.setProperty("java.security.auth.login.config", SaslScramTopicTest.class.getResource("/").getPath() + "kafka_client_scram_consumer_jaas.conf"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-256"); consumer = new KafkaConsumer<>(props); String topicName = "topictest"; consumer.subscribe(Arrays.asList(topicName)); while (!stop) { try { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { String valueStr = record.value(); try { logger.info(valueStr); logger.info("topic:" + record.topic() +" ,partition:" + record.partition() + " ,offset:" +record.offset() + " -> " + record.value()); } catch (Exception e) { System.out.println("error------->>> " + valueStr); } } } catch (Exception e) { e.printStackTrace(); } } if (consumer != null) consumer.close(); } /** * * <跳过历史数据,从最新的数据开始消费> * * @param consumer * @throws */ public static void assignOffset(KafkaConsumer<String, String> consumer) { if (consumer == null) { return; } Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<TopicPartition, OffsetAndMetadata>(); consumer.poll(100); Set<TopicPartition> assignment = consumer.assignment(); consumer.seekToEnd(assignment); //consumer.seekToBeginning(assignment); for (TopicPartition topicPartition : assignment) { long position = consumer.position(topicPartition); offsetMap.put(topicPartition, new OffsetAndMetadata(position)); consumer.commitSync(offsetMap); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。