Skip to content

Commit

Permalink
[FLINK-7521] Add config option to set the content length limit of RES…
Browse files Browse the repository at this point in the history
…T server and client
  • Loading branch information
FangYongs authored and tillrohrmann committed Mar 14, 2018
1 parent 445cdfd commit 6f46d6d
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,19 @@ public class RestOptions {
key("rest.connection-timeout")
.defaultValue(15_000L)
.withDescription("The maximum time in ms for the client to establish a TCP connection.");

/**
* The max content length that the server will handle.
*/
public static final ConfigOption<Integer> REST_SERVER_CONTENT_MAX_MB =
key("rest.server.content.max.mb")
.defaultValue(10);

/**
* The max content length that the client will handle.
*/
public static final ConfigOption<Integer> REST_CLIENT_CONTENT_MAX_MB =
key("rest.client.content.max.mb")
.defaultValue(1);

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline()
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(1024 * 1024))
.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
.addLast(new ClientHandler())
.addLast(new PipelineErrorHandler(LOG));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,15 @@ public final class RestClientConfiguration {

private final long connectionTimeout;

private RestClientConfiguration(@Nullable SSLEngine sslEngine, final long connectionTimeout) {
private final int maxContentLength;

private RestClientConfiguration(
@Nullable final SSLEngine sslEngine,
final long connectionTimeout,
final int maxContentLength) {
this.sslEngine = sslEngine;
this.connectionTimeout = connectionTimeout;
this.maxContentLength = maxContentLength;
}

/**
Expand All @@ -61,6 +67,15 @@ public long getConnectionTimeout() {
return connectionTimeout;
}

/**
* Returns the max content length that the REST client endpoint could handle.
*
* @return max content length that the REST client endpoint could handle
*/
public int getMaxContentLength() {
return maxContentLength;
}

/**
* Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}.
*
Expand Down Expand Up @@ -89,6 +104,11 @@ public static RestClientConfiguration fromConfiguration(Configuration config) th

final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);

return new RestClientConfiguration(sslEngine, connectionTimeout);
int maxContentLength = config.getInteger(RestOptions.REST_CLIENT_CONTENT_MAX_MB) * 1024 * 1024;
if (maxContentLength <= 0) {
throw new ConfigurationException("Max content length for client must be a positive integer: " + maxContentLength);
}

return new RestClientConfiguration(sslEngine, connectionTimeout, maxContentLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public abstract class RestServerEndpoint {
private final int configuredPort;
private final SSLEngine sslEngine;
private final Path uploadDir;
private final int maxContentLength;

private final CompletableFuture<Void> terminationFuture;

Expand All @@ -96,6 +97,8 @@ public RestServerEndpoint(RestServerEndpointConfiguration configuration) throws
this.uploadDir = configuration.getUploadDir();
createUploadDir(uploadDir, log);

this.maxContentLength = configuration.getMaxContentLength();

terminationFuture = new CompletableFuture<>();

this.restAddress = null;
Expand Down Expand Up @@ -156,7 +159,7 @@ protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(new HttpObjectAggregator(MAX_REQUEST_SIZE_BYTES))
.addLast(new HttpObjectAggregator(maxContentLength))
.addLast(handler.name(), handler)
.addLast(new PipelineErrorHandler(log));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,22 @@ public final class RestServerEndpointConfiguration {

private final Path uploadDir;

private final int maxContentLength;

private RestServerEndpointConfiguration(
@Nullable String restBindAddress,
int restBindPort,
@Nullable SSLEngine sslEngine,
final Path uploadDir) {
final Path uploadDir,
final int maxContentLength) {

Preconditions.checkArgument(0 <= restBindPort && restBindPort < 65536, "The bing rest port " + restBindPort + " is out of range (0, 65536[");

this.restBindAddress = restBindAddress;
this.restBindPort = restBindPort;
this.sslEngine = sslEngine;
this.uploadDir = requireNonNull(uploadDir);
this.maxContentLength = maxContentLength;
}

/**
Expand Down Expand Up @@ -99,6 +103,15 @@ public Path getUploadDir() {
return uploadDir;
}

/**
* Returns the max content length that the REST server endpoint could handle.
*
* @return max content length that the REST server endpoint could handle
*/
public int getMaxContentLength() {
return maxContentLength;
}

/**
* Creates and returns a new {@link RestServerEndpointConfiguration} from the given {@link Configuration}.
*
Expand Down Expand Up @@ -131,6 +144,11 @@ public static RestServerEndpointConfiguration fromConfiguration(Configuration co
config.getString(WebOptions.UPLOAD_DIR, config.getString(WebOptions.TMP_DIR)),
"flink-web-upload-" + UUID.randomUUID());

return new RestServerEndpointConfiguration(address, port, sslEngine, uploadDir);
int maxContentLength = config.getInteger(RestOptions.REST_SERVER_CONTENT_MAX_MB) * 1024 * 1024;
if (maxContentLength <= 0) {
throw new ConfigurationException("Max content length for server must be a positive integer: " + maxContentLength);
}

return new RestServerEndpointConfiguration(address, port, sslEngine, uploadDir, maxContentLength);
}
}

0 comments on commit 6f46d6d

Please sign in to comment.