其中consumer、producer 、admin、streams、listener分别对应不同的配置信息,如consumer对应spring.kafka.consumer开头的配置信息。

  port: 9006
    context-path: /kafka
    # Kafka服务端监听地址端口,集群用逗号分隔
      # 消费者组ID,在消费者实例没有指定消费者组的时候生效
      group-id: test01
      # 如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。
      enable-auto-commit: true
      # 每次自动提交offset的时间间隔,当enable-auto-commit设置为true时生效,默认值为5000,单位ms
      auto-commit-interval: 500
      # kafka服务(实际是zookeeper)中没有初始化的offset时,如果offset是以下值的回应:
      # earliest:自动复位offset为smallest的offset
      # latest:自动复位offset为largest的offset
      # anything  else:向consumer抛出异常
      # none:如果整个消费者组中没有以往的offset,则抛出异常
      auto-offset-reset: latest
      # message的key的解码类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # message的value的解码类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 单次消费获取数据的最大条数
      max-poll-records: 500
      # 每次fetch请求时,server应该返回的最小字节数。
      # 如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。默认值为1,单位bytes
      fetch-min-size: 1
      # 如果没有足够的数据能够满足fetch.min.bytes(fetch-min-size),
      # 则此项配置是指在应答fetch请求之前,server会阻塞的最大时间,默认值为100,单位ms
      fetch-max-wait: 100
      # 如果设置为read_committed,则consumer会缓存消息,直到收到消息对应的事务控制消息。
      # 若事务commit,则对外发布这些消息;若事务abort,则丢弃这些消息
      # 默认值为read_uncommitted
      isolation-level: read_uncommitted
      # producer需要server接收到数据之后发出的确认接收的信号
      # acks=0:设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket  buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1;
      # acks=1: 这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
      # acks=all: 这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
      acks: 1
      # 设置大于0的值将使客户端重新发送任何数据。
      # 注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
      retries: 4
      # producer将试图批处理消息记录,以减少请求次数,这项配置控制默认的批量处理消息字节数,默认值16384,单位bytes
      batch-size: 16384
        # producer发送消息的延时,与batch-size配合使用,默认值0,单位ms
          ms: 100
      # producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,
      # 默认值33554432,单位bytes
      buffer-memory: 33554432
      # key的序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # value的序列化类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 生产者生成的所有数据的压缩类型,此配置接受标准压缩编解码器('gzip','snappy','lz4','zstd')
      # 默认为none
      compression-type: none
  • 61
  • 62
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {

	 * Comma-delimited list of host:port pairs to use for establishing the initial
	 * connections to the Kafka cluster. Applies to all components unless overridden.
	private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));

	 * ID to pass to the server when making requests. Used for server-side logging.
	private String clientId;

	 * Additional properties, common to producers and consumers, used to configure the
	 * client.
	private final Map<String, String> properties = new HashMap<>();

	private final Consumer consumer = new Consumer();

	private final Producer producer = new Producer();

	private final Admin admin = new Admin();

	private final Streams streams = new Streams();

	private final Listener listener = new Listener();
	... ...

	 * Create an initial map of consumer properties from the state of this instance.
	 * <p>
	 * This allows you to add additional properties, if necessary, and override the
	 * default kafkaConsumerFactory bean.
	 * @return the consumer properties initialized with the customizations defined on this
	 * instance
	public Map<String, Object> buildConsumerProperties() {
		Map<String, Object> properties = buildCommonProperties();
		return properties;

	 * Create an initial map of producer properties from the state of this instance.
	 * <p>
	 * This allows you to add additional properties, if necessary, and override the
	 * default kafkaProducerFactory bean.
	 * @return the producer properties initialized with the customizations defined on this
	 * instance
	public Map<String, Object> buildProducerProperties() {
		Map<String, Object> properties = buildCommonProperties();
		return properties;

	 * Create an initial map of admin properties from the state of this instance.
	 * <p>
	 * This allows you to add additional properties, if necessary, and override the
	 * default kafkaAdmin bean.
	 * @return the admin properties initialized with the customizations defined on this
	 * instance
	public Map<String, Object> buildAdminProperties() {
		Map<String, Object> properties = buildCommonProperties();
		return properties;

	 * Create an initial map of streams properties from the state of this instance.
	 * <p>
	 * This allows you to add additional properties, if necessary.
	 * @return the streams properties initialized with the customizations defined on this
	 * instance
	public Map<String, Object> buildStreamsProperties() {
		Map<String, Object> properties = buildCommonProperties();
		return properties;
KafkaAutoConfiguration类,提供了Kafka常用Bean的默认实现,包括KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory、KafkaAdmin 等,每个实现的Bean都使用了@ConditionalOnMissingBean注解,表示当开发人员没有自己单独实现的时候,使用默认实现,当开发人员单独实现的时候,默认实现不起作用,不会初始化默认的Bean实现。

@Configuration(proxyBeanMethods = false)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {

	private final KafkaProperties properties;

	public KafkaAutoConfiguration(KafkaProperties properties) {
		this.properties = properties;

	public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
			ProducerListener<Object, Object> kafkaProducerListener,
			ObjectProvider<RecordMessageConverter> messageConverter) {
		KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
		return kafkaTemplate;

	public ProducerListener<Object, Object> kafkaProducerListener() {
		return new LoggingProducerListener<>();

	public ConsumerFactory<?, ?> kafkaConsumerFactory() {
		return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());

	public ProducerFactory<?, ?> kafkaProducerFactory() {
		DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
		String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
		if (transactionIdPrefix != null) {
		return factory;

	@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
	public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
		return new KafkaTransactionManager<>(producerFactory);

	@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
	public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
		KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
		Jaas jaasProperties = this.properties.getJaas();
		if (jaasProperties.getControlFlag() != null) {
		if (jaasProperties.getLoginModule() != null) {
		return jaas;

	public KafkaAdmin kafkaAdmin() {
		KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
		return kafkaAdmin;

package com.example.kafka.producer.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;

import java.util.Map;

public class KafkaTemplateConfig {

    private KafkaProperties kafkaProperties;

    public KafkaTemplate<?, ?> kafkaTemplate(@Qualifier("defaultKafkaProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,
                                             ProducerListener<Object, Object> kafkaProducerListener,
                                             ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        return kafkaTemplate;

    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener<>();

    public ProducerFactory<Object, Object> kafkaProducerFactory() {
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
        String transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
        return factory;
     * 获取生产者工厂
    public ProducerFactory<Object, Object> newProducerFactory() {
        Map<String, Object> producerProperties = kafkaProperties.buildProducerProperties();
        // 修改参数名称
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
        String transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {

        return factory;

    public KafkaTemplate<?, ?> newKafkaTemplate(@Qualifier("newKafkaProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,
                                             ProducerListener<Object, Object> kafkaProducerListener,
                                             ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        return kafkaTemplate;

import com.example.kafka.producer.entity.ProduceEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

 * Kafka生产者服务
public class KafkaProducerController {

    private KafkaTemplate<String, String> kafkaTemplate;

    private KafkaTemplate<String, String> newKafkaTemplate;
    public void produce(@RequestBody ProduceEntity produceEntity) {
        for(int i = 0; i<12; i++) {
            kafkaTemplate.send(produceEntity.getTopic(), i+ " " + produceEntity.getMessage());
            newKafkaTemplate.send(produceEntity.getTopic(), "new "+ i +" "+produceEntity.getMessage());

