Skip to content

Commit

Permalink
Enable TLS in client if serviceUrl has pulsar+ssl:// or https:// (apa…
Browse files Browse the repository at this point in the history
…che#2315)

It's silly to have to do enableTls(true) if pulsar+ssl or https is there.
  • Loading branch information
ivankelly authored and sijie committed Aug 7, 2018
1 parent 15441f8 commit 7e16f13
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 38 deletions.
3 changes: 0 additions & 3 deletions conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ authPlugin=
# authParams=tlsCertFile:/path/to/client-cert.pem,tlsKeyFile:/path/to/client-key.pem
authParams=

# Whether to use TLS. Defaults to false.
useTls=

# Allow TLS connections to servers whose certificate cannot be
# be verified to have been signed by a trusted certificate
# authority.
Expand Down
33 changes: 23 additions & 10 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,30 +64,43 @@ const std::string generateRandomName() {
}
typedef boost::unique_lock<boost::mutex> Lock;

static const std::string https("https");
static const std::string pulsarSsl("pulsar+ssl");

static const ClientConfiguration detectTls(const std::string& serviceUrl,
const ClientConfiguration& clientConfiguration) {
ClientConfiguration conf(clientConfiguration);
if (serviceUrl.compare(0, https.size(), https) == 0 ||
serviceUrl.compare(0, pulsarSsl.size(), pulsarSsl) == 0) {
conf.setUseTls(true);
}
return conf;
}

ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
bool poolConnections)
: mutex_(),
state_(Open),
serviceUrl_(serviceUrl),
clientConfiguration_(clientConfiguration),
ioExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(clientConfiguration.getIOThreads())),
clientConfiguration_(detectTls(serviceUrl, clientConfiguration)),
ioExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads())),
listenerExecutorProvider_(
boost::make_shared<ExecutorServiceProvider>(clientConfiguration.getMessageListenerThreads())),
boost::make_shared<ExecutorServiceProvider>(clientConfiguration_.getMessageListenerThreads())),
partitionListenerExecutorProvider_(
boost::make_shared<ExecutorServiceProvider>(clientConfiguration.getMessageListenerThreads())),
pool_(clientConfiguration, ioExecutorProvider_, clientConfiguration.getAuthPtr(), poolConnections),
boost::make_shared<ExecutorServiceProvider>(clientConfiguration_.getMessageListenerThreads())),
pool_(clientConfiguration_, ioExecutorProvider_, clientConfiguration_.getAuthPtr(), poolConnections),
producerIdGenerator_(0),
consumerIdGenerator_(0),
requestIdGenerator_(0) {
if (clientConfiguration.getLogger()) {
if (clientConfiguration_.getLogger()) {
// A logger factory was explicitely configured. Let's just use that
LogUtils::setLoggerFactory(clientConfiguration.getLogger());
LogUtils::setLoggerFactory(clientConfiguration_.getLogger());
} else {
#ifdef USE_LOG4CXX
if (!clientConfiguration.getLogConfFilePath().empty()) {
if (!clientConfiguration_.getLogConfFilePath().empty()) {
// A log4cxx log file was passed through deprecated parameter. Use that to configure Log4CXX
LogUtils::setLoggerFactory(
Log4CxxLoggerFactory::create(clientConfiguration.getLogConfFilePath()));
Log4CxxLoggerFactory::create(clientConfiguration_.getLogConfFilePath()));
} else {
// Use default simple console logger
LogUtils::setLoggerFactory(SimpleLoggerFactory::create());
Expand All @@ -102,7 +115,7 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
LOG_DEBUG("Using HTTP Lookup");
lookupServicePtr_ =
boost::make_shared<HTTPLookupService>(boost::cref(serviceUrl_), boost::cref(clientConfiguration_),
boost::cref(clientConfiguration.getAuthPtr()));
boost::cref(clientConfiguration_.getAuthPtr()));
} else {
LOG_DEBUG("Using Binary Lookup");
lookupServicePtr_ =
Expand Down
21 changes: 21 additions & 0 deletions pulsar-client-cpp/tests/AuthPluginTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,27 @@ TEST(AuthPluginTest, testTls) {
ASSERT_EQ(i, numOfMessages);
}

TEST(AuthPluginTest, testTlsDetectPulsarSsl) {
ClientConfiguration config = ClientConfiguration();
config.setTlsTrustCertsFilePath("../../pulsar-broker/src/test/resources/authentication/tls/cacert.pem");
config.setTlsAllowInsecureConnection(false);
AuthenticationPtr auth =
pulsar::AuthTls::create("../../pulsar-broker/src/test/resources/authentication/tls/client-cert.pem",
"../../pulsar-broker/src/test/resources/authentication/tls/client-key.pem");
config.setAuth(auth);

Client client("pulsar+ssl://localhost:9886", config);

std::string topicName = "persistent://property/cluster/namespace/test-tls-detect";

Producer producer;
Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();
Result result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);
}

namespace testAthenz {
std::string principalToken;
void mockZTS() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class PulsarClientTool {
@Parameter(names = { "-h", "--help", }, help = true, description = "Show this help.")
boolean help;

boolean useTls = false;
boolean tlsAllowInsecureConnection = false;
boolean tlsEnableHostnameVerification = false;
String tlsTrustCertsFilePath = null;
Expand All @@ -69,7 +68,6 @@ public PulsarClientTool(Properties properties) throws MalformedURLException {
}
this.authPluginClassName = properties.getProperty("authPlugin");
this.authParams = properties.getProperty("authParams");
this.useTls = Boolean.parseBoolean(properties.getProperty("useTls"));
this.tlsAllowInsecureConnection = Boolean
.parseBoolean(properties.getProperty("tlsAllowInsecureConnection", "false"));
this.tlsEnableHostnameVerification = Boolean
Expand All @@ -91,7 +89,6 @@ private void updateConfig() throws UnsupportedAuthenticationException, Malformed
if (isNotBlank(this.authPluginClassName)) {
clientBuilder.authentication(authPluginClassName, authParams);
}
clientBuilder.enableTls(this.useTls);
clientBuilder.allowTlsInsecureConnection(this.tlsAllowInsecureConnection);
clientBuilder.tlsTrustCertsFilePath(this.tlsTrustCertsFilePath);
clientBuilder.serviceUrl(serviceURL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,13 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
ClientBuilder enableTcpNoDelay(boolean enableTcpNoDelay);

/**
* Configure whether to use TLS encryption on the connection <i>(default: false)</i>
* Configure whether to use TLS encryption on the connection
* <i>(default: true if serviceUrl starts with "pulsar+ssl://", false otherwise)</i>
*
* @param enableTls
* @deprecated use "pulsar+ssl://" in serviceUrl to enable
*/
@Deprecated
ClientBuilder enableTls(boolean enableTls);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;

public class ClientBuilderImpl implements ClientBuilder {

ClientConfigurationData conf;

public ClientBuilderImpl() {
Expand Down Expand Up @@ -66,6 +65,9 @@ public ClientBuilder loadConf(Map<String, Object> config) {
@Override
public ClientBuilder serviceUrl(String serviceUrl) {
conf.setServiceUrl(serviceUrl);
if (!conf.isUseTls()) {
enableTls(serviceUrl.startsWith("pulsar+ssl") || serviceUrl.startsWith("https"));
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ public class BuildersTest {

@Test
public void clientBuilderTest() {
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().enableTls(true).ioThreads(10)
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().ioThreads(10)
.maxNumberOfRejectedRequestPerConnection(200).serviceUrl("pulsar://service:6650");

assertEquals(clientBuilder.conf.isUseTls(), true);
assertEquals(clientBuilder.conf.isUseTls(), false);
assertEquals(clientBuilder.conf.getServiceUrl(), "pulsar://service:6650");

ClientBuilderImpl b2 = (ClientBuilderImpl) clientBuilder.clone();
Expand All @@ -43,4 +43,30 @@ public void clientBuilderTest() {
assertEquals(b2.conf.getServiceUrl(), "pulsar://other-broker:6650");
}

@Test
public void enableTlsTest() {
ClientBuilderImpl builder = (ClientBuilderImpl)PulsarClient.builder().serviceUrl("pulsar://service:6650");
assertEquals(builder.conf.isUseTls(), false);
assertEquals(builder.conf.getServiceUrl(), "pulsar://service:6650");

builder = (ClientBuilderImpl)PulsarClient.builder().serviceUrl("http://service:6650");
assertEquals(builder.conf.isUseTls(), false);
assertEquals(builder.conf.getServiceUrl(), "http://service:6650");

builder = (ClientBuilderImpl)PulsarClient.builder().serviceUrl("pulsar+ssl://service:6650");
assertEquals(builder.conf.isUseTls(), true);
assertEquals(builder.conf.getServiceUrl(), "pulsar+ssl://service:6650");

builder = (ClientBuilderImpl)PulsarClient.builder().serviceUrl("https://service:6650");
assertEquals(builder.conf.isUseTls(), true);
assertEquals(builder.conf.getServiceUrl(), "https://service:6650");

builder = (ClientBuilderImpl)PulsarClient.builder().serviceUrl("pulsar://service:6650").enableTls(true);
assertEquals(builder.conf.isUseTls(), true);
assertEquals(builder.conf.getServiceUrl(), "pulsar://service:6650");

builder = (ClientBuilderImpl)PulsarClient.builder().serviceUrl("pulsar+ssl://service:6650").enableTls(false);
assertEquals(builder.conf.isUseTls(), false);
assertEquals(builder.conf.getServiceUrl(), "pulsar+ssl://service:6650");
}
}
1 change: 0 additions & 1 deletion site/docs/latest/clients/Cpp.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ client.close();

```cpp
ClientConfiguration config = ClientConfiguration();
config.setUseTls(true);
config.setTlsTrustCertsFilePath("/path/to/cacert.pem");
config.setTlsAllowInsecureConnection(false);
config.setAuth(pulsar::AuthTls::create(
Expand Down
6 changes: 2 additions & 4 deletions site/docs/latest/clients/Java.md
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ Pulsar currently supports two authentication schemes: [TLS](../../security/tls)

### TLS Authentication

To use [TLS](../../security/tls), you need to set TLS to `true` using the `setUseTls` method, point your Pulsar client to a TLS cert path, and provide paths to cert and key files.
To use [TLS](../../security/tls), point your Pulsar client to a TLS cert path, and provide paths to cert and key files.

Here's an example configuration:

Expand All @@ -431,15 +431,14 @@ Authentication tlsAuth = AuthenticationFactory

PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://my-broker.com:6651")
.enableTls(true)
.tlsTrustCertsFilePath("/path/to/cacert.pem")
.authentication(tlsAuth)
.build();
```

### Athenz

To use [Athenz](../../security/athenz) as an authentication provider, you need to [use TLS](#tls-authentication) and provide values for four parameters in a hash:
To use [Athenz](../../security/athenz) as an authentication provider, you need to [use TLS transport](../../security/tls-transport) and provide values for four parameters in a hash:

* `tenantDomain`
* `tenantService`
Expand All @@ -461,7 +460,6 @@ Authentication athenzAuth = AuthenticationFactory

PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://my-broker.com:6651")
.enableTls(true)
.tlsTrustCertsFilePath("/path/to/cacert.pem")
.authentication(athenzAuth)
.build();
Expand Down
1 change: 0 additions & 1 deletion site/docs/latest/security/athenz.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationAthenz
authParams={"tenantDomain":"shopping","tenantService":"some_app","providerDomain":"pulsar","privateKey":"file:///path/to/private.pem","keyId":"v1"}

# Enable TLS
useTls=true
tlsAllowInsecureConnection=false
tlsTrustCertsFilePath=/path/to/cacert.pem
```
Expand Down
8 changes: 2 additions & 6 deletions site/docs/latest/security/authorization.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,33 +98,29 @@ The structure of topic names in Pulsar reflects the hierarchy between tenants, c
```java
String authPluginClassName = "com.org.MyAuthPluginClass";
String authParams = "param1:value1";
boolean useTls = false;
boolean tlsAllowInsecureConnection = false;
String tlsTrustCertsFilePath = null;

ClientConfiguration config = new ClientConfiguration();
config.setAuthentication(authPluginClassName, authParams);
config.setUseTls(useTls);
config.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
config.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);

PulsarAdmin admin = new PulsarAdmin(url, config);
PulsarAdmin admin = new PulsarAdmin("pulsar+ssl://service:6651", config);
```

To use TLS:

```java
String authPluginClassName = "com.org.MyAuthPluginClass";
String authParams = "param1:value1";
boolean useTls = false;
boolean tlsAllowInsecureConnection = false;
String tlsTrustCertsFilePath = null;

ClientConfiguration config = new ClientConfiguration();
config.setAuthentication(authPluginClassName, authParams);
config.setUseTls(useTls);
config.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
config.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);

PulsarAdmin admin = new PulsarAdmin(url, config);
PulsarAdmin admin = new PulsarAdmin("pulsar+ssl://service:6651", config);
```
3 changes: 0 additions & 3 deletions site/docs/latest/security/tls-transport.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ You'll need to add the following parameters to that file to use TLS transport wi
```properties
webServiceUrl=https://broker.example.com:8443/
brokerServiceUrl=pulsar+ssl://broker.example.com:6651/
useTls=true
tlsAllowInsecureConnection=false
tlsTrustCertsFilePath=/path/to/ca.cert.pem
```
Expand All @@ -180,7 +179,6 @@ import org.apache.pulsar.client.api.PulsarClient;

PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://broker.example.com:6651/")
.enableTls(true)
.tlsTrustCertsFilePath("/path/to/ca.cert.pem")
.build();
```
Expand All @@ -201,7 +199,6 @@ client = Client("pulsar+ssl://broker.example.com:6651/",
#include <pulsar/Client.h>

pulsar::ClientConfiguration config;
config.setUseTls(true);
config.setTlsTrustCertsFilePath("/path/to/ca.cert.pem");
config.setTlsAllowInsecureConnection(false);

Expand Down
3 changes: 0 additions & 3 deletions site/docs/latest/security/tls.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ You'll need to add the following parameters to that file to use TLS authenticati
```properties
webServiceUrl=https://broker.example.com:8443/
brokerServiceUrl=pulsar+ssl://broker.example.com:6651/
useTls=true
tlsAllowInsecureConnection=false
tlsTrustCertsFilePath=/path/to/ca.cert.pem
authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationTls
Expand All @@ -117,7 +116,6 @@ import org.apache.pulsar.client.api.PulsarClient;

PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://broker.example.com:6651/")
.enableTls(true)
.tlsTrustCertsFilePath("/path/to/ca.cert.pem")
.authentication("org.apache.pulsar.client.impl.auth.AuthenticationTls",
"tlsCertFile:/path/to/my-role.cert.pem,tlsKeyFile:/path/to/my-role.key-pk8.pem")
Expand All @@ -142,7 +140,6 @@ client = Client("pulsar+ssl://broker.example.com:6651/",
#include <pulsar/Client.h>

pulsar::ClientConfiguration config;
config.setUseTls(true);
config.setTlsTrustCertsFilePath("/path/to/ca.cert.pem");
config.setTlsAllowInsecureConnection(false);

Expand Down

0 comments on commit 7e16f13

Please sign in to comment.