From f22bbeafec45290cc15ad2935e9ebd32d119b8af Mon Sep 17 00:00:00 2001 From: massakam Date: Sun, 10 Feb 2019 01:06:06 +0900 Subject: [PATCH] Create SSL context in constructor of ChannelInitializer (#3550) * Create SSL context in constructor of ChannelInitializer * Set TLS ports only in tests that need to enable TLS --- .../service/PulsarChannelInitializer.java | 23 +++--- .../pulsar/broker/admin/AdminApiTest.java | 2 + .../broker/admin/AdminApiTlsAuthTest.java | 2 + .../admin/BrokerAdminClientTlsAuthTest.java | 2 + .../broker/admin/v1/V1_AdminApiTest.java | 2 + .../auth/MockedPulsarServiceBaseTest.java | 2 - .../broker/service/BrokerServiceTest.java | 8 ++ .../AuthenticatedProducerConsumerTest.java | 2 + ...enticationTlsHostnameVerificationTest.java | 4 + .../client/api/BrokerServiceLookupTest.java | 14 +--- .../client/api/ServiceUrlProviderTest.java | 2 - .../client/api/TlsProducerConsumerBase.java | 2 + .../pulsar/client/impl/ConnectionPool.java | 49 +++--------- .../client/impl/PulsarChannelInitializer.java | 74 +++++++++++++++++++ .../service/ServiceChannelInitializer.java | 26 ++++--- pulsar-proxy/pom.xml | 5 ++ .../proxy/server/DirectProxyHandler.java | 20 +---- .../pulsar/proxy/server/ProxyConnection.java | 9 ++- .../proxy/server/ProxyConnectionPool.java | 5 +- .../server/ServiceChannelInitializer.java | 55 +++++++++++--- .../server/AuthedAdminProxyHandlerTest.java | 2 + ...roxyAuthenticatedProducerConsumerTest.java | 2 + .../server/ProxyWithAuthorizationNegTest.java | 2 + .../server/ProxyWithAuthorizationTest.java | 10 ++- .../ProxyWithoutServiceDiscoveryTest.java | 2 + .../SuperUserAuthedAdminProxyHandlerTest.java | 2 + 26 files changed, 219 insertions(+), 109 deletions(-) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index f245af354888e..4cd5c0276dfaf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -34,28 +34,29 @@ public class PulsarChannelInitializer extends ChannelInitializer public static final String TLS_HANDLER = "tls"; private final PulsarService pulsar; - private final boolean enableTLS; + private final SslContext sslCtx; /** * * @param brokerService */ - public PulsarChannelInitializer(PulsarService pulsar, - boolean enableTLS) { + public PulsarChannelInitializer(PulsarService pulsar, boolean enableTLS) throws Exception { super(); this.pulsar = pulsar; - this.enableTLS = enableTLS; + if (enableTLS) { + ServiceConfiguration serviceConfig = pulsar.getConfiguration(); + this.sslCtx = SecurityUtility.createNettySslContextForServer(serviceConfig.isTlsAllowInsecureConnection(), + serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(), + serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(), + serviceConfig.isTlsRequireTrustedClientCertOnConnect()); + } else { + this.sslCtx = null; + } } @Override protected void initChannel(SocketChannel ch) throws Exception { - ServiceConfiguration serviceConfig = pulsar.getConfiguration(); - if (enableTLS) { - SslContext sslCtx = SecurityUtility.createNettySslContextForServer( - serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustCertsFilePath(), - serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath(), - serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(), - serviceConfig.isTlsRequireTrustedClientCertOnConnect()); + if (sslCtx != null) { ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc())); ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 837db22100ffd..66e9067a13cab 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -140,6 +140,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { @Override public void setup() throws Exception { conf.setLoadBalancerEnabled(true); + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java index 2dfcac218228d..f1797800efe1a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java @@ -69,6 +69,8 @@ private static String getTLSFile(String name) { @Override public void setup() throws Exception { conf.setLoadBalancerEnabled(true); + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsCertificateFilePath(getTLSFile("broker.cert")); conf.setTlsKeyFilePath(getTLSFile("broker.key-pk8")); conf.setTlsTrustCertsFilePath(getTLSFile("ca.cert")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java index e478631513693..7edf18a7be8e1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java @@ -77,6 +77,8 @@ private static String getTLSFile(String name) { @BeforeMethod @Override public void setup() throws Exception { + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); buildConf(conf); super.internalSetup(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 8fb174db7d38a..eda4409edca09 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -139,6 +139,8 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { @Override public void setup() throws Exception { conf.setLoadBalancerEnabled(true); + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 99e7ae431bb2a..c33b2121b492e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -93,10 +93,8 @@ public MockedPulsarServiceBaseTest() { protected void resetConfig() { this.conf = new ServiceConfiguration(); this.conf.setBrokerServicePort(BROKER_PORT); - this.conf.setBrokerServicePortTls(BROKER_PORT_TLS); this.conf.setAdvertisedAddress("localhost"); this.conf.setWebServicePort(BROKER_WEBSERVICE_PORT); - this.conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); this.conf.setClusterName(configClusterName); this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate this.conf.setManagedLedgerCacheSizeMB(8); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 4e9f4e986f33b..418bebf122288 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -446,6 +446,8 @@ public void testTlsEnabled() throws Exception { final String subName = "newSub"; conf.setAuthenticationEnabled(false); + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); restartBroker(); @@ -523,6 +525,8 @@ public void testTlsAuthAllowInsecure() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthenticationProviders(providers); + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); conf.setTlsAllowInsecureConnection(true); @@ -581,6 +585,8 @@ public void testTlsAuthDisallowInsecure() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthenticationProviders(providers); + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); conf.setTlsAllowInsecureConnection(false); @@ -638,6 +644,8 @@ public void testTlsAuthUseTrustCert() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthenticationProviders(providers); + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); conf.setTlsAllowInsecureConnection(false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java index 4923894c661f9..ef8765bdccafc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -71,6 +71,8 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java index dbd893a9e145d..a2ae4c1a45e73 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java @@ -151,6 +151,8 @@ public void testTlsSyncProducerAndConsumerWithInvalidBrokerHost(boolean hostname this.hostnameVerificationEnabled = hostnameVerificationEnabled; // setup broker cert which has CN = "pulsar" different than broker's hostname="localhost" + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_MIM_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_MIM_SERVER_KEY_FILE_PATH); @@ -190,6 +192,8 @@ public void testTlsSyncProducerAndConsumerCorrectBrokerHost() throws Exception { log.info("-- Starting {} test --", methodName); // setup broker cert which has CN = "localhost" + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 54b10ba7c8afe..f784c3a933278 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -142,9 +142,7 @@ public void testMultipleBrokerLookup() throws Exception { /**** start broker-2 ****/ ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setBrokerServicePort(PortManager.nextFreePort()); - conf2.setBrokerServicePortTls(PortManager.nextFreePort()); conf2.setWebServicePort(PortManager.nextFreePort()); - conf2.setWebServicePortTls(PortManager.nextFreePort()); conf2.setAdvertisedAddress("localhost"); conf2.setClusterName(conf.getClusterName()); conf2.setZookeeperServers("localhost:2181"); @@ -222,9 +220,7 @@ public void testMultipleBrokerDifferentClusterLookup() throws Exception { ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); conf2.setBrokerServicePort(PortManager.nextFreePort()); - conf2.setBrokerServicePortTls(PortManager.nextFreePort()); conf2.setWebServicePort(PortManager.nextFreePort()); - conf2.setWebServicePortTls(PortManager.nextFreePort()); conf2.setAdvertisedAddress("localhost"); conf2.setClusterName(newCluster); // Broker2 serves newCluster conf2.setZookeeperServers("localhost:2181"); @@ -313,9 +309,7 @@ public void testPartitionTopicLookup() throws Exception { ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); conf2.setBrokerServicePort(PortManager.nextFreePort()); - conf2.setBrokerServicePortTls(PortManager.nextFreePort()); conf2.setWebServicePort(PortManager.nextFreePort()); - conf2.setWebServicePortTls(PortManager.nextFreePort()); conf2.setAdvertisedAddress("localhost"); conf2.setClusterName(pulsar.getConfiguration().getClusterName()); conf2.setZookeeperServers("localhost:2181"); @@ -401,6 +395,8 @@ public void testWebserviceServiceTls() throws Exception { PulsarService pulsar2 = startBroker(conf2); // restart broker1 with tls enabled + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsAllowInsecureConnection(true); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); @@ -529,6 +525,8 @@ public void testDiscoveryLookupTls() throws Exception { final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key"; // (1) restart broker1 with tls enabled + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsAllowInsecureConnection(true); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); @@ -804,9 +802,7 @@ public void testSplitUnloadLookupTest() throws Exception { ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); conf2.setBrokerServicePort(PortManager.nextFreePort()); - conf2.setBrokerServicePortTls(PortManager.nextFreePort()); conf2.setWebServicePort(PortManager.nextFreePort()); - conf2.setWebServicePortTls(PortManager.nextFreePort()); conf2.setAdvertisedAddress("localhost"); conf2.setClusterName(conf.getClusterName()); conf2.setZookeeperServers("localhost:2181"); @@ -909,9 +905,7 @@ public void testModularLoadManagerSplitBundle() throws Exception { ServiceConfiguration conf2 = new ServiceConfiguration(); conf2.setAdvertisedAddress("localhost"); conf2.setBrokerServicePort(PortManager.nextFreePort()); - conf2.setBrokerServicePortTls(PortManager.nextFreePort()); conf2.setWebServicePort(PortManager.nextFreePort()); - conf2.setWebServicePortTls(PortManager.nextFreePort()); conf2.setAdvertisedAddress("localhost"); conf2.setClusterName(conf.getClusterName()); conf2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java index 40ae7f8e6ba23..1badfa2166414 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java @@ -104,8 +104,6 @@ public void testCreateClientWithAutoChangedServiceUrlProvider() throws Exception PulsarService pulsarService1 = pulsar; conf.setBrokerServicePort(PortManager.nextFreePort()); conf.setWebServicePort(PortManager.nextFreePort()); - conf.setBrokerServicePortTls(PortManager.nextFreePort()); - conf.setWebServicePortTls(PortManager.nextFreePort()); startBroker(); PulsarService pulsarService2 = pulsar; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java index 5d966cb6bbfd2..a496531aecbbc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java @@ -58,6 +58,8 @@ protected void cleanup() throws Exception { } protected void internalSetUpForBroker() throws Exception { + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 25f4b14a7fd3a..1aa1e43185d87 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.security.cert.X509Certificate; import java.util.Iterator; import java.util.List; import java.util.Random; @@ -31,11 +30,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.function.Supplier; -import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; -import org.apache.pulsar.common.api.ByteBufPair; -import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,12 +43,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.handler.ssl.SslContext; import io.netty.resolver.dns.DnsNameResolver; import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.util.concurrent.Future; @@ -66,14 +58,12 @@ public class ConnectionPool implements Closeable { protected final DnsNameResolver dnsResolver; - private static final int MaxMessageSize = 5 * 1024 * 1024; - public static final String TLS_HANDLER = "tls"; - - public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) { + public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup)); } - public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, Supplier clientCnxSupplier) { + public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, + Supplier clientCnxSupplier) throws PulsarClientException { this.eventLoopGroup = eventLoopGroup; this.maxConnectionsPerHosts = conf.getConnectionsPerBroker(); @@ -85,29 +75,13 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs()); bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay()); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); - bootstrap.handler(new ChannelInitializer() { - public void initChannel(SocketChannel ch) throws Exception { - if (conf.isUseTls()) { - SslContext sslCtx; - // Set client certificate if available - AuthenticationDataProvider authData = conf.getAuthentication().getAuthData(); - if (authData.hasDataForTls()) { - sslCtx = SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), - conf.getTlsTrustCertsFilePath(), (X509Certificate[]) authData.getTlsCertificates(), - authData.getTlsPrivateKey()); - } else { - sslCtx = SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), - conf.getTlsTrustCertsFilePath()); - } - ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc())); - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); - } else { - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); - } - ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4)); - ch.pipeline().addLast("handler", clientCnxSupplier.get()); - } - }); + + try { + bootstrap.handler(new PulsarChannelInitializer(conf, clientCnxSupplier)); + } catch (Exception e) { + log.error("Failed to create channel initializer"); + throw new PulsarClientException(e); + } this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true) .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build(); @@ -130,7 +104,8 @@ void closeAllConnections() { // If the future already failed, there's nothing we have to do } } else { - // The future is still pending: just register to make sure it gets closed if the operation will succeed + // The future is still pending: just register to make sure it gets closed if the operation will + // succeed future.thenAccept(ClientCnx::close); } }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java new file mode 100644 index 0000000000000..494cb1283264b --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.security.cert.X509Certificate; +import java.util.function.Supplier; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.ssl.SslContext; + +import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.api.ByteBufPair; +import org.apache.pulsar.common.api.PulsarDecoder; +import org.apache.pulsar.common.util.SecurityUtility; + +public class PulsarChannelInitializer extends ChannelInitializer { + + public static final String TLS_HANDLER = "tls"; + + private final Supplier clientCnxSupplier; + private final SslContext sslCtx; + + public PulsarChannelInitializer(ClientConfigurationData conf, Supplier clientCnxSupplier) + throws Exception { + super(); + this.clientCnxSupplier = clientCnxSupplier; + if (conf.isUseTls()) { + // Set client certificate if available + AuthenticationDataProvider authData = conf.getAuthentication().getAuthData(); + if (authData.hasDataForTls()) { + this.sslCtx = SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), + conf.getTlsTrustCertsFilePath(), (X509Certificate[]) authData.getTlsCertificates(), + authData.getTlsPrivateKey()); + } else { + this.sslCtx = SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), + conf.getTlsTrustCertsFilePath()); + } + } else { + this.sslCtx = null; + } + } + + @Override + public void initChannel(SocketChannel ch) throws Exception { + if (sslCtx != null) { + ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc())); + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); + } else { + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); + } + + ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4)); + ch.pipeline().addLast("handler", clientCnxSupplier.get()); + } +} diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java index 3f230d410196d..d3d17c89d2bb1 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java @@ -34,26 +34,28 @@ public class ServiceChannelInitializer extends ChannelInitializer { public static final String TLS_HANDLER = "tls"; - private ServiceConfig serviceConfig; - private DiscoveryService discoveryService; - private boolean enableTLS; + private final ServiceConfig serviceConfig; + private final DiscoveryService discoveryService; + private final SslContext sslCtx; - public ServiceChannelInitializer(DiscoveryService discoveryService, ServiceConfig serviceConfig, - boolean enableTLS) { + public ServiceChannelInitializer(DiscoveryService discoveryService, ServiceConfig serviceConfig, boolean enableTLS) + throws Exception { super(); this.serviceConfig = serviceConfig; this.discoveryService = discoveryService; - this.enableTLS = enableTLS; + if (enableTLS) { + this.sslCtx = SecurityUtility.createNettySslContextForServer(serviceConfig.isTlsAllowInsecureConnection(), + serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(), + serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(), + serviceConfig.getTlsRequireTrustedClientCertOnConnect()); + } else { + this.sslCtx = null; + } } @Override protected void initChannel(SocketChannel ch) throws Exception { - if (enableTLS) { - SslContext sslCtx = SecurityUtility.createNettySslContextForServer( - serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustCertsFilePath(), - serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath(), - serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(), - serviceConfig.getTlsRequireTrustedClientCertOnConnect()); + if (sslCtx != null) { ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc())); } ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4)); diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml index 05e2b396af8c7..1f60fd1be9677 100644 --- a/pulsar-proxy/pom.xml +++ b/pulsar-proxy/pom.xml @@ -49,6 +49,11 @@ ${project.version} + + org.apache.commons + commons-lang3 + + org.eclipse.jetty jetty-server diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index c7fa786123e5a..0f96845d28c9d 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -21,17 +21,14 @@ import java.net.URI; import java.net.URISyntaxException; -import java.security.cert.X509Certificate; import javax.net.ssl.SSLSession; import org.apache.http.conn.ssl.DefaultHostnameVerifier; import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.PulsarDecoder; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected; -import org.apache.pulsar.common.util.SecurityUtility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,15 +60,17 @@ public class DirectProxyHandler { public static final String TLS_HANDLER = "tls"; private final Authentication authentication; + private final SslContext sslCtx; public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl, - int protocolVersion) { + int protocolVersion, SslContext sslCtx) { this.authentication = proxyConnection.getClientAuthentication(); this.inboundChannel = proxyConnection.ctx().channel(); this.originalPrincipal = proxyConnection.clientAuthRole; this.clientAuthData = proxyConnection.clientAuthData; this.clientAuthMethod = proxyConnection.clientAuthMethod; this.protocolVersion = protocolVersion; + this.sslCtx = sslCtx; ProxyConfiguration config = service.getConfiguration(); // Start the connection attempt. @@ -84,18 +83,7 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, b.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { - if (config.isTlsEnabledWithBroker()) { - SslContext sslCtx; - // Set client certificate if available - AuthenticationDataProvider authData = authentication.getAuthData(); - if (authData.hasDataForTls()) { - sslCtx = SecurityUtility.createNettySslContextForClient(config.isTlsAllowInsecureConnection(), - config.getBrokerClientTrustCertsFilePath(), - (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey()); - } else { - sslCtx = SecurityUtility.createNettySslContextForClient(config.isTlsAllowInsecureConnection(), - config.getBrokerClientTrustCertsFilePath()); - } + if (sslCtx != null) { ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc())); } ch.pipeline().addLast("frameDecoder", diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 54a08ef0b4f3a..90b979e56e6c0 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -49,6 +49,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -64,6 +65,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener clientCnxSupplier) { + Supplier clientCnxSupplier) throws PulsarClientException { super(clientConfig, eventLoopGroup, clientCnxSupplier); } @@ -56,4 +57,4 @@ public void close() throws IOException { } private static final Logger log = LoggerFactory.getLogger(ProxyConnectionPool.class); -} \ No newline at end of file +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java index 30001c74800b2..f87523e0bbaf6 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java @@ -18,6 +18,12 @@ */ package org.apache.pulsar.proxy.server; +import static org.apache.commons.lang3.StringUtils.isEmpty; + +import java.security.cert.X509Certificate; + +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.common.api.PulsarDecoder; import org.apache.pulsar.common.util.SecurityUtility; @@ -33,28 +39,53 @@ public class ServiceChannelInitializer extends ChannelInitializer { public static final String TLS_HANDLER = "tls"; - private ProxyConfiguration serviceConfig; - private ProxyService proxyService; - private boolean enableTLS; + private final ProxyService proxyService; + private final SslContext serverSslCtx; + private final SslContext clientSslCtx; - public ServiceChannelInitializer(ProxyService proxyService, ProxyConfiguration serviceConfig, boolean enableTLS) { + public ServiceChannelInitializer(ProxyService proxyService, ProxyConfiguration serviceConfig, boolean enableTLS) + throws Exception { super(); - this.serviceConfig = serviceConfig; this.proxyService = proxyService; - this.enableTLS = enableTLS; - } - @Override - protected void initChannel(SocketChannel ch) throws Exception { if (enableTLS) { - SslContext sslCtx = SecurityUtility.createNettySslContextForServer(true /* to allow InsecureConnection */, + this.serverSslCtx = SecurityUtility.createNettySslContextForServer(true /* to allow InsecureConnection */, serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(), serviceConfig.isTlsRequireTrustedClientCertOnConnect()); - ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc())); + } else { + this.serverSslCtx = null; + } + + if (serviceConfig.isTlsEnabledWithBroker()) { + AuthenticationDataProvider authData = null; + + if (!isEmpty(serviceConfig.getBrokerClientAuthenticationPlugin())) { + authData = AuthenticationFactory.create(serviceConfig.getBrokerClientAuthenticationPlugin(), + serviceConfig.getBrokerClientAuthenticationParameters()).getAuthData(); + } + + if (authData != null && authData.hasDataForTls()) { + this.clientSslCtx = SecurityUtility.createNettySslContextForClient( + serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getBrokerClientTrustCertsFilePath(), + (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey()); + } else { + this.clientSslCtx = SecurityUtility.createNettySslContextForClient( + serviceConfig.isTlsAllowInsecureConnection(), + serviceConfig.getBrokerClientTrustCertsFilePath()); + } + } else { + this.clientSslCtx = null; + } + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + if (serverSslCtx != null) { + ch.pipeline().addLast(TLS_HANDLER, serverSslCtx.newHandler(ch.alloc())); } ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4)); - ch.pipeline().addLast("handler", new ProxyConnection(proxyService)); + ch.pipeline().addLast("handler", new ProxyConnection(proxyService, clientSslCtx)); } } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java index 096ff051b57e6..231099ee2e5a7 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java @@ -62,6 +62,8 @@ protected void setup() throws Exception { // enable tls and auth&auth at broker conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsTrustCertsFilePath(getTlsFile("ca.cert")); conf.setTlsCertificateFilePath(getTlsFile("broker.cert")); conf.setTlsKeyFilePath(getTlsFile("broker.key-pk8")); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java index 50cd920e6ee44..fe6f9c2cd635f 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java @@ -74,6 +74,8 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java index 1fb9cb4a7aa95..9aa23dd984241 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java @@ -78,6 +78,8 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_BROKER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_BROKER_KEY_FILE_PATH); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java index 1f743e29c26e2..7182c0ea73f9a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java @@ -142,6 +142,8 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_BROKER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_BROKER_KEY_FILE_PATH); @@ -404,7 +406,13 @@ public void tlsCiphersAndProtocols(Set tlsCiphers, Set tlsProtoc ProxyService proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)))); - proxyService.start(); + try { + proxyService.start(); + } catch (Exception ex) { + if (!expectFailure) { + Assert.fail("This test case should not fail"); + } + } org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically((test) -> { try { return admin.namespaces().getPermissions(namespaceName).containsKey("Proxy") diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java index 825982c3df895..db78adb48f7dc 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java @@ -70,6 +70,8 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(false); + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java index 4bca6d4ca1955..c29e6ab6882cf 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java @@ -64,6 +64,8 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); + conf.setBrokerServicePortTls(BROKER_PORT_TLS); + conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); conf.setTlsTrustCertsFilePath(getTlsFile("ca.cert")); conf.setTlsCertificateFilePath(getTlsFile("broker.cert")); conf.setTlsKeyFilePath(getTlsFile("broker.key-pk8"));