当前位置:   article > 正文

kafka开启sasl认证_sasl.server.callback.handler.class

sasl.server.callback.handler.class

SASL/PLAIN认证机制开启方法:

  1. 修改Kafka配置文件 server.properties 或者其它名字(CDH和Ambari的发行版可能文件名有所差异)
  1. sasl.enabled.mechanisms = PLAIN
  2. sasl.mechanism.inter.broker.protocol = PLAIN
  3. security.inter.broker.protocol = SASL_PLAINTEXT
  4. listeners = SASL_PLAINTEXT://localhost:9092
  1. 编辑 kafka_jaas.conf
  1. KafkaServer {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. # broker之间互相认证的凭据
  4. username="admin"
  5. password="admin-secret"
  6. # 以下是用户信息
  7. user_admin="admin-secret" # 用户admin的密码是admin-secret
  8. user_alice="alice-secret"; # 用户alice的密码是alice-secret
  9. };

启动kafka的时候把 kafka_jaas.conf 的路径当作命令行参数传入:

-Djava.security.auth.login.config=/path/to/kafka_jaas.conf

ACL授权机制开启方法:

编辑Kafka配置文件,添加如下内容:

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

以上的配置方案除了没有使用SSL加密之外,还存在一个严重的缺陷:用户信息是通过静态配置文件的方式存储的,当对用户信息进行添加、删除和修改的时候都需要重启Kafka集群,而我们知道,作为消息中间件,Kafka的上下游与众多组件相连,重启可能造成数据丢失或重复,Kafka应当尽量避免重启。

解决方案

还好,Kafka允许用户为SASL/PLAIN认证机制提供了自定义的回调函数,根据KIP-86,如果不希望采用静态配置文件存储用户认证信息的话,只需要编写一个实现了 AuthenticateCallbackHandler 接口的类,然后在配置文件中指明这个类即可,指明的方法为在Kafka配置文件中添加如下内容:

listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.example.MyCallbackHandler

先看下 AuthenticateCallbackHandler 这个接口的默认实现,可以在github上看到:

 

  1. public class PlainServerCallbackHandler implements AuthenticateCallbackHandler {
  2. private static final String JAAS_USER_PREFIX = "user_";
  3. private List<AppConfigurationEntry> jaasConfigEntries;
  4. @Override
  5. public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
  6. this.jaasConfigEntries = jaasConfigEntries;
  7. }
  8. @Override
  9. public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
  10. String username = null;
  11. for (Callback callback: callbacks) {
  12. if (callback instanceof NameCallback)
  13. username = ((NameCallback) callback).getDefaultName();
  14. else if (callback instanceof PlainAuthenticateCallback) {
  15. PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
  16. boolean authenticated = authenticate(username, plainCallback.password());
  17. plainCallback.authenticated(authenticated);
  18. } else
  19. throw new UnsupportedCallbackException(callback);
  20. }
  21. }
  22. protected boolean authenticate(String username, char[] password) throws IOException {
  23. if (username == null)
  24. return false;
  25. else {
  26. String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries,
  27. JAAS_USER_PREFIX + username,
  28. PlainLoginModule.class.getName());
  29. return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray());
  30. }
  31. }
  32. @Override
  33. public void close() throws KafkaException {
  34. }
  35. }

这个接口有三个方法需要实现,分别是configure()、handle()和close(),configure() 进行初始化配置,close() 关闭打开的相关资源,handle() 处理用户的认证请求。我采用MySQL来存储用户信息,因此可以满足动态增删用户的需求,大概的实现为:

  1. public class MySQLAuthCallback implements AuthenticateCallbackHandler {
  2. private static final Logger log = LoggerFactory.getLogger("plugin");
  3. private DruidDataSource dataSource = null;
  4. public MySQLAuthCallback() {
  5. this.dataSource = new DruidDataSource();
  6. this.dataSource.setDriverClassName("com.mysql.jdbc.Driver");
  7. this.dataSource.setUrl("jdbc:mysql://localhost:3306/kafka_user_info");
  8. this.dataSource.setUsername("kafka");
  9. this.dataSource.setPassword("kafka");
  10. this.dataSource.setInitialSize(5);
  11. this.dataSource.setMinIdle(1);
  12. this.dataSource.setMaxActive(10);
  13. this.dataSource.setPoolPreparedStatements(false);
  14. }
  15. public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
  16. }
  17. public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
  18. String username = null;
  19. for (Callback callback: callbacks) {
  20. if (callback instanceof NameCallback) {
  21. username = ((NameCallback) callback).getDefaultName();
  22. }
  23. else if (callback instanceof PlainAuthenticateCallback) {
  24. PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
  25. boolean authenticated = authenticate(username, plainCallback.password());
  26. plainCallback.authenticated(authenticated);
  27. } else
  28. throw new UnsupportedCallbackException(callback);
  29. }
  30. }
  31. protected boolean authenticate(String username, char[] password) {
  32. return userExists(username, new String(password));
  33. }
  34. private boolean userExists(String username, String password) {
  35. Connection conn = null;
  36. PreparedStatement statement = null;
  37. ResultSet resultSet = null;
  38. try {
  39. conn = getConnection();
  40. statement = conn.prepareStatement("select * from users where username=? and password=?");
  41. statement.setString(1, username);
  42. statement.setString(2, password);
  43. resultSet = statement.executeQuery();
  44. while (resultSet.next()) {
  45. boolean b = username.equals(resultSet.getString("username")) &&
  46. password.equals(resultSet.getString("password"));
  47. log.info("user {} authentication status: {}.", username, b);
  48. return b;
  49. }
  50. } catch (SQLException e) {
  51. log.info("sql exception occurred: {}", e.getMessage());
  52. } finally {
  53. // 按照打开顺序逆序关闭打开的资源
  54. }
  55. }
  56. return false;
  57. }
  58. private Connection getConnection() throws SQLException {
  59. // 通过datasource获取connection,省略了try/catch
  60. synchronized(dataSource) {
  61. return dataSource.getConnection();
  62. }
  63. }
  64. public void close() throws KafkaException {
  65. }
  66. }

自定义类编写完成后后,将jar包拷贝到每个broker的CLASSPATH下,比如kafka的libs目录下。在MySQL中插入几条用户信息,然后尝试以这些用户的身份来连接Kafka(为方便起见,测试阶段可以先不要开启ACL),我们会发现,MySQL中的用户可以连接,而写在配置文件中的用户无法连接,说明这个插件达到了预期的效果,增删用户不需要重启了

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/981607
推荐阅读
相关标签
  

闽ICP备14008679号