15.SpringMVC之异步请求
阅读原文时间:2023年07月09日阅读:1

SpringMVC在此基础上对异步请求进行了封装。提供了AsyncWebRequest类型的request,并提供了处理异步请求的管理器WebAsyncManager和工具WebAsyncUtils.

SpringMVC将异步请求返回值细分为了:Callable,WebAsyncTask,,DeferredResult 和 ListenableFuture. 后续会针对这四种不同的类型一一分析。

AsyncWebRequest

AsyncWebRequest,它是专门处理异步请求的request,定义如下:

//org.springframework.web.context.request.async.AsyncWebRequest
public interface AsyncWebRequest extends NativeWebRequest {
void setTimeout(Long timeout);

//相当于在AsyncListener中的\`onTimeout和onComplete\`  
void addTimeoutHandler(Runnable runnable);  
void addCompletionHandler(Runnable runnable);

void startAsync();

//判断异步请求是否开启和结束  
boolean isAsyncStarted();  
boolean isAsyncComplete();

void dispatch();  

}

AsyncWebRequest 有两个实现类,

  • NoSupportAsyncWebRequest: 不支持异步请求
  • StandardServletAsyncWebRequest: 支持异步请求。

StandardServletAsyncWebRequest 除了实现了AsyncWebRequest接口外,还实现了AsyncListener,另外它还继承了ServletWebRequest.

public class StandardServletAsyncWebRequest extends ServletWebRequest implements AsyncWebRequest, AsyncListener {
private Long timeout;

//封装 AsyncContext 属性  
private AsyncContext asyncContext;  
private AtomicBoolean asyncCompleted = new AtomicBoolean(false);

//AsyncListener onTimeout,onCompletion方法 调用如下handlers..  
private final List<Runnable> timeoutHandlers = new ArrayList<Runnable>();  
private final List<Runnable> completionHandlers = new ArrayList<Runnable>();

@Override  
public boolean isAsyncStarted() {  
    return ((this.asyncContext != null) && getRequest().isAsyncStarted());  
}

@Override  
public void startAsync() {  
    if (isAsyncStarted()) {  
        return;  
    }  
    this.asyncContext = getRequest().startAsync(getRequest(), getResponse());  
    this.asyncContext.addListener(this);  
    if (this.timeout != null) {  
        this.asyncContext.setTimeout(this.timeout);  
    }  
}

// ---  实现 AsyncListener 方法----  
@Override  
public void onTimeout(AsyncEvent event) throws IOException {  
    for (Runnable handler : this.timeoutHandlers) {  
        handler.run();  
    }  
}

@Override  
public void onComplete(AsyncEvent event) throws IOException {  
    for (Runnable handler : this.completionHandlers) {  
        handler.run();  
    }  
    //执行完完成时,清空asyncContext  
    this.asyncContext = null;  
    this.asyncCompleted.set(true);  
}  

}

WebAsyncUtils

//org.springframework.web.context.request.async.WebAsyncUtils
public abstract class WebAsyncUtils {
//第一次获取时,直接创建WebAsyncManager,并设置到setAttribute中。 以后获取,直接从request属性中获取。
public static WebAsyncManager getAsyncManager(ServletRequest servletRequest) {
WebAsyncManager asyncManager = (WebAsyncManager) servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE);
if (asyncManager == null) {
asyncManager = new WebAsyncManager();
servletRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager);
}
return asyncManager;
}

public static WebAsyncManager getAsyncManager(WebRequest webRequest) {  
    //逻辑类似  getAsyncManager(ServletRequest servletRequest) 略...  
}

//判断ServletRequest是否有方法"startAsync"。 只有servlet环境3.0以上版本才有此方法  
public static AsyncWebRequest createAsyncWebRequest(HttpServletRequest request, HttpServletResponse response) {  
    return ClassUtils.hasMethod(ServletRequest.class, "startAsync") ?  
            createStandardServletAsyncWebRequest(request, response) : new NoSupportAsyncWebRequest(request, response);  
}

private static AsyncWebRequest createStandardServletAsyncWebRequest(HttpServletRequest request, HttpServletResponse response) {  
    if (standardAsyncRequestConstructor == null) {  
        String className = "org.springframework.web.context.request.async.StandardServletAsyncWebRequest";  
        Class<?> clazz = ClassUtils.forName(className, WebAsyncUtils.class.getClassLoader());  
        standardAsyncRequestConstructor = clazz.getConstructor(HttpServletRequest.class, HttpServletResponse.class);  
    }  
    return (AsyncWebRequest) BeanUtils.instantiateClass(standardAsyncRequestConstructor, request, response);  
}  

}

WebAsyncManager

WebAsyncManager是SpringMVC处理异步请求过程中最核心的类,它管理着整个异步处理的过程。

//org.springframework.web.context.request.async
public final class WebAsyncManager {
//两种类型的 超时 Interceptors
private static final CallableProcessingInterceptor timeoutCallableInterceptor = new TimeoutCallableProcessingInterceptor();
private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor = new TimeoutDeferredResultProcessingInterceptor();

//持有 asyncWebRequest 对象  
private AsyncWebRequest asyncWebRequest;  
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(this.getClass().getSimpleName());

//两种类型的 处理请求Interceptors  
private final Map<Object, CallableProcessingInterceptor> callableInterceptors = new LinkedHashMap<Object, CallableProcessingInterceptor>();  
private final Map<Object, DeferredResultProcessingInterceptor> deferredResultInterceptors = new LinkedHashMap<Object, DeferredResultProcessingInterceptor>();    

//用来处理Callable 和 WebAsyncTask 类型的异步请求  
public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception {   }

//用来处理 DeferredResult 和 ListenableFuture 类型的请求  
public void startDeferredResultProcessing(final DeferredResult<?> deferredResult, Object... processingContext) throws Exception  

}

它最重要的两个方法是:startCallableProcessingstartDeferredResultProcessing,这两个方法是启动异步处理的入口方法,它们一共做三件事:

  1. 给Request设置属性(timeout,timeoutHandler,completionHandler…)
  2. 在相应位置,执行interceptors逻辑
  3. 启动异步处理

这里重点分析下startCallableProcessing

public void startCallableProcessing(final WebAsyncTask webAsyncTask, Object… processingContext) throws Exception {
//设置asyncWebRequest 属性…
Long timeout = webAsyncTask.getTimeout();
if (timeout != null) {
this.asyncWebRequest.setTimeout(timeout);
}

AsyncTaskExecutor executor = webAsyncTask.getExecutor();  
if (executor != null) {  
    this.taskExecutor = executor;  
}

//初始化 interceptors

//在asyncWebRequest 执行前后,执行完成,超时 等关键时间节点 执行 interceptors 逻辑…

//启动异步处理
startAsyncProcessing(processingContext);

// 线程池 执行callable方法....  
this.taskExecutor.submit(new Runnable() {  
    @Override  
    public void run() {  
        // interceptors 略....  
        Object result = callable.call();

        //设置处理结果,并发送请求  
        setConcurrentResultAndDispatch(result);  
    }  
});  

}

//调用asyncWebRequest.startAsync()启动异步处理
private void startAsyncProcessing(Object[] processingContext) {
clearConcurrentResult();
this.concurrentResultContext = processingContext;
this.asyncWebRequest.startAsync();
}

//判断是否已经有异步处理结果
public boolean hasConcurrentResult() {
//concurrentResult 初始化时 = RESULT_NONE
return (this.concurrentResult != RESULT_NONE);
}

//设置处理结果,并发送请求
private void setConcurrentResultAndDispatch(Object result) {
synchronized (WebAsyncManager.this) {
//判断是否已经有异步处理结果
if (hasConcurrentResult()) {
return;
}

    //将result设置为当前处理结果  
    this.concurrentResult = result;  
}

//如果异步请求在这里已经被设置为异步处理完成状态,则记录错误日志。(网络异常会造成此种问题)  
if (this.asyncWebRequest.isAsyncComplete()) {  
    logger.error("Could not complete async processing due to timeout or network error");  
    return;  
}  
//再次发送请求:SpringMVC请求处理完成之后再次发送一个相同的请求。在HandlerAdapter做特殊处理  
this.asyncWebRequest.dispatch();  

}

SpringMVC想要支持异步处理,首先DispatchServlet要配置:<async-supported>true</async-supported>,其次请求方法的返回值为:Callable,WebAsyncTask,,DeferredResult 和 ListenableFuture

@Controller
@RequestMapping("/async")
public class AsyncController {

@RequestMapping(value = "/callable",produces = "text/plain;charset=UTF-8")  
@ResponseBody  
public Callable<String> callable(){  
    System.out.println("Callable进入主线程...");  
    Callable<String> result =  new Callable<String>() {  
        @Override  
        public String call() throws Exception {  
            Thread.sleep(5 \* 1000);  
            System.out.println("Callable子线程执行ing...");  
            return "Callable:"+"久等了";  
        }  
    };  
    System.out.println("Callable主线程退出...");  
    return result;  
}

@RequestMapping(value = "/web",produces = "text/plain;charset=UTF-8")  
@ResponseBody  
public WebAsyncTask<String> web(){  
    System.out.println("WebAsyncTask 进入主线程...");  
    WebAsyncTask task = new WebAsyncTask(new Callable() {  
        @Override  
        public Object call() throws Exception {  
            Thread.sleep(5 \* 1000);  
            System.out.println("WebAsyncTask 子线程执行ing...");  
            return "WebAsyncTask:"+"久等了";  
        }  
    });  
    System.out.println("WebAsyncTask 主线程退出...");  
    return task;  
}

@RequestMapping(value = "/deferred",produces = "text/plain;charset=UTF-8")  
@ResponseBody  
public DeferredResult<String> deferred(){  
    //这里的 7 \* 1000 L ,是指主线程结束之后的超时时间。  
    DeferredResult<String> result = new DeferredResult<String>(7 \* 1000L , "超时了");  
    approve(result);  
    try {  
        Thread.sleep(10 \* 1000); //在主线程执行这段代码,并不会抛出"超时了"  
    } catch (InterruptedException e) {  
    }  
    return result;  
}

private void approve(final DeferredResult<String> result) {  
    new Thread(() -> {  
        try {  
            Thread.sleep(5 \* 1000);  
            result.setResult("同意:" + LocalDateTime.now());  
        } catch (InterruptedException e) {  
        }  
    }).start();  
}

@RequestMapping(value = "/future",produces = "text/plain;charset=UTF-8")  
public ListenableFuture<ResponseEntity<String>> future(){  
    ListenableFuture<ResponseEntity<String>> future = new AsyncRestTemplate().getForEntity("http://www.baidu.com", String.class);  
    return future;  
}  

}

源码跟踪

springMVC异步处理请求的过程是总体上可以拆分为2次:

  • 第一次,启动异步请求,并设置timeout,completion等事件的监听,直接返回 null;
  • 第二次,当监听到completion 时,直接在发送一次相同的请求,并将执行结果返回。

SpringMVC执行请求方法的过程都是在HandlerAdater中进行的。

在之前解析RequestMappingHandlerAdapter#invokeHandleMethod()处理请求时,将异步请求部分给剔除了,现在回看此方法:

//org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter
private ModelAndView invokeHandleMethod(HttpServletRequest request,
HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {

    ServletWebRequest webRequest = new ServletWebRequest(request, response);

    WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);  
    ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);  
    ServletInvocableHandlerMethod requestMappingMethod = createRequestMappingMethod(handlerMethod, binderFactory);

    //mavContainer相关略......

    AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);  
    asyncWebRequest.setTimeout(this.asyncRequestTimeout);

    final WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);  
    asyncManager.setTaskExecutor(this.taskExecutor);  
    asyncManager.setAsyncWebRequest(asyncWebRequest);  
    asyncManager.registerCallableInterceptors(this.callableInterceptors);  
    asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);

    //异步请求是否已经完成  
    if (asyncManager.hasConcurrentResult()) {  
        //如果异步请求已经处理完成,则获取执行结果  --- 1  
        Object result = asyncManager.getConcurrentResult();  
        mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()\[0\];

        //清空执行结果  
        asyncManager.clearConcurrentResult();

        //覆盖原有的requestMappingMethod方法;  --- 2  
        requestMappingMethod = requestMappingMethod.wrapConcurrentResult(result);  
    }

    //执行方法 -- 3  
    requestMappingMethod.invokeAndHandle(webRequest, mavContainer);

    //asyncManager是否已经启动  
    if (asyncManager.isConcurrentHandlingStarted()) {  
        //-- 4  
        return null;  
    }  
    // --- 5  
    return getModelAndView(mavContainer, modelFactory, webRequest);  
}
  • 第一次执行时: 会执行上述代码中的 3,4
  • 第二次执行时: 执行上述代码中的1,2,3,5 。 注意步骤2,会将原有的requestMappingMethod重写.接下来会分析。

ServletInvocableHandlerMethod.invokeAndHandle(webRequest, mavContainer)

springMVC在使用RequestMappingHandlerAdapter#invokeHandleMethod()处理请求时,会调用ServletInvocableHandlerMethod#invokeAndHandle()方法,该方法在处理完毕之后,会调用

this.returnValueHandlers.handleReturnValue(returnValue, getReturnValueType(returnValue), mavContainer, webRequest);`

处理返回值,针对上述四种类型的结果,匹配不同的XXReturnValueHandler.

Callable : CallableMethodReturnValueHandler

//org.springframework.web.servlet.mvc.method.annotation.CallableMethodReturnValueHandler
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
//if 略…
Callable callable = (Callable) returnValue;
WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(callable, mavContainer);
}

WebAsyncTask : AsyncTaskMethodReturnValueHandler

//org.springframework.web.servlet.mvc.method.annotation.AsyncTaskMethodReturnValueHandler
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
//if 略…
WebAsyncTask webAsyncTask = (WebAsyncTask) returnValue;
webAsyncTask.setBeanFactory(this.beanFactory);
WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(webAsyncTask, mavContainer);
}

这里可以看出 Callable和webAsyncTask都是用了startCallableProcessing方法。

DeferredResultDeferredResultMethodReturnValueHandler

//org.springframework.web.servlet.mvc.method.annotation.DeferredResultMethodReturnValueHandler
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
//if 略…
DeferredResult deferredResult = (DeferredResult) returnValue;
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
}

DeferredResultDeferredResultMethodReturnValueHandler

//org.springframework.web.servlet.mvc.method.annotation.ListenableFutureReturnValueHandler
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
//if 略…
final DeferredResult deferredResult = new DeferredResult();
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);

ListenableFuture<?> future = (ListenableFuture<?>) returnValue;  
future.addCallback(new ListenableFutureCallback<Object>() {  
    @Override  
    public void onSuccess(Object result) {  
        deferredResult.setResult(result);  
    }  
    @Override  
    public void onFailure(Throwable ex) {  
        deferredResult.setErrorResult(ex);  
    }  
});  

}

自此可以说明看 DeferredResult 和 ListenableFuture都是用了startDeferredResultProcessing方法。

ServletInvocableHandlerMethod.wrapConcurrentResult(result)

第二次请求时,要重点关注此行:requestMappingMethod.wrapConcurrentResult(result),此时的result已经是异步执行后的最终结果,不是DeferredResult.

//org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod
private static final Method CALLABLE_METHOD = ClassUtils.getMethod(Callable.class, "call");

public ServletInvocableHandlerMethod(Object handler, Method method) {
super(handler, method);
initResponseStatus();
}
ServletInvocableHandlerMethod wrapConcurrentResult(Object result) {
return new ConcurrentResultHandlerMethod(result, new ConcurrentResultMethodParameter(result));
}

//org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod $ ConcurrentResultMethodParameter
private class ConcurrentResultMethodParameter extends HandlerMethodParameter {
private final Object returnValue;
private final ResolvableType returnType;

//直接传入返回值returnValue, 返回值的类型为 returnValue的类型  
public ConcurrentResultMethodParameter(Object returnValue) {  
    super(-1);  
    this.returnValue = returnValue;  
    this.returnType = ResolvableType.forType(super.getGenericParameterType()).getGeneric(0);  
}  

}

//org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod $ ConcurrentResultHandlerMethod
private class ConcurrentResultHandlerMethod extends ServletInvocableHandlerMethod {
public ConcurrentResultHandlerMethod(final Object result, ConcurrentResultMethodParameter returnType) {
//调用父类的构造方法(handler,method),最终调用 method.invoke();
super(new Callable() {
@Override
public Object call() throws Exception {
if (result instanceof Exception) {
throw (Exception) result;
}
else if (result instanceof Throwable) {
throw new NestedServletException("Async processing failed", (Throwable) result);
}
//此时的result即为最终异步处理的结果.
return result;
}
}, CALLABLE_METHOD);
setHandlerMethodReturnValueHandlers(ServletInvocableHandlerMethod.this.returnValueHandlers);
this.returnType = returnType;
}
}

第二次执行 requestMappingMethod.invokeAndHandle(webRequest, mavContainer);,此时的requestMappingMethod已经是伪造后的结果,该方法的返回值也被伪造为ConcurrentResultMethodParameter,最终调用的为ConcurrentResultHandlerMethod在构造函数中定义的Callable.call();

SpringMVC想要支持异步处理,首先DispatchServlet要配置:true,其次请求方法的返回值为:Callable,WebAsyncTask,,DeferredResult 和 ListenableFuture

<task:executor />配置参数:

  • id:当配置多个executor时,被@Async(“id”)指定使用;也被作为线程名的前缀。
  • pool-size:
    • core size:最小的线程数,缺省:1
    • max size:最大的线程数,缺省:Integer.MAX_VALUE
  • queue-capacity:当最小的线程数已经被占用满后,新的任务会被放进queue里面,当这个queue的capacity也被占满之后,pool里面会创建新线程处理这个任务,直到总线程数达到了max size,这时系统会拒绝这个任务并抛出TaskRejectedException异常(缺省配置的情况下,可以通过rejection-policy来决定如何处理这种情况)。缺省值为:Integer.MAX_VALUE
  • keep-alive:超过core size的那些线程,任务完成后,再经过这个时长(秒)会被结束掉
  • rejection-policy:当pool已经达到max size的时候,如何处理新任务
    • ABORT(缺省):抛出TaskRejectedException异常,然后不执行
    • DISCARD:不执行,也不抛出异常
    • DISCARD_OLDEST:丢弃queue中最旧的那个任务
    • CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行

Java编程方式的配置方法:

@Configuration
@EnableAsync
public class SpringConfig {

/\*\* Set the ThreadPoolExecutor's core pool size. \*/  
private int corePoolSize = 10;  
/\*\* Set the ThreadPoolExecutor's maximum pool size. \*/  
private int maxPoolSize = 200;  
/\*\* Set the capacity for the ThreadPoolExecutor's BlockingQueue. \*/  
private int queueCapacity = 10;  

private String ThreadNamePrefix = "MyLogExecutor-";  

@Bean  
public Executor logExecutor() {  
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
    executor.setCorePoolSize(corePoolSize);  
    executor.setMaxPoolSize(maxPoolSize);  
    executor.setQueueCapacity(queueCapacity);  
    executor.setThreadNamePrefix(ThreadNamePrefix);  

    // rejection-policy:当pool已经达到max size的时候,如何处理新任务  
    // CALLER\_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行  
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
    executor.initialize();  
    return executor;  
}  

}