当前位置:   article > 正文

SpringBoot整合Kafka,以及无框架下的Kafka使用(简单易懂)_springboot kafka配置

springboot kafka配置

前言:本章分为KafKa重点概述、Java下的KafKa、SpringBoot整合KafKa三大块。

           后期还会加内容。

一、KafKa重点概述

1.Kafka分区--将主题topic划分为多个小块

轮询策略:就是按照顺序轮流分配到每个分区中

随机策略:每次随机进行分配

按key保存策略:指定key,计算这个key的hash值,按照hash值对不同信息进行分配

2.备份机制--保障数据的高可用、高一致性

其中在追随者中副本中又分为ISR和普通副本

小结:因为ISR为同步,普通为异步。

           所以在数据高一致性上选择ISR,在高可用上选择普通。

3.生产者详情

在配置中有几个重要常用的配置。

--发送消息时可指定同步和异步  推荐异步

--重传机制 

--ack确认机制

--压缩发送

4.消费者详情

--消息有序性

--提交偏移量

5.Maven坐标

<!-- kafkfa -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
</dependency>

二、无框架下使用KafKa

在配置KafKa中,在ProducerConfig中有很多静态常量,根据自己的要求,可以对Properties进行设置。

1.生产者类

这里介绍最简单的发送消息设置,总共四个步骤

1.配置Kafka信息

2.创建生产者对象并封装要发送的数据

3.发送消息

4.关闭生产通道

  1. // 1.kafka的配置信息
  2. Properties properties = new Properties();
  3. //kafka的连接地址
  4. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"替换成你kafka的ip地址:9092");
  5. //消息key的序列化器
  6. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  7. //消息value的序列化器
  8. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  9. // 2.创建kafka生产者对象
  10. KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);
  11. // 封装发送的消息
  12. ProducerRecord<String, String> record = new ProducerRecord<>("dalang-topic", "10001", "hello kangan");
  13. // 3. 发送消息
  14. kafkaProducer.send(record);
  15. // 4.关闭消息通道
  16. kafkaProducer.close();

一些比较常用的生产者配置

-同步和异步发送消息         在send加个参数new rollback(){}

-重传机制                           常量为:ProducerConfig.RETRIES_CONFIG

-ack确认机制                     常量为:ProducerConfig.ACKS_CONFIG

-压缩发送                           常量为:ProducerConfig.COMPRESSION_TYPE_CONFIG

2.消费者类

这里介绍最简单的消费消息设置,总共四个步骤

1.配置KafKa

2.创建生产者对象

3.订阅要拉取的主题

4.设置监听并消费

  1. // 1. 配置kafka
  2. Properties properties = new Properties();
  3. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "替换成你Kafka所在的ip地址:9092");
  4. // 2.生产者对象
  5. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
  6. // 3.订阅主题
  7. kafkaConsumer.subscribe(Collections.singletonList("dalang-topic"));
  8. //当前线程一直处于监听状态
  9. while (true) {
  10. //4.获取消息
  11. ConsumerRecords<String, String> consumerRecords=consumer.poll(Duration.ofMillis(1000));
  12. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  13. System.out.println(consumerRecord.key());
  14. System.out.println(consumerRecord.value());
  15. }
  16. }

-提交偏移量 指定常量ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG设置为false

同步提交             消费者对象.commitSync.();    

异步提交             消费者对象.commitAsync.(new OffsetCommitCallback() {});    

同步异步提交        结合同步和异步

三、SpringBoot整合Kafka

1.某个微服务模块生产者:

配置文件加入以下内容:

spring:

       kafka:

                bootstrap-servers: 填写你的KafKa的Ip地址:9092

                producer:

                        retries: 10

                        key-serializer: org.apache.kafka.common.serialization.StringSerializer

                        value-serializer: org.apache.kafka.common.serialization.StringSerializer

1.1自动注入 KafkaTemplate

1.2发送消息 send();

这里也是举简单例子,测试实体类为User

  1. @Data
  2. @ToString
  3. public class User {
  4. private String userName;
  5. private Integer age;
  6. }
  1. @RestController
  2. public class HelloKangAnController {
  3. @Autowired
  4. private KafkaTemplate<String,String> kafkaTemplate;
  5. /**
  6. * 发送消息
  7. * @return
  8. */
  9. @GetMapping("/hello")
  10. public String hello(){
  11. // kafkaTemplate.send("kangan-topic","早知道信息差新闻test1");
  12. User user = new User();
  13. user.setUserName("kangan");
  14. user.setAge(23);
  15. //设置了字符串反序列化 需要将对象转换成字符串
  16. kafkaTemplate.send("user-topic", JSON.toJSONString(user));
  17. return "ok";
  18. }
  19. }

2.某个微服务模块消费者

配置文件:

spring:

       kafka:

                bootstrap-servers: 填写你的KafKa的Ip地址:9092

                consumer:

                        group-id: ${spring.application.name}

                        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

                        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2.1自动注入 KafkaTemplate

2.2在需要监听拉取的方法上加入注解 @KafkaListener 指定你要拉取的主题

  1. @Component
  2. public class HelloListener {
  3. /**
  4. * 监听消息
  5. * @param message
  6. */
  7. @KafkaListener(topics = "user-topic")
  8. public void onMessage(String message){
  9. User user = JSON.parseObject(message, User.class);
  10. if(!StringUtils.isEmpty(message)){
  11. System.out.println(user);
  12. }
  13. }
  14. }

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号