当前位置:   article > 正文

Kafka学习(一):消费者实现对分区的并发消费_kafka消费分区

kafka消费分区

说明

在上篇博文《springboot学习(十七):了解spring - kafka配置工作原理》中,我们简单了解了spring-kafka的配置工作原理。通过源码可以看到spring在实现并发消费时,采用的是线程封闭的策略,也就是一个groupid中,根据配置的concurrency来创建多个消费者线程,每个消费者消费一个或多个分区,来实现整个topic消息的消费处理。本篇博文将对上篇博文中最后提出的问题 ----- 如何对单个分区实现并发消费? 通过代码演示,解答该问题。

正文

spring的线程封闭策略

在spring中实现消息的并发消费采用的是线程封闭的策略,具体实现是在创建监听器容器时,会根据配置的concurrency来创建多个KafkaMessageListenerContainer,在该类中又有内部类ListenerConsumer,在该内部类中封闭创建了consumer对象。以此来实现主题消息的并发消费。
在这里插入图片描述
注意,以这种方式进行并发消费时,实际的并发度受到了主题分区数的限制,当消费线程数大于分区数时,会使多出来的消费者线程一直处于空闲状态。对此,spring在创建KafkaMessageListenerContainer前,对用户配置的concurrency值进行了校验,当该值超出主题分区数时,将值设置为实际的分区数。

TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
if (topicPartitions != null && this.concurrency > topicPartitions.length) {
    this.logger.warn("When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length);
    this.concurrency = topicPartitions.length;
}
  • 1
  • 2
  • 3
  • 4
  • 5

同时,以线程封闭的方式实现并发消费,每个消费者线程都需要保持一个TCP连接,如果分区数很大,则会带来很大的系统开销。但是,以该方式实现并发消费,可以保证每个分区消息的顺序消费。

通过KafkaConsumer的消费模式我们可以看到,消费者需要不断的从服务器拉取(poll)消息进行处理,如果消息处理的速度越快,则拉取的频次越高,整体的消费能力越强。所以整体的消费速度在于消息处理模块的速度。我们可以将这个模块改为多线程的处理方式,来提高整体的消费能力。


多线程处理

通过将消息拉取动作和处理动作分开,将处理模块改为多线程的处理方式来提升消息的处理速度,进而提升整体的消费能力。
在这里插入图片描述
以上图示,将主题对应的消费者组进行池化,每个group对应一个consumer线程池,池中线程数为主题的分区数。每个消费者是个线程,提交到consumer线程池后,不断从服务器拉取消息,同时在消费者线程中,又有一个用来实际处理消息的MessageHandler线程池,在获取的消息后,根据每批次的消息创建MessagedoHandle线程,提交到handler线程池进行消息的实际处理。

KafaManager

依据以上原理创建KafaManager类,通过该类可以进行消息的发送和消费者的订阅。KafkaProducer是线程安全的,所以在该类中依据配置创建一个单例的KafkaProducer对象。对于消费者的订阅,通过该类提供的subscribe方法,用户可以自定义消息的处理方式,要订阅的主题,groupid,clientid,消息处理线程池的线程数等参数。

KafkaProperties

在创建KafkaManager时,需要读取用户的配置,这里通过KafkaProperties来进行配置,在该类中有全局的boostrap-servers配置变量,还有两个内部类对象Producer和Consumer。可以对生产者和消费者进行不同的配置。对于boostrap-servers的配置,生产者和消费者的自定义配置优先级大于全局配置。

public class KafkaProperties {

    private static String bootstrapServers;
    private final KafkaProperties.Producer producer = new KafkaProperties.Producer();
    private final KafkaProperties.Consumer consumer = new KafkaProperties.Consumer();
    ....
	public class Consumer {
        private String bootstrapServers;
        private String autoCommitInterval;
        private String enableAutoCommit;
        private String keyDeserializer;
        private String valueDeserializer;
        .....
   	}

	private class Producer {
        private String bootstrapServers;
        private String acks;
        private String batchSize;
        private String bufferMemory;
        private String keySerializer;
        private String valueSerializer;
        private String retries;
        private String clientId;
        .....
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

该类的生产者、消费者的配置参数,我只列了部分的必要参数,更详细的配置可以再添加。

在创建KafkaManager对象时,根据KafkaProperties对象的值进行实例化KafkaProducer对象。

private KafkaManager(){}

public KafkaManager(KafkaProperties properties) {
    this.properties = properties;
    initProducer(properties);
}

private void initProducer(KafkaProperties properties) {
    Properties ps = properties.buildProducerProperties();
    this.producer = new KafkaProducer(ps);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
sendMessage

在发送消息时,可以进行批量发送,消息可以指定要发往的分区号,消息对应的key等值。

int sendMessage(String topic, String message);

int sendMessage(String topic, List<String> messages);

int sendMessage(String topic, String key, String message);

int sendMessage(String topic, Map<String, String> messages);

int sendMessage(String topic, Integer partition, List<String> messages);

int sendMessage(String topic, Integer partition, Map<String, String> messages);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
subscribe 消费者订阅

通过KafaManager的subscribe方法,来创建消费者实现对指定分区的并发消费。通过发法参数来配置消费者的处理消息的动作,订阅的主题topic,所属的消息组groupid,消息者consumer的clientid,处理者handler线程的数量等值。

<T> int subscribe(MessageHandler<T> messageHandler, String groupId, String topic, String clientId, Integer concurrency, Integer partitionCount);
  • 1

在该方法中,首先会对参数partitionCount和concurrency的值进行校验。partitionCount的值限定了消费者线程池最大的线程数,其值应该为要订阅主题的分区数,如果没有指定则默认值为3。concurrency的值表示消费者对应处理者线程池的线程数,该值必须是正整数,没有指定时默认值为1。在对clientid校验时,如果没有设置该值,则会根据topic-groupid获取对应的计数器,以固定的前缀+编号为值进行设置。

接下来,根据topic获取对应groupid的消费者线程池,根据clientid获取对应的handler线程池。
最后创建KafkaConsumerContainer对象,提交到消费者线程池中。

@Override
public <T> int subscribe(MessageHandler<T> messageHandler, String groupId, String topic, String clientId, Integer concurrency, Integer partitionCount) {
    if (partitionCount == null || partitionCount <= 0) {
        partitionCount = 3;
    }
    if (concurrency == null || concurrency <= 0) {
        concurrency = 1;
    }
    Map<String, ThreadPoolExecutor> topicConsumerPool = topicConsumerMap.get(topic);
    if (topicConsumerPool == null) {
        topicConsumerPool = new ConcurrentHashMap<>();
        topicConsumerMap.put(topic, topicConsumerPool);
    }
    ThreadPoolExecutor consumerPool = topicConsumerPool.get(groupId);
    if (consumerPool == null) {
        consumerPool = new ThreadPoolExecutor(partitionCount, partitionCount, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.AbortPolicy());
        topicConsumerPool.put(groupId, consumerPool);
    }
    Map<String, KafkaConsumerContainer> groupContainer = groupContainerMap.get(groupId);
    if (groupContainer == null) {
        groupContainer = new ConcurrentHashMap<>();
        groupContainerMap.put(groupId, groupContainer);
    }

    if (StringUtils.isEmpty(clientId)) {
        AtomicInteger counter = getCounter(groupId, topic);
        StringJoiner joiner = new StringJoiner("-", "", "-").add(groupId).add(topic).add("consumer");
        clientId = joiner.toString() + counter.getAndIncrement();
    }
    if (groupContainer.get(clientId) != null) {
        throw new RuntimeException("The clientId has been used");
    }
    ThreadPoolExecutor handlePool = new ThreadPoolExecutor(concurrency, concurrency, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
    KafkaConsumerContainer consumerContainer = new KafkaConsumerContainer(this.properties, handlePool, topic, groupId, clientId, messageHandler);
    groupContainer.put(clientId, consumerContainer);
    consumerPool.submit(consumerContainer);
    return 1;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

KafkaConsumerContainer

该类继承了Thread类,并且类中封闭了KafkaConsumer对象,在创建该类时,根据KafkaProperties配置类和subscribe方法参数实例化KafkaConsumer对象。

在run方法中,通过调用pollAndInvoke()方法,从服务器拉取消息,再根据拉取的消息和subscribe参数指定的消息处理方式messageHandler来创建MessagedoHandler对象,提交到handlerThreadPool进行真正的消息处理。

注意,while循环的条件polled值,表示是否继续拉取,当需要取消订阅时,可以调用close方法将该值设置为false退出循环,继续进行run方法的执行,执行完毕后完成取消订阅。

public KafkaConsumerContainer(KafkaProperties kafkaProperties, ThreadPoolExecutor executor, String topic, String groupId, String clientId, MessageHandler messageHandler) {
    this.kafkaProperties = kafkaProperties;
    this.executor = executor;
    this.topic = topic;
    this.groupId = groupId;
    this.clientId = clientId;
    this.messageHandler = messageHandler;
    this.polled = true;
    init();
}

private void init() {
    Properties properties = this.kafkaProperties.buildConsumerProperties();
    properties.put("group.id", this.groupId);
    properties.put("client.id", this.clientId);
    this.isAutoCommit = (boolean) properties.get("enable.auto.commit");
    this.consumer = new KafkaConsumer(properties);
    this.consumer.subscribe(Arrays.asList(this.topic));

}

private void pollAndInvoke() {
    while (polled) {
        ConsumerRecords<String, T> records = this.consumer.poll(Duration.ofSeconds(1));
        System.out.println(records.count());
        MessagedoHandle messagedoHandle = new MessagedoHandle(messageHandler, records);
        executor.submit(messagedoHandle);
        if (!isAutoCommit) {
            this.consumer.commitSync();
        }
    }
}

@Override
public void run() {
    pollAndInvoke();
    System.out.println("this container is closed");
    this.consumer.close();
    this.executor.shutdown();
    System.out.println("bye bye");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

测试

application.yml配置文件

kafka:
    bootstrap-servers: 192.168.186.129:9093,192.168.186.129:9094,192.168.186.129:9095
    producer:
       retries: 0
    consumer:
        enable-auto-commit: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

KafkaConfig配置类

@Configuration
public class KafkaConfig {

    @Bean
    @ConfigurationProperties(prefix = "kafka")
    public KafkaProperties kafkaProperties() {

        return new KafkaProperties();
    }

    @Bean
    public KafkaManager kafkaManager(KafkaProperties kafkaProperties) {
        System.out.println(kafkaProperties.getBootstrapServers());
        KafkaManager kafkaManager = new KafkaManager(kafkaProperties);
        return kafkaManager;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

发送消息

@RestController
public class KafkaController {

    @Autowired
    private KafkaManager kafkaManager;

    @RequestMapping("/send")
    public String sendMessage() {
        for (int i = 0; i < 10; i++) {
            kafkaManager.sendMessage("test666", "kafkaMWeb test NO." + i);
        }
        return "success";
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

订阅消费

@Service
public class ConsumerService {

    @Autowired
    private KafkaManager kafkaManager;

    public void consumerMessage() {
        kafkaManager.subscribe((MessageContext<String> msg) -> System.out.println(msg.getMessage()), "1237", "test666");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

总结

通过将消息处理模块采用多线程处理来提高整体的消费能力。以这种方式优点是将消息的拉取和处理工作分离,消费者拉取消息后将消息进行多线程处理,极大提高了消费能力,并且避免了线程封闭策略下维持多个TCP连接带来的大量系统资源消耗,但缺点也显而易见,首先分区消息的顺序消费无法保证,其次消息的手动确认提交实现比较困难

在以上例子中,将enable-auto-commit的值设置true,让其自动提交,但如果配置了false,需要手动提交时,则是在将消息提交到handler线程池后调用了consumer.commitSync()方法。这样的提交方式存在着消息丢失的可能,并不安全。

可以根据spring处理提交的方式,使用一个共享变量Map<topic, Map<partition, offset>> offsets来存储已消费消息的消费位移,在多线程修改该共享变量时需要进行加锁

private void addOffset(ConsumerRecord<K, V> record) {
	synchronized (offsets) {
	    ((Map)this.offsets.computeIfAbsent(record.topic(), (v) -> {
	        return new ConcurrentHashMap();
	    })).compute(record.partition(), (k, v) -> {
	        return v == null ? record.offset() : Math.max(v, record.offset());
	    });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

但是这样就可以避免消息丢失的风险吗?试想下如果handler1处理批次1的消息失败,而handler2处理批次2的消息成功,那么在提交消费位移后,批次1的消息就会丢失。

这种情况又改如何处理?朱忠华的《深入理解Kafka:核心设计与实践原理》一书中,提出了类似于TCP的滑动窗口控制的方式来控制消费位移的提交。更详细的信息大家可以阅读该书的第三章 消费者 多线程实现 部分的内容。


源码地址:https://github.com/Edenwds/kafka_study/tree/master/kafkaM

参考资料:《深入理解Kafka:核心设计与实践原理》— 朱忠华

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/153449
推荐阅读
相关标签
  

闽ICP备14008679号