赞
踩
在实际开发过程中,如果使用Kafka处理超大数据量(千万级、亿级)的场景,Kafka消费者的消费速度可能决定系统性能瓶颈。
为了提高消费者的消费速度,我们可以采取以下措施:
concurrency
将消费者的消费线程数增大到 10(2个pod),提高消息处理的并发能力。max.poll.records
增大到 500,提高单次处理消息的数量。@Component public class OrderConsumer { @Resource(name = "execThreadPool") private ThreadPoolTaskExecutor execThreadPool; @KafkaListener( id = "record_consumer", topics = "record", groupId = "g_record_consumer", concurrency = "10", properties = {"max.poll.interval.ms:300000", "max.poll.records:500"} ) public void consume(ConsumerRecords<String, String> records, Acknowledgment ack) { execThreadPool.submit(()-> { // 业务逻辑 } ); ack.acknowledge(); } }
ThreadPoolTaskExecutor
是 Spring 框架提供的一个线程池实现,用于管理和执行多线程任务。它是 TaskExecutor 接口的实现,提供了在 Spring 应用程序中创建和配置线程池的便捷方式。
ThreadPoolTaskExecutor主要特点:
线程池配置: ThreadPoolTaskExecutor 允许你配置核心线程数、最大线程数、队列容量等线程池属性。
线程创建和销毁: 它会根据任务的需求自动创建和销毁线程,避免不必要的线程创建和销毁开销。
线程复用: 线程池中的线程可以被复用,从而减少线程创建的开销。
队列管理: 当线程池达到最大线程数时,新任务会被放入队列中等待执行。
拒绝策略: 当线程池已满并且队列也已满时,可以配置拒绝策略来处理新任务的方式。
RejectedExecutionHandler 是 Java 线程池的一个重要接口,用于定义当线程池已满并且无法接受新任务时,如何处理被拒绝的任务。当线程池的队列和线程都已满,新任务就会被拒绝执行,这时就会使用 RejectedExecutionHandler 来处理这些被拒绝的任务。
在 Java 中,有几种内置的 RejectedExecutionHandler 实现可供选择,每种实现都有不同的拒绝策略:
AbortPolicy(默认策略): 这是默认的拒绝策略,它会抛出一个 RejectedExecutionException 异常,表示任务被拒绝执行。
CallerRunsPolicy: 当线程池已满时,将任务返回给提交任务的调用者(Caller)。这意味着提交任务的线程会尝试执行被拒绝的任务。
DiscardPolicy: 这个策略会默默地丢弃被拒绝的任务,不会产生任何异常。
DiscardOldestPolicy: 这个策略会丢弃队列中最老的任务,然后尝试将新任务添加到队列中。
除了这些内置的策略,你还可以实现自定义的 RejectedExecutionHandler 接口,以定义特定于你应用程序需求的拒绝策略。你可以根据业务需求来决定拒绝策略,比如记录日志、通知管理员、重试等。
@Configuration public class ThreadPoolConfig { @Bean private ThreadPoolTaskExecutor execThreadPool() { ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); pool.setCorePoolSize(50); // 核心线程数 pool.setMaxPoolSize(10000); // 最大线程数 pool.setQueueCapacity(0); // 等待队列size pool.setKeepAliveSeconds(60); // 线程最大空闲存活时间 pool.setWaitForTasksToCompleteOnShutdown(true); pool.setAwaitTerminationSeconds(60); // 程序shutdown时最多等60秒钟让现存任务结束 pool.setRejectedExecutionHandler(new CallerRunsPolicy()); // 拒绝策略 return pool; } }
通过以上方案,我们可以提高消费侧的TPS,同时杜绝重复上报的现象,极大提高数据准确性和用户体验。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。