赞
踩
实现对主题的订阅控制,动态分配客户端的读写权限。
创建writer用户,密码是writer-pwd:
./kafka-configs.sh --zookeeper 192.168.248.100:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=writer-pwd],SCRAM-SHA-512=[password=writer-pwd]' --entity-type users --entity-name writer
创建reader用户,密码是reader-pwd:
./kafka-configs.sh --zookeeper 192.168.248.100:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=reader-pwd],SCRAM-SHA-512=[password=reader-pwd]' --entity-type users --entity-name reader
创建admin用户,密码是admin:
./kafka-configs.sh --zookeeper 192.168.248.100:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin
查看一下writer用户的信息
./kafka-configs.sh --zookeeper 192.168.248.100:2181 --describe --entity-type users --entity-name writer
config底下添加配置文件kafka-broker-jaas.conf(这里配置admin用户用于实现broker间的通讯):
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
配置broker端的server.properties
# 启用ACL
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 设置本例中admin为超级用户
super.users=User:admin
# 启用SCRAM机制,采用SCRAM-SHA-512算法
sasl.enabled.mechanisms=SCRAM-SHA-512
# 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
# broker间通讯使用PLAINTEXT,本例中不演示SSL配置
security.inter.broker.protocol=SASL_PLAINTEXT
# 配置listeners使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://192.168.248.100:9092
# 配置advertised.listeners
advertised.listeners=SASL_PLAINTEXT://192.168.248.100:9092
kafka-server-start.sh脚本最后一行配置环境变量如下:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/kafka/kafka1/config/kafka-broker-jaas.conf kafka.Kafka "$@"
启动kafka
创建一个主题:
./kafka-topics.sh --create --zookeeper 192.168.248.100:2181 --topic test --partitions 1 --replication-factor 1
config底下创建writer用户生产配置文件producer.conf:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writer-pwd";
给writer写的权限:
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.248.100:2181 --add --allow-principal User:writer --operation Write --topic bigData
发送消息:
./kafka-console-producer.sh --broker-list 192.168.248.100:9092 --topic test --producer.config /opt/kafka/kafka/config/producer.conf
测试发送消息成功
config底下创建writer用户消费配置文件consumer.conf:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader-pwd";
设置reader用户的读权限:
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.248.100:2181 --add --allow-principal User:admin --operation Read --topic bigData
设置reader用户的读test-group消费组权限:
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.248.100:2181 --add --allow-principal User:admin --operation Read --group test-group
消费消息:
./kafka-console-consumer.sh --bootstrap-server 192.168.248.100:9092 --topic test --from-beginning --consumer.config /opt/kafka/kafka1/config/consumer.conf --group test-group
在allpication.yml加上配置,即可正常消费(前提是reader用户已赋予read权限),发送者也一样
spring:
kafka:
properties:
sasl.mechanism: SCRAM-SHA-512
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader-pwd";
package cn.BigData; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.Acknowledgment; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * 测试发送对象 * @author 天真热 * @create 2022-03-31 10:03 * @desc **/ @RestController public class Send { @Resource private KafkaTemplate<String, Object> kafkaTemplate; @KafkaListener(topics = "test", groupId = "test-group") public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { System.out.println("test:" + record.value()); //手动提交offset ack.acknowledge(); } @RequestMapping("/sendTest") public void sendTest() { kafkaTemplate.send("test", "testtesttesttest"); } }
由于我们给kafka配置的Acl权限,所有对于kafka的操作都需要登录才可以进行
创建kafka.conf配置文件:因为admin配置了管理员权限,所以admin拥有最高权限。
KafkaClient{
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
ps补充:也可以直接在prop申明账户,这是后面发现的比较方便的,替代前一种方案
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin\";");
由于我们所有的操作都需要与kafka创建连接,所以我这里写了一个单例模式,创建一个与kafka的持久连接,方便提高效率
private static AdminClient client; /** * 获取AdminClient连接 * * @param bootstrapServers * @return */ public static AdminClient getAdminClient(String bootstrapServers) throws IOException { String resource = new ClassPathResource("/kafka.conf").getURL().getPath(); if (client == null) { synchronized (AdminClient.class) { if (client == null) { client = initAdminClient(bootstrapServers, resource); } } } return client; } /** * 初始化AdminClient * * @param bootstrapServers * @param resource * @return */ public static AdminClient initAdminClient(String bootstrapServers, String resource) { System.setProperty("java.security.auth.login.config", resource); //创建链接 Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("max.request.size", 8000000); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-512"); AdminClient client = AdminClient.create(props); return client;
kafka创建用户
/** * kafka创建用户 * * @param bootstrapServers * @param account * @param password */ public static void addUser(String bootstrapServers, String account, String password) throws ExecutionException, InterruptedException, IOException { //获取AdminClient连接 AdminClient adminClient = getAdminClient(bootstrapServers); //创建User列表 List<UserScramCredentialAlteration> alterations = new ArrayList<>(); //构造Scram认证机制信息 ScramCredentialInfo info = new ScramCredentialInfo(org.apache.kafka.clients.admin.ScramMechanism.SCRAM_SHA_512, 8192); //用户信息 UserScramCredentialAlteration userScramCredentialAdd = new UserScramCredentialUpsertion(account, info, password); //添加用户信息到集合 alterations.add(userScramCredentialAdd); //创建用户,并拿到返回结果 AlterUserScramCredentialsResult result = adminClient.alterUserScramCredentials(alterations); result.all().get(); }
kafka删除用户
/** * kafka删除用户 * * @param bootstrapServers * @param account */ public static void deleteUser(String bootstrapServers, String account) throws ExecutionException, InterruptedException, IOException { AdminClient adminClient = getAdminClient(bootstrapServers); //创建删除列表 List<UserScramCredentialAlteration> alterations = new ArrayList<>(); //构建删除用的UserScramCredentialAlteration UserScramCredentialAlteration userScramCredentialDel = new UserScramCredentialDeletion(account, org.apache.kafka.clients.admin.ScramMechanism.SCRAM_SHA_512); //添加认证信息到列表 alterations.add(userScramCredentialDel); //执行方法,并拿到返回结果 AlterUserScramCredentialsResult result = adminClient.alterUserScramCredentials(alterations); //阻塞等待结果完成 result.all().get(); }
kafka获取所有用户信息
/**
* 获取所有用户信息
*
* @throws ExecutionException
* @throws InterruptedException
*/
public static void describeAccount(String bootstrapServers) throws ExecutionException, InterruptedException {
AdminClient adminClient = getAdminClient(bootstrapServers);
//查询所有的账户,这也是默认方法
DescribeUserScramCredentialsResult result = adminClient.describeUserScramCredentials();
//执行方法,并拿到返回结果
java.util.Map<String, UserScramCredentialsDescription> future = result.all().get();
//输出
future.forEach((name, info) -> System.out.println("[ScramUserName:" + name + "]:[ScramUserInfo:" + info.toString() + "]"));
}
赋予用户消费组读权限
/** * 给用户赋予消费组读权限 * * @param bootstrapServers * @param account * @param consumerGroup */ public static void addGroupReadAcl(String bootstrapServers, String account, String consumerGroup) throws IOException, ExecutionException, InterruptedException { //获取AdminClient连接 AdminClient adminClient = getAdminClient(bootstrapServers); //绑定消费组 ResourcePattern resourcePatternGroup = new ResourcePattern(ResourceType.GROUP, consumerGroup, PatternType.LITERAL); //绑定用户、权限 AccessControlEntry accessControlEntryRead = new AccessControlEntry("User:" + account, "*", AclOperation.READ, AclPermissionType.ALLOW); //绑定用户、权限、消费组 AclBinding aclBindingGroup = new AclBinding(resourcePatternGroup, accessControlEntryRead); Collection<AclBinding> aclBindingCollection = new ArrayList<>(); aclBindingCollection.add(aclBindingGroup); //添加到集合 CreateAclsResult aclResult = adminClient.createAcls(aclBindingCollection); KafkaFuture<Void> result = aclResult.all(); result.get(); //执行 }
移除用户消费组读权限
/** * 移除消费组读权限 * * @param bootstrapServers * @param account * @param consumerGroup * @throws IOException * @throws ExecutionException * @throws InterruptedException */ public static void deleteGroupReadAcl(String bootstrapServers, String account, String consumerGroup) throws IOException, ExecutionException, InterruptedException { //获取AdminClient连接 AdminClient adminClient = getAdminClient(bootstrapServers); //绑定消费组 ResourcePatternFilter resourcePatternFilterGroup = new ResourcePatternFilter(ResourceType.GROUP, consumerGroup, PatternType.LITERAL); //绑定用户、权限 AccessControlEntryFilter accessControlEntryFilter = new AccessControlEntryFilter("User:" + account, "*", AclOperation.READ, AclPermissionType.ALLOW); //绑定用户、权限、消费组 AclBindingFilter aclBindingGroup = new AclBindingFilter(resourcePatternFilterGroup, accessControlEntryFilter); Collection<AclBindingFilter> aclBindingCollection = new ArrayList<>(); aclBindingCollection.add(aclBindingGroup);//添加到集合 DeleteAclsResult aclResult = adminClient.deleteAcls(aclBindingCollection); KafkaFuture<Collection<AclBinding>> result = aclResult.all(); result.get(); }
赋予用户主题读/写权限
/** * 给用户赋予主题读/写权限 * @param bootstrapServers * @param topic * @param account * @param acl * @throws IOException * @throws ExecutionException * @throws InterruptedException */ public static void addTopicReadOrWriterAcl(String bootstrapServers, String topic, String account, AclOperation acl) throws IOException, ExecutionException, InterruptedException { //获取AdminClient连接 AdminClient adminClient = getAdminClient(bootstrapServers); //绑定主题 ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL); //绑定用户和权限 AccessControlEntry accessControlEntryRead = new AccessControlEntry("User:" + account, "*", acl, AclPermissionType.ALLOW); //绑定用户、主题、权限 AclBinding aclBindingRead = new AclBinding(resourcePattern, accessControlEntryRead); //赋予权限 Collection<AclBinding> aclBindingCollection = new ArrayList<>(); aclBindingCollection.add(aclBindingRead); //添加到集合 CreateAclsResult aclResult = adminClient.createAcls(aclBindingCollection); KafkaFuture<Void> result = aclResult.all(); result.get(); //执行 }
移除用户主题读/写权限
/** * 移除用户主题读/写权限 * * @param bootstrapServers * @param topic * @param account */ public static void deleteTopicReadOrWriterAcl(String bootstrapServers, String topic, String account) throws IOException, ExecutionException, InterruptedException { //获取AdminClient连接 AdminClient adminClient = getAdminClient(bootstrapServers); //绑定主题 ResourcePatternFilter resourcePatternFilter = new ResourcePatternFilter(ResourceType.TOPIC, topic, PatternType.LITERAL); //绑定用户、权限 AccessControlEntryFilter accessControlEntryFilter = new AccessControlEntryFilter("User:" + account, "*", AclOperation.WRITE/READ, AclPermissionType.ALLOW); //绑定用户、主题、权限 AclBindingFilter aclBinding = new AclBindingFilter(resourcePatternFilter, accessControlEntryFilter); //移除权限 Collection<AclBindingFilter> aclBindingCollection = new ArrayList<>(); aclBindingCollection.add(aclBinding); DeleteAclsResult aclResult = adminClient.deleteAcls(aclBindingCollection); KafkaFuture<Collection<AclBinding>> result = aclResult.all(); result.get(); }
批量移除主题读写权限
/** * 批量移除主题读写权限 * * @param bootstrapServers * @param kafkaAuthoritys */ public static void batchDeleteTopicReadOrWriterAcl(String bootstrapServers, List<KafkaAuthority> kafkaAuthoritys) throws IOException, ExecutionException, InterruptedException { //获取AdminClient连接 AdminClient adminClient = getAdminClient(bootstrapServers); adminClient.close(); //移除权限集合 Collection<AclBindingFilter> aclBindingCollection = new ArrayList<>(); for (KafkaAuthority kafkaAuthority : kafkaAuthoritys) { //绑定主题 ResourcePatternFilter resourcePatternFilter = new ResourcePatternFilter(ResourceType.TOPIC, kafkaAuthority.getTopic(), PatternType.LITERAL); //绑定用户、权限 AccessControlEntryFilter accessControlEntryFilter = new AccessControlEntryFilter("User:" + kafkaAuthority.getAccount(), "*", AclOperation.WRITE/READ(这里可存表,然后由kafkaAuthority动态获取), AclPermissionType.ALLOW); //绑定用户、主题、权限 AclBindingFilter aclBinding = new AclBindingFilter(resourcePatternFilter, accessControlEntryFilter); //加入集合 aclBindingCollection.add(aclBinding); }
查看所有用户的所有权限
/** * 账户权限 * * @throws ExecutionException * @throws InterruptedException */ public static void describeAllAcl() throws ExecutionException, InterruptedException { AdminClient adminClient = getAdminClient(); DescribeAclsResult result = adminClient.describeAcls(AclBindingFilter.ANY); try { Collection<AclBinding> gets = result.values().get(); for (AclBinding get : gets) { System.out.println(get.pattern().name()); System.out.println(get.pattern().patternType()); System.out.println(get.pattern().resourceType()); System.out.println(get.entry().principal()); System.out.println(get.entry().permissionType()); System.out.println(get.entry().operation()); System.out.println("-------------------------"); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。