Skip to content

Commit

Permalink
Adds ability to cache body without converting to bytes (spring-cloud#…
Browse files Browse the repository at this point in the history
…1095)

Creates new Cache filter that is activated with retry filter is used.

Other filters can opt into caching if needed.

Uses netty factory and retainedSlice() for retry.

Only cache if retry filter is enabled

Fixes spring-cloudgh-982
Fixes spring-cloudgh-1064
  • Loading branch information
spencergibb committed Jun 13, 2019
1 parent bb1efb6 commit 71780c2
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.cloud.gateway.actuate.GatewayControllerEndpoint;
import org.springframework.cloud.gateway.filter.AdaptCachedBodyGlobalFilter;
import org.springframework.cloud.gateway.filter.AlwaysRetainBodyGlobalFilter;
import org.springframework.cloud.gateway.filter.ForwardPathFilter;
import org.springframework.cloud.gateway.filter.ForwardRoutingFilter;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.NettyRoutingFilter;
import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
import org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter;
import org.springframework.cloud.gateway.filter.RouteToRequestUrlFilter;
import org.springframework.cloud.gateway.filter.WebsocketRoutingFilter;
import org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter;
Expand Down Expand Up @@ -263,6 +265,16 @@ public AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter() {
return new AdaptCachedBodyGlobalFilter();
}

@Bean
public AlwaysRetainBodyGlobalFilter alwaysRetainBodyGlobalFilter() {
return new AlwaysRetainBodyGlobalFilter();
}

@Bean
public RemoveCachedBodyFilter removeCachedBodyFilter() {
return new RemoveCachedBodyFilter();
}

@Bean
public RouteToRequestUrlFilter routeToRequestUrlFilter() {
return new RouteToRequestUrlFilter();
Expand Down Expand Up @@ -590,6 +602,9 @@ else if (ssl.isUseInsecureTrustManager()) {
});
}

//TODO: add configuration to turn on wiretap
//httpClient = httpClient.wiretap(true);

return httpClient;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.event;

import org.springframework.context.ApplicationEvent;

public class EnableBodyCachingEvent extends ApplicationEvent {

private final String routeId;

public EnableBodyCachingEvent(Object source, String routeId) {
super(source);
this.routeId = routeId;
}

public String getRouteId() {
return this.routeId;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.cloud.gateway.event.EnableBodyCachingEvent;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.context.ApplicationListener;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.web.server.ServerWebExchange;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR;

public class AlwaysRetainBodyGlobalFilter
implements GlobalFilter, Ordered, ApplicationListener<EnableBodyCachingEvent> {

private static final Log log = LogFactory.getLog(AlwaysRetainBodyGlobalFilter.class);

private ConcurrentMap<String, Boolean> routesToCache = new ConcurrentHashMap<>();

/**
* Request body cache key.
*/
public static final String ALWAYS_CACHE_REQUEST_BODY_KEY = "alwaysCacheRequestBody";

@Override
public void onApplicationEvent(EnableBodyCachingEvent event) {
this.routesToCache.putIfAbsent(event.getRouteId(), true);
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Object body = exchange.getAttributeOrDefault(ALWAYS_CACHE_REQUEST_BODY_KEY, null);
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);

if (body != null || !this.routesToCache.containsKey(route.getId())) {
return chain.filter(exchange);
}

return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
if (dataBuffer.readableByteCount() > 0) {
if (log.isTraceEnabled()) {
log.trace("retaining body in exchange attribute");
}
exchange.getAttributes().put(ALWAYS_CACHE_REQUEST_BODY_KEY,
dataBuffer);
}

ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return Mono.<DataBuffer>fromSupplier(() -> {
if (exchange.getAttributeOrDefault(
ALWAYS_CACHE_REQUEST_BODY_KEY, null) == null) {
// probably == downstream closed
return null;
}
// TODO: deal with Netty
NettyDataBuffer pdb = (NettyDataBuffer) dataBuffer;
return pdb.factory()
.wrap(pdb.getNativeBuffer().retainedSlice());
}).flux();
}
};
return chain.filter(exchange.mutate().request(decorator).build());
}).switchIfEmpty(chain.filter(exchange));
}

@Override
public int getOrder() {
return -10;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;

import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.web.server.ServerWebExchange;

import static org.springframework.cloud.gateway.filter.AlwaysRetainBodyGlobalFilter.ALWAYS_CACHE_REQUEST_BODY_KEY;

public class RemoveCachedBodyFilter implements GlobalFilter, Ordered {

private static final Log log = LogFactory.getLog(RemoveCachedBodyFilter.class);

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).doFinally(s -> {
PooledDataBuffer b = (PooledDataBuffer) exchange.getAttributes()
.remove(ALWAYS_CACHE_REQUEST_BODY_KEY);
if (b != null && b.isAllocated()) {
if (log.isTraceEnabled()) {
log.trace("releasing cached body in exchange attribute");
}
b.release();
}
});
}

@Override
public int getOrder() {
return HIGHEST_PRECEDENCE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
package org.springframework.cloud.gateway.filter.factory;

import org.springframework.cloud.gateway.support.AbstractConfigurable;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;

/**
* This class is BETA and may be subject to change in a future release.
*
* @param <C> {@link AbstractConfigurable} subtype
*/
public abstract class AbstractGatewayFilterFactory<C> extends AbstractConfigurable<C>
implements GatewayFilterFactory<C> {
implements GatewayFilterFactory<C>, ApplicationEventPublisherAware {

private ApplicationEventPublisher publisher;

@SuppressWarnings("unchecked")
public AbstractGatewayFilterFactory() {
Expand All @@ -35,6 +39,15 @@ public AbstractGatewayFilterFactory(Class<C> configClass) {
super(configClass);
}

protected ApplicationEventPublisher getPublisher() {
return this.publisher;
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}

public static class NameConfig {

private String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.support.Configurable;
import org.springframework.cloud.gateway.support.HasRouteId;
import org.springframework.cloud.gateway.support.NameUtils;
import org.springframework.cloud.gateway.support.ShortcutConfigurable;
import org.springframework.http.server.reactive.ServerHttpRequest;
Expand All @@ -41,6 +42,12 @@ public interface GatewayFilterFactory<C> extends ShortcutConfigurable, Configura
String VALUE_KEY = "value";

// useful for javadsl
default GatewayFilter apply(String routeId, Consumer<C> consumer) {
C config = newConfig();
consumer.accept(config);
return apply(routeId, config);
}

default GatewayFilter apply(Consumer<C> consumer) {
C config = newConfig();
consumer.accept(config);
Expand All @@ -58,6 +65,14 @@ default C newConfig() {

GatewayFilter apply(C config);

default GatewayFilter apply(String routeId, C config) {
if (config instanceof HasRouteId) {
HasRouteId hasRouteId = (HasRouteId) config;
hasRouteId.setRouteId(routeId);
}
return apply(config);
}

default String name() {
// TODO: deal with proxys
return NameUtils.normalizeFilterFactoryName(getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public List<String> shortcutFieldOrder() {
return singletonList(NAME_KEY);
}

@Override
// TODO: make Config implement HasRouteId and remove this method.
public GatewayFilter apply(String routeId, Consumer<Config> consumer) {
Config config = newConfig();
consumer.accept(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ public GatewayFilter apply(
Long currentRequestSize = Long.valueOf(contentLength);
if (currentRequestSize > requestSizeConfig.getMaxSize()) {
exchange.getResponse().setStatusCode(HttpStatus.PAYLOAD_TOO_LARGE);
exchange.getResponse().getHeaders().add("errorMessage",
getErrorMessage(currentRequestSize,
requestSizeConfig.getMaxSize()));
if (!exchange.getResponse().isCommitted()) {
exchange.getResponse().getHeaders().add("errorMessage",
getErrorMessage(currentRequestSize,
requestSizeConfig.getMaxSize()));
}
return exchange.getResponse().setComplete();
}
}
Expand Down
Loading

0 comments on commit 71780c2

Please sign in to comment.