当前位置:   article > 正文

Spring-kafka配置参数详解,消息批量发送与批量接收消费_kafkatemplate批量发送数据

kafkatemplate批量发送数据


在这里插入图片描述

配置文件

  • maven依赖
		<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.7</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 这个是我正在使用的配置,基本上都加了注释
  • 有些我没用到的,就没写,以后有需要遇到了再补充
spring:
  kafka:
    bootstrap-servers: localhost:9092 # 用来初始化连接kafka(不用配置全部节点,会动态发现)
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      buffer-memory: 33554432 # 缓存容量。默认值32MB = 33554432
      batch-size: 163840 # 默认 single request 批处理大小(以字节为单位),默认16KB = 16384
      retries: 1 # 消息发送失败重试次数
      acks: 1
      properties:
        linger:
          ms: 500 # 不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送。与batch-size配合使用,满足一个就发送
        max:
          request:
            size: 1048576 # 请求的最大字节数
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: radar # 默认消费者组
      max-poll-records: 2000 # 批量一次最大拉取数据量
      enable-auto-commit: false # 自动提交已消费offfset,false-禁用
      auto-commit-interval: 4000 # 自动提交时间间隔,单位ms
      auto-offset-reset: earliest
      heartbeat-interval: 10000 # ⼼跳与消费者协调员之间的预期时间(以毫秒为单位)
      fetch-max-wait: 500
    listener:
      ack-mode: manual_immediate # manual_immediate-手动ack后立即提交;batch-批量自动确认;RECORD-单条自动确认;
      type: batch # 批量消费
      missing-topics-fatal: false # 未发现topic时不报错: 自动创建topic需要设置为false
    template:
      default-topic: radar
      patitions: 7
      replications: 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

消息批量发送

  • 消息批量发送,主要是设置batch-sizelinger.ms
  • batch-size参数是指一批消息发送的字节数,消息积累到这么多字节就会发送,默认16384(16KB)。根据自己业务需求,决定是要低延迟还是高吞吐量,可以改小或改大,也可以通过修改数值不断尝试,从而取得延迟与吞吐量的平衡。
  • linger.ms是指延迟毫秒数,默认是0立刻发送,设置数值后在到达指定毫秒数时才会一起发送
  • batch-sizelinger.ms这两个条件都设置时,只要满足其中一个条件,就会发送消息
  • 对于linger.ms,和batch-size不同,没法直接配置,需要使用properties进行配置,还有一些其他参数也是如此

消息批量消费

  • 主要是listener.type设置为batch,启用批量监听消费
  • max.poll.records,一次批量拉取的数量,默认500,可以根据需要设置大一点,但要注意,如果一次拉取太多,消费不了阻塞了,也会有问题
  • 我这里设置了禁用自动确认enable-auto-commit: false,消息消费后手动确认立刻生效listener.ack-mode: manual_immediate

配置类

  • Spring-kafka,维护配置文件即可,不需要手动创建bean
  • 此处配置类,是为了在项目启动时,自动创建指定分区数、副本数的topic
  • 如果你只有一个topic或者topic的分区和副本数都是一致的,也可以在kafka的配置文件server.properties里设置,这个配置类就不需要了,只需要设置spring.kafka.listener.missing-topics-fatalfalse即可,未发现topic时不会报错而是自动创建topic,具体可参考我的这篇博客
package com.newatc.collect.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName KafkaConfiguration
 * @Description kafka配置类,读取配置创建topic
 * @Date 2022-6-28 09:55:26
 * @Author yanyulin
 **/
@Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.template.default-topic}")
    private String topic;

    @Value("${spring.kafka.template.patitions}")
    private Integer patitions;

    @Value("${spring.kafka.template.replications}")
    private Short replications;

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public Integer getPatitions() {
        return patitions;
    }

    public void setPatitions(Integer patitions) {
        this.patitions = patitions;
    }

    public Short getReplications() {
        return replications;
    }

    public void setReplications(Short replications) {
        this.replications = replications;
    }

    /**
     * 项目启动时,自动创建topic,指定分区和副本数量
     * @return Topic
     */
    @Bean
    public NewTopic topic() {
        return new NewTopic(topic, patitions, replications);
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

消息生产者

  • Spring kafka集成的很好,很多东西都不需要我们做了,直接使用KafkaTemplate即可
package com.newatc.collect.config;

import com.newatc.collect.util.PartitionEnum;
import javax.annotation.Resource;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * @ClassName KafkaProducer
 * @Description kafka信息发送类
 * @Date 2022-6-27 11:20:12
 * @Author yanyulin
 **/
@Component
public class KafkaProducer {

    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${spring.kafka.template.default-topic}")
    private String topic;

    /**
     * 将雷达上报数据,发到kafka队列里
     * @param type
     * @param data
     */
    public void sendData(String type, String data) {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, PartitionEnum.getMap().get(type), type, data);
        kafkaTemplate.send(producerRecord);
    }

    /**
     * 将雷达上报数据,发到kafka队列里
     * @param type
     * @param message
     */
    public void sendMessage(String type, String message) {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, PartitionEnum.getMap().get(type), type, message);
        kafkaTemplate.send(producerRecord);
        log.debug("发送 {} 数据到kafka : {}", type, message);
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

调用生产者发送消息

  • 使用也很简单,把KafkaProducer依赖注入即可
    @Autowired
    private KafkaProducer producer;
    
    producer.sendData(PartitionEnum.FLOW_STATS.getType(), JSONObject.toJSONString(flowStats));

  • 1
  • 2
  • 3
  • 4
  • 5

消息消费者

  • 消息消费者也很简单,使用注解@KafkaListener即可
  • 可以指定消费的topicpartition
@Component
public class KafkaConsumer {
    /**
     * 雷达实时轨迹数据加载呈现<1s,即时数据立刻消费
     * @param records
     * @param ack
     */
    @KafkaListener(
        containerGroup = "${spring.kafka.consumer.group-id}",
        topicPartitions = { @TopicPartition(topic = "${spring.kafka.template.default-topic}", partitions = { "0" }) }
    )
    public void receiverRealTimeDataRecord(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        int size = records.size();
        log.debug("RealTimeData RECV MSG COUNT: {}\n", size);
        List<String> data = new ArrayList<>();
        for (ConsumerRecord<String, String> consumerRecord : records) {
            data.add(consumerRecord.value());
        }
        realTimeDataClickHouseService.saveAll(data);
        log.debug("\n[RealTimeData] {} 消费完成", size);
        //确认单当前消息(及之前的消息)offset均已被消费完成
        ack.acknowledge();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 如果对你有帮助,希望可以给一个免费的赞
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/591674
推荐阅读
相关标签
  

闽ICP备14008679号