当前位置:   article > 正文

Springboot中Kafka的构建_springboot kafka 创建topic

springboot kafka 创建topic

SpringBoot中Kafka的构建

首先定义一个常量类Const

public interface Const {

    /**
     * kafka topic1.
     */
    String KAFKA_SINGLE_MAIN_TOPIC = "service_main_topic";
    /**
     * kafka topic2.
     */
    String KAFKA_SINGLE_DETAIL_TOPIC = "service_detail_topic";
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

这里定义了我们项目中需要的topic

创建一个Config类,把topic注入到spring中,spring去kafka中提前创建好topic

@Configuration
public class KafkaInitialConfig {

    /***
     * 创建TopicName为service_main_topic的Topic并设置分区数为8以及副本数为1
     * 通过bean创建(bean的名字为initialTopic1)
     * @return
     */
    @Bean
    public NewTopic initialTopic1() {
        return new NewTopic(Const.KAFKA_SINGLE_MAIN_TOPIC, 8, (short) 1);
    }

    /***
     * 创建TopicName为service_detail_topic的Topic并设置分区数为10以及副本数为1
     * 通过bean创建(bean的名字为initialTopic2)
     * @return
     */
    @Bean
    public NewTopic initialTopic2() {
        return new NewTopic(Const.KAFKA_SINGLE_DETAIL_TOPIC, 10, (short) 1);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

创建一个通用发送消息的Config

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Component
public class KafkaConfig {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 发送消息到kafka
     */
    public void sendMessage(String topic, String message) {
        log.info("kafka发送消息,topic =[{}],message=[{}]", topic, message);
        kafkaTemplate.send(topic, message);
    }

    /**
     * 发送消息到kafka
     * 分区发送
     */
    public void sendMessage(String topic, String partionKey, String message) {
        log.info("kafka发送消息,topic =[{}],partionKey=[{}],message=[{}]", topic, partionKey, message);
        kafkaTemplate.send(topic, partionKey, message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

然后建立了一个测试的controller,测试通不通

@RestController
@RequestMapping("test")
public class TestController {

    @Autowired
    KafkaConfig kafkaConfig;

    @RequestMapping(value = "test1", method = RequestMethod.GET)
    public void test1() {
        kafkaConfig.sendMessage(Const.KAFKA_SINGLE_MAIN_TOPIC, "测试Kafka1");
    }

    @RequestMapping(value = "test2", method = RequestMethod.GET)
    public void test2() {
        kafkaConfig.sendMessage(Const.KAFKA_SINGLE_DETAIL_TOPIC, "123", "测试Kafka2");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

下面是发送和接收结果:
在这里插入图片描述
在这里插入图片描述
正常!结束。
还有配置文件配置放在这里

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    listener:
      concurrency: 4
      missing-topics-fatal: false
    template:
      default-topic: service-main-topic
    producer:
      # 消息重发的次数
      retries: 3
      # 一次处理消息消息的大小batch,一批消息最大多大时发送
      batch-size: 16384
      # 生产者最大可发送的消息大小,内有多个batch,一旦满了,只有发送到kafka后才能空出位置,否则阻塞接受新消息
      buffer-memory: 524288
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/731344
推荐阅读
相关标签
  

闽ICP备14008679号