Spring Cloud Gateway 之获取请求体(Request Body)的几种方式
阅读原文时间:2021年10月05日阅读:8

Spring Cloud Gateway 获取请求体

一、直接在全局拦截器中获取,伪代码如下

private String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest){

    Flux<DataBuffer> body = serverHttpRequest.getBody();

    AtomicReference<String> bodyRef = new AtomicReference<>();

    body.subscribe(buffer -> {

        CharBuffer charBuffer = StandardCharsets.UTF\_8.decode(buffer.asByteBuffer());

        DataBufferUtils.release(buffer);

        bodyRef.set(charBuffer.toString());

    });

    return bodyRef.get();

}

存在的缺陷:其他拦截器无法再通过该方式获取请求体(因为请求体已被消费),并且会抛出异常

Only one connection receive subscriber allowed.Caused by: java.lang.IllegalStateException: Only one connection receive subscriber allowed.

异常原因:实际上spring-cloud-gateway反向代理的原理是,首先读取原请求的数据,然后构造一个新的请求,将原请求的数据封装到新的请求中,然后再转发出去。然而我们在他封装之前读取了一次request body,而request body只能读取一次。因此就出现了上面的错误。

再者受版本限制

这种方法在spring-boot-starter-parent 2.0.6.RELEASE + Spring Cloud Finchley.SR2 body 中生效,

但是在spring-boot-starter-parent 2.1.0.RELEASE + Spring Cloud Greenwich.M3 body 中不生效,总是为空

二、先在全局过滤器中获取,然后再把request重新包装,继续向下传递传递

@Override
public GatewayFilter apply(NameValueConfig nameValueConfig) {
return (exchange, chain) -> {
URI uri = exchange.getRequest().getURI();
URI ex = UriComponentsBuilder.fromUri(uri).build(true).toUri();
ServerHttpRequest request = exchange.getRequest().mutate().uri(ex).build();
if("POST".equalsIgnoreCase(request.getMethodValue())){//判断是否为POST请求
Flux body = request.getBody();
AtomicReference bodyRef = new AtomicReference<>();
body.subscribe(dataBuffer -> {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer());
DataBufferUtils.release(dataBuffer);
bodyRef.set(charBuffer.toString());
});//读取request body到缓存
String bodyStr = bodyRef.get();//获取request body
System.out.println(bodyStr);//这里是我们需要做的操作
DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
Flux bodyFlux = Flux.just(bodyDataBuffer);

            request = new ServerHttpRequestDecorator(request){  
                @Override  
                public Flux<DataBuffer> getBody() {  
                    return bodyFlux;  
                }  
            };//封装我们的request  
        }  
        return chain.filter(exchange.mutate().request(request).build());  
    };  
}  

  protected DataBuffer stringBuffer(String value) {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);

    NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);  
    DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);  
    buffer.write(bytes);  
    return buffer;  
}

该方案的缺陷:request body获取不完整(因为异步原因),只能获取1024B的数据。并且请求体超过1024B,会出现响应超慢(因为我是开启了熔断)。

三、过滤器加路线定位器

翻查源码发现ReadBodyPredicateFactory里面缓存了request body的信息,于是在自定义router中配置了ReadBodyPredicateFactory,然后在filter中通过cachedRequestBodyObject缓存字段获取request body信息。

/**
* @description: 获取POST请求的请求体
* ReadBodyPredicateFactory 发现里面缓存了request body的信息,
* 于是在自定义router中配置了ReadBodyPredicateFactory
* @modified:
*/
@EnableAutoConfiguration
@Configuration
public class RouteLocatorRequestBoby{
   //自定义过滤器
@Resource
private ReqTraceFilter reqTraceFilter;
  
@Resource
private RibbonLoadBalancerClient ribbonLoadBalancerClient;

private static final String SERVICE = "/leap/\*\*";

private static final String HTTP\_PREFIX = "http://";

private static final String COLON = ":";

@Bean  
public RouteLocator myRoutes(RouteLocatorBuilder builder) {  
    //通过负载均衡获取服务实例  
    ServiceInstance instance = ribbonLoadBalancerClient.choose("PLATFORM-SERVICE");  
    //拼接路径  
    StringBuilder forwardAddress = new StringBuilder(HTTP\_PREFIX);  
    forwardAddress.append(instance.getHost())  
            .append(COLON)  
            .append(instance.getPort());  
    return builder.routes()  
            //拦截请求类型为POST Content-Type application/json application/json;charset=UTF-8  
            .route(r -> r  
                            .header(HttpHeaders.CONTENT\_TYPE,  
                                    MediaType.APPLICATION\_JSON\_VALUE + MediaType.APPLICATION\_JSON\_UTF8\_VALUE)  
                            .and()  
                            .method(HttpMethod.POST)  
                            .and()  
                            //获取缓存中的请求体  
                            .readBody(Object.class, readBody -> {  
                                return true;  
                            })  
                            .and()  
                            .path(SERVICE)  
                            //把请求体传递给拦截器reqTraceFilter  
                            .filters(f -> {  
                                f.filter(reqTraceFilter);  
                                return f;  
                            })  
                            .uri(forwardAddress.toString())).build();  
}

/**
* @description: 过滤器,用于获取请求体,和处理请求体业务,列如记录日志
* @modified:
*/
@Component
public class ReqTraceFilter implements GlobalFilter, GatewayFilter,Ordered {

private static final String CONTENT\_TYPE = "Content-Type";

private static final String CONTENT\_TYPE\_JSON = "application/json";  

  
//获取请求路由详细信息Route route = exchange.getAttribute(GATEWAY_ROUTE_BEAN)
private static final String GATEWAY_ROUTE_BEAN = "org.springframework.cloud.gateway.support.ServerWebExchangeUtils.gatewayRoute";

private static final String CACHE\_REQUEST\_BODY\_OBJECT\_KEY = "cachedRequestBodyObject";  
@Override  
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {  
    ServerHttpRequest request = exchange.getRequest();  
    //判断过滤器是否执行  
    String requestUrl = RequestUtils.getCurrentRequest(request);  
    if (!RequestUtils.isFilter(requestUrl)) {  
        String bodyStr = "";  
        String contentType = request.getHeaders().getFirst(CONTENT\_TYPE);  
        String method = request.getMethodValue();  
        //判断是否为POST请求  
        if (null != contentType && HttpMethod.POST.name().equalsIgnoreCase(method) && contentType.contains(CONTENT\_TYPE\_JSON)) {  
            Object cachedBody = exchange.getAttribute(CACHE\_REQUEST\_BODY\_OBJECT\_KEY);  
            if(null != cachedBody){  
                bodyStr = cachedBody.toString();  
            }  
        }  
        if (HttpMethod.GET.name().equalsIgnoreCase(method)) {  
            bodyStr = request.getQueryParams().toString();  
        }

        log.info("请求体内容:{}",bodyStr);  
    }  
    return chain.filter(exchange);  
}

@Override  
public int getOrder() {  
    return 5;  
}  

}

该方案优点:这种解决,一不会带来重复读取问题,二不会带来requestbody取不全问题。三在低版本的Spring Cloud Finchley.SR2也可以运行。

缺点:不支持 multipart/form-data(异常415),这个致命。

四、通过 org.springframework.cloud.gateway.filter.factory.rewrite 包下有个 ModifyRequestBodyGatewayFilterFactory ,顾名思义,这就是修改 Request Body 的过滤器工厂类。

@Component
@Slf4j
public class ReqTraceFilter implements GlobalFilter, GatewayFilter, Ordered {

@Resource  
private IPlatformFeignClient platformFeignClient;

/\*\*  
 \* httpheader,traceId的key名称  
 \*/  
private static final String REQUESTID = "traceId";

private static final String CONTENT\_TYPE = "Content-Type";

private static final String CONTENT\_TYPE\_JSON = "application/json";

private static final String GATEWAY\_ROUTE\_BEAN = "org.springframework.cloud.gateway.support.ServerWebExchangeUtils.gatewayRoute";

@Override  
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {  
    ServerHttpRequest request = exchange.getRequest();  
    //判断过滤器是否执行  
    String requestUrl = RequestUtils.getCurrentRequest(request);  
    if (!RequestUtils.isFilter(requestUrl)) {  
        String bodyStr = "";  
        String contentType = request.getHeaders().getFirst(CONTENT\_TYPE);  
        String method = request.getMethodValue();  
        //判断是否为POST请求  
        if (null != contentType && HttpMethod.POST.name().equalsIgnoreCase(method) && contentType.contains(CONTENT\_TYPE\_JSON)) {  
            ServerRequest serverRequest = new DefaultServerRequest(exchange);  
            List<String> list = new ArrayList<>();  
            // 读取请求体  
            Mono<String> modifiedBody = serverRequest.bodyToMono(String.class)  
                    .flatMap(body -> {  
                        //记录请求体日志  
                        final String nId = saveRequestOperLog(exchange, body);  
                        //记录日志id  
                        list.add(nId);  
                        return Mono.just(body);  
                    });

            BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);  
            HttpHeaders headers = new HttpHeaders();  
            headers.putAll(exchange.getRequest().getHeaders());  
            headers.remove(HttpHeaders.CONTENT\_LENGTH);

            CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);  
            return bodyInserter.insert(outputMessage, new BodyInserterContext())  
                    .then(Mono.defer(() -> {  
                        ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(  
                                exchange.getRequest()) {  
                            @Override  
                            public HttpHeaders getHeaders() {  
                                long contentLength = headers.getContentLength();  
                                HttpHeaders httpHeaders = new HttpHeaders();  
                                httpHeaders.putAll(super.getHeaders());  
                                httpHeaders.put(REQUESTID,list);  
                                if (contentLength > 0) {  
                                    httpHeaders.setContentLength(contentLength);  
                                } else {  
                                    httpHeaders.set(HttpHeaders.TRANSFER\_ENCODING, "chunked");  
                                }  
                                return httpHeaders;  
                            }

                            @Override  
                            public Flux<DataBuffer> getBody() {  
                                return outputMessage.getBody();  
                            }  
                        };

                        return chain.filter(exchange.mutate().request(decorator).build());  
                    }));  
        }  
        if (HttpMethod.GET.name().equalsIgnoreCase(method)) {  
            bodyStr = request.getQueryParams().toString();  
            String nId = saveRequestOperLog(exchange, bodyStr);  
            ServerHttpRequest userInfo = exchange.getRequest().mutate()  
                    .header(REQUESTID, nId).build();  
            return chain.filter(exchange.mutate().request(userInfo).build());  
        }

    }  
    return chain.filter(exchange);  
}

/\*\*  
 \* 保存请求日志  
 \*  
 \* @param exchange  
 \* @param requestParameters  
 \* @return  
 \*/  
private String saveRequestOperLog(ServerWebExchange exchange, String requestParameters) {  
    log.debug("接口请求参数:{}", requestParameters);  
    ServerHttpRequest request = exchange.getRequest();  
    String ip = Objects.requireNonNull(request.getRemoteAddress()).getAddress().getHostAddress();  
    SaveOperLogVO vo = new  SaveOperLogVO();  
    vo.setIp(ip);  
    vo.setReqUrl(RequestUtils.getCurrentRequest(request));  
    vo.setReqMethod(request.getMethodValue());  
    vo.setRequestParameters(requestParameters);

    Route route = exchange.getAttribute(GATEWAY\_ROUTE\_BEAN);  
    //是否配置路由  
    if (route != null) {  
        vo.setSubsystem(route.getId());  
    }  
    ResEntity<String> res = platformFeignClient.saveOperLog(vo);  
    log.debug("当前请求ID返回的数据:{}", res);  
    return res.getData();  
}

@Override  
public int getOrder() {  
    return 5;  
}  

}

该方案:完美解决以上所有问题

参考文档:https://www.codercto.com/a/52970.html