Skip to content

Commit

Permalink
[improve][proxy] Add TLS transport encryption for broker client (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece authored Aug 4, 2022
1 parent 1d6981c commit 5c09d53
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.configuration.Category;
Expand Down Expand Up @@ -406,6 +407,17 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private String brokerClientTrustCertsFilePath;

@FieldContext(
category = CATEGORY_CLIENT_AUTHENTICATION,
doc = "The path to TLS private key used by the Pulsar proxy to authenticate with Pulsar brokers"
)
private String brokerClientKeyFilePath;

@FieldContext(
category = CATEGORY_CLIENT_AUTHENTICATION,
doc = "The path to the TLS certificate used by the Pulsar proxy to authenticate with Pulsar brokers")
private String brokerClientCertificateFilePath;

@FieldContext(
category = CATEGORY_CLIENT_AUTHENTICATION,
doc = "Whether TLS is enabled when communicating with Pulsar brokers"
Expand Down Expand Up @@ -546,6 +558,25 @@ public class ProxyConfiguration implements PulsarConfiguration {
private String brokerClientSslProvider = null;

// needed when client auth is required
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "TLS KeyStore type configuration for proxy: JKS, PKCS12 "
+ " used by the Pulsar proxy to authenticate with Pulsar brokers"
)
private String brokerClientTlsKeyStoreType = "JKS";
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "TLS KeyStore path for internal client, "
+ " used by the Pulsar proxy to authenticate with Pulsar brokers"
)
private String brokerClientTlsKeyStore = null;
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "TLS KeyStore password for proxy, "
+ " used by the Pulsar proxy to authenticate with Pulsar brokers"
)
@ToString.Exclude
private String brokerClientTlsKeyStorePassword = null;
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "TLS TrustStore type configuration for proxy: JKS, PKCS12 "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,10 +587,15 @@ ClientConfigurationData createClientConfiguration() {
clientConf.setTlsTrustStoreType(proxyConfig.getBrokerClientTlsTrustStoreType());
clientConf.setTlsTrustStorePath(proxyConfig.getBrokerClientTlsTrustStore());
clientConf.setTlsTrustStorePassword(proxyConfig.getBrokerClientTlsTrustStorePassword());
clientConf.setTlsKeyStoreType(proxyConfig.getBrokerClientTlsKeyStoreType());
clientConf.setTlsKeyStorePath(proxyConfig.getBrokerClientTlsKeyStore());
clientConf.setTlsKeyStorePassword(proxyConfig.getBrokerClientTlsKeyStorePassword());
} else {
clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
clientConf.setTlsKeyFilePath(proxyConfig.getBrokerClientKeyFilePath());
clientConf.setTlsCertificateFilePath(proxyConfig.getBrokerClientCertificateFilePath());
}
clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
}
return clientConf;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;

import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertThrows;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class ProxyMutualTlsTest extends MockedPulsarServiceBaseTest {

private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";

private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();

@Override
@BeforeClass
protected void setup() throws Exception {
internalSetup();

proxyConfig.setServicePort(Optional.of(0));
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setServicePortTls(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
proxyConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setTlsRequireTrustedClientCertOnConnect(true);
proxyConfig.setTlsAllowInsecureConnection(false);

proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();

proxyService.start();
}

@Override
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
internalCleanup();

proxyService.close();
}

@Test
public void testProducerByTlsTransport() throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(proxyService.getServiceUrlTls())
.allowTlsInsecureConnection(false)
.tlsKeyFilePath(TLS_CLIENT_KEY_FILE_PATH)
.tlsCertificateFilePath(TLS_CLIENT_CERT_FILE_PATH)
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
.operationTimeout(3, TimeUnit.SECONDS)
.build();
@Cleanup
Producer<byte[]> producer =
client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/"+ UUID.randomUUID()).create();

for (int i = 0; i < 10; i++) {
producer.send("test".getBytes());
}
}

@Test
public void testProducerByAuthenticationTls() throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(proxyService.getServiceUrlTls())
.allowTlsInsecureConnection(false)
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
.authentication(new AuthenticationTls(TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH))
.operationTimeout(3, TimeUnit.SECONDS)
.build();
@Cleanup
Producer<byte[]> producer =
client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/"+ UUID.randomUUID()).create();

for (int i = 0; i < 10; i++) {
producer.send("test".getBytes());
}
}

@Test
public void testProducerNegative() throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(proxyService.getServiceUrlTls())
.allowTlsInsecureConnection(false)
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
.operationTimeout(3, TimeUnit.SECONDS)
.build();

assertThrows(PulsarClientException.class,
() -> client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/"+ UUID.randomUUID()).create());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@

import static org.mockito.Mockito.spy;
import com.google.common.collect.Sets;
import io.jsonwebtoken.SignatureAlgorithm;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import javax.crypto.SecretKey;
import lombok.Cleanup;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
Expand All @@ -42,6 +47,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand All @@ -59,6 +65,9 @@
public class ProxyWithAuthorizationTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxyWithAuthorizationTest.class);

private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
private final String CLIENT_TOKEN = AuthTokenUtils.createToken(SECRET_KEY, "Client", Optional.empty());

private final String TLS_PROXY_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/proxy-cacert.pem";
private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/proxy-cert.pem";
private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/proxy-key.pem";
Expand Down Expand Up @@ -170,7 +179,11 @@ protected void doInitConf() throws Exception {
conf.setBrokerClientAuthenticationParameters(
"tlsCertFile:" + TLS_BROKER_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_BROKER_KEY_FILE_PATH);
conf.setBrokerClientTrustCertsFilePath(TLS_BROKER_TRUST_CERT_FILE_PATH);
conf.setAuthenticationProviders(Collections.singleton(AuthenticationProviderTls.class.getName()));
conf.setAuthenticationProviders(Set.of(AuthenticationProviderTls.class.getName(),
AuthenticationProviderToken.class.getName()));
Properties properties = new Properties();
properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
conf.setProperties(properties);

conf.setClusterName("proxy-authorization");
conf.setNumExecutorThreadPoolSize(5);
Expand Down Expand Up @@ -204,7 +217,11 @@ protected void setup() throws Exception {
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(
"tlsCertFile:" + TLS_PROXY_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_PROXY_KEY_FILE_PATH);
proxyConfig.setAuthenticationProviders(Collections.singleton(AuthenticationProviderTls.class.getName()));
proxyConfig.setAuthenticationProviders(Set.of(AuthenticationProviderTls.class.getName(),
AuthenticationProviderToken.class.getName()));
Properties properties = new Properties();
properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
proxyConfig.setProperties(properties);

proxyService = Mockito.spy(new ProxyService(proxyConfig,
new AuthenticationService(
Expand Down Expand Up @@ -378,6 +395,8 @@ public void testTlsHostVerificationProxyToBroker(boolean hostnameVerificationEna
}

log.info("-- Exiting {} test --", methodName);
// reset
proxyConfig.setTlsHostnameVerificationEnabled(false);
}

/*
Expand Down Expand Up @@ -471,6 +490,79 @@ public void tlsCiphersAndProtocols(Set<String> tlsCiphers, Set<String> tlsProtoc
log.info("-- Exiting {} test --", methodName);
}

private final Authentication tlsAuth = new AuthenticationTls(TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH);
private final Authentication tokenAuth = new AuthenticationToken(CLIENT_TOKEN);

@DataProvider
public Object[] tlsTransportWithAuth() {
return new Object[]{
tlsAuth,
tokenAuth,
};
}

@Test(dataProvider = "tlsTransportWithAuth")
public void testProxyTlsTransportWithAuth(Authentication auth) throws Exception {
log.info("-- Starting {} test --", methodName);

startProxy();
createAdminClient();

@Cleanup
PulsarClient proxyClient = PulsarClient.builder()
.serviceUrl(proxyService.getServiceUrlTls())
.statsInterval(0, TimeUnit.SECONDS)
.authentication(auth)
.tlsKeyFilePath(TLS_CLIENT_KEY_FILE_PATH)
.tlsCertificateFilePath(TLS_CLIENT_CERT_FILE_PATH)
.tlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH)
.operationTimeout(1000, TimeUnit.MILLISECONDS)
.build();

String namespaceName = "my-tenant/my-ns";

admin.clusters().createCluster("proxy-authorization",
ClusterData.builder().serviceUrlTls(brokerUrlTls.toString()).build());

admin.tenants().createTenant("my-tenant",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
admin.namespaces().createNamespace(namespaceName);

admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));

Consumer<byte[]> consumer = proxyClient.newConsumer()
.topic("persistent://my-tenant/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();

Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
.topic("persistent://my-tenant/my-ns/my-topic1").create();
final int msgs = 10;
for (int i = 0; i < msgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

Message<byte[]> msg = null;
Set<String> messageSet = new HashSet<>();
int count = 0;
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
count++;
}
// Acknowledge the consumption of all messages at once
Assert.assertEquals(msgs, count);
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}

private void createAdminClient() throws Exception {
Map<String, String> authParams = Maps.newHashMap();
authParams.put("tlsCertFile", TLS_SUPERUSER_CLIENT_CERT_FILE_PATH);
Expand Down

0 comments on commit 5c09d53

Please sign in to comment.