赞
踩
在上篇博文《springboot学习(十七):了解spring - kafka配置工作原理》中,我们简单了解了spring-kafka的配置工作原理。通过源码可以看到spring在实现并发消费时,采用的是线程封闭的策略,也就是一个groupid中,根据配置的concurrency来创建多个消费者线程,每个消费者消费一个或多个分区,来实现整个topic消息的消费处理。本篇博文将对上篇博文中最后提出的问题 ----- 如何对单个分区实现并发消费? 通过代码演示,解答该问题。
在spring中实现消息的并发消费采用的是线程封闭的策略,具体实现是在创建监听器容器时,会根据配置的concurrency来创建多个KafkaMessageListenerContainer,在该类中又有内部类ListenerConsumer,在该内部类中封闭创建了consumer对象。以此来实现主题消息的并发消费。
注意,以这种方式进行并发消费时,实际的并发度受到了主题分区数的限制,当消费线程数大于分区数时,会使多出来的消费者线程一直处于空闲状态。对此,spring在创建KafkaMessageListenerContainer前,对用户配置的concurrency值进行了校验,当该值超出主题分区数时,将值设置为实际的分区数。
TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
if (topicPartitions != null && this.concurrency > topicPartitions.length) {
this.logger.warn("When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length);
this.concurrency = topicPartitions.length;
}
同时,以线程封闭的方式实现并发消费,每个消费者线程都需要保持一个TCP连接,如果分区数很大,则会带来很大的系统开销。但是,以该方式实现并发消费,可以保证每个分区消息的顺序消费。
通过KafkaConsumer的消费模式我们可以看到,消费者需要不断的从服务器拉取(poll)消息进行处理,如果消息处理的速度越快,则拉取的频次越高,整体的消费能力越强。所以整体的消费速度在于消息处理模块的速度。我们可以将这个模块改为多线程的处理方式,来提高整体的消费能力。
通过将消息拉取动作和处理动作分开,将处理模块改为多线程的处理方式来提升消息的处理速度,进而提升整体的消费能力。
以上图示,将主题对应的消费者组进行池化,每个group对应一个consumer线程池,池中线程数为主题的分区数。每个消费者是个线程,提交到consumer线程池后,不断从服务器拉取消息,同时在消费者线程中,又有一个用来实际处理消息的MessageHandler线程池,在获取的消息后,根据每批次的消息创建MessagedoHandle线程,提交到handler线程池进行消息的实际处理。
依据以上原理创建KafaManager类,通过该类可以进行消息的发送和消费者的订阅。KafkaProducer是线程安全的,所以在该类中依据配置创建一个单例的KafkaProducer对象。对于消费者的订阅,通过该类提供的subscribe方法,用户可以自定义消息的处理方式,要订阅的主题,groupid,clientid,消息处理线程池的线程数等参数。
在创建KafkaManager时,需要读取用户的配置,这里通过KafkaProperties来进行配置,在该类中有全局的boostrap-servers配置变量,还有两个内部类对象Producer和Consumer。可以对生产者和消费者进行不同的配置。对于boostrap-servers的配置,生产者和消费者的自定义配置优先级大于全局配置。
public class KafkaProperties { private static String bootstrapServers; private final KafkaProperties.Producer producer = new KafkaProperties.Producer(); private final KafkaProperties.Consumer consumer = new KafkaProperties.Consumer(); .... public class Consumer { private String bootstrapServers; private String autoCommitInterval; private String enableAutoCommit; private String keyDeserializer; private String valueDeserializer; ..... } private class Producer { private String bootstrapServers; private String acks; private String batchSize; private String bufferMemory; private String keySerializer; private String valueSerializer; private String retries; private String clientId; ..... } }
该类的生产者、消费者的配置参数,我只列了部分的必要参数,更详细的配置可以再添加。
在创建KafkaManager对象时,根据KafkaProperties对象的值进行实例化KafkaProducer对象。
private KafkaManager(){}
public KafkaManager(KafkaProperties properties) {
this.properties = properties;
initProducer(properties);
}
private void initProducer(KafkaProperties properties) {
Properties ps = properties.buildProducerProperties();
this.producer = new KafkaProducer(ps);
}
在发送消息时,可以进行批量发送,消息可以指定要发往的分区号,消息对应的key等值。
int sendMessage(String topic, String message);
int sendMessage(String topic, List<String> messages);
int sendMessage(String topic, String key, String message);
int sendMessage(String topic, Map<String, String> messages);
int sendMessage(String topic, Integer partition, List<String> messages);
int sendMessage(String topic, Integer partition, Map<String, String> messages);
通过KafaManager的subscribe方法,来创建消费者实现对指定分区的并发消费。通过发法参数来配置消费者的处理消息的动作,订阅的主题topic,所属的消息组groupid,消息者consumer的clientid,处理者handler线程的数量等值。
<T> int subscribe(MessageHandler<T> messageHandler, String groupId, String topic, String clientId, Integer concurrency, Integer partitionCount);
在该方法中,首先会对参数partitionCount和concurrency的值进行校验。partitionCount的值限定了消费者线程池最大的线程数,其值应该为要订阅主题的分区数,如果没有指定则默认值为3。concurrency的值表示消费者对应处理者线程池的线程数,该值必须是正整数,没有指定时默认值为1。在对clientid校验时,如果没有设置该值,则会根据topic-groupid获取对应的计数器,以固定的前缀+编号为值进行设置。
接下来,根据topic获取对应groupid的消费者线程池,根据clientid获取对应的handler线程池。
最后创建KafkaConsumerContainer对象,提交到消费者线程池中。
@Override public <T> int subscribe(MessageHandler<T> messageHandler, String groupId, String topic, String clientId, Integer concurrency, Integer partitionCount) { if (partitionCount == null || partitionCount <= 0) { partitionCount = 3; } if (concurrency == null || concurrency <= 0) { concurrency = 1; } Map<String, ThreadPoolExecutor> topicConsumerPool = topicConsumerMap.get(topic); if (topicConsumerPool == null) { topicConsumerPool = new ConcurrentHashMap<>(); topicConsumerMap.put(topic, topicConsumerPool); } ThreadPoolExecutor consumerPool = topicConsumerPool.get(groupId); if (consumerPool == null) { consumerPool = new ThreadPoolExecutor(partitionCount, partitionCount, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.AbortPolicy()); topicConsumerPool.put(groupId, consumerPool); } Map<String, KafkaConsumerContainer> groupContainer = groupContainerMap.get(groupId); if (groupContainer == null) { groupContainer = new ConcurrentHashMap<>(); groupContainerMap.put(groupId, groupContainer); } if (StringUtils.isEmpty(clientId)) { AtomicInteger counter = getCounter(groupId, topic); StringJoiner joiner = new StringJoiner("-", "", "-").add(groupId).add(topic).add("consumer"); clientId = joiner.toString() + counter.getAndIncrement(); } if (groupContainer.get(clientId) != null) { throw new RuntimeException("The clientId has been used"); } ThreadPoolExecutor handlePool = new ThreadPoolExecutor(concurrency, concurrency, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy()); KafkaConsumerContainer consumerContainer = new KafkaConsumerContainer(this.properties, handlePool, topic, groupId, clientId, messageHandler); groupContainer.put(clientId, consumerContainer); consumerPool.submit(consumerContainer); return 1; }
该类继承了Thread类,并且类中封闭了KafkaConsumer对象,在创建该类时,根据KafkaProperties配置类和subscribe方法参数实例化KafkaConsumer对象。
在run方法中,通过调用pollAndInvoke()方法,从服务器拉取消息,再根据拉取的消息和subscribe参数指定的消息处理方式messageHandler来创建MessagedoHandler对象,提交到handlerThreadPool进行真正的消息处理。
注意,while循环的条件polled值,表示是否继续拉取,当需要取消订阅时,可以调用close方法将该值设置为false退出循环,继续进行run方法的执行,执行完毕后完成取消订阅。
public KafkaConsumerContainer(KafkaProperties kafkaProperties, ThreadPoolExecutor executor, String topic, String groupId, String clientId, MessageHandler messageHandler) { this.kafkaProperties = kafkaProperties; this.executor = executor; this.topic = topic; this.groupId = groupId; this.clientId = clientId; this.messageHandler = messageHandler; this.polled = true; init(); } private void init() { Properties properties = this.kafkaProperties.buildConsumerProperties(); properties.put("group.id", this.groupId); properties.put("client.id", this.clientId); this.isAutoCommit = (boolean) properties.get("enable.auto.commit"); this.consumer = new KafkaConsumer(properties); this.consumer.subscribe(Arrays.asList(this.topic)); } private void pollAndInvoke() { while (polled) { ConsumerRecords<String, T> records = this.consumer.poll(Duration.ofSeconds(1)); System.out.println(records.count()); MessagedoHandle messagedoHandle = new MessagedoHandle(messageHandler, records); executor.submit(messagedoHandle); if (!isAutoCommit) { this.consumer.commitSync(); } } } @Override public void run() { pollAndInvoke(); System.out.println("this container is closed"); this.consumer.close(); this.executor.shutdown(); System.out.println("bye bye"); }
application.yml配置文件
kafka:
bootstrap-servers: 192.168.186.129:9093,192.168.186.129:9094,192.168.186.129:9095
producer:
retries: 0
consumer:
enable-auto-commit: true
KafkaConfig配置类
@Configuration public class KafkaConfig { @Bean @ConfigurationProperties(prefix = "kafka") public KafkaProperties kafkaProperties() { return new KafkaProperties(); } @Bean public KafkaManager kafkaManager(KafkaProperties kafkaProperties) { System.out.println(kafkaProperties.getBootstrapServers()); KafkaManager kafkaManager = new KafkaManager(kafkaProperties); return kafkaManager; } }
发送消息
@RestController
public class KafkaController {
@Autowired
private KafkaManager kafkaManager;
@RequestMapping("/send")
public String sendMessage() {
for (int i = 0; i < 10; i++) {
kafkaManager.sendMessage("test666", "kafkaMWeb test NO." + i);
}
return "success";
}
}
订阅消费
@Service
public class ConsumerService {
@Autowired
private KafkaManager kafkaManager;
public void consumerMessage() {
kafkaManager.subscribe((MessageContext<String> msg) -> System.out.println(msg.getMessage()), "1237", "test666");
}
}
通过将消息处理模块采用多线程处理来提高整体的消费能力。以这种方式优点是将消息的拉取和处理工作分离,消费者拉取消息后将消息进行多线程处理,极大提高了消费能力,并且避免了线程封闭策略下维持多个TCP连接带来的大量系统资源消耗,但缺点也显而易见,首先分区消息的顺序消费无法保证,其次消息的手动确认提交实现比较困难。
在以上例子中,将enable-auto-commit的值设置true,让其自动提交,但如果配置了false,需要手动提交时,则是在将消息提交到handler线程池后调用了consumer.commitSync()方法。这样的提交方式存在着消息丢失的可能,并不安全。
可以根据spring处理提交的方式,使用一个共享变量Map<topic, Map<partition, offset>> offsets来存储已消费消息的消费位移,在多线程修改该共享变量时需要进行加锁。
private void addOffset(ConsumerRecord<K, V> record) {
synchronized (offsets) {
((Map)this.offsets.computeIfAbsent(record.topic(), (v) -> {
return new ConcurrentHashMap();
})).compute(record.partition(), (k, v) -> {
return v == null ? record.offset() : Math.max(v, record.offset());
});
}
}
但是这样就可以避免消息丢失的风险吗?试想下如果handler1处理批次1的消息失败,而handler2处理批次2的消息成功,那么在提交消费位移后,批次1的消息就会丢失。
这种情况又改如何处理?朱忠华的《深入理解Kafka:核心设计与实践原理》一书中,提出了类似于TCP的滑动窗口控制的方式来控制消费位移的提交。更详细的信息大家可以阅读该书的第三章 消费者 多线程实现 部分的内容。
源码地址:https://github.com/Edenwds/kafka_study/tree/master/kafkaM
参考资料:《深入理解Kafka:核心设计与实践原理》— 朱忠华
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。