当前位置:   article > 正文

SpringBoot整合Kafka(自动配置分析、同步异步producer、comsumer、自定义KafkaAdmin和KafkaTemplate)_kafka集成spring boot

kafka集成spring boot

1. pom.xml添加依赖

        <!-- spring支持的kafka版本是3.1.2 -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

可以看到自动添加了spring-tx依赖、kafka-clients依赖
spring-kafka

2. 自动配置分析

查看spring-boot-autoconfigure-2.7.5.jar的KafkaAutoConfiguration,如下所示,可以看到:

  • 绑定了KafkaProperties配置类
  • 向IOC容器添加了KafkaTemplate、KafkaConsumerFactory、KafkaProducerFactory、KafkaAdmin组件
package org.springframework.boot.autoconfigure.kafka;
......省略部分......
@AutoConfiguration
@ConditionalOnClass({KafkaTemplate.class})
@EnableConfigurationProperties({KafkaProperties.class})
// 虽然导入了KafkaAnnotationDrivenConfiguration,但是要添加@EnableKafka注解生效
@Import({KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class})
public class KafkaAutoConfiguration {
......省略部分......
    @Bean
    @ConditionalOnMissingBean({KafkaTemplate.class})
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {
......省略部分......
    }
......省略部分......
    @Bean
    @ConditionalOnMissingBean({ConsumerFactory.class})
    public DefaultKafkaConsumerFactory<?, ?> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
......省略部分......
    }

    @Bean
    @ConditionalOnMissingBean({ProducerFactory.class})
    public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
......省略部分......
    }
......省略部分......
    @Bean
    @ConditionalOnMissingBean
    public KafkaAdmin kafkaAdmin() {
......省略部分......
    }
......省略部分......
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

查看KafkaProperties配置类,是由spring.kafka开头的配置进行配置的

......省略部分......
@ConfigurationProperties(
    prefix = "spring.kafka"
)
public class KafkaProperties {
......省略部分......
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3. application.properties配置

配置文件如下。主要配置了bootstrap-server地址、producer的相关配置、consumer的相关配置

spring.application.name=springboot-kafka-test

spring.kafka.bootstrap-servers=192.168.28.12:9092

# producer的配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 单位为字节B,这里设置的16KB
spring.kafka.producer.batch-size=16384
# 所有分区缓冲区的大小,这里设置的32MB
spring.kafka.producer.buffer-memory=33554432


# consumer的配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-kafka-test
# 如果在kafka中找不到当前消费者的offset,则直接将offset重置为最新的
spring.kafka.consumer.auto-offset-reset=latest
# 自动提交consumer的offset
spring.kafka.consumer.enable-auto-commit=true
# 自动提交consumer的offset的时间间隔,单位为毫秒
spring.kafka.consumer.auto-commit-interval=1000

# kafka事务的配置
spring.kafka.producer.transaction-id-prefix=kafka_tx.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

4. 同步和异步producer的实现

4.1 同步producer的实现

说明如下::

  • 可以通过代码的方式实现事务,也可以通过注解@Transactional实现事务。遇到异常,整个事务都会回滚
  • 如果发送的value是一个类对象,可以使用spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer参数配置使用json序列化器
package com.hh.springboottest.myController;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

@Slf4j
@RestController
public class KafkaSyncProducerController {

    private String kafkaTopic = "springboot-kafka-test";

    @Autowired
    private KafkaTemplate<Integer, String> kafkaTemplate;

    @RequestMapping("/send/sync/{message}")
    public String syncSend(@PathVariable String message) {
        ListenableFuture<SendResult<Integer, String>> future =
                // 向partition = 0, key = 0发送消息
                kafkaTemplate.send(kafkaTopic, 0, 0, message);

        try {
            // 同步方式获取发送结果
            SendResult<Integer, String> sendResult = future.get();
            RecordMetadata recordMetadata = sendResult.getRecordMetadata();
            log.info("topic: {}, partition: {}, offset: {}",
                    recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());

        } catch (InterruptedException e) {
            e.printStackTrace();
            return "消息发送失败";
        } catch (ExecutionException e) {
            e.printStackTrace();
            return "消息发送失败";
        }

        return "消息发送成功";
    }

    // 通过代码的方式实现事务
    @RequestMapping("/send/transaction1/{message}")
    public String transactionSend1(@PathVariable String message) {
        kafkaTemplate.executeInTransaction(template -> {
            template.send(kafkaTopic, 0, 0, message);
            if (message.equals("error")) {
                throw new RuntimeException("message is error");
            }
            template.send(kafkaTopic, 0, 0, message + "-salve");

            return true;
        });

        return "消息发送成功";
    }


    // 通过注解的方式实现事务, 主要不要用自定义的KafkaTemplate, 否则会报错
    @GetMapping("/send/transaction2/{message}")
    @Transactional(rollbackFor = RuntimeException.class)
    public String transactionSend2(@PathVariable String message) {
        kafkaTemplate.send(kafkaTopic, 0, 0, message);
        if (message.equals("error")) {
            throw new RuntimeException("message is error");
        }
        kafkaTemplate.send(kafkaTopic, 0, 0, message + "-slave");

        return "消息发送成功";
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80

4.2 异步producer实现

  • 如果异步发送很多个消息,可以在主线程使用CompletableFuture.allOf(CompletableFutures).join()阻塞等待所以子线程执行完成
package com.hh.springboottest.myController;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class KafkaAsyncProducerController {

    private String kafkaTopic = "springboot-kafka-test";

    @Autowired
    private KafkaTemplate<Integer, String> kafkaTemplate;

    @RequestMapping("/send/async/{message}")
    public String asyncSend(@PathVariable String message) {
        ListenableFuture<SendResult<Integer, String>> future =
                // 向partition = 0, key = 0发送消息
                kafkaTemplate.send(kafkaTopic, 0, 0, message);

        // 异步方式获取发送结果
        future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("消息发送失败: {}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<Integer, String> sendResult) {
                RecordMetadata recordMetadata = sendResult.getRecordMetadata();
                log.info("消息发送成功, topic: {}, partition: {}, offset: {}",
                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
            }
        });

        return "消息发送成功";

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

5. consumer的实现

启动SpringBoot后,会自动对Topic进行监听,有消息就会以log日志的形式打印出来

package com.hh.springboottest.component;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class KafkaConsumerComponent {
    final private String kafkaTopic = "springboot-kafka-test";

    // 添加了@KafkaListener注解,consumeKafkaMessage方法就可以获得很多参数
    @KafkaListener(id = "kafkaListenerTest1", topics = kafkaTopic)
    public void consumeKafkaMessage(ConsumerRecord<Integer, String> consumerRecord) {
        log.info("topic: {}, partition: {}, offset: {}, key: {}, value: {}",
                consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(),
                consumerRecord.key(), consumerRecord.value());
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

也可以使用如下方式,指定每个topic消费的每个partition的offset

    @KafkaListener(groupId = "kafkaListenerTest1",
            topicPartitions = {
                    @TopicPartition(topic = kafkaTopic, partitionOffsets = {
                            @PartitionOffset(partition = "0", initialOffset = "0")
                    })
            })
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

6. 通过SpringBoot创建Kafka的topic,和实现自定义的KafkaAdmin、KafkaTemplate

启动SpringBoot应用,就会在kafka中创建topic

package com.hh.springboottest.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    // 将NewTopic添加到IOC容器时,会自动创建kafka的topic
    @Bean
    public NewTopic createTopic1() {
        // 创建一个name为springboot-create-topic1, 1个partition, replication为1
        return new NewTopic("springboot-create-topic1", 1, (short) 1);
    }

    @Bean
    public NewTopic createTopic2() {
        // 创建一个name为springboot-create-topic2, 1个partition, replication为1
        return new NewTopic("springboot-create-topic2", 1, (short) 1);
    }

    // 实现自定义的KafkaAdmin,不建议这样做
    @Bean
    public KafkaAdmin generateKafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "192.168.28.12:9092");

        KafkaAdmin kafkaAdmin = new KafkaAdmin(configs);
        return kafkaAdmin;
    }

    // 实现自定义的KafkaTemplate,不建议这样做
    @Bean
    @Autowired  // 从IOC容器自动获取ProducerFactory
    public KafkaTemplate<Integer, String> generateKafkaTemplate(ProducerFactory<Integer, String> producerFactory) {

        // 覆盖ProducerFactory的原有设置
        Map<String, Object> configsOverride = new HashMap<>();
        configsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);

        KafkaTemplate<Integer, String> kafkaTemplate =
                new KafkaTemplate<Integer, String>(producerFactory, configsOverride);
        return kafkaTemplate;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/879669
推荐阅读
相关标签
  

闽ICP备14008679号