赞
踩
添加依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.3</version>
</dependency>
添加spring-kafka相关配置:
#=============== 集群通用配置 ================ spring.kafka.bootstrap-servers=kafka-dyskevxt-headless.kafka-uat.svc.xke.test.xdf.cn:29092 spring.kafka.security.protocol=SASL_PLAINTEXT spring.kafka.properties.sasl.mechanism=PLAIN spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="zhurunhua" password="pwd"; #=============== producer ================= spring.kafka.producer.retries=5 # 每次批量发送消息的数量 spring.kafka.producer.batch-size=1000 spring.kafka.producer.buffer-memory=1000000 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ================== spring.kafka.consumer.group-id=zhurunhua-test-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
消费者:
@Component
public class TestPlainConsumer {
@KafkaListener(topics = {"zhurunhua-test-topic"})
public void consumer(ConsumerRecord<String, String> record) {
System.out.println(record.value());
}
}
topic可以从配置文件读取,代码中通过${}的方式获取配置的topic:
@Component
public class TestPlainConsumer {
@KafkaListener(topics = {"${kafka.subscribe.topic}"})
public void consumer(ConsumerRecord<String, String> record) {
System.out.println(record.value());
}
}
生产者:
@Component public class TestPlainProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${kafka.subscribe.topic}") private String topic; public void send(String message) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable throwable) { System.err.println(throwable); } @Override public void onSuccess(SendResult<String, String> sendResult) { System.out.println("发送结果:" + sendResult); } }); } }
引入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
<scope>compile</scope>
</dependency>
客户端代码
public class TestSaslClient { private final static String TOPIC = "zhurunhua-test-topic"; private final static String BROKERS = "kafka-dyskevxt-headless.kafka-uat.svc.xke.test.xdf.cn:29092"; private static KafkaConsumer<String, String> consumer; private static KafkaProducer<String, String> producer; static { Properties c = initConfig(); consumer = new KafkaConsumer<>(c); producer = new KafkaProducer<>(c); } /** * 初始化配置 * * @return java.util.Properties * @date 2023/04/17 5:51 下午 */ public static Properties initConfig() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "zhurunhua-test-1"); // SASL/PLAIN 认证配置 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT); props.put(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM); props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"zhurunhua\" password=\"pwd\";"); // 可选设置属性 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 自动提交offset,每1s提交一次 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); return props; } /** * 消费者示例 * * @date 2023/04/17 5:51 下午 */ public void subscribe() { consumer.subscribe(Collections.singleton(TOPIC)); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } } } catch (Exception e) { System.out.println("consumer error : " + e); } finally { consumer.close(); } } /** * 生产者示例 * * @date 2023/04/17 5:52 下午 */ public void send() { ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, 0, "zhurunhua", "测试消息"); producer.send(record, (recordMetadata, e) -> { if (Objects.nonNull(e)) { System.out.println("send error: " + e); } else { System.out.println(recordMetadata); } }); } }
更多安全和认证相关资料,参考:https://kafka.apache.org/documentation/#security_overview
若使用ssl,相关配置示例:
spring.kafka.ssl.trust-store-type=JKS
spring.kafka.ssl.key-store-type=JKS
spring.kafka.ssl.protocol=SSL
spring.kafka.ssl.key-store-password=rxu4G5kPyAqTkERk
spring.kafka.ssl.trust-store-password=rxu4G5kPyAqTkERk
spring.kafka.ssl.key-store-location=/kafka-jks/keystore.jks
spring.kafka.ssl.trust-store-location=/kafka-jks/keystore.jks
使用不同的sasl.mechanism时,需要注意sasl.jaas.config的配置中,LoginModule不同:
kafka认证的参数相对来说比较复杂,需要理解每个参数的含义,错一个就会失败,启动日志会打印客户端配置,可用于协助排查问题
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。