赞
踩
一. Kafka 简介
什么是 Kafka?
Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,并于 2011 年开源。它用于构建实时数据管道和流应用,能够处理和分析流数据。
Kafka 的核心概念
Producer(生产者):发送消息到 Kafka 主题。
Consumer(消费者):从 Kafka 主题中读取消息。
Broker(代理):Kafka 服务器,负责接收和存储消息。
Topic(主题):消息分类的逻辑单元。
Partition(分区):主题的物理分区,便于并行处理。
Offset(偏移量):每条消息在分区中的唯一标识符。
二、Spring Boot 2.7.0 集成 Kafka
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
spring: kafka: bootstrap-servers: localhost:9092 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: default-group # 默认的消费者组 consumer-group-1: group-id: group1 topic: topic1 consumer-group-2: group-id: group2 topic: topic2 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer template: default-topic: my-topic
Kafka 配置类
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import java.util.HashMap; import java.util.Map; @EnableKafka @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.key-deserializer}") private String keyDeserializer; @Value("${spring.kafka.consumer.value-deserializer}") private String valueDeserializer; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean(name = "kafkaListenerContainerFactoryGroup1") public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryGroup1( @Value("${spring.kafka.consumer-group-1.group-id}") String groupId) { return createFactory(groupId); } @Bean(name = "kafkaListenerContainerFactoryGroup2") public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryGroup2( @Value("${spring.kafka.consumer-group-2.group-id}") String groupId) { return createFactory(groupId); } private ConcurrentKafkaListenerContainerFactory<String, String> createFactory(String groupId) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); Map<String, Object> props = new HashMap<>(consumerConfigs()); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); return factory; } }
auto.offset.reset 配置选项
earliest:当没有初始偏移量或者当前偏移量超出范围时,消费者将从最早的可用数据开始读取。这通常用于新消费者加入时,从头开始读取所有历史数据。
latest:当没有初始偏移量或者当前偏移量超出范围时,消费者将从最新的数据开始读取。这通常用于新消费者加入时,只读取从现在开始的数据。
none:如果消费者没有找到当前偏移量或偏移量超出范围,则会抛出异常。这要求消费者必须有有效的偏移量。
anything else:其他值将导致消费者抛出异常。
Kafka 消费者服务
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumerService { @KafkaListener(topics = "${spring.kafka.consumer-group-1.topic}", containerFactory = "kafkaListenerContainerFactoryGroup1") public void listenGroup1(String message) { System.out.println("Received message in group1: " + message); } @KafkaListener(topics = "${spring.kafka.consumer-group-2.topic}", containerFactory = "kafkaListenerContainerFactoryGroup2") public void listenGroup2(String message) { System.out.println("Received message in group2: " + message); } }
创建 Kafka 生产者
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class KafkaProducerController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @GetMapping("/send") public String sendMessage(@RequestParam("message") String message) { kafkaTemplate.send("my-topic", message); return "Message sent to Kafka topic"; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。