Skip to content

Commit

Permalink
Merge pull request ratpack#1624 from ratpack/http-client-mem-leak
Browse files Browse the repository at this point in the history
Address possible heap leak in DefaultHttpClient
  • Loading branch information
ldaley authored Jun 14, 2022
2 parents 3430ff3 + 4c66fdc commit 5c2be4a
Show file tree
Hide file tree
Showing 27 changed files with 167 additions and 81 deletions.
6 changes: 3 additions & 3 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ ext {
commonVersions = [
remote : "0.7",
slf4j : "1.7.32",
netty : "4.1.65.Final",
nettyTCNative : "2.0.40.Final",
netty : "4.1.73.Final",
nettyTCNative : "2.0.48.Final",
guava : "28.2-jre",
groovy : "2.5.4",
pac4j : "1.8.8",
Expand All @@ -31,7 +31,7 @@ ext {
dropwizardMetrics: "4.1.6",
pegdown : "1.6.0",
spock : "1.3-groovy-2.5",
log4j : "2.17.0",
log4j : "2.17.1",
newrelic : "5.13.0",
reactiveStreams : "1.0.3",
guice : "4.2.3",
Expand Down
1 change: 1 addition & 0 deletions ratpack-core/ratpack-core.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies {
compile "io.netty:netty-handler-proxy:$commonVersions.netty"
compile "io.netty:netty-resolver-dns:$commonVersions.netty"
runtime "io.netty:netty-resolver-dns-native-macos:$commonVersions.netty:osx-x86_64"
runtime "io.netty:netty-resolver-dns-native-macos:$commonVersions.netty:osx-aarch_64"
compile 'com.sun.activation:javax.activation:1.2.0'

compile "com.github.ben-manes.caffeine:caffeine:${commonVersions.caffeine}"
Expand Down
3 changes: 2 additions & 1 deletion ratpack-core/src/main/java/ratpack/http/HttpUrlBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ default HttpUrlBuilder params(Action<? super ImmutableMultimap.Builder<String, O
HttpUrlBuilder params(MultiValueMap<String, ?> params);

/**
* Add a fragment to the URL
* Add a fragment to the URL.
*
* @param fragment string of the fragment
* @return {@code this}
* @since 1.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

package ratpack.http.client.internal;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolMap;
import io.netty.channel.pool.SimpleChannelPool;
import io.netty.resolver.AddressResolverGroup;
import ratpack.api.Nullable;
Expand All @@ -35,7 +38,6 @@
import java.net.URI;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class DefaultHttpClient implements HttpClientInternal {
Expand All @@ -60,37 +62,11 @@ public class DefaultHttpClient implements HttpClientInternal {
@Nullable
final ProxyInternal proxy;

private final Map<String, ChannelPoolStats> hostStats = new ConcurrentHashMap<>();

private final HttpChannelPoolMap channelPoolMap = new HttpChannelPoolMap() {
@Override
protected ChannelPool newPool(HttpChannelKey key) {
Bootstrap bootstrap = new Bootstrap()
.remoteAddress(key.host, key.port)
.group(key.execution.getEventLoop())
.resolver(resolver)
.channel(TransportDetector.getSocketChannelImpl())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) key.connectTimeout.toMillis())
.option(ChannelOption.ALLOCATOR, byteBufAllocator)
.option(ChannelOption.AUTO_READ, false)
.option(ChannelOption.SO_KEEPALIVE, isPooling());

if (isPooling()) {
InstrumentedChannelPoolHandler channelPoolHandler = getPoolingHandler(key);
hostStats.put(key.host, channelPoolHandler);
CleanClosingFixedChannelPool channelPool = new CleanClosingFixedChannelPool(bootstrap, channelPoolHandler, getPoolSize(), getPoolQueueSize());
((ExecControllerInternal) key.execution.getController()).onClose(() -> {
remove(key);
channelPool.closeCleanly();
});
return channelPool;
} else {
InstrumentedChannelPoolHandler channelPoolHandler = getSimpleHandler(key);
hostStats.put(key.host, channelPoolHandler);
return new SimpleChannelPool(bootstrap, channelPoolHandler, ALWAYS_UNHEALTHY);
}
}
};
private final Cache<String, ChannelPoolStats> hostStats = Caffeine.newBuilder()
.maximumSize(1024)
.build();

private final ManagedChannelPoolMap channelPoolMap;

public DefaultHttpClient(
ByteBufAllocator byteBufAllocator,
Expand Down Expand Up @@ -122,6 +98,64 @@ public DefaultHttpClient(
this.enableMetricsCollection = enableMetricsCollection;
this.resolver = resolver;
this.proxy = proxy;

this.channelPoolMap = isPooling() ? getPoolingChannelManager() : getSimpleChannelManager();
}

private ManagedChannelPoolMap getPoolingChannelManager() {
final HttpChannelPoolMap channelPoolMap = new HttpChannelPoolMap() {
@Override
protected ChannelPool newPool(HttpChannelKey key) {
Bootstrap bootstrap = createBootstrap(key, true);

InstrumentedChannelPoolHandler channelPoolHandler = getPoolingHandler(key);
if (enableMetricsCollection) {
hostStats.put(key.host, channelPoolHandler);
}
CleanClosingFixedChannelPool channelPool = new CleanClosingFixedChannelPool(bootstrap, channelPoolHandler, getPoolSize(), getPoolQueueSize());
((ExecControllerInternal) key.execController).onClose(() -> {
remove(key);
channelPool.closeCleanly();
});
return channelPool;
}
};

return channelPoolMap;
}

private ManagedChannelPoolMap getSimpleChannelManager() {
return new ManagedChannelPoolMap() {

@Override
public void close() {

}

@Override
public ChannelPool get(HttpChannelKey key) {
Bootstrap bootstrap = createBootstrap(key, true);
return new SimpleChannelPool(bootstrap, getSimpleHandler(key), ALWAYS_UNHEALTHY);
}

@Override
public boolean contains(HttpChannelKey key) {
return false;
}
};
}

private Bootstrap createBootstrap(HttpChannelKey key, boolean pooling) {
Bootstrap bootstrap = new Bootstrap()
.remoteAddress(key.host, key.port)
.group(key.eventLoop)
.resolver(resolver)
.channel(TransportDetector.getSocketChannelImpl())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) key.connectTimeout.toMillis())
.option(ChannelOption.ALLOCATOR, byteBufAllocator)
.option(ChannelOption.AUTO_READ, false)
.option(ChannelOption.SO_KEEPALIVE, pooling);
return bootstrap;
}

private InstrumentedChannelPoolHandler getPoolingHandler(HttpChannelKey key) {
Expand Down Expand Up @@ -160,7 +194,7 @@ private boolean isPooling() {
}

@Override
public HttpChannelPoolMap getChannelPoolMap() {
public ChannelPoolMap<HttpChannelKey, ChannelPool> getChannelPoolMap() {
return channelPoolMap;
}

Expand Down Expand Up @@ -263,7 +297,7 @@ private <T extends HttpResponse> Promise<T> intercept(Promise<T> promise, Action

public HttpClientStats getHttpClientStats() {
return new HttpClientStats(
hostStats.entrySet().stream().collect(Collectors.toMap(
hostStats.asMap().entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().getHostStats()
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package ratpack.http.client.internal;

import io.netty.channel.EventLoop;
import ratpack.exec.ExecController;
import ratpack.exec.Execution;

import java.net.URI;
Expand All @@ -27,7 +29,8 @@ final class HttpChannelKey {
final int port;
final String host;

final Execution execution;
final ExecController execController;
final EventLoop eventLoop;

final Duration connectTimeout;

Expand All @@ -45,7 +48,8 @@ final class HttpChannelKey {

this.port = uri.getPort() < 0 ? ssl ? 443 : 80 : uri.getPort();
this.host = uri.getHost();
this.execution = execution;
this.execController = execution.getController();
this.eventLoop = execution.getEventLoop();
this.connectTimeout = connectTimeout;
}

Expand All @@ -60,7 +64,7 @@ public boolean equals(Object o) {

HttpChannelKey that = (HttpChannelKey) o;

return execution.getController() == that.execution.getController()
return execController == that.execController
&& ssl == that.ssl
&& port == that.port
&& host.equals(that.host);
Expand All @@ -71,7 +75,7 @@ public int hashCode() {
int result = ssl ? 1 : 0;
result = 31 * result + port;
result = 31 * result + host.hashCode();
result = 31 * result + execution.getController().hashCode();
result = 31 * result + execController.hashCode();
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@
import io.netty.channel.pool.AbstractChannelPoolMap;
import io.netty.channel.pool.ChannelPool;

abstract class HttpChannelPoolMap extends AbstractChannelPoolMap<HttpChannelKey, ChannelPool> {
abstract class HttpChannelPoolMap extends AbstractChannelPoolMap<HttpChannelKey, ChannelPool> implements ManagedChannelPoolMap {

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package ratpack.http.client.internal;

import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolMap;
import ratpack.func.Action;
import ratpack.http.client.HttpClient;
import ratpack.http.client.HttpResponse;
import ratpack.http.client.RequestSpec;

interface HttpClientInternal extends HttpClient {

HttpChannelPoolMap getChannelPoolMap();
ChannelPoolMap<HttpChannelKey, ChannelPool> getChannelPoolMap();

Action<? super RequestSpec> getRequestInterceptor();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ratpack.http.client.internal;

import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolMap;

import java.io.Closeable;

public interface ManagedChannelPoolMap extends ChannelPoolMap<HttpChannelKey, ChannelPool>, Closeable {

void close();

}
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ default ServerConfigBuilder findBaseDir(String markerFilePath) {
ServerConfigBuilder writeSpinCount(int writeSpinCount);

/**
* The path where to store the bind port
* The path where to store the bind port.
*
* @param portFile the path file
* @return {@code this}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public interface ServerSentEventsBuilder {
* <p>
* Use {@link #buffered()} to use sensible defaults.
*
* @param numEvents the number of events to buffer (must be > 0)
* @param numBytes the number of bytes to buffer (must be > 0)
* @param numEvents the number of events to buffer (must be &gt; 0)
* @param numBytes the number of bytes to buffer (must be &gt; 0)
* @param duration the amount of time to buffer events (use 0 to disable)
* @return {@code this}
* @see #buffered(int, int, Duration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ class ServerConfigDataDeserializerSpec extends BaseRatpackSpec {
@AutoCleanup
def b1 = EphemeralBaseDir.tmpDir()
def originalClassLoader

@AutoCleanup
def classLoader = new GroovyClassLoader()

def serverEnvironment = new ServerEnvironment([:], new Properties())
def deserializer = new ServerConfigDataDeserializer(serverEnvironment.address, serverEnvironment.port, serverEnvironment.development, serverEnvironment.publicAddress, { -> FileSystemBinding.of(b1.root) } as Supplier)
def objectMapper = DefaultConfigDataBuilder.newDefaultObjectMapper()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,13 +617,15 @@ class HttpClientBodyStreamingSpec extends BaseHttpClientSpec {
closed.get()

and:
outFile.bytes.length == inFile.bytes.length
outFile.text == inFile.text

then:
text == "java.lang.IllegalStateException: Publisher completed before sending advertised number of bytes"
closed.get()

and:
outFile.bytes.length == inFile.bytes.length
outFile.text == inFile.text

where:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ BAR
pathResponse.status.code == 404
}

def "can track http client metrics when pooling is disabled"() {
def "does not track http client metrics when pooling is disabled"() {
given:
String ok = 'ok'
def result = new BlockingVariable<String>()
Expand Down Expand Up @@ -903,9 +903,9 @@ BAR

then:
polling.within(2) {
assert httpClient.getHttpClientStats().totalActiveConnectionCount == 1
assert httpClient.getHttpClientStats().totalActiveConnectionCount == 0
assert httpClient.getHttpClientStats().totalIdleConnectionCount == 0
assert httpClient.getHttpClientStats().totalConnectionCount == 1
assert httpClient.getHttpClientStats().totalConnectionCount == 0
}

when:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class BaseDirFinderSpec extends BaseRatpackSpec {
@AutoCleanup
EphemeralBaseDir b1 = EphemeralBaseDir.tmpDir()

@AutoCleanup
def classLoader = new GroovyClassLoader()

def "returns empty when not found"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ByteBufAllocatorConfig {
private boolean detailed = true;

/**
* The state of the ByteBufAllocator metric collector
* The state of the ByteBufAllocator metric collector.
*
* @return the state of the {@link ByteBufAllocator} metric collector.
*/
Expand Down
Loading

0 comments on commit 5c2be4a

Please sign in to comment.