Skip to content

Commit

Permalink
Create SSL context in constructor of ChannelInitializer (apache#3550)
Browse files Browse the repository at this point in the history
* Create SSL context in constructor of ChannelInitializer

* Set TLS ports only in tests that need to enable TLS
  • Loading branch information
massakam authored and merlimat committed Feb 9, 2019
1 parent 18312da commit f22bbea
Show file tree
Hide file tree
Showing 26 changed files with 219 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,29 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
public static final String TLS_HANDLER = "tls";

private final PulsarService pulsar;
private final boolean enableTLS;
private final SslContext sslCtx;

/**
*
* @param brokerService
*/
public PulsarChannelInitializer(PulsarService pulsar,
boolean enableTLS) {
public PulsarChannelInitializer(PulsarService pulsar, boolean enableTLS) throws Exception {
super();
this.pulsar = pulsar;
this.enableTLS = enableTLS;
if (enableTLS) {
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
this.sslCtx = SecurityUtility.createNettySslContextForServer(serviceConfig.isTlsAllowInsecureConnection(),
serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
serviceConfig.isTlsRequireTrustedClientCertOnConnect());
} else {
this.sslCtx = null;
}
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
if (enableTLS) {
SslContext sslCtx = SecurityUtility.createNettySslContextForServer(
serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustCertsFilePath(),
serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath(),
serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
serviceConfig.isTlsRequireTrustedClientCertOnConnect());
if (sslCtx != null) {
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ private static String getTLSFile(String name) {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setTlsCertificateFilePath(getTLSFile("broker.cert"));
conf.setTlsKeyFilePath(getTLSFile("broker.key-pk8"));
conf.setTlsTrustCertsFilePath(getTLSFile("ca.cert"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ private static String getTLSFile(String name) {
@BeforeMethod
@Override
public void setup() throws Exception {
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
buildConf(conf);
super.internalSetup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,8 @@ public MockedPulsarServiceBaseTest() {
protected void resetConfig() {
this.conf = new ServiceConfiguration();
this.conf.setBrokerServicePort(BROKER_PORT);
this.conf.setBrokerServicePortTls(BROKER_PORT_TLS);
this.conf.setAdvertisedAddress("localhost");
this.conf.setWebServicePort(BROKER_WEBSERVICE_PORT);
this.conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
this.conf.setClusterName(configClusterName);
this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate
this.conf.setManagedLedgerCacheSizeMB(8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ public void testTlsEnabled() throws Exception {
final String subName = "newSub";

conf.setAuthenticationEnabled(false);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
restartBroker();
Expand Down Expand Up @@ -523,6 +525,8 @@ public void testTlsAuthAllowInsecure() throws Exception {

conf.setAuthenticationEnabled(true);
conf.setAuthenticationProviders(providers);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(true);
Expand Down Expand Up @@ -581,6 +585,8 @@ public void testTlsAuthDisallowInsecure() throws Exception {

conf.setAuthenticationEnabled(true);
conf.setAuthenticationProviders(providers);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(false);
Expand Down Expand Up @@ -638,6 +644,8 @@ public void testTlsAuthUseTrustCert() throws Exception {

conf.setAuthenticationEnabled(true);
conf.setAuthenticationProviders(providers);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);

conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ public void testTlsSyncProducerAndConsumerWithInvalidBrokerHost(boolean hostname

this.hostnameVerificationEnabled = hostnameVerificationEnabled;
// setup broker cert which has CN = "pulsar" different than broker's hostname="localhost"
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setTlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH);
conf.setTlsCertificateFilePath(TLS_MIM_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_MIM_SERVER_KEY_FILE_PATH);
Expand Down Expand Up @@ -190,6 +192,8 @@ public void testTlsSyncProducerAndConsumerCorrectBrokerHost() throws Exception {
log.info("-- Starting {} test --", methodName);

// setup broker cert which has CN = "localhost"
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ public void testMultipleBrokerLookup() throws Exception {
/**** start broker-2 ****/
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setBrokerServicePort(PortManager.nextFreePort());
conf2.setBrokerServicePortTls(PortManager.nextFreePort());
conf2.setWebServicePort(PortManager.nextFreePort());
conf2.setWebServicePortTls(PortManager.nextFreePort());
conf2.setAdvertisedAddress("localhost");
conf2.setClusterName(conf.getClusterName());
conf2.setZookeeperServers("localhost:2181");
Expand Down Expand Up @@ -222,9 +220,7 @@ public void testMultipleBrokerDifferentClusterLookup() throws Exception {
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setAdvertisedAddress("localhost");
conf2.setBrokerServicePort(PortManager.nextFreePort());
conf2.setBrokerServicePortTls(PortManager.nextFreePort());
conf2.setWebServicePort(PortManager.nextFreePort());
conf2.setWebServicePortTls(PortManager.nextFreePort());
conf2.setAdvertisedAddress("localhost");
conf2.setClusterName(newCluster); // Broker2 serves newCluster
conf2.setZookeeperServers("localhost:2181");
Expand Down Expand Up @@ -313,9 +309,7 @@ public void testPartitionTopicLookup() throws Exception {
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setAdvertisedAddress("localhost");
conf2.setBrokerServicePort(PortManager.nextFreePort());
conf2.setBrokerServicePortTls(PortManager.nextFreePort());
conf2.setWebServicePort(PortManager.nextFreePort());
conf2.setWebServicePortTls(PortManager.nextFreePort());
conf2.setAdvertisedAddress("localhost");
conf2.setClusterName(pulsar.getConfiguration().getClusterName());
conf2.setZookeeperServers("localhost:2181");
Expand Down Expand Up @@ -401,6 +395,8 @@ public void testWebserviceServiceTls() throws Exception {
PulsarService pulsar2 = startBroker(conf2);

// restart broker1 with tls enabled
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setTlsAllowInsecureConnection(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
Expand Down Expand Up @@ -529,6 +525,8 @@ public void testDiscoveryLookupTls() throws Exception {
final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key";

// (1) restart broker1 with tls enabled
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setTlsAllowInsecureConnection(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
Expand Down Expand Up @@ -804,9 +802,7 @@ public void testSplitUnloadLookupTest() throws Exception {
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setAdvertisedAddress("localhost");
conf2.setBrokerServicePort(PortManager.nextFreePort());
conf2.setBrokerServicePortTls(PortManager.nextFreePort());
conf2.setWebServicePort(PortManager.nextFreePort());
conf2.setWebServicePortTls(PortManager.nextFreePort());
conf2.setAdvertisedAddress("localhost");
conf2.setClusterName(conf.getClusterName());
conf2.setZookeeperServers("localhost:2181");
Expand Down Expand Up @@ -909,9 +905,7 @@ public void testModularLoadManagerSplitBundle() throws Exception {
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setAdvertisedAddress("localhost");
conf2.setBrokerServicePort(PortManager.nextFreePort());
conf2.setBrokerServicePortTls(PortManager.nextFreePort());
conf2.setWebServicePort(PortManager.nextFreePort());
conf2.setWebServicePortTls(PortManager.nextFreePort());
conf2.setAdvertisedAddress("localhost");
conf2.setClusterName(conf.getClusterName());
conf2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ public void testCreateClientWithAutoChangedServiceUrlProvider() throws Exception
PulsarService pulsarService1 = pulsar;
conf.setBrokerServicePort(PortManager.nextFreePort());
conf.setWebServicePort(PortManager.nextFreePort());
conf.setBrokerServicePortTls(PortManager.nextFreePort());
conf.setWebServicePortTls(PortManager.nextFreePort());
startBroker();
PulsarService pulsarService2 = pulsar;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ protected void cleanup() throws Exception {
}

protected void internalSetUpForBroker() throws Exception {
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.cert.X509Certificate;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
Expand All @@ -31,11 +30,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;

import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,12 +43,8 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.Future;
Expand All @@ -66,14 +58,12 @@ public class ConnectionPool implements Closeable {

protected final DnsNameResolver dnsResolver;

private static final int MaxMessageSize = 5 * 1024 * 1024;
public static final String TLS_HANDLER = "tls";

public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
}

public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, Supplier<ClientCnx> clientCnxSupplier) {
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
this.eventLoopGroup = eventLoopGroup;
this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();

Expand All @@ -85,29 +75,13 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs());
bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
if (conf.isUseTls()) {
SslContext sslCtx;
// Set client certificate if available
AuthenticationDataProvider authData = conf.getAuthentication().getAuthData();
if (authData.hasDataForTls()) {
sslCtx = SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustCertsFilePath(), (X509Certificate[]) authData.getTlsCertificates(),
authData.getTlsPrivateKey());
} else {
sslCtx = SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustCertsFilePath());
}
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
} else {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
});

try {
bootstrap.handler(new PulsarChannelInitializer(conf, clientCnxSupplier));
} catch (Exception e) {
log.error("Failed to create channel initializer");
throw new PulsarClientException(e);
}

this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true)
.channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
Expand All @@ -130,7 +104,8 @@ void closeAllConnections() {
// If the future already failed, there's nothing we have to do
}
} else {
// The future is still pending: just register to make sure it gets closed if the operation will succeed
// The future is still pending: just register to make sure it gets closed if the operation will
// succeed
future.thenAccept(ClientCnx::close);
}
});
Expand Down
Loading

0 comments on commit f22bbea

Please sign in to comment.