当前位置:   article > 正文

JAVA代码中使用kafka生产者配置MD5密码的配置方法_java 读取 kafka 用户和密码

java 读取 kafka 用户和密码

最近有一个需求,是将一些信息发送给指定的kafka(也就是咱们作为生产者)。在发送的时候,需要提供用户名和密码接收方才能成功接收。如果密码是明文,我们通常设置配置如下:

  1. properties.put("security.protocol", "PLAINTEXT");
  2. properties.put("sasl.mechanism", "PLAIN");
  3. properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"123456789\";");

但是这次我们使用的密码是为MD5加密的,所以配置改为:

  1. properties.put("security.protocol", "SASL_PLAINTEXT");
  2. properties.put("sasl.mechanism", "SCRAM-SHA-512");
  3. properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"7ed7845e45765fb75b045273e044cd21\";");

这种配置方法亲测有效,完整的kafka生产者代码如下:

  1. public void sendKafka(String str) {
  2. String BROKERS = "134.108.84.210:8423,134.108.84.94:8423,134.108.85.65:8423";//公共接入点地址
  3. Properties properties = new Properties();
  4. properties.put("bootstrap.servers", BROKERS);
  5. properties.put("acks", "1");// -1 all ack;1 leader ack;0 none ack
  6. properties.put("retries", 3);
  7. properties.put("batch.size", 262144);
  8. properties.put("linger.ms", 10);
  9. properties.put("buffer.memory", 67108864);
  10. properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  11. properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  12. properties.put("client.id", "producer-olt_" + System.currentTimeMillis());
  13. properties.put("group.id","zj_olt_alarm");
  14. properties.put("security.protocol", "SASL_PLAINTEXT");
  15. properties.put("sasl.mechanism", "SCRAM-SHA-512");
  16. properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"7ed7845e45765fb75b045273e044cd21\";");
  17. //构造Producer对象,注意,该对象是线程安全的
  18. //一般来说,一个进程内一个Producer对象即可,如果想提高性能,可构造多个对象,但最好不要超过5个
  19. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
  20. try {
  21. ProducerRecord<String, String> record = new ProducerRecord<String, String>("TOPIC",str);
  22. producer.send(record, new Callback() {
  23. @Override
  24. public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
  25. if (exception != null) {
  26. //todo log and retry
  27. System.out.println("================================================");
  28. System.out.printf("send exception. %s%n", exception.getMessage());
  29. return;
  30. }
  31. //todo business process...
  32. System.out.println("================================================");
  33. System.out.printf("send message success. topic=%s,offset=%s,partition=%d %n",
  34. recordMetadata.topic(), recordMetadata.offset(), recordMetadata.partition());
  35. }
  36. });
  37. // System.out.println("推送成功");
  38. } catch (Exception e) {
  39. e.printStackTrace();
  40. logger.error("{kafka发送失败}",e);
  41. }
  42. }

以上方法提供参考,如有错误还请指出,共同进步

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/920374
推荐阅读
相关标签
  

闽ICP备14008679号