赞
踩
在Spring Boot 中使用Kafka :
首先,在 pom.xml
文件中添加 Kafka 相关的依赖。通常使用 spring-kafka
提供的依赖来集成 Kafka。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
请确保 ${spring-kafka.version}
是你所需的 Spring Kafka 版本。
在 application.properties
或 application.yml
中配置 Kafka 的连接信息:
spring.kafka.bootstrap-servers=your-kafka-server:9092
spring.kafka.consumer.group-id=my-group-id
可以根据你的需要配置更多的 Kafka 相关属性,如序列化器、反序列化器等。
使用 Spring Boot 创建 Kafka 生产者的步骤如下:
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-server:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // 如果发送 JSON 对象 return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
在上述配置中,producerFactory
方法设置了 Kafka 生产者的配置,包括序列化器。kafkaTemplate
方法创建了一个 KafkaTemplate 用于发送消息。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducerService { private static final String TOPIC = "your-topic-name"; @Autowired private KafkaTemplate<String, Object> kafkaTemplate; public void sendMessage(Object message) { kafkaTemplate.send(TOPIC, message); } }
在 KafkaProducerService
中,通过 KafkaTemplate
发送消息到指定的 TOPIC
。
使用 Spring Boot 创建 Kafka 消费者的步骤如下:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-server:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); // 如果消费 JSON 对象 return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new ErrorHandlingDeserializer<>(new JsonDeserializer<>(Object.class))); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
在上述配置中,consumerFactory
方法设置了 Kafka 消费者的配置,包括反序列化器。kafkaListenerContainerFactory
方法创建了一个 ConcurrentKafkaListenerContainerFactory
用于监听 Kafka 消息。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "your-topic-name", groupId = "my-group-id")
public void listen(Object message) {
System.out.println("Received Message: " + message.toString());
// 在这里处理接收到的消息逻辑
}
}
在 KafkaConsumerService
中,通过 @KafkaListener
注解监听指定的 TOPIC
,并处理接收到的消息。
现在,你可以在你的 Spring Boot 应用程序中通过 KafkaProducerService
发送消息,通过 KafkaConsumerService
接收和处理消息了。确保在启动应用程序时 Kafka 已经正常运行,并且配置了正确的连接信息和主题名称。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。