Skip to content

Commit

Permalink
Added option to configure max HTTP request size (apache#6296)
Browse files Browse the repository at this point in the history
* Added option to configure max HTTP request size

* Removed old comment
  • Loading branch information
merlimat authored Feb 12, 2020
1 parent 50d9352 commit 22d7824
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 2 deletions.
5 changes: 5 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,11 @@ saslJaasClientAllowedIds=
# Default value `SaslConstants.JAAS_DEFAULT_BROKER_SECTION_NAME`, which is "Broker".
saslJaasBrokerSectionName=

### --- HTTP Server config --- ###

# If >0, it will reject all HTTP requests with bodies larged than the configured limit
httpMaxRequestSize=-1

### --- BookKeeper Client --- ###

# Authentication plugin to use when connecting to bookies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private String anonymousUserRole = null;

@FieldContext(
category = CATEGORY_HTTP,
doc = "If >0, it will reject all HTTP requests with bodies larged than the configured limit"
)
private long httpMaxRequestSize = -1;

@FieldContext(
category = CATEGORY_SASL_AUTH,
doc = "This is a regexp, which limits the range of possible ids which can connect to the Broker using SASL.\n"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import java.io.IOException;

import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public class MaxRequestSizeFilter implements Filter {

private final long maxSize;

public MaxRequestSizeFilter(long maxSize) {
this.maxSize = maxSize;
}

@Override
public void init(FilterConfig filterConfig) throws ServletException {
}

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {

long size = request.getContentLengthLong();

if (size > maxSize || isChunked(request)) {
// Size it's either unknown or too large
HttpServletResponse httpResponse = (HttpServletResponse) response;
httpResponse.sendError(HttpServletResponse.SC_BAD_REQUEST, "Bad Request");
} else {
chain.doFilter(request, response);
}
}

private static boolean isChunked(ServletRequest request) {
if (request instanceof HttpServletRequest) {
HttpServletRequest req = (HttpServletRequest) request;
String encoding = req.getHeader("Transfer-Encoding");
return encoding != null && encoding.contains("chunked");
} else {
return false;
}
}

@Override
public void destroy() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import io.prometheus.client.jetty.JettyStatisticsCollector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
Expand All @@ -30,6 +31,13 @@
import java.util.TimeZone;

import javax.servlet.DispatcherType;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -146,9 +154,15 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require
context.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}

if (pulsar.getConfig().getHttpMaxRequestSize() > 0) {
context.addFilter(new FilterHolder(
new MaxRequestSizeFilter(
pulsar.getConfig().getHttpMaxRequestSize())),
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}

FilterHolder responseFilter = new FilterHolder(new ResponseHandlerFilter(pulsar));
context.addFilter(responseFilter, MATCH_ALL, EnumSet.allOf(DispatcherType.class));

handlers.add(context);
}

Expand Down Expand Up @@ -191,7 +205,6 @@ public void start() throws PulsarServerException {
handlers.add(stats);

server.setHandler(stats);

server.start();

if (httpConnector != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;

import com.google.common.io.CharStreams;
import com.google.common.io.Closeables;
Expand All @@ -33,6 +34,7 @@
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.Certificate;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -45,6 +47,15 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;

import lombok.Cleanup;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.pulsar.broker.MockedBookKeeperClientFactory;
import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -54,6 +65,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.zookeeper.MockedZooKeeperClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
Expand Down Expand Up @@ -194,6 +207,42 @@ public void testSplitPath() {
Assert.assertEquals(result, "topic1");
}

@Test
public void testMaxRequestSize() throws Exception {
setupEnv(true, "1.0", true, false, false, false);

String url = pulsar.getWebServiceAddress() + "/admin/v2/tenants/my-tenant" + System.currentTimeMillis();

@Cleanup
CloseableHttpClient client = HttpClients.createDefault();
HttpPut httpPut = new HttpPut(url);
httpPut.setHeader("Content-Type", "application/json");
httpPut.setHeader("Accept", "application/json");

// HTTP server is configured to reject everything > 10K
TenantInfo info1 = new TenantInfo();
info1.setAdminRoles(Collections.singleton(StringUtils.repeat("*", 20 * 1024)));
httpPut.setEntity(new ByteArrayEntity(ObjectMapperFactory.getThreadLocal().writeValueAsBytes(info1)));

CloseableHttpResponse response = client.execute(httpPut);
// This should have failed
assertEquals(response.getStatusLine().getStatusCode(), 400);

TenantInfo info2 = new TenantInfo();
info2.setAdminRoles(Collections.singleton(StringUtils.repeat("*", 1 * 1024)));
httpPut.setEntity(new ByteArrayEntity(ObjectMapperFactory.getThreadLocal().writeValueAsBytes(info2)));

response = client.execute(httpPut);
assertEquals(response.getStatusLine().getStatusCode(), 204);

// Simple GET without content size should go through
HttpGet httpGet = new HttpGet(url);
httpGet.setHeader("Content-Type", "application/json");
httpGet.setHeader("Accept", "application/json");
response = client.execute(httpGet);
assertEquals(response.getStatusLine().getStatusCode(), 200);
}

private String makeHttpRequest(boolean useTls, boolean useAuth) throws Exception {
InputStream response = null;
try {
Expand Down Expand Up @@ -256,6 +305,7 @@ private void setupEnv(boolean enableFilter, String minApiVersion, boolean allowU
config.setClusterName("local");
config.setAdvertisedAddress("localhost"); // TLS certificate expects localhost
config.setZookeeperServers("localhost:2181");
config.setHttpMaxRequestSize(10 * 1024);
pulsar = spy(new PulsarService(config));
pulsar.setShutdownService(new NoOpShutdownService());
doReturn(zkFactory).when(pulsar).getZooKeeperClientFactory();
Expand Down

0 comments on commit 22d7824

Please sign in to comment.