Skip to content

Commit

Permalink
Add http output buffer configuration to proxy (apache#2815)
Browse files Browse the repository at this point in the history
This controls the amount of data the http proxy will buffer before flushing
to the client. Setting this to something very low facilitates
streaming usecases with the reverse proxy.
  • Loading branch information
ivankelly authored and sijie committed Oct 23, 2018
1 parent f0a81cb commit 0e08b6a
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 1 deletion.
7 changes: 7 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,10 @@ tlsRequireTrustedClientCertOnConnect=false

# Deprecated. Use configurationStoreServers
globalZookeeperServers=

# Http output buffer size. The amount of data that will be buffered for http requests
# before it is flushed to the channel. A larger buffer size may result in higher http throughput
# though it may take longer for the client to see data.
# If using HTTP streaming via the reverse proxy, this should be set to the minimum value, 1,
# so that clients see the data as soon as possible.
httpOutputBufferSize=32768
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.regex.Pattern;

import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;

import com.google.common.collect.Sets;
Expand Down Expand Up @@ -133,6 +134,14 @@ public class ProxyConfiguration implements PulsarConfiguration {
// Http redirects to redirect to non-pulsar services
private Set<HttpReverseProxyConfig> httpReverseProxyConfigs = Sets.newHashSet();

// Http output buffer size. The amount of data that will be buffered for http requests
// before it is flushed to the channel. A larger buffer size may result in higher http throughput
// though it may take longer for the client to see data.
// If using HTTP streaming via the reverse proxy, this should be set to the minimum value, 1,
// so that clients see the data as soon as possible.
@FieldContext(minValue = 1)
private int httpOutputBufferSize = 32*1024;

private Properties properties = new Properties();

public boolean forwardAuthorizationCredentials() {
Expand Down Expand Up @@ -448,6 +457,14 @@ public void setTlsRequireTrustedClientCertOnConnect(boolean tlsRequireTrustedCli
this.tlsRequireTrustedClientCertOnConnect = tlsRequireTrustedClientCertOnConnect;
}

public int getHttpOutputBufferSize() {
return httpOutputBufferSize;
}

public void setHttpOutputBufferSize(int httpOutputBufferSize) {
this.httpOutputBufferSize = httpOutputBufferSize;
}

public Set<HttpReverseProxyConfig> getHttpReverseProxyConfigs() {
return httpReverseProxyConfigs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.pulsar.common.util.SecurityUtility;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.Slf4jRequestLog;
Expand Down Expand Up @@ -87,7 +89,10 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication

List<ServerConnector> connectors = Lists.newArrayList();

ServerConnector connector = new ServerConnector(server, 1, 1);
HttpConfiguration http_config = new HttpConfiguration();
http_config.setOutputBufferSize(config.getHttpOutputBufferSize());

ServerConnector connector = new ServerConnector(server, 1, 1, new HttpConnectionFactory(http_config));
connector.setPort(externalServicePort);
connectors.add(connector);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@
*/
package org.apache.pulsar.proxy.server;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.io.IOException;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BooleanSupplier;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

Expand All @@ -32,9 +40,13 @@
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.logging.LoggingFeature;

Expand Down Expand Up @@ -81,6 +93,46 @@ public void handle(String target, Request baseRequest,
};
}

private static ServletContextHandler newStreamingHandler(LinkedBlockingQueue<String> dataQueue) {
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
ServletHolder asyncHolder = new ServletHolder(new HttpServlet() {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
final AsyncContext ctx = req.startAsync();
resp.setContentType("text/plain;charset=utf-8");
resp.setStatus(HttpServletResponse.SC_OK);

ctx.start(() -> {
log.info("Doing async processing");
try {
while (true) {
String data = dataQueue.take();
if (data.equals("DONE")) {
ctx.complete();
break;
} else {
ctx.getResponse().getWriter().print(data);
ctx.getResponse().getWriter().flush();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Async handler interrupted");
ctx.complete();
} catch (Exception e) {
log.error("Unexpected error in async handler", e);
ctx.complete();
}
});
}
});
asyncHolder.setAsyncSupported(true);
context.addServlet(asyncHolder, "/");
return context;
}

@Override
@AfterClass
protected void cleanup() throws Exception {
Expand Down Expand Up @@ -296,4 +348,83 @@ public void testPathEndsInSlash() throws Exception {

}

@Test
public void testStreaming() throws Exception {
LinkedBlockingQueue<String> dataQueue = new LinkedBlockingQueue<>();
Server streamingServer = new Server(0);
streamingServer.setHandler(newStreamingHandler(dataQueue));
streamingServer.start();

Properties props = new Properties();
props.setProperty("httpOutputBufferSize", "1");
props.setProperty("httpReverseProxy.foobar.path", "/stream");
props.setProperty("httpReverseProxy.foobar.proxyTo", streamingServer.getURI().toString());
props.setProperty("servicePort", "0");
props.setProperty("webServicePort", "0");

ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));

WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig,
new BrokerDiscoveryProvider(proxyConfig, mockZooKeeperClientFactory));
webServer.start();

HttpClient httpClient = new HttpClient();
httpClient.start();
try {
LinkedBlockingQueue<Byte> responses = new LinkedBlockingQueue<>();
CompletableFuture<Result> promise = new CompletableFuture<>();
httpClient.newRequest(webServer.getServiceUri()).path("/stream")
.onResponseContent((response, content) -> {
while (content.hasRemaining()) {
try {
responses.put(content.get());
} catch (Exception e) {
log.error("Error reading response", e);
promise.completeExceptionally(e);
}
}
})
.send((result) -> {
log.info("Response complete");
promise.complete(result);
});

dataQueue.put("Some data");
assertEventuallyTrue(() -> responses.size() == "Some data".length());
Assert.assertEquals("Some data", drainToString(responses));
Assert.assertFalse(promise.isDone());

dataQueue.put("More data");
assertEventuallyTrue(() -> responses.size() == "More data".length());
Assert.assertEquals("More data", drainToString(responses));
Assert.assertFalse(promise.isDone());

dataQueue.put("DONE");
assertEventuallyTrue(() -> promise.isDone());
Assert.assertTrue(promise.get().isSucceeded());
} finally {
webServer.stop();
httpClient.stop();
streamingServer.stop();
}
}

static String drainToString(Queue<Byte> queue) throws Exception {
byte[] bytes = new byte[queue.size()];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = queue.poll();
}
return new String(bytes, UTF_8);
}

static void assertEventuallyTrue(BooleanSupplier predicate) throws Exception {
// wait up to 3 seconds
for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) {
Thread.sleep(100);
}
Assert.assertTrue(predicate.getAsBoolean());
}
}

0 comments on commit 0e08b6a

Please sign in to comment.