赞
踩
<!-- spring支持的kafka版本是3.1.2 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
可以看到自动添加了spring-tx依赖、kafka-clients依赖
查看spring-boot-autoconfigure-2.7.5.jar的KafkaAutoConfiguration,如下所示,可以看到:
package org.springframework.boot.autoconfigure.kafka; ......省略部分...... @AutoConfiguration @ConditionalOnClass({KafkaTemplate.class}) @EnableConfigurationProperties({KafkaProperties.class}) // 虽然导入了KafkaAnnotationDrivenConfiguration,但是要添加@EnableKafka注解生效 @Import({KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class}) public class KafkaAutoConfiguration { ......省略部分...... @Bean @ConditionalOnMissingBean({KafkaTemplate.class}) public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) { ......省略部分...... } ......省略部分...... @Bean @ConditionalOnMissingBean({ConsumerFactory.class}) public DefaultKafkaConsumerFactory<?, ?> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) { ......省略部分...... } @Bean @ConditionalOnMissingBean({ProducerFactory.class}) public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) { ......省略部分...... } ......省略部分...... @Bean @ConditionalOnMissingBean public KafkaAdmin kafkaAdmin() { ......省略部分...... } ......省略部分...... }
查看KafkaProperties配置类,是由spring.kafka开头的配置进行配置的
......省略部分......
@ConfigurationProperties(
prefix = "spring.kafka"
)
public class KafkaProperties {
......省略部分......
}
配置文件如下。主要配置了bootstrap-server地址、producer的相关配置、consumer的相关配置
spring.application.name=springboot-kafka-test spring.kafka.bootstrap-servers=192.168.28.12:9092 # producer的配置 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 单位为字节B,这里设置的16KB spring.kafka.producer.batch-size=16384 # 所有分区缓冲区的大小,这里设置的32MB spring.kafka.producer.buffer-memory=33554432 # consumer的配置 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.group-id=springboot-kafka-test # 如果在kafka中找不到当前消费者的offset,则直接将offset重置为最新的 spring.kafka.consumer.auto-offset-reset=latest # 自动提交consumer的offset spring.kafka.consumer.enable-auto-commit=true # 自动提交consumer的offset的时间间隔,单位为毫秒 spring.kafka.consumer.auto-commit-interval=1000 # kafka事务的配置 spring.kafka.producer.transaction-id-prefix=kafka_tx.
说明如下::
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
参数配置使用json序列化器package com.hh.springboottest.myController; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.ExecutionException; @Slf4j @RestController public class KafkaSyncProducerController { private String kafkaTopic = "springboot-kafka-test"; @Autowired private KafkaTemplate<Integer, String> kafkaTemplate; @RequestMapping("/send/sync/{message}") public String syncSend(@PathVariable String message) { ListenableFuture<SendResult<Integer, String>> future = // 向partition = 0, key = 0发送消息 kafkaTemplate.send(kafkaTopic, 0, 0, message); try { // 同步方式获取发送结果 SendResult<Integer, String> sendResult = future.get(); RecordMetadata recordMetadata = sendResult.getRecordMetadata(); log.info("topic: {}, partition: {}, offset: {}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); } catch (InterruptedException e) { e.printStackTrace(); return "消息发送失败"; } catch (ExecutionException e) { e.printStackTrace(); return "消息发送失败"; } return "消息发送成功"; } // 通过代码的方式实现事务 @RequestMapping("/send/transaction1/{message}") public String transactionSend1(@PathVariable String message) { kafkaTemplate.executeInTransaction(template -> { template.send(kafkaTopic, 0, 0, message); if (message.equals("error")) { throw new RuntimeException("message is error"); } template.send(kafkaTopic, 0, 0, message + "-salve"); return true; }); return "消息发送成功"; } // 通过注解的方式实现事务, 主要不要用自定义的KafkaTemplate, 否则会报错 @GetMapping("/send/transaction2/{message}") @Transactional(rollbackFor = RuntimeException.class) public String transactionSend2(@PathVariable String message) { kafkaTemplate.send(kafkaTopic, 0, 0, message); if (message.equals("error")) { throw new RuntimeException("message is error"); } kafkaTemplate.send(kafkaTopic, 0, 0, message + "-slave"); return "消息发送成功"; } }
CompletableFuture.allOf(CompletableFutures).join()
阻塞等待所以子线程执行完成package com.hh.springboottest.myController; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @Slf4j @RestController public class KafkaAsyncProducerController { private String kafkaTopic = "springboot-kafka-test"; @Autowired private KafkaTemplate<Integer, String> kafkaTemplate; @RequestMapping("/send/async/{message}") public String asyncSend(@PathVariable String message) { ListenableFuture<SendResult<Integer, String>> future = // 向partition = 0, key = 0发送消息 kafkaTemplate.send(kafkaTopic, 0, 0, message); // 异步方式获取发送结果 future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onFailure(Throwable throwable) { log.error("消息发送失败: {}", throwable.getMessage()); } @Override public void onSuccess(SendResult<Integer, String> sendResult) { RecordMetadata recordMetadata = sendResult.getRecordMetadata(); log.info("消息发送成功, topic: {}, partition: {}, offset: {}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); } }); return "消息发送成功"; } }
启动SpringBoot后,会自动对Topic进行监听,有消息就会以log日志的形式打印出来
package com.hh.springboottest.component; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Slf4j @Component public class KafkaConsumerComponent { final private String kafkaTopic = "springboot-kafka-test"; // 添加了@KafkaListener注解,consumeKafkaMessage方法就可以获得很多参数 @KafkaListener(id = "kafkaListenerTest1", topics = kafkaTopic) public void consumeKafkaMessage(ConsumerRecord<Integer, String> consumerRecord) { log.info("topic: {}, partition: {}, offset: {}, key: {}, value: {}", consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.key(), consumerRecord.value()); } }
也可以使用如下方式,指定每个topic消费的每个partition的offset
@KafkaListener(groupId = "kafkaListenerTest1",
topicPartitions = {
@TopicPartition(topic = kafkaTopic, partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0")
})
})
启动SpringBoot应用,就会在kafka中创建topic
package com.hh.springboottest.config; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConfig { // 将NewTopic添加到IOC容器时,会自动创建kafka的topic @Bean public NewTopic createTopic1() { // 创建一个name为springboot-create-topic1, 1个partition, replication为1 return new NewTopic("springboot-create-topic1", 1, (short) 1); } @Bean public NewTopic createTopic2() { // 创建一个name为springboot-create-topic2, 1个partition, replication为1 return new NewTopic("springboot-create-topic2", 1, (short) 1); } // 实现自定义的KafkaAdmin,不建议这样做 @Bean public KafkaAdmin generateKafkaAdmin() { Map<String, Object> configs = new HashMap<>(); configs.put("bootstrap.servers", "192.168.28.12:9092"); KafkaAdmin kafkaAdmin = new KafkaAdmin(configs); return kafkaAdmin; } // 实现自定义的KafkaTemplate,不建议这样做 @Bean @Autowired // 从IOC容器自动获取ProducerFactory public KafkaTemplate<Integer, String> generateKafkaTemplate(ProducerFactory<Integer, String> producerFactory) { // 覆盖ProducerFactory的原有设置 Map<String, Object> configsOverride = new HashMap<>(); configsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, 200); KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<Integer, String>(producerFactory, configsOverride); return kafkaTemplate; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。