Skip to content

Commit

Permalink
Proxy admin web requests (apache#1505)
Browse files Browse the repository at this point in the history
* Proxy admin web requests

* Remove unused dependency:

* Work on redirects

* Rethrow exception on auth failure

* Override decisionm by proxy to disable followRedirects

* Add license header

* Authenticated and Unauthenticated tests for admin proxy

* Add missing license headers:
  • Loading branch information
mgodave authored and merlimat committed Apr 25, 2018
1 parent 785d2a2 commit 0c9822c
Show file tree
Hide file tree
Showing 6 changed files with 409 additions and 12 deletions.
5 changes: 5 additions & 0 deletions pulsar-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@
<artifactId>jetty-servlets</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-proxy</artifactId>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;

import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.Objects;
import javax.net.ssl.SSLContext;
import javax.servlet.ServletException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.SecurityUtility;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.proxy.AsyncProxyServlet;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AdminProxyHandler extends AsyncProxyServlet.Transparent {
private static final Logger LOG = LoggerFactory.getLogger(AdminProxyHandler.class);

private final ProxyConfiguration config;

AdminProxyHandler(ProxyConfiguration config) {
this.config = config;
}

@Override
protected HttpClient createHttpClient() throws ServletException {
HttpClient client = super.createHttpClient();
client.setFollowRedirects(true);
return client;
}

@Override
protected HttpClient newHttpClient() {
try {
Authentication auth = AuthenticationFactory.create(
config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters()
);

Objects.requireNonNull(auth, "No supported auth found for proxy");

auth.start();

if (config.isTlsEnabledWithBroker()) {
try {
X509Certificate trustCertificates[] = SecurityUtility
.loadCertificatesFromPemFile(config.getTlsTrustCertsFilePath());

SSLContext sslCtx;
AuthenticationDataProvider authData = auth.getAuthData();
if (authData.hasDataForTls()) {
sslCtx = SecurityUtility.createSslContext(
config.isTlsAllowInsecureConnection(),
trustCertificates,
authData.getTlsCertificates(),
authData.getTlsPrivateKey()
);
} else {
sslCtx = SecurityUtility.createSslContext(
config.isTlsAllowInsecureConnection(),
trustCertificates
);
}

SslContextFactory contextFactory = new SslContextFactory();
contextFactory.setSslContext(sslCtx);

return new HttpClient(contextFactory);
} catch (Exception e) {
try {
auth.close();
} catch (IOException ioe) {
LOG.error("Failed to close the authentication service", ioe);
}
throw new PulsarClientException.InvalidConfigurationException(e.getMessage());
}
}
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}

// return an unauthenticated client, every request will fail.
return new HttpClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
package org.apache.pulsar.proxy.server;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Lists.newArrayList;
import static java.lang.Thread.setDefaultUncaughtExceptionHandler;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.slf4j.bridge.SLF4JBridgeHandler.install;
import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;

import java.util.List;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -102,17 +106,14 @@ public ProxyServiceStarter(String[] args) throws Exception {
// create a web-service
final WebServer server = new WebServer(config);

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
proxyService.close();
server.stop();
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
proxyService.close();
server.stop();
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
}
});
}));

proxyService.start();

Expand All @@ -122,6 +123,13 @@ public void run() {
server.addRestResources("/", VipStatus.class.getPackage().getName(),
VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());

AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
servletHolder.setInitParameter("preserveHost", "true");
servletHolder.setInitParameter("proxyTo", config.getBrokerServiceURL());
server.addServlet("/admin/*", servletHolder);
server.addServlet("/lookup/*", servletHolder);

// start web-service
server.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

import java.net.URI;
import java.security.GeneralSecurityException;
import java.util.Collections;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.net.ssl.SSLContext;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.eclipse.jetty.server.Handler;
Expand Down Expand Up @@ -98,8 +98,15 @@ public URI getServiceUri() {
}

public void addServlet(String path, ServletHolder servletHolder) {
addServlet(path, servletHolder, Collections.emptyList());
}

public void addServlet(String path, ServletHolder servletHolder, List<Pair<String, Object>> attributes) {
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.addServlet(servletHolder, path);
for (Pair<String, Object> attribute : attributes) {
context.setAttribute(attribute.getLeft(), attribute.getRight());
}
handlers.add(context);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;

import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.spy;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.eclipse.jetty.servlet.ServletHolder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Maps;

public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
private final String DUMMY_VALUE = "DUMMY_VALUE";

private final String configClusterName = "test";
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
private WebServer webServer;
private AdminProxyWrapper adminProxyHandler;

@BeforeMethod
@Override
protected void setup() throws Exception {
// enable tls and auth&auth at broker
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);

conf.setTlsEnabled(true);
conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(true);

Set<String> superUserRoles = new HashSet<>();
superUserRoles.add("localhost");
superUserRoles.add("superUser");
conf.setSuperUserRoles(superUserRoles);

conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
conf.setBrokerClientAuthenticationParameters(
"tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
conf.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
conf.setAuthenticationProviders(providers);

conf.setClusterName(configClusterName);

super.init();

// start proxy service
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setAuthenticationEnabled(true);

proxyConfig.setServicePort(PortManager.nextFreePort());
proxyConfig.setServicePortTls(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
proxyConfig.setWebServicePortTls(PortManager.nextFreePort());
proxyConfig.setTlsEnabledInProxy(true);
proxyConfig.setTlsEnabledWithBroker(true);

// enable tls and auth&auth at proxy
proxyConfig.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
proxyConfig.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
proxyConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);

proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(
"tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
proxyConfig.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
proxyConfig.setAuthenticationProviders(providers);

proxyConfig.setZookeeperServers(DUMMY_VALUE);
proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);

webServer = new WebServer(proxyConfig);

adminProxyHandler = new AdminProxyWrapper(proxyConfig);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
servletHolder.setInitParameter("preserveHost", "true");
servletHolder.setInitParameter("proxyTo", brokerUrlTls.toString());
webServer.addServlet("/admin/*", servletHolder);
webServer.addServlet("/lookup/*", servletHolder);

// start web-service
webServer.start();
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
webServer.stop();
super.internalCleanup();
}

@Test
public void testAuthenticatedProxy() throws Exception {
Map<String, String> authParams = Maps.newHashMap();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);

admin = spy(PulsarAdmin.builder()
.serviceHttpUrl("https://localhost:" + proxyConfig.getWebServicePortTls())
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
.allowTlsInsecureConnection(true)
.authentication(AuthenticationTls.class.getName(), authParams)
.build());

List<String> activeBrokers = admin.brokers().getActiveBrokers(configClusterName);
Assert.assertEquals(activeBrokers.size(), 1);
Assert.assertTrue(adminProxyHandler.rewriteCalled);
}

static class AdminProxyWrapper extends AdminProxyHandler {
boolean rewriteCalled = false;

AdminProxyWrapper(ProxyConfiguration config) {
super(config);
}

@Override
protected String rewriteTarget(HttpServletRequest clientRequest) {
rewriteCalled = true;
return super.rewriteTarget(clientRequest);
}

}

}
Loading

0 comments on commit 0c9822c

Please sign in to comment.