Skip to content

Commit

Permalink
Allow user to configure proxy as reverse HTTP proxy (apache#2801)
Browse files Browse the repository at this point in the history
* Allow user to configure proxy as reverse HTTP proxy

Users normally want to expose as few endpoints as possible. This patch
enables configuring the pulsar proxy as a review http proxy, so that
it can be used as a single endpoint for all pulsar and even non-pulsar
HTTP services.

The reverse proxy uses jetty's builtin implementation.

It is configured in proxy.conf, in the form

httpReverseProxy.NAME.path = '/path-on-pulsar-proxy-endpoint'
httpReverseProxy.NAME.proxyTo = 'http://internal-server/some-app'

where NAME is any user defined string. Multiple paths can be
configured this way.

* Add jetty logging at info level for tests
  • Loading branch information
ivankelly authored and merlimat committed Oct 17, 2018
1 parent a7310ee commit 052c3b0
Show file tree
Hide file tree
Showing 6 changed files with 432 additions and 15 deletions.
1 change: 1 addition & 0 deletions buildtools/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
<Root level="warn">
<AppenderRef ref="Console" />
</Root>
<Logger name="org.eclipse.jetty" level="info"/>
<Logger name="org.apache.pulsar" level="info"/>
<Logger name="org.apache.bookkeeper" level="info"/>
<Logger name="org.apache.kafka" level="info"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,15 @@ public static <T extends PulsarConfiguration> T create(InputStream inStream,
}
}

/**
* Creates PulsarConfiguration and loads it with populated attribute values from provided Properties object.
*
* @param properties The properties to populate the attributed from
* @throws IOException
* @throws IllegalArgumentException
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
protected static <T extends PulsarConfiguration> T create(Properties properties,
public static <T extends PulsarConfiguration> T create(Properties properties,
Class<? extends PulsarConfiguration> clazz) throws IOException, IllegalArgumentException {
checkNotNull(properties);
T configuration = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,25 @@
*/
package org.apache.pulsar.proxy.server;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.configuration.PulsarConfiguration;

import com.google.common.collect.Sets;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProxyConfiguration implements PulsarConfiguration {
private final static Logger log = LoggerFactory.getLogger(ProxyConfiguration.class);

// Local-Zookeeper quorum connection string
private String zookeeperServers;
Expand Down Expand Up @@ -119,7 +129,10 @@ public class ProxyConfiguration implements PulsarConfiguration {
// Specify whether Client certificates are required for TLS
// Reject the Connection if the Client Certificate is not trusted.
private boolean tlsRequireTrustedClientCertOnConnect = false;


// Http redirects to redirect to non-pulsar services
private Set<HttpReverseProxyConfig> httpReverseProxyConfigs = Sets.newHashSet();

private Properties properties = new Properties();

public boolean forwardAuthorizationCredentials() {
Expand Down Expand Up @@ -370,6 +383,29 @@ public Properties getProperties() {

public void setProperties(Properties properties) {
this.properties = properties;

Map<String, Map<String, String>> redirects = new HashMap<>();
Pattern redirectPattern = Pattern.compile("^httpReverseProxy\\.([^\\.]*)\\.(.+)$");
Map<String, List<Matcher>> groups = properties.stringPropertyNames().stream()
.map((s) -> redirectPattern.matcher(s))
.filter(Matcher::matches)
.collect(Collectors.groupingBy((m) -> m.group(1))); // group by name

groups.entrySet().forEach((e) -> {
Map<String, String> keyToFullKey = e.getValue().stream().collect(
Collectors.toMap(m -> m.group(2), m -> m.group(0)));
if (!keyToFullKey.containsKey("path")) {
throw new IllegalArgumentException(
String.format("httpReverseProxy.%s.path must be specified exactly once", e.getKey()));
}
if (!keyToFullKey.containsKey("proxyTo")) {
throw new IllegalArgumentException(
String.format("httpReverseProxy.%s.proxyTo must be specified exactly once", e.getKey()));
}
httpReverseProxyConfigs.add(new HttpReverseProxyConfig(e.getKey(),
properties.getProperty(keyToFullKey.get("path")),
properties.getProperty(keyToFullKey.get("proxyTo"))));
});
}

public Set<String> getTlsProtocols() {
Expand Down Expand Up @@ -411,4 +447,41 @@ public boolean getTlsRequireTrustedClientCertOnConnect() {
public void setTlsRequireTrustedClientCertOnConnect(boolean tlsRequireTrustedClientCertOnConnect) {
this.tlsRequireTrustedClientCertOnConnect = tlsRequireTrustedClientCertOnConnect;
}

public Set<HttpReverseProxyConfig> getHttpReverseProxyConfigs() {
return httpReverseProxyConfigs;
}

public void setHttpReverseProxyConfigs(Set<HttpReverseProxyConfig> reverseProxyConfigs) {
this.httpReverseProxyConfigs = reverseProxyConfigs;
}

public static class HttpReverseProxyConfig {
private final String name;
private final String path;
private final String proxyTo;

HttpReverseProxyConfig(String name, String path, String proxyTo) {
this.name = name;
this.path = path;
this.proxyTo = proxyTo;
}

public String getName() {
return name;
}

public String getPath() {
return path;
}

public String getProxyTo() {
return proxyTo;
}

@Override
public String toString() {
return String.format("HttpReverseProxyConfig(%s, path=%s, proxyTo=%s)", name, path, proxyTo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@
package org.apache.pulsar.proxy.server;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Lists.newArrayList;
import static java.lang.Thread.setDefaultUncaughtExceptionHandler;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.slf4j.bridge.SLF4JBridgeHandler.install;
import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;

import java.util.List;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.eclipse.jetty.proxy.ProxyServlet;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,6 +39,7 @@
import io.prometheus.client.hotspot.DefaultExports;
import org.apache.pulsar.common.configuration.VipStatus;


/**
* Starts an instance of the Pulsar ProxyService
*
Expand Down Expand Up @@ -137,22 +135,36 @@ public ProxyServiceStarter(String[] args) throws Exception {

// Setup metrics
DefaultExports.initialize();
addWebServerHandlers(server, config, proxyService.getDiscoveryProvider());

// start web-service
server.start();
}

public static void main(String[] args) throws Exception {
new ProxyServiceStarter(args);
}

static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
BrokerDiscoveryProvider discoveryProvider) {
server.addServlet("/metrics", new ServletHolder(MetricsServlet.class));
server.addRestResources("/", VipStatus.class.getPackage().getName(),
VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());

AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, proxyService.getDiscoveryProvider());
AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
servletHolder.setInitParameter("preserveHost", "true");
server.addServlet("/admin", servletHolder);
server.addServlet("/lookup", servletHolder);

// start web-service
server.start();
}

public static void main(String[] args) throws Exception {
new ProxyServiceStarter(args);
for (ProxyConfiguration.HttpReverseProxyConfig revProxy : config.getHttpReverseProxyConfigs()) {
log.debug("Adding reverse proxy with config {}", revProxy);
ServletHolder proxyHolder = new ServletHolder(ProxyServlet.Transparent.class);
proxyHolder.setInitParameter("proxyTo", revProxy.getProxyTo());
proxyHolder.setInitParameter("prefix", "/");
server.addServlet(revProxy.getPath(), proxyHolder);
}
}

private static final Logger log = LoggerFactory.getLogger(ProxyServiceStarter.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import java.net.URI;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -70,9 +72,11 @@ public class WebServer {
private final Server server;
private final ExecutorService webServiceExecutor;
private final AuthenticationService authenticationService;
private final List<String> servletPaths = Lists.newArrayList();
private final List<Handler> handlers = Lists.newArrayList();
private final ProxyConfiguration config;
protected final int externalServicePort;
protected int externalServicePort;
private URI serviceURI = null;

public WebServer(ProxyConfiguration config, AuthenticationService authenticationService) {
this.webServiceExecutor = Executors.newFixedThreadPool(32, new DefaultThreadFactory("pulsar-external-web"));
Expand Down Expand Up @@ -109,14 +113,21 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication
}

public URI getServiceUri() {
return this.server.getURI();
return serviceURI;
}

public void addServlet(String basePath, ServletHolder servletHolder) {
addServlet(basePath, servletHolder, Collections.emptyList());
}

public void addServlet(String basePath, ServletHolder servletHolder, List<Pair<String, Object>> attributes) {
Optional<String> existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst();
if (existingPath.isPresent()) {
throw new IllegalArgumentException(
String.format("Cannot add servlet at %s, path %s already exists", basePath, existingPath.get()));
}
servletPaths.add(basePath);

ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(basePath);
context.addServlet(servletHolder, "/*");
Expand Down Expand Up @@ -169,6 +180,20 @@ public void start() throws Exception {

try {
server.start();

Arrays.stream(server.getConnectors())
.filter(c -> c instanceof ServerConnector)
.findFirst().ifPresent(c -> {
WebServer.this.externalServicePort = ((ServerConnector) c).getPort();
});

// server reports URI of first servlet, we want to strip that path off
URI reportedURI = server.getURI();
serviceURI = new URI(reportedURI.getScheme(),
null,
reportedURI.getHost(),
reportedURI.getPort(),
null, null, null);
} catch (Exception e) {
List<Integer> ports = new ArrayList<>();
for (Connector c : server.getConnectors()) {
Expand Down
Loading

0 comments on commit 052c3b0

Please sign in to comment.