Skip to content

Commit

Permalink
Enable specification of TLS Protocol Versions and Cipher Suites (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
Jai Asher authored Feb 13, 2018
1 parent 7dd64d0 commit b990674
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
private String tlsTrustCertsFilePath = "";
// Accept untrusted TLS certificate from client
private boolean tlsAllowInsecureConnection = false;

// Specify the tls protocols the broker will use to negotiate during TLS Handshake.
// Example:- [TLSv1.2, TLSv1.1, TLSv1]
private Set<String> tlsProtocols = Sets.newTreeSet();
// Specify the tls cipher the broker will use to negotiate during TLS Handshake.
// Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
private Set<String> tlsCiphers = Sets.newTreeSet();

/***** --- Authentication --- ****/
// Enable authentication
private boolean authenticationEnabled = false;
Expand Down Expand Up @@ -1400,4 +1406,20 @@ public boolean authenticateOriginalAuthData() {
public void setAuthenticateOriginalAuthData(boolean authenticateOriginalAuthData) {
this.authenticateOriginalAuthData = authenticateOriginalAuthData;
}

public Set<String> getTlsProtocols() {
return tlsProtocols;
}

public void setTlsProtocols(Set<String> tlsProtocols) {
this.tlsProtocols = tlsProtocols;
}

public Set<String> getTlsCiphers() {
return tlsCiphers;
}

public void setTlsCiphers(Set<String> tlsCiphers) {
this.tlsCiphers = tlsCiphers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ public PulsarChannelInitializer(BrokerService brokerService, ServiceConfiguratio
@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (enableTLS) {
SslContext sslCtx = SecurityUtility.createNettySslContextForServer(serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath());
SslContext sslCtx = SecurityUtility.createNettySslContextForServer(
serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustCertsFilePath(),
serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath(),
serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols());
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,13 @@ public void close() {
}
};

public static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTime)
public static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
throws Exception {
for (int i = 0; i < retryCount; i++) {
if (predicate.test(null) || i == (retryCount - 1)) {
break;
}
Thread.sleep(intSleepTime + (intSleepTime * i));
Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,10 @@ private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
SSLSession sslSession = null;
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
if (log.isDebugEnabled()) {
log.debug("Verifying HostName for {}, Cipher {}, Protocols {}", hostname, sslSession.getCipherSuite(),
sslSession.getProtocol());
}
return hostnameVerifier.verify(hostname, sslSession);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.security.spec.PKCS8EncodedKeySpec;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.Set;

import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
Expand Down Expand Up @@ -95,12 +97,21 @@ public static SslContext createNettySslContextForClient(boolean allowInsecureCon
}

public static SslContext createNettySslContextForServer(boolean allowInsecureConnection, String trustCertsFilePath,
String certFilePath, String keyFilePath)
String certFilePath, String keyFilePath, Set<String> ciphers, Set<String> protocols)
throws GeneralSecurityException, SSLException, FileNotFoundException {
X509Certificate[] certificates = loadCertificatesFromPemFile(certFilePath);
PrivateKey privateKey = loadPrivateKeyFromPemFile(keyFilePath);

SslContextBuilder builder = SslContextBuilder.forServer(privateKey, (X509Certificate[]) certificates);
if (ciphers != null && ciphers.size() > 0) {
builder.ciphers(ciphers);
}

if (protocols != null && protocols.size() > 0) {
String[] protocolsArray = new String[protocols.size()];
builder.protocols(protocols.toArray(protocolsArray));
}

if (allowInsecureConnection) {
builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
if (enableTLS) {
SslContext sslCtx = SecurityUtility.createNettySslContextForServer(
serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustCertsFilePath(),
serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath());
serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath(),
serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols());
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
}
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ public class ServiceConfig implements PulsarConfiguration {
private String tlsTrustCertsFilePath = "";
// Accept untrusted TLS certificate from client
private boolean tlsAllowInsecureConnection = false;

// Specify the tls protocols the broker will use to negotiate during TLS Handshake.
// Example:- [TLSv1.2, TLSv1.1, TLSv1]
private Set<String> tlsProtocols = Sets.newTreeSet();
// Specify the tls cipher the broker will use to negotiate during TLS Handshake.
// Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
private Set<String> tlsCiphers = Sets.newTreeSet();

private Properties properties = new Properties();

public String getZookeeperServers() {
Expand Down Expand Up @@ -244,4 +250,20 @@ public Properties getProperties() {
public void setProperties(Properties properties) {
this.properties = properties;
}

public Set<String> getTlsProtocols() {
return tlsProtocols;
}

public void setTlsProtocols(Set<String> tlsProtocols) {
this.tlsProtocols = tlsProtocols;
}

public Set<String> getTlsCiphers() {
return tlsCiphers;
}

public void setTlsCiphers(Set<String> tlsCiphers) {
this.tlsCiphers = tlsCiphers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,13 @@ public class ProxyConfiguration implements PulsarConfiguration {
private boolean tlsAllowInsecureConnection = false;
// Validates hostname when proxy creates tls connection with broker
private boolean tlsHostnameVerificationEnabled = false;

// Specify the tls protocols the broker will use to negotiate during TLS Handshake.
// Example:- [TLSv1.2, TLSv1.1, TLSv1]
private Set<String> tlsProtocols = Sets.newTreeSet();
// Specify the tls cipher the broker will use to negotiate during TLS Handshake.
// Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
private Set<String> tlsCiphers = Sets.newTreeSet();

private Properties properties = new Properties();

public boolean forwardAuthorizationCredentials() {
Expand Down Expand Up @@ -289,4 +295,20 @@ public Properties getProperties() {
public void setProperties(Properties properties) {
this.properties = properties;
}

public Set<String> getTlsProtocols() {
return tlsProtocols;
}

public void setTlsProtocols(Set<String> tlsProtocols) {
this.tlsProtocols = tlsProtocols;
}

public Set<String> getTlsCiphers() {
return tlsCiphers;
}

public void setTlsCiphers(Set<String> tlsCiphers) {
this.tlsCiphers = tlsCiphers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
if (enableTLS) {
SslContext sslCtx = SecurityUtility.createNettySslContextForServer(true /* to allow InsecureConnection */,
serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
serviceConfig.getTlsKeyFilePath());
serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols());
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -75,10 +76,65 @@ public class ProxyWithProxyAuthorizationTest extends ProducerConsumerBase {
private ProxyConfiguration proxyConfig = new ProxyConfiguration();

@DataProvider(name = "hostnameVerification")
public Object[][] codecProvider() {
public Object[][] hostnameVerificationCodecProvider() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}

@DataProvider(name = "protocolsCiphersProvider")
public Object[][] protocolsCiphersProviderCodecProvider() {
// Test using defaults
Set<String> ciphers_1 = Sets.newTreeSet();
Set<String> protocols_1 = Sets.newTreeSet();

// Test explicitly specifying protocols defaults
Set<String> ciphers_2 = Sets.newTreeSet();
Set<String> protocols_2 = Sets.newTreeSet();
protocols_2.add("TLSv1.2");
protocols_2.add("TLSv1.1");
protocols_2.add("TLSv1");

// Test for invalid ciphers
Set<String> ciphers_3 = Sets.newTreeSet();
Set<String> protocols_3 = Sets.newTreeSet();
ciphers_3.add("INVALID_PROTOCOL");

// Incorrect Config since TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 was introduced in TLSv1.2
Set<String> ciphers_4 = Sets.newTreeSet();
Set<String> protocols_4 = Sets.newTreeSet();
ciphers_4.add("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");
protocols_4.add("TLSv1.1");

// Incorrect Config since TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 was introduced in TLSv1.2
Set<String> ciphers_5 = Sets.newTreeSet();
Set<String> protocols_5 = Sets.newTreeSet();
ciphers_5.add("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");
protocols_5.add("TLSv1");

// Correct Config
Set<String> ciphers_6 = Sets.newTreeSet();
Set<String> protocols_6 = Sets.newTreeSet();
ciphers_6.add("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");
protocols_6.add("TLSv1.2");

// In correct config - JDK 8 doesn't support TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
Set<String> ciphers_7 = Sets.newTreeSet();
Set<String> protocols_7 = Sets.newTreeSet();
protocols_7.add("TLSv1.2");
ciphers_7.add("TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384");

// Correct config - Atlease one of the Cipher Suite is supported
Set<String> ciphers_8 = Sets.newTreeSet();
Set<String> protocols_8 = Sets.newTreeSet();
protocols_8.add("TLSv1.2");
ciphers_8.add("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");
ciphers_8.add("TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384");

return new Object[][] { { ciphers_1, protocols_1, Boolean.FALSE }, { ciphers_2, protocols_2, Boolean.FALSE },
{ ciphers_3, protocols_3, Boolean.TRUE }, { ciphers_4, protocols_4, Boolean.TRUE },
{ ciphers_5, protocols_5, Boolean.TRUE }, { ciphers_6, protocols_6, Boolean.FALSE },
{ ciphers_7, protocols_7, Boolean.TRUE }, { ciphers_8, protocols_8, Boolean.FALSE }};
}

@BeforeMethod
@Override
protected void setup() throws Exception {
Expand Down Expand Up @@ -162,15 +218,14 @@ void startProxy() throws Exception {
* @throws Exception
*/
@Test
public void textProxyAuthorization() throws Exception {
public void testProxyAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);

startProxy();
createAdminClient();
final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
// create a client which connects to proxy over tls and pass authData
ClientConfiguration clientConf = new ClientConfiguration();
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientConf);
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, new ClientConfiguration());

String namespaceName = "my-property/proxy-authorization/my-ns";

Expand Down Expand Up @@ -215,7 +270,7 @@ public void textProxyAuthorization() throws Exception {
}

@Test(dataProvider = "hostnameVerification")
public void textTlsHostVerificationProxyToClient(boolean hostnameVerificationEnabled) throws Exception {
public void testTlsHostVerificationProxyToClient(boolean hostnameVerificationEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

startProxy();
Expand Down Expand Up @@ -266,7 +321,7 @@ public void textTlsHostVerificationProxyToClient(boolean hostnameVerificationEna
* @throws Exception
*/
@Test(dataProvider = "hostnameVerification")
public void textTlsHostVerificationProxyToBroker(boolean hostnameVerificationEnabled) throws Exception {
public void testTlsHostVerificationProxyToBroker(boolean hostnameVerificationEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

proxyConfig.setTlsHostnameVerificationEnabled(hostnameVerificationEnabled);
Expand Down Expand Up @@ -306,6 +361,85 @@ public void textTlsHostVerificationProxyToBroker(boolean hostnameVerificationEna
log.info("-- Exiting {} test --", methodName);
}

/*
* This test verifies whether the Client and Proxy honor the protocols and ciphers specified.
* Details description of test cases can be found in protocolsCiphersProviderCodecProvider
*/
@Test(dataProvider = "protocolsCiphersProvider", timeOut=5000)
public void tlsCiphersAndProtocols(Set<String> tlsCiphers, Set<String> tlsProtocols, boolean expectFailure) throws Exception {
log.info("-- Starting {} test --", methodName);
String namespaceName = "my-property/proxy-authorization/my-ns";
createAdminClient();

admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
admin.namespaces().createNamespace(namespaceName);

admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));

ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setAuthorizationEnabled(false);
proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS);

proxyConfig.setServicePort(PortManager.nextFreePort());
proxyConfig.setServicePortTls(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
proxyConfig.setWebServicePortTls(PortManager.nextFreePort());
proxyConfig.setTlsEnabledInProxy(true);
proxyConfig.setTlsEnabledWithBroker(true);

// enable tls and auth&auth at proxy
proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
proxyConfig.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH);

proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(
"tlsCertFile:" + TLS_PROXY_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_PROXY_KEY_FILE_PATH);

Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
conf.setAuthenticationProviders(providers);
proxyConfig.setAuthenticationProviders(providers);
proxyConfig.setTlsProtocols(tlsProtocols);
proxyConfig.setTlsCiphers(tlsCiphers);
ProxyService proxyService = Mockito.spy(new ProxyService(proxyConfig));
proxyService.start();
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically((test) -> {
try {
return admin.namespaces().getPermissions(namespaceName).containsKey("Proxy")
&& admin.namespaces().getPermissions(namespaceName).containsKey("Client");
} catch (PulsarAdminException e) {
return false;
}
}, 3, 1000);
try {

final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
ClientConfiguration clientConf = new ClientConfiguration();
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientConf);
Consumer consumer = proxyClient.subscribe("persistent://my-property/proxy-authorization/my-ns/my-topic1",
"my-subscriber-name", new ConsumerConfiguration());

if (expectFailure) {
Assert.fail("Failure expected for this test case");
}
consumer.close();
proxyClient.close();
} catch (Exception ex) {
if (!expectFailure) {
Assert.fail("This test case should not fail");
}
}
admin.close();
log.info("-- Exiting {} test --", methodName);
}

protected final void createAdminClient() throws Exception {
Map<String, String> authParams = Maps.newHashMap();
authParams.put("tlsCertFile", TLS_SUPERUSER_CLIENT_CERT_FILE_PATH);
Expand Down

0 comments on commit b990674

Please sign in to comment.