赞
踩
异步包括异步请求和异步调用。
异步请求与异步调用的区别
两者的使用场景不同,异步请求用来解决并发请求对服务器造成的压力,从而提高对请求的吞吐量;而异步调用是用来做一些非主线流程且不需要实时计算和响应的任务,如同步日志到kafka中做日志分析等。
异步请求是会一直等待response响应,需要返回结果给客户端;而异步调用往往会马上返回给客户端响应,完成这次整个的请求,至于异步调用的任务后台自己慢慢跑就行,客户端不会关心。
异步请求与同步请求
特点:
可以先释放容器分配给请求的线程与相关资源,减轻系统负担,释放容器所分配线程的请求,其响应将被延后,可以在耗时处理完成后再响应客户端。
一句话:增加服务器对客户端请求的吞吐量(实际生产环境里,并发请求量很大时,会通过nginx把请求负载到集群服务的各个节点上来分摊请求压力,当然还可以通过消息队列来做请求的缓冲)。
@Slf4j @RequestMapping(value = "/email/servletReq", method = GET) public void servletReq (HttpServletRequest request, HttpServletResponse response) { AsyncContext asyncContext = request.startAsync(); // 设置监听器:可设置其开始、完成、异常、超时等事件的回调处理 asyncContext.addListener(new AsyncListener() { @Override public void onTimeout(AsyncEvent event) throws IOException { // 超时后的相关操作... } @Override public void onStartAsync(AsyncEvent event) throws IOException { log.info("线程开始"); } @Override public void onError(AsyncEvent event) throws IOException { } @Override public void onComplete(AsyncEvent event) throws IOException { log.info("执行完成"); // 清理资源的操作... } }); // 设置超时时间 asyncContext.setTimeout(20000); asyncContext.start(new Runnable() { @Override public void run() { try { Thread.sleep(10000); log.info("内部线程:" + Thread.currentThread().getName()); asyncContext.getResponse().setCharacterEncoding("utf-8"); asyncContext.getResponse().setContentType("text/html;charset=UTF-8"); asyncContext.getResponse().getWriter().println("这是异步的请求返回"); } catch (Exception e) { } // 异步请求完成通知, 此时整个请求才完成 asyncContext.complete(); } }); // 此时之类request的线程连接已经释放 log.info("主线程:" + Thread.currentThread().getName()); }
@RequestMapping(value = "/email/callableReq", method = GET) @ResponseBody public Callable<String> callableReq () { log.info("外部线程:" + Thread.currentThread().getName()); return new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(10000); log.info("内部线程:" + Thread.currentThread().getName()); return "callable!"; } }; } @Configuration public class RequestAsyncPoolConfig extends WebMvcConfigurerAdapter { @Resource private ThreadPoolTaskExecutor myThreadPoolTaskExecutor; @Override public void configureAsyncSupport(final AsyncSupportConfigurer configurer) { // 处理callable超时 configurer.setDefaultTimeout(60*1000); configurer.setTaskExecutor(myThreadPoolTaskExecutor); configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor()); } @Bean public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor() { return new TimeoutCallableProcessingInterceptor(); } }
@RequestMapping(value = "/email/webAsyncReq", method = GET) @ResponseBody public WebAsyncTask<String> webAsyncReq () { log.info("外部线程:" + Thread.currentThread().getName()); Callable<String> result = () -> { log.info("内部线程开始:" + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(4); } catch (Exception e) { } log.info("副线程返回"); log.info("内部线程返回:" + Thread.currentThread().getName()); return "success"; }; WebAsyncTask<String> wat = new WebAsyncTask<String>(3000L, result); wat.onTimeout(new Callable<String>() { @Override public String call() throws Exception { return "超时"; } }); return wat; }
@RequestMapping(value = "/email/deferredResultReq", method = GET) @ResponseBody public DeferredResult<String> deferredResultReq () { log.info("外部线程:" + Thread.currentThread().getName()); // 设置超时时间 DeferredResult<String> result = new DeferredResult<String>(60*1000L); // 处理超时事件 采用委托机制 result.onTimeout(new Runnable() { @Override public void run() { log.info("DeferredResult超时"); result.setResult("超时!"); } }); result.onCompletion(new Runnable() { @Override public void run() { log.info("调用完成"); } }); myThreadPoolTaskExecutor.execute(new Runnable() { @Override public void run() { // 处理业务逻辑 log.info("内部线程:" + Thread.currentThread().getName()); // 返回结果 result.setResult("DeferredResult!"); } }); return result; }
开启异步支持
@Configuration
@EnableAsync
public class SpringAsyncConfig {
}
@EnableAsync检测Spring的@Async注释和EJB 3.1 javax. EJB异步,还可用于检测其他用户定义注解。
自定义线程池:
@Slf4j
@Configuration
public class ThreadPoolConfiguration {
@Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
public ThreadPoolExecutor systemCheckPoolExecutorService() {
return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10000),
new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),
(r, executor) -> log.error("system pool is full! "));
}
}
在异步处理的方法上添加注解 @Async ,当对 execute 方法 调用时,通过自定义的线程池 defaultThreadPoolExecutor 异步化执行 execute 方法
@Service
public class AsyncServiceImpl implements AsyncService {
@Async("defaultThreadPoolExecutor")
public Boolean execute(Integer num) {
log.info("线程:" + Thread.currentThread().getName() + " , 任务:" + num);
return true;
}
}
用 @Async 注解标记的方法,称为异步方法。在SB应用中使用 @Async 很简单:
@Async使用
@Async
@Slf4j
public void returnVoid() {
}
@Async
@Slf4j
public Future<String> returnFuture() {
try {
Thread.sleep(1000);
return new AsyncResult<String>("hello");
} catch (InterruptedException e) {
}
return null;
}
Spring默认使用SimpleAsyncTaskExecutor线程池去执行这些异步方法,此执行器没有限制线程数,实际上此线程池不是真正意义上的线程池,线程并没有重用,每次调用都会创建一个新的线程。可从两个层级进行覆盖:
@Async("threadPoolTaskExecutor")
public void asyncMethodWithConfiguredExecutor() {
}
@Configuration
@EnableAsync
public class SpringAsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.initialize();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
}
当方法返回值是Future时,异常捕获是没问题的,Future.get()
方法会抛出异常。但如果返回类型是Void,异常在当前线程就捕获不到,需要添加额外的配置来处理异常。
实现AsyncUncaughtExceptionHandler接口来自定义异常处理类,重写handleUncaughtException()方法,存在任何未捕获的异步异常时调用:
@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException (Throwable throwable, Method method, Object... obj) {
log.info("Exception message - " + throwable.getMessage() + "Method name - " + method.getName());
for (Object param : obj) {
log.info("Parameter value - " + param);
}
}
}
由configuration类实现的AsyncConfigurer接口。作为其中的一部分,还需要覆盖getAsyncUncaughtExceptionHandler()方法来返回自定义的异步异常处理程序:
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
调用的异步方法,不能为同一个类的方法(包括同一个类的内部类),简单来说,因为Spring在启动扫描时会为其创建一个代理类,而同类调用时,还是调用本身的代理类的,所以和平常调用是一样的。其他注解如@Cache等也是如此,由于Spring的代理机制。在开发中最好把异步服务单独抽出一个类来管理。
导致@Async异步方法失效的几种情况:
上面的情况2,3很好解决,仅考虑情况1。
@Controller public class EmailController { @Autowired private ApplicationContext applicationContext; @RequestMapping(value = "/asyncCall", method = GET) @ResponseBody public void asyncCall () { try { // 调用同类下的异步方法是不起作用的 // this.testAsyncTask(); // 通过上下文获取自己的代理对象调用异步方法 EmailController controller = (EmailController)applicationContext.getBean(EmailController.class); controller.testAsyncTask(); } catch (Exception e) { } } @Async public void testAsyncTask() throws InterruptedException { Thread.sleep(10000); log.info("异步任务执行完成!"); } }
@EnableAspectJAutoProxy(exposeProxy = true)
注解:@Service @Transactional(value = "transactionManager", readOnly = false, propagation = Propagation.REQUIRED, rollbackFor = Throwable.class) public class EmailService { @Autowired private ApplicationContext applicationContext; @Async public void testSyncTask() throws InterruptedException { Thread.sleep(10000); log.info("异步任务执行完成!"); } public void asyncCallTwo() throws InterruptedException { //this.testSyncTask(); // EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class); // emailService.testSyncTask(); boolean isAop = AopUtils.isAopProxy(EmailController.class);//是否是代理对象; boolean isCglib = AopUtils.isCglibProxy(EmailController.class); //是否是CGLIB方式的代理对象; boolean isJdk = AopUtils.isJdkDynamicProxy(EmailController.class); //是否是JDK动态代理方式的代理对象; EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class); EmailService proxy = (EmailService) AopContext.currentProxy(); log.info(emailService == proxy ? true : false); proxy.testSyncTask(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。