赞
踩
spring boot 集成kafka是比较简单的 直接引入spring-kafka的包 然后稍作配置即可
添加 Kafka 依赖
在 pom.xml 文件中添加 Kafka 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置 Kafka 属性
在 application.properties 或 application.yml 文件中添加 Kafka 配置属性:
# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092
# 消费者配置
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 生产者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
创建测试消费者
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consume(String message) {
System.out.println("Received message: " + message);
}
}
这里创建了一个 KafkaConsumer 类,使用 @KafkaListener 注解来监听名为 “my-topic” 的主题。当收到消息时,consume 方法会被调用,并打印出收到的消息。
创建 Kafka 生产者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
这里创建了一个 KafkaProducer 类,通过注入 KafkaTemplate 来发送消息。send 方法接受主题和消息作为参数,并将消息发送到指定的主题。
测试代码
import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class KafkaTests { @Autowired private KafkaProducer producer; @Test public void testProduceAndConsume() throws InterruptedException { String topic = "my-topic"; String message = "Hello, Kafka!"; producer.send(topic, message); // 等待消息被消费 Thread.sleep(5000); } }
官方地址: https://docs.spring.io/spring-kafka/docs/2.8.11/reference/html/#getting-started
PLAINTEXT
这是 Kafka 默认的协议,不提供任何认证机制。所有客户端都可以不经认证就连接到 Kafka 集群。
在生产环境中,强烈不建议使用这种协议,因为它不提供任何安全保障。
SSL
SSL 提供了一种加密通信的方式,可以保护客户端和服务器之间的数据传输。
SSL 需要在 Kafka 代理(Broker)和客户端之间建立信任关系,通常是通过 SSL 证书来实现的。
SSL 可以单独使用,也可以与其他认证协议(如 SASL)结合使用,提供更高级别的安全性。
SASL/PLAIN
SASL/PLAIN 是一种基于用户名和密码的简单认证机制。
客户端需要提供有效的用户名和密码才能连接到 Kafka 集群。
SASL/PLAIN 通常与 SSL 协议结合使用,以防止明文传输用户名和密码。
SASL/SCRAM
SASL/SCRAM 是一种更安全的基于挑战-响应的认证机制。
SASL/SCRAM 支持两种变体:SCRAM-SHA-256 和 SCRAM-SHA-512。
SCRAM 使用 HMAC 和随机数来对密码进行哈希计算,而不会在网络上传输明文密码。
SCRAM 通常也与 SSL 结合使用,以提供更高的安全性。
SASL/GSSAPI (Kerberos)
SASL/GSSAPI 是基于 Kerberos 的认证协议。
Kerberos 是一种广泛使用的网络认证协议,提供了集中式的认证和授权服务。
Kerberos 使用票据(Ticket)系统来验证客户端身份,而不是直接传输密码。
Kerberos 认证通常被认为是企业级环境中最安全的认证方式之一。
SASL/OAUTHBEARER
SASL/OAUTHBEARER 是一种使用 OAuth 2.0 访问令牌(Access Token)进行认证的协议。
OAuth 2.0 是一种广泛使用的授权框架,常用于授权第三方应用访问用户数据。
SASL/OAUTHBEARER 需要客户端获取有效的 OAuth 2.0 访问令牌,并在连接 Kafka 时提供该令牌进行认证。
kafka的server端开放了 SASL_PLAINTEXT 认证 同时使用了 SCRAM-SHA-256
yaml配置
spring: application: name: xxxx kafka: bootstrap-servers: 192.168.xxxx:9092,192.168.xxx:9092,192.168.xxx:9092 security: #指定安全协议 protocol: SASL_PLAINTEXT properties: # SCRAM 加密方法 256还是512 sasl.mechanism: SCRAM-SHA-256 # username password 为在server端新建的用户 ## 注意 最后边有个; sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx"; producer: value-serializer: org.apache.kafka.common.serialization.StringSerializer
配置好后 在使用 测试类进行验证
会出现如下log 就是配置成功了
2024-03-21 15:59:20.306 INFO [recommend,,] 84510 --- [ main] o.a.k.c.t.i.KafkaMetricsCollector : initializing Kafka metrics collector
2024-03-21 15:59:20.338 INFO [recommend,,] 84510 --- [ main] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
2024-03-21 15:59:20.372 INFO [recommend,,] 84510 --- [ main] o.a.k.c.s.authenticator.AbstractLogin : Successfully logged in.
2024-03-21 15:59:20.400 INFO [recommend,,] 84510 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.7.0
2024-03-21 15:59:20.400 INFO [recommend,,] 84510 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: xxxxx524ed625438c5
2024-03-21 15:59:20.401 INFO [recommend,,] 84510 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1711007960400
the end !!!
good day !!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。