JDK HttpClient 多重请求-响应的处理
阅读原文时间:2022年01月01日阅读:1

HttpClient 多重请求-响应的处理

目录

上篇介绍了JDK HttpClient客户端的构建和启动。在客户端构建完成时,后台已经启动了一个选择器管理线程,负责轮询并分发I/O事件。而I/O事件的触发,就和请求——响应的过程密切相关。而我们将要看到,一次用户请求的执行,并非简单的发送——接收那么简单。

本篇我们简要了解用户请求的发送和接收的流程,然后重点HttpClient对用户请求的修饰,以及处理可能产生的多重请求的过程。

本文所述的HttpClient都指代JDK11开始内置的HttpClient及相关类,源码分析基于JAVA 17。阅读本文需要清楚CompletableFuture的基本使用。

HttpClient请求的收发,涉及的基本流程简要概括如下:

请求过程的时序图大致如下:

需要注意的是,HttpClient调用的全链路流程都采用了CompletableFuture来实现编程上的异步,而通过thenCompose方法来实现各异步操作的先后顺序。

本篇,我们重点关注流程图中“复制请求”,“过滤请求”,“过滤新生成的请求”,”重复新请求直至没有新请求“的步骤。

用户请求HttpRequest的生成方法和客户端构建方法类似,都是用了建造者模式,生成的请求是不可变的ImmutableHttpRequest类型(HttpRequest的子类)。

我们关注HttpClientImpl::sendAsync 方法,它是所有请求调用的统一入口。里面做了两件事:

  • 复制了用户请求

  • 生成多重交换对象(MultiExchange),并让其处理请求

    private CompletableFuture>
    sendAsync(HttpRequest userRequest,
    BodyHandler responseHandler,
    PushPromiseHandler pushPromiseHandler,
    Executor exchangeExecutor) {

        Objects.requireNonNull(userRequest);
        Objects.requireNonNull(responseHandler);
    AccessControlContext acc = null;
    if (System.getSecurityManager() != null)
        acc = AccessController.getContext();
    
    //这里复制了可能是“不可信的用户请求”。其类型与用户请求有所不同,是可修饰的。
    //稍后将分析
    // Clone the, possibly untrusted, HttpRequest
    HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector);
    if (requestImpl.method().equals("CONNECT"))
        throw new IllegalArgumentException("Unsupported method CONNECT");
    
    long start = DEBUGELAPSED ? System.nanoTime() : 0;
    reference();
    try {
        if (debugelapsed.on())
            debugelapsed.log("ClientImpl (async) send %s", userRequest);
    
        //若用户没有传入执行器,则使用默认的执行器来执行异步过程
        Executor executor = exchangeExecutor == null
                ? this.delegatingExecutor : exchangeExecutor;
    
        //多重交换对象生成,负责一次用户请求中可能产生的多个请求的生命周期,稍后分析
        MultiExchange<T> mex = new MultiExchange<>(userRequest,
                                                        requestImpl,
                                                        this,
                                                        responseHandler,
                                                        pushPromiseHandler,
                                                        acc);
        //由多重交换对象负责处理请求,稍后重点分析
        CompletableFuture<HttpResponse<T>> res =
                mex.responseAsync(executor).whenComplete((b,t) -> unreference());
        if (DEBUGELAPSED) {
            res = res.whenComplete(
                    (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
        }
    
        // makes sure that any dependent actions happen in the CF default
        // executor. This is only needed for sendAsync(...), when
        // exchangeExecutor is non-null.
        if (exchangeExecutor != null) {
            res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
        }
        return res;
    } catch(Throwable t) {
        unreference();
        debugCompleted("ClientImpl (async)", start, userRequest);
        throw t;
    }
    }

我们看下请求复制的过程:

public HttpRequestImpl(HttpRequest request, ProxySelector ps) {
        String method = request.method();
        if (method != null && !Utils.isValidName(method))
            throw new IllegalArgumentException("illegal method \""
                    + method.replace("\n","\\n")
                    .replace("\r", "\\r")
                    .replace("\t", "\\t")
                    + "\"");
        URI requestURI = Objects.requireNonNull(request.uri(),
                "uri must be non null");
        Duration timeout = request.timeout().orElse(null);
        this.method = method == null ? "GET" : method;
        //校验用户传入的请求头信息合法性
        this.userHeaders = HttpHeaders.of(request.headers().map(), Utils.VALIDATE_USER_HEADER);
        if (request instanceof HttpRequestImpl) {
            // all cases exception WebSocket should have a new system headers
            this.isWebSocket = ((HttpRequestImpl) request).isWebSocket;
            if (isWebSocket) {
                this.systemHeadersBuilder = ((HttpRequestImpl)request).systemHeadersBuilder;
            } else {
                //请求头构建者,稍后将看到其负责请求头的过滤和添加
                this.systemHeadersBuilder = new HttpHeadersBuilder();
            }
        } else {
            HttpRequestBuilderImpl.checkURI(requestURI);
            checkTimeout(timeout);
            this.systemHeadersBuilder = new HttpHeadersBuilder();
        }
        if (!userHeaders.firstValue("User-Agent").isPresent()) {
            this.systemHeadersBuilder.setHeader("User-Agent", USER_AGENT);
        }
        this.uri = requestURI;
        if (isWebSocket) {
            // WebSocket determines and sets the proxy itself
            this.proxy = ((HttpRequestImpl) request).proxy;
        } else {
            if (ps != null)
                this.proxy = retrieveProxy(ps, uri);
            else
                this.proxy = null;
        }
        this.expectContinue = request.expectContinue();
        this.secure = uri.getScheme().toLowerCase(Locale.US).equals("https");
        //bodyPublisher是什么呢?就是请求体的发布者,比如从字符串生成请求体的BodyPublishers.OfString
        this.requestPublisher = request.bodyPublisher().orElse(null);
        this.timeout = timeout;
        this.version = request.version();
        this.authority = null;
    }

可以看到,请求的复制主要是做了下属性的拷贝,但也进行了参数的校验,默认属性的推断和配置等。

MultiExchange的生成主要是将客户端的一些属性和当前请求作为参数来实例化自身,比较简单。可以看到,multiExchange是名副其实的多重交换的管理者。

    /**
     * MultiExchange with one final response.
     */
    MultiExchange(HttpRequest userRequest,
                  HttpRequestImpl requestImpl,
                  HttpClientImpl client,
                  HttpResponse.BodyHandler<T> responseHandler,
                  PushPromiseHandler<T> pushPromiseHandler,
                  @SuppressWarnings("removal") AccessControlContext acc) {
        this.previous = null;
        this.userRequest = userRequest;
        this.request = requestImpl;
        this.currentreq = request;
        //前一个请求,默认为空,这是由于一个请求可能会产生多个请求及对应的响应过程(重定向,认证)
        this.previousreq = null;
        this.client = client;
        this.filters = client.filterChain();
        this.acc = acc;
        this.executor = client.theExecutor();
        this.responseHandler = responseHandler;

        if (pushPromiseHandler != null) {
            Executor executor = acc == null
                    ? this.executor.delegate()
                    : new PrivilegedExecutor(this.executor.delegate(), acc);
            this.pushGroup = new PushGroup<>(pushPromiseHandler, request, executor);
        } else {
            pushGroup = null;
        }
        this.connectTimeout = client.connectTimeout()
                .map(ConnectTimeoutTracker::new).orElse(null);
        this.exchange = new Exchange<>(request, this);
    }

接下来,我们进入重头戏:MultiExchange::responseAsync(executor)方法。它负责异步处理一个用户请求可能引发的多个请求——响应过程,并返回最终结果。

class MultiExchange<T> implements Cancelable {
    //省略大量代码

    public CompletableFuture<HttpResponse<T>> responseAsync(Executor executor) {
            CompletableFuture<Void> start = new MinimalFuture<>(new CancelableRef(this));
            //此方法定义了一大堆需要执行的操作,事实上对一系列操作经过计算后,封装成要执行的异步任务
            CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
            //此处才开始真正执行。这里是用completableFuture的执行特点保证的,也是异步编程的不同点
            start.completeAsync( () -> null, executor); // trigger execution
            return cf;
    }

    private CompletableFuture<HttpResponse<T>>
        responseAsync0(CompletableFuture<Void> start) {
            //真正开始在thenCompose()方法后,完成一次或多次请求,返回一个最终响应
            return start.thenCompose( v -> responseAsyncImpl())
                        .thenCompose((Response r) -> {
                            Exchange<T> exch = getExchange();
                            if (bodyNotPermitted(r)) {
                                if (bodyIsPresent(r)) {
                                    IOException ioe = new IOException(
                                        "unexpected content length header with 204 response");
                                    exch.cancel(ioe);
                                    return MinimalFuture.failedFuture(ioe);
                                } else
                                    return handleNoBody(r, exch);
                            }
                            //解析响应体
                            return exch.readBodyAsync(responseHandler)
                                .thenApply((T body) -> {
                                    this.response =
                                        new HttpResponseImpl<>(r.request(), r, this.response, body, exch);
                                    return this.response;
                                });
                        }).exceptionallyCompose(this::whenCancelled);
    }

    //处理一次用户请求带来的一个或多个请求的过程,返回一个最终响应
    private CompletableFuture<Response> responseAsyncImpl() {
        CompletableFuture<Response> cf;
        if (attempts.incrementAndGet() > max_attempts) {
            cf = failedFuture(new IOException("Too many retries", retryCause));
        } else {
            //超时响应时间管理
            if (currentreq.timeout().isPresent()) {
                responseTimerEvent = ResponseTimerEvent.of(this);
                client.registerTimer(responseTimerEvent);
            }
            try {
                // 1. apply request filters  使请求过滤器发挥作用
                // if currentreq == previousreq the filters have already
                // been applied once. Applying them a second time might
                // cause some headers values to be added twice: for
                // instance, the same cookie might be added again.
                if (currentreq != previousreq) {
                    requestFilters(currentreq);
                }
            } catch (IOException e) {
                return failedFuture(e);
            }
            Exchange<T> exch = getExchange();
            // 2. get response
            // 由单个交换对象(Exhange)负责处理当前的单个请求,异步返回响应
            cf = exch.responseAsync()
                     .thenCompose((Response response) -> {
                        HttpRequestImpl newrequest;
                        try {
                            // 3. apply response filters
                            //应用响应过滤器,看看是否有新生成的请求
                            newrequest = responseFilters(response);
                        } catch (IOException e) {
                            return failedFuture(e);
                        }
                        // 4. check filter result and repeat or continue
                        if (newrequest == null) {
                            if (attempts.get() > 1) {
                                Log.logError("Succeeded on attempt: " + attempts);
                            }
                            //如果没有新生成的请求,那么就算完了,哈哈
                            return completedFuture(response);
                        } else {
                            this.response =
                                new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
                            Exchange<T> oldExch = exch;
                            if (currentreq.isWebSocket()) {
                                //如果生成了一个webSocket请求,就要关闭当前连接
                                // need to close the connection and open a new one.
                                exch.exchImpl.connection().close();
                            }
                            return exch.ignoreBody().handle((r,t) -> {
                                //处理下一个请求
                                previousreq = currentreq;
                                currentreq = newrequest;
                                expiredOnce = false;
                                setExchange(new Exchange<>(currentreq, this, acc));
                                //递归处理请求,直至没有新请求生成
                                return responseAsyncImpl();
                            }).thenCompose(Function.identity());
                        } })
                     .handle((response, ex) -> {
                         //若有错误,生成错误响应
                        // 5. handle errors and cancel any timer set
                        cancelTimer();
                        if (ex == null) {
                            assert response != null;
                            return completedFuture(response);
                        }
                        // all exceptions thrown are handled here
                        CompletableFuture<Response> errorCF = getExceptionalCF(ex);
                        if (errorCF == null) {
                            return responseAsyncImpl();
                        } else {
                            return errorCF;
                        } })
                     .thenCompose(Function.identity());
        }
        return cf;
    }
}

可以看到,整个调用流程是(编程上)异步化的,由completableFuture.thenCompose方法连接起来。尽管这里想要分析的是请求的过滤,但我们还看到了递归处理多个请求的过程。而多个请求的生成,就是在对请求和响应的过滤后(准确来说,是对请求头和响应头的过滤)后生成的。

在上面的代码中,对请求头和响应头的过滤分别对应下面这几行:

if (currentreq != previousreq) {
    requestFilters(currentreq);  //过滤和修饰请求
};
//……省略无关代码
newrequest = responseFilters(response);   //过滤响应

两个方法的代码如下。可以看到,请求按照过滤器的添加顺序顺序过滤,留意一下之前在构建客户端时的添加顺序,即是:

认证 -> 重定向 -> cookie(若有)

而对响应的过滤处理则是倒序进行。这个符合我们的认知。

//这两个方法都在multiExchange下,可回看时序图或自行查看源码
private void requestFilters(HttpRequestImpl r) throws IOException {
        Log.logTrace("Applying request filters");
        //按过滤器加入到顺序修饰请求
        for (HeaderFilter filter : filters) {
            Log.logTrace("Applying {0}", filter);
            filter.request(r, this);
        }
        Log.logTrace("All filters applied");
    }

    private HttpRequestImpl responseFilters(Response response) throws IOException
    {
        Log.logTrace("Applying response filters");
        //按照与修饰请求时的相反顺序来处理响应,这个符合我们的认识。
        //此处filters是multiExchange里的一个LinkedList类型的成员变量,是在multiExchange初始化时建立的,
        //里面存放着对懒加载初始化后的过滤器实例
        Iterator<HeaderFilter> reverseItr = filters.descendingIterator();
        while (reverseItr.hasNext()) {
            HeaderFilter filter = reverseItr.next();
            Log.logTrace("Applying {0}", filter);
            HttpRequestImpl newreq = filter.response(response);
            if (newreq != null) {
                Log.logTrace("New request: stopping filters");
                return newreq;
            }
        }
        Log.logTrace("All filters applied");
        return null;
    }

我们可以看下过滤器的结构,就是一个request方法修饰请求头,一个response方法检查响应头并做出改变的决定。

/**

 * A header filter that can examine or modify, typically system headers for
 * requests before they are sent, and responses before they are returned to the
 * user. Some ability to resend requests is provided.
 */
interface HeaderFilter {

    void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException;

    /**
     * Returns null if response ok to be given to user.  Non null is a request
     * that must be resent and its response given to user. If impl throws an
     * exception that is returned to user instead.
     */
    HttpRequestImpl response(Response r) throws IOException;
}

此处,我们简要过下cookie和重定向的处理。首先看下cookie的处理:

class CookieFilter implements HeaderFilter {

    public CookieFilter() {
    }

    @Override
    public void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException {
        HttpClientImpl client = e.client();
        //看到这里,cookieHandler是客户端维持的,它存放了一个用户请求引发的多重交换中的所有路径的cookie
        Optional<CookieHandler> cookieHandlerOpt = client.cookieHandler();
        if (cookieHandlerOpt.isPresent()) {
            CookieHandler cookieHandler = cookieHandlerOpt.get();
            Map<String,List<String>> userheaders = r.getUserHeaders().map();
            //取出所有符合当前url的用户请求头中的cookie键值对
            Map<String,List<String>> cookies = cookieHandler.get(r.uri(), userheaders);

            // add the returned cookies
            HttpHeadersBuilder systemHeadersBuilder = r.getSystemHeadersBuilder();
            if (cookies.isEmpty()) {
                Log.logTrace("Request: no cookie to add for {0}", r.uri());
            } else {
                Log.logTrace("Request: adding cookies for {0}", r.uri());
            }
            for (Map.Entry<String,List<String>> entry : cookies.entrySet()) {
                final String hdrname = entry.getKey();
                //这里判端是否真的是cookie
                if (!hdrname.equalsIgnoreCase("Cookie")
                        && !hdrname.equalsIgnoreCase("Cookie2"))
                    continue;
                List<String> values = entry.getValue();
                if (values == null || values.isEmpty()) continue;
                for (String val : values) {
                    if (Utils.isValidValue(val)) {
                        //经过简单校验后,将符合条件的cookie加入到构建的请求头Map中
                        systemHeadersBuilder.addHeader(hdrname, val);
                    }
                }
            }
        } else {
            Log.logTrace("Request: No cookie manager found for {0}", r.uri());
        }
    }

    @Override
    public HttpRequestImpl response(Response r) throws IOException {
        HttpHeaders hdrs = r.headers();
        HttpRequestImpl request = r.request();
        Exchange<?> e = r.exchange;
        Log.logTrace("Response: processing cookies for {0}", request.uri());
        Optional<CookieHandler> cookieHandlerOpt = e.client().cookieHandler();
        if (cookieHandlerOpt.isPresent()) {
            CookieHandler cookieHandler = cookieHandlerOpt.get();
            Log.logTrace("Response: parsing cookies from {0}", hdrs.map());
            //将响应头中注入的cookie放入客户端维持的cookieHandler中
            cookieHandler.put(request.uri(), hdrs.map());
        } else {
            Log.logTrace("Response: No cookie manager found for {0}",
                         request.uri());
        }
        return null;
    }
}

从上面的代码中我们可以看到,我们在初始化客户端时就要指定一个cookieHandler用于存储和维护cookie。JAVA提供了一个基本的CookieManager实现类,其基本实现是,最终将cookie存在内存中,用两个Map存储,如下图所示。

每次请求,都从cookieHandler里取出对应请求url的cookie,加入当前请求中;在获取到响应后,又将被服务器种下的新Cookie放入cookieHandler中。这种设计对基本的业务应该是够用了。

我们再来看下重定向过滤的实现,重点关注request和response两个重载方法:

class RedirectFilter implements HeaderFilter {

    HttpRequestImpl request;
    HttpClientImpl client;
    HttpClient.Redirect policy;
    String method;
    MultiExchange<?> exchange;
    //最多5次重定向的限制
    static final int DEFAULT_MAX_REDIRECTS = 5;
    URI uri;

    /*
     * NOT_MODIFIED status code results from a conditional GET where
     * the server does not (must not) return a response body because
     * the condition specified in the request disallows it
     */
    static final int HTTP_NOT_MODIFIED = 304;

    static final int max_redirects = Utils.getIntegerNetProperty(
            "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_REDIRECTS
    );

    // A public no-arg constructor is required by FilterFactory
    public RedirectFilter() {}

    @Override
    public synchronized void request(HttpRequestImpl r, MultiExchange<?> e) throws IOException {
        //在请求时初始化自身
        this.request = r;
        this.client = e.client();
        //重定向策略,是初始化客户端时指定的,默认不追随重定向
        this.policy = client.followRedirects();

        this.method = r.method();
        this.uri = r.uri();
        this.exchange = e;
    }

    @Override
    public synchronized HttpRequestImpl response(Response r) throws IOException {
        return handleResponse(r);
    }

    /**
     * Checks to see if a new request is needed and returns it.
     * Null means response is ok to return to user.
     */
    private HttpRequestImpl handleResponse(Response r) {
        int rcode = r.statusCode();
        if (rcode == 200 || policy == HttpClient.Redirect.NEVER) {
            return null;
        }

        if (rcode == HTTP_NOT_MODIFIED)
            return null;

        //检查是否是重定向的状态码,符合条件才进入
        if (isRedirecting(rcode)) {
            URI redir = getRedirectedURI(r.headers());
            //根据响应状态码决定重定向的方法
            String newMethod = redirectedMethod(rcode, method);
            Log.logTrace("response code: {0}, redirected URI: {1}", rcode, redir);
            if (canRedirect(redir) && ++exchange.numberOfRedirects < max_redirects) {
                Log.logTrace("redirect to: {0} with method: {1}", redir, newMethod);
                //可以重定向,则生成新的请求
                return HttpRequestImpl.newInstanceForRedirection(redir, newMethod, request, rcode != 303);
            } else {
                Log.logTrace("not redirecting");
                return null;
            }
        }
        return null;
    }

    private static String redirectedMethod(int statusCode, String orig) {
        return switch (statusCode) {
            case 301, 302   -> orig.equals("POST") ? "GET" : orig;
            case 303        -> "GET";
            case 307, 308   -> orig;

            default -> orig; // unexpected but return orig
        };
    }

    private static boolean isRedirecting(int statusCode) {
        // < 300: not a redirect codes
        if (statusCode < 300) return false;
        // 309-399 Unassigned => don't follow
        // > 399: not a redirect code
        if (statusCode > 308) return false;

        return switch (statusCode) {
            // 300: MultipleChoice => don't follow
            // 304: Not Modified => don't follow
            // 305: Proxy Redirect => don't follow.
            // 306: Unused => don't follow
            case 300, 304, 305, 306 -> false;
            // 301, 302, 303, 307, 308: OK to follow.
            default -> true;
        };
    }

    private URI getRedirectedURI(HttpHeaders headers) {
        //获取重定向url,此处省略
    }

    private boolean canRedirect(URI redir) {
        //根据设置的重定向策略来决定是否重定向,此处省略
    }
}

这里的操作还是相对直观的:根据响应状态码和设置的重定向策略决定是否重定向,重定向则生成新的请求。这便是产生多次”请求——响应交换“的一个原因。

认证头过滤器现在已经很少被使用了,这里就不分析了。

至此,我们分析了JDK HttpClient对一次用户请求可能产生的多重请求——响应过程的处理的基本流程。在全链路异步化的执行中,通过对用户请求进行复制和修饰,和对一次响应结果的检查,HttpClient实现了cookie管理和重定向请求生成和基本登录认证功能。

下篇,我们将见证一次无加密Http1.1请求的完整生命周期。