Skip to content

Commit

Permalink
Always use SNI for TLS enabled Pulsar Java client. (apache#8117)
Browse files Browse the repository at this point in the history
Co-authored-by: Rolf Arne Corneliussen <[email protected]>
  • Loading branch information
racorn and Rolf Arne Corneliussen authored Sep 24, 2020
1 parent 6a0a3a6 commit f2933f7
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import java.net.InetAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.testng.annotations.Test;

import lombok.Cleanup;

public class TlsSniTest extends TlsProducerConsumerBase {

/**
* Verify that using an IP-address in the broker service URL will work with using the SNI capabilities
* of the client. If we try to create an {@link javax.net.ssl.SSLEngine} with a peer host that is an
* IP address, the peer host is ignored, see for example
* {@link io.netty.handler.ssl.ReferenceCountedOpenSslEngine}.
*
*/
@Test
public void testIpAddressInBrokerServiceUrl() throws Exception {
String topicName = "persistent://my-property/use/my-ns/my-topic1";

URI brokerServiceUrlTls = new URI(pulsar.getBrokerServiceUrlTls());

String brokerServiceIpAddressUrl = String.format("pulsar+ssl://%s:%d",
InetAddress.getByName(brokerServiceUrlTls.getHost()).getHostAddress(),
brokerServiceUrlTls.getPort());

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceIpAddressUrl)
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(false)
.enableTlsHostnameVerification(false)
.operationTimeout(1000, TimeUnit.MILLISECONDS);
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
clientBuilder.authentication(AuthenticationTls.class.getName(), authParams);

@Cleanup
PulsarClient pulsarClient = clientBuilder.build();
// should be able to create producer successfully
pulsarClient.newProducer().topic(topicName).create();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou
bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);

try {
channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier, isSniProxy);
channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
bootstrap.handler(channelInitializerHandler);
} catch (Exception e) {
log.error("Failed to create channel initializer");
Expand Down Expand Up @@ -293,39 +293,12 @@ CompletableFuture<List<InetAddress>> resolveName(String hostname) {
* Attempt to establish a TCP connection to an already resolved single IP address
*/
private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();
// if proxy is configured in pulsar-client then make it thread-safe while updating channelInitializerHandler
if (isSniProxy) {
bootstrap.register().addListener((ChannelFuture cf) -> {
if (!cf.isSuccess()) {
future.completeExceptionally(cf.cause());
return;
}
Channel channel = cf.channel();
try {
channelInitializerHandler.initChannel(channel, sniHost);
channel.connect(new InetSocketAddress(ipAddress, port)).addListener((ChannelFuture channelFuture) -> {
if (channelFuture.isSuccess()) {
future.complete(channelFuture.channel());
} else {
future.completeExceptionally(channelFuture.cause());
}
});
} catch (Exception e) {
log.warn("Failed to initialize channel with {}, {}", ipAddress, sniHost, e);
future.completeExceptionally(e);
}
});
} else {
bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> {
if (channelFuture.isSuccess()) {
future.complete(channelFuture.channel());
} else {
future.completeExceptionally(channelFuture.cause());
}
});
}
return future;
InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port);
return adapt(bootstrap.register())
.thenCompose(channel -> clientConfig.isUseTls()
? channelInitializerHandler.initTls(channel, sniHost != null ? sniHost : remoteAddress)
: CompletableFuture.completedFuture(channel))
.thenCompose(channel -> adapt(channel.connect(remoteAddress)));
}

public void releaseConnection(ClientCnx cnx) {
Expand Down Expand Up @@ -364,12 +337,25 @@ int getPoolSize() {
}

public static int signSafeMod(long dividend, int divisor) {
int mod = (int) (dividend % (long) divisor);
int mod = (int) (dividend % divisor);
if (mod < 0) {
mod += divisor;
}
return mod;
}

private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);

private static CompletableFuture<Channel> adapt(ChannelFuture channelFuture) {
CompletableFuture<Channel> adapter = new CompletableFuture<>();
channelFuture.addListener((ChannelFuture cf) ->{
if (cf.isSuccess()) {
adapter.complete(channelFuture.channel());
} else {
adapter.completeExceptionally(channelFuture.cause());
}
});
return adapter;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.security.cert.X509Certificate;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand Down Expand Up @@ -52,17 +52,15 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>

private final Supplier<SslContext> sslContextSupplier;
private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
private final boolean isSniProxyEnabled;

private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(1);

public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx> clientCnxSupplier, boolean isSniProxyEnabled)
public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx> clientCnxSupplier)
throws Exception {
super();
this.clientCnxSupplier = clientCnxSupplier;
this.tlsEnabled = conf.isUseTls();
this.tlsEnabledWithKeyStore = conf.isUseKeyStoreTls();
this.isSniProxyEnabled = isSniProxyEnabled;

if (tlsEnabled) {
if (tlsEnabledWithKeyStore) {
Expand All @@ -88,10 +86,10 @@ public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx
return authData.getTlsTrustStoreStream() == null
? SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustCertsFilePath(),
(X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey())
authData.getTlsCertificates(), authData.getTlsPrivateKey())
: SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
authData.getTlsTrustStoreStream(),
(X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey());
authData.getTlsCertificates(), authData.getTlsPrivateKey());
} else {
return SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustCertsFilePath());
Expand All @@ -107,33 +105,35 @@ public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx

@Override
public void initChannel(SocketChannel ch) throws Exception {
/**
* skip initializing channel if sni-proxy is enabled in that case {@link ConnectionPool} will initialize the
* channel explicitly.
*/
if (!isSniProxyEnabled) {
initChannel(ch, null);
}
}

public void initChannel(Channel ch, InetSocketAddress sniHost) throws Exception {
if (tlsEnabled) {
if (tlsEnabledWithKeyStore) {
ch.pipeline().addLast(TLS_HANDLER,
new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
} else {
SslHandler handler = sniHost != null
? sslContextSupplier.get().newHandler(ch.alloc(), sniHost.getHostName(), sniHost.getPort())
: sslContextSupplier.get().newHandler(ch.alloc());
ch.pipeline().addLast(TLS_HANDLER, handler);
}
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
} else {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
// Setup channel except for the SsHandler for TLS enabled connections

ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER);

ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("handler", clientCnxSupplier.get());
}

CompletableFuture<Channel> initTls(Channel ch, InetSocketAddress sniHost) {
if (!tlsEnabled) {
throw new IllegalStateException("TLS is not enabled in client configuration");
}
CompletableFuture<Channel> initTlsFuture = new CompletableFuture<>();
ch.eventLoop().execute(() -> {
try {
SslHandler handler = tlsEnabledWithKeyStore
? new SslHandler(nettySSLContextAutoRefreshBuilder.get()
.createSSLEngine(sniHost.getHostString(), sniHost.getPort()))
: sslContextSupplier.get().newHandler(ch.alloc(), sniHost.getHostString(), sniHost.getPort());
ch.pipeline().addFirst(TLS_HANDLER, handler);
initTlsFuture.complete(ch);
} catch (Throwable t) {
initTlsFuture.completeExceptionally(t);
}
});

return initTlsFuture;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,14 @@ public SSLContext createSSLContext() throws GeneralSecurityException, IOExceptio
}

public SSLEngine createSSLEngine() {
SSLEngine sslEngine = sslContext.createSSLEngine();
return configureSSLEngine(sslContext.createSSLEngine());
}

public SSLEngine createSSLEngine(String peerHost, int peerPort) {
return configureSSLEngine(sslContext.createSSLEngine(peerHost, peerPort));
}

private SSLEngine configureSSLEngine(SSLEngine sslEngine) {
sslEngine.setEnabledProtocols(sslEngine.getSupportedProtocols());
sslEngine.setEnabledCipherSuites(sslEngine.getSupportedCipherSuites());

Expand All @@ -177,7 +183,6 @@ public SSLEngine createSSLEngine() {
} else {
sslEngine.setUseClientMode(true);
}

return sslEngine;
}

Expand Down Expand Up @@ -353,3 +358,4 @@ public static SslContextFactory createSslContextFactory(String sslProviderString
return sslCtxFactory;
}
}

0 comments on commit f2933f7

Please sign in to comment.