diff --git a/conf/proxy.conf b/conf/proxy.conf index 68f0e8dd1d160..dbac95aa2324b 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -134,3 +134,10 @@ tlsRequireTrustedClientCertOnConnect=false # Deprecated. Use configurationStoreServers globalZookeeperServers= + +# Http output buffer size. The amount of data that will be buffered for http requests +# before it is flushed to the channel. A larger buffer size may result in higher http throughput +# though it may take longer for the client to see data. +# If using HTTP streaming via the reverse proxy, this should be set to the minimum value, 1, +# so that clients see the data as soon as possible. +httpOutputBufferSize=32768 diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 18a09a6d3252a..b84bf3c507eb1 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; +import org.apache.pulsar.common.configuration.FieldContext; import org.apache.pulsar.common.configuration.PulsarConfiguration; import com.google.common.collect.Sets; @@ -133,6 +134,14 @@ public class ProxyConfiguration implements PulsarConfiguration { // Http redirects to redirect to non-pulsar services private Set httpReverseProxyConfigs = Sets.newHashSet(); + // Http output buffer size. The amount of data that will be buffered for http requests + // before it is flushed to the channel. A larger buffer size may result in higher http throughput + // though it may take longer for the client to see data. + // If using HTTP streaming via the reverse proxy, this should be set to the minimum value, 1, + // so that clients see the data as soon as possible. + @FieldContext(minValue = 1) + private int httpOutputBufferSize = 32*1024; + private Properties properties = new Properties(); public boolean forwardAuthorizationCredentials() { @@ -448,6 +457,14 @@ public void setTlsRequireTrustedClientCertOnConnect(boolean tlsRequireTrustedCli this.tlsRequireTrustedClientCertOnConnect = tlsRequireTrustedClientCertOnConnect; } + public int getHttpOutputBufferSize() { + return httpOutputBufferSize; + } + + public void setHttpOutputBufferSize(int httpOutputBufferSize) { + this.httpOutputBufferSize = httpOutputBufferSize; + } + public Set getHttpReverseProxyConfigs() { return httpReverseProxyConfigs; } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java index c5618bb448b25..cf4469bfcdf22 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java @@ -43,6 +43,8 @@ import org.apache.pulsar.common.util.SecurityUtility; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.Slf4jRequestLog; @@ -87,7 +89,10 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication List connectors = Lists.newArrayList(); - ServerConnector connector = new ServerConnector(server, 1, 1); + HttpConfiguration http_config = new HttpConfiguration(); + http_config.setOutputBufferSize(config.getHttpOutputBufferSize()); + + ServerConnector connector = new ServerConnector(server, 1, 1, new HttpConnectionFactory(http_config)); connector.setPort(externalServicePort); connectors.add(connector); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java index d08619da0342f..d5efb2362e394 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java @@ -18,10 +18,18 @@ */ package org.apache.pulsar.proxy.server; +import static java.nio.charset.StandardCharsets.UTF_8; + import java.io.IOException; import java.util.Properties; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.BooleanSupplier; +import javax.servlet.AsyncContext; import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -32,9 +40,13 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.logging.LoggingFeature; @@ -81,6 +93,46 @@ public void handle(String target, Request baseRequest, }; } + private static ServletContextHandler newStreamingHandler(LinkedBlockingQueue dataQueue) { + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + ServletHolder asyncHolder = new ServletHolder(new HttpServlet() { + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + final AsyncContext ctx = req.startAsync(); + resp.setContentType("text/plain;charset=utf-8"); + resp.setStatus(HttpServletResponse.SC_OK); + + ctx.start(() -> { + log.info("Doing async processing"); + try { + while (true) { + String data = dataQueue.take(); + if (data.equals("DONE")) { + ctx.complete(); + break; + } else { + ctx.getResponse().getWriter().print(data); + ctx.getResponse().getWriter().flush(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Async handler interrupted"); + ctx.complete(); + } catch (Exception e) { + log.error("Unexpected error in async handler", e); + ctx.complete(); + } + }); + } + }); + asyncHolder.setAsyncSupported(true); + context.addServlet(asyncHolder, "/"); + return context; + } + @Override @AfterClass protected void cleanup() throws Exception { @@ -296,4 +348,83 @@ public void testPathEndsInSlash() throws Exception { } + @Test + public void testStreaming() throws Exception { + LinkedBlockingQueue dataQueue = new LinkedBlockingQueue<>(); + Server streamingServer = new Server(0); + streamingServer.setHandler(newStreamingHandler(dataQueue)); + streamingServer.start(); + + Properties props = new Properties(); + props.setProperty("httpOutputBufferSize", "1"); + props.setProperty("httpReverseProxy.foobar.path", "/stream"); + props.setProperty("httpReverseProxy.foobar.proxyTo", streamingServer.getURI().toString()); + props.setProperty("servicePort", "0"); + props.setProperty("webServicePort", "0"); + + ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); + AuthenticationService authService = new AuthenticationService( + PulsarConfigurationLoader.convertFrom(proxyConfig)); + + WebServer webServer = new WebServer(proxyConfig, authService); + ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, + new BrokerDiscoveryProvider(proxyConfig, mockZooKeeperClientFactory)); + webServer.start(); + + HttpClient httpClient = new HttpClient(); + httpClient.start(); + try { + LinkedBlockingQueue responses = new LinkedBlockingQueue<>(); + CompletableFuture promise = new CompletableFuture<>(); + httpClient.newRequest(webServer.getServiceUri()).path("/stream") + .onResponseContent((response, content) -> { + while (content.hasRemaining()) { + try { + responses.put(content.get()); + } catch (Exception e) { + log.error("Error reading response", e); + promise.completeExceptionally(e); + } + } + }) + .send((result) -> { + log.info("Response complete"); + promise.complete(result); + }); + + dataQueue.put("Some data"); + assertEventuallyTrue(() -> responses.size() == "Some data".length()); + Assert.assertEquals("Some data", drainToString(responses)); + Assert.assertFalse(promise.isDone()); + + dataQueue.put("More data"); + assertEventuallyTrue(() -> responses.size() == "More data".length()); + Assert.assertEquals("More data", drainToString(responses)); + Assert.assertFalse(promise.isDone()); + + dataQueue.put("DONE"); + assertEventuallyTrue(() -> promise.isDone()); + Assert.assertTrue(promise.get().isSucceeded()); + } finally { + webServer.stop(); + httpClient.stop(); + streamingServer.stop(); + } + } + + static String drainToString(Queue queue) throws Exception { + byte[] bytes = new byte[queue.size()]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = queue.poll(); + } + return new String(bytes, UTF_8); + } + + static void assertEventuallyTrue(BooleanSupplier predicate) throws Exception { + // wait up to 3 seconds + for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) { + Thread.sleep(100); + } + Assert.assertTrue(predicate.getAsBoolean()); + } }