赞
踩
jdk21已于北京时间9月19日21点正式发布, 其中引人注目的就是虚拟线程(Virtual Thread)随之正式发布, 不再是此前jdk19、jdk20中的预览版本。
平台线程:java传统的线程是对系统线程的包装,为了区别于虚拟线程,因此将通过传统方式实现的线程叫做平台线程(Platform Thread)
虚拟线程:虚拟线程是由JDK内部实现的轻量级线程,不依赖于操作系统,可以显著减少编写、维护和观察高吞吐量并发应用程序的工作量。
添加虚拟线程工作量巨大,花费了数年时间,不断孵化,虚拟线程主要是为了解决异步编程相关的问题, 让应用程序能够以简单的一个请求一个处理线程的方式运行,并且能够达到硬件的最佳利用率,先回顾下java传统方式实现编发的两个方案:
这种方式可以让开发专注于业务逻辑,使用命令式编程,代码在一条线程上从头到尾执行, Tomcat的Servlet线程就是该模式。
为了提高应用程序的并发请求数,通常会启用多个线程来接受请求,jdk中的线程是对操作系统线程的包装。这导致java的线程创建,销毁成本比较高,为了避免这种情况通常会使用线程池来提高程序性能。
假如一个请求需要耗时50ms,要想实现每秒200的吞吐量, 则理论上至少需要10条线程。如果要想达到2000的吞吐量,怎需要将线程池线程数量设置到100条。
但一个操作系统能创建的线程数量是有限的,线程池化虽然避免了线程创建、销毁的开销,但并不能提高线程数。在CPU和连接数被耗尽之前很可能无法再创建线程,CPU也就无法得到充分利用。
为了充分提高硬件利用率,则出现了类似netty
这种异步事件驱动的网络I/O框架,以及Reactive Stream
这种反应式编程模式(Spring-WebFlux
就是反应式编程的一种实现)。代码不是在一个线程上从头到尾处理请求,而是在等待另一 I/O 操作完成时将其线程返回到池中,以便该线程可以为其它请求提供服务,可以实现通过少量线程数达到大量并发操作。
但是这种编程方式比较难以维护,通过大量的回调方法编排业务逻辑(通常是使用java8的lambda语法实现),方法的返回值变成了Mono
、或者CompletableFuture
类型。大量回调不便于理解业务需求,需要在onErrorResume
中做异常处理,也无法对整个方法加try/catch
块达到预期的异常处理。
如上是一个Spring-webflux的示例代码,一个方法存在4层return语句,并且这4层代码块很可能是运行在不同线程。
JDK传统方式实现的平台线程是对操作系统线程包装,线程的创建受到操作系统的限制,一条平台线程的创建要占用到1M左右的内存。
而虚拟线程
是JDK基于平台线程
实现的轻量级线程
,虚拟线程
依附于平台线程
(此时称为载体线程
)运行,它的创建成本很低,不会像平台线程独占操作系统线程,Java 通过将大量虚拟线程映射到少量平台线程来提供充足线程的假象。
因此可以通过虚拟线程来实现一次请求代码只会执行在同一个虚拟线程中,让开发者更专注于业务逻辑。
但虚拟线程仅在 CPU 上执行计算时才消耗操作系统线程,有着与异步编程方式相同的吞吐量,只不过它是透明实现的:当在虚拟线程中运行的代码调用阻塞 I/O 操作时,java会自动挂起虚拟线程,IO操作完成后再自动恢复执行虚拟线程。
为什么增加虚拟线:简单直白的描述就是希望java开发人员能够以简单易懂的编码方式来实现高吞吐量量的应用程序,提高CPU利用率,避免资源浪费。
Stream API
仍然是并行处理数据集的首选方式java.util.concurrent.Semaphore
信号量这样的方式控制Thread.setDaemon(boolean)
方法无法将虚拟线程更改为非守护线程。Thread.NORM_PRIORITY
。该Thread.setPriority(int)
方法对虚拟线程没有影响。未来版本中可能会重新考虑此限制。SecurityManager
。java.lang.management.ThreadMXBeanAPI
支持平台线程
的监控和管理,但不支持虚拟线程。-XX:+PreserveFramePointer
标志对虚拟线程性能有巨大的负面影响创建虚拟线程并直接启动
Thread.startVirtualThread(new Runnable() {
@Override
public void run() {
log.info("虚拟线程执行:threadId:{}", Thread.currentThread().threadId());
}
});
创建2条虚拟线程调用代码启动,并等待执行完成
// 这种写法会让日志中无法打印线程名
public static void main(String[] args) throws InterruptedException {
var vthread = Thread.ofVirtual().unstarted(() -> {
log.info("虚拟线程休眠开始:" + Thread.currentThread());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("虚拟线程休眠结束:" + Thread.currentThread());
});
var vthread2 = Thread.ofVirtual().unstarted(() -> {
log.info("虚拟线程休眠开始:" + Thread.currentThread());
try {
Thread.sleep(110);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("虚拟线程休眠结束:" + Thread.currentThread());
});
vthread.start();
vthread2.start();
vthread2.join();
vthread.join();
}
如下是执行结果:
1. 日志中没能打印出线程名
给虚拟线程添加线程名,便于业务分析,排障
// 给虚拟线程添加线程名,便于业务分析,排障
public static void main(String[] args) throws InterruptedException {
var vthread = Thread.ofVirtual().name("vThread-test-", 1).unstarted(() -> {
log.info("虚拟线程休眠结束:{}", Thread.currentThread());
});
var vthread2 = Thread.ofVirtual().name("vThread-test2-", 2).unstarted(() -> {
log.info("虚拟线程休眠结束:{}", Thread.currentThread());
});
vthread.start();
vthread2.start();
vthread2.join();
vthread.join();
}
实际上Thread.ofVirtual().name("vThread-test-", 1)
返回的Thread.Builder.OfVirtual
是
VirtualThreadBuilder
实现类,并不需要每次创建,可以改成如下方式:
public static void main(String[] args) throws InterruptedException {
Thread.Builder.OfVirtual virtualThreadBuilder
= Thread.ofVirtual()
.name("vThread-test-", 1)
.uncaughtExceptionHandler((t, e) -> System.out.println("异常处理"));
var vthread = virtualThreadBuilder.unstarted(() -> {
log.info("虚拟线程休眠结束:{}", Thread.currentThread());
});
var vthread2 = virtualThreadBuilder.unstarted(() -> {
log.info("虚拟线程休眠结束:{}", Thread.currentThread());
});
vthread.start();
vthread2.start();
vthread2.join();
vthread.join();
}
也可以使用创建线程的工厂模式,
这种方式估计是为了适配原先的java.util.concurrent.ThreadFactory
ThreadFactory factory = Thread.ofVirtual()
.name("virtual-thread-test-2-", 1)
.uncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("虚拟线程触发异常" + t + ",throwable:" + e.getMessage());
log.info("虚拟线程Id:{}, 虚拟线程名:{}, 依附的平台线程:{}", t.threadId(), t.getName(), t);
}
})
.factory();
// 通过工厂创建虚拟线程
factory.newThread(runnable);
2. 再看日志的第二个问题,虚拟线程号#23开始和结束日志后面的ForkJoinPool-1-worker-后面的序号不一样,这是为什么???
#23
是虚拟线程
的线程号
,而后面的ForkJoinPool-1-worker-
实际上是虚拟线程所被挂载到的平台线程
(载体线程
),从名字可以看出,虚拟线程依赖的载体线程实际上由ForkJoinPool
来实现,jdk在调度虚拟线程时保证代码在挂起前后是在同一个虚拟线程执行,但是不保证所依赖的载体线程也是同一个,也不需要有这样的保证。
java.lang.VirtualThread#toString
方法
下面是JDK官网一个创建大量虚拟线程的示例程序。程序首先获得一个ExecutorService将为每个提交的任务创建一个新的虚拟线程。然后它提交 10,000 个任务并等待所有任务完成:
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
IntStream.range(0, 10_000).forEach(i -> {
executor.submit(() -> {
Thread.sleep(Duration.ofSeconds(1));
return i;
});
});
} // executor.close() is called implicitly, and waits
此示例中的任务是简单的代码(休眠一秒钟),现代硬件可以轻松支持 10,000 个虚拟线程同时运行此类代码。但实际上却仅依赖少量的平台线程
Executors.newCachedThreadPool()
创建ExecutorService
,ExecutorService
将尝试创建 10,000 个平台线程
,从而创建 10,000 个操作系统线程,可能会出现程序崩溃,具体取决于机器性能和操作系统,我实验的时候电脑直接崩了重启,估计是内存不足导致。Executors.newFixedThreadPool(200)
创建ExecutorService
,ExecutorService
将创建 200 个平台线程,10,000 个任务共用该线程池,许多任务将顺序运行而不是并发运行,并且程序将需要很长时间才能完成。对于该程序,具有 200 个平台线程的池只能实现每秒 200 个任务的吞吐量,而虚拟线程可实现每秒约 10,000 个任务的吞吐量。此外,如果将10_000示例程序中的 更改为1_000_000,则该程序将提交 1,000,000 个任务,创建 1,000,000 个并发运行的虚拟线程,并且(在充分预热后)实现每秒约 1,000,000 个任务的吞吐量。虚拟线程
并不能让运行代码的速度比平台线程
快,但可以显著提高应用程序吞吐量。而使用虚拟线程几乎同时执行完成
以上示例使用Executor.newVirtualThreadPerTaskExecutor()
直接创建虚拟线程,同样丢失了虚拟线程名,可以通过虚拟线程工来添加虚拟线程名
ThreadFactory factory = Thread.ofVirtual().name("virtual-thread-test-", 1).factory();
ExecutorService virtualThreadExecutor = Executors.newThreadPerTaskExecutor(factory);
下面是聚合两个远程请求结果并作为返回值的示例:
public Response handle(Request request) {
var url1 = ...
var url2 = ...
Response response = new Response();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var future1 = executor.submit(() -> fetchURL(url1));
var future2 = executor.submit(() -> fetchURL(url2));
response.send(future1.get() + future2.get());
} catch (ExecutionException | InterruptedException e) {
response.fail(e);
}
return response;
}
String fetchURL(URL url) throws IOException {
try (var in = url.openStream()) {
return new String(in.readAllBytes(), StandardCharsets.UTF_8);
}
}
handle方法中通过虚拟线程访问2个远程服务,并通过future1.get()等待结果,将结果聚合返回。像这样的服务器应用程序具有简单的阻塞代码,可以很好地扩展,因为它可以使用大量虚拟线程。
JDK传统的平台线程依赖于操作系统调度。
而对于虚拟线程,由JDK 调度执行。JDK的调度程序将虚拟线程分配给平台线程(此时平台线程称为虚拟线程的载体
)。然后,操作系统像往常一样调度平台线程,(可以实现虚拟线程和平台线程M:N调度关系)。
JDK的虚拟线程调度程序通过ForkJoinPool以先进先出(FIFO)模式调度。调度程序默认的平台线程数它等于可用处理器的数量,可以通过系统属性进行调整jdk.virtualThreadScheduler.parallelism
。
源码见: java.lang.VirtualThread#createDefaultScheduler
虚拟线程在其生命周期内可以被调度到不同的载体上;换句话说,调度程序不维护虚拟线程和任何特定平台线程之间的关联性。从Java代码的角度来看,一个正在运行的虚拟线程在逻辑上独立于它当前的载体:
源码级分析参考: 虚拟线程 - VirtualThread源码透视
当JDK调度程序调度虚拟线程执行时则为挂载,此时平台线程成为载体线程
当虚拟线程执行完成或被阻塞时则由调度程序从载体线程卸载
,平台线程可以再次用于挂载
其它虚拟线程执行
如下会触发虚拟线程卸载:
public static void main(String[] args) throws Exception {
ForkJoinPool forkJoinPool = ForkJoinPoolFactory.createDefaultScheduler("custom-platform-thread-");
Thread.Builder.OfVirtual virtualBuilder = Thread.ofVirtual();
Field schedulerField = virtualBuilder.getClass().getDeclaredField("scheduler");
schedulerField.setAccessible(true);
schedulerField.set(virtualBuilder, forkJoinPool);
virtualBuilder.name("virtual-thread-test-", 0)
.uncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("异常处理");
}
})
.factory();
for (int i = 0; i < 10; i++) {
virtualBuilder.start(new Runnable() {
@Override
public void run() {
log.info("虚拟线程执行:thread:{}", Thread.currentThread());
}
});
}
Thread.sleep(Duration.ofSeconds(3));
}
运行结果: 可以看到, 载体线程被替换成自定义的平台线程
观察正在运行的程序的状态对于故障排除、维护和优化也至关重要,虚拟线程也是线程的一种实现,因此常见的监控工具也能监控虚拟线程。
Java 调试器可以单步执行虚拟线程、显示调用堆栈并检查堆栈帧中的变量。
JDK Flight Recorder (JFR)
是 JDK 的低开销分析和监视机制,可以将应用程序代码中的事件(例如对象分配和 I/O 操作)与正确的虚拟线程关联起来。
JDK传统jstack或jcmd命令仅适用于数十或数百个平台线程,但不适合数千或数百万虚拟线程。因此JDK引入一种新的线程转储,jcmd以将虚拟线程与平台线程一起呈现,jcmd除了纯文本之外,还可以以 JSON 格式保存线程转储信息:
$ jcmd <pid> Thread.dump_to_file -format=json <file>
但新的线程转存储格式不包括对象地址、锁、JNI 统计信息、堆统计信息以及传统线程转储中出现的其他信息。
由于可能存在大量线程,因此JDK将这种线程dump方式设计为不暂停应用程序。
如果设置系统属性-Djdk.trackAllThreads=false,则直接使用java.lang.Thread.BuilderAPI 创建的虚拟线程将不会被运行时跟踪,并且可能不会出现在线程dump信息中。只会列出阻塞在网络io操作的虚拟线程以及由Executors.newVirtualThreadPerTaskExecutor()创建的虚拟线程。
虚拟线程是在 JDK 中实现的,并且不依赖于任何特定的操作系统线程,因此操作系统级监控无法观察到虚拟线程。
StackOverflowError
可能会抛出异常。虚拟线程支持线程局部变量 (ThreadLocal
) 和可继承的线程局部变量 (InheritableThreadLocal
),就像平台线程一样,因此它们可以运行使用线程局部变量的现有代码。但是,由于虚拟线程可能非常多,因此只有在仔细考虑后才能使用线程局部变量。特别是,不要使用线程局部变量在线程池中共享同一线程的多个任务之间池化昂贵的资源。虚拟线程永远不应该被池化,因为每个虚拟线程在其生命周期内只运行一个任务。我们从JDKjava.base
模块中删除了许多线程局部变量的使用,为虚拟线程做好准备,以便在运行数百万个线程时减少内存占用。
当虚拟线程设置任何线程局部变量的值时,系统属性jdk.traceVirtualThreadLocals
可用于触发堆栈跟踪。当迁移代码以使用虚拟线程时,此诊断输出可能有助于删除线程局部变量。将系统属性设置为true来触发堆栈跟踪;默认值为false。
对于某些用例,ScopedValue(JEP 429
是线程局部变量的更好替代方案, 但JDK21中仍然是预览版本。
Spring Framework、Spring Boot已经适配了虚拟线程
在SpringBoot中只需要通过如下配置即可让Spring异步任务,Servlet使用虚拟线程
@Bean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME)
public AsyncTaskExecutor asyncTaskExecutor() {
return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
}
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}
Spring在积极的改进相关代码,已适配虚拟线程, 例如数据库驱动程序、消息传递系统、HTTP 客户端等等。
但也表示虚拟线程不能完全替换ReactiveX 的编程模式,但是可以补充ReactiveX 中的一些不足.
Spring相关blog: https://spring.io/blog/2022/10/11/embracing-virtual-threads
Netty
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。