Skip to content

Commit

Permalink
[Issue: 7379] Improve security setting of Pulsar Functions (apache#7424)
Browse files Browse the repository at this point in the history
Fixes apache#7379 

### Motivation

Rename some settings to make those broker-client TLS settings clearer.

### Modifications

- replace `clientAuthenticationParameters` with `brokerClientAuthenticationParameters`
- replace `clientAuthenticationPlugin` with `brokerClientAuthenticationPlugin`
- replace `tlsHostnameVerificationEnable` with `tlsEnableHostnameVerification`
  • Loading branch information
wolfstudy authored Jul 3, 2020
1 parent 4a04169 commit e99b669
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 66 deletions.
26 changes: 17 additions & 9 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,25 @@ downloadDirectory: download/pulsar_functions

# Configure the pulsar client used by function metadata management
#
# points
# points
# Whether to enable TLS when clients connect to broker
useTls: false
# For TLS:
# brokerServiceUrl=pulsar+ssl://localhost:6651/
pulsarServiceUrl: pulsar://localhost:6650
# For TLS:
# webServiceUrl=https://localhost:8443/
pulsarWebServiceUrl: http://localhost:8080

############################################
# security settings for pulsar broker client
############################################
# The path to trusted certificates used by the Pulsar client to authenticate with Pulsar brokers
# brokerClientTrustCertsFilePath:
# the authentication plugin to be used by the pulsar client used in worker service
# clientAuthenticationPlugin:
# brokerClientAuthenticationPlugin:
# the authentication parameter to be used by the pulsar client used in worker service
# clientAuthenticationParameters:
# brokerClientAuthenticationParameters:

# Bookie Authentication
#
Expand Down Expand Up @@ -193,15 +205,11 @@ tlsKeyFilePath:
tlsTrustCertsFilePath:
# Accept untrusted TLS certificate from client
tlsAllowInsecureConnection: false
# Whether server hostname must match the common name of the certificate
tlsEnableHostnameVerification: false
# Tls cert refresh duration in seconds (set 0 to check on every new connection)
tlsCertRefreshCheckDurationSec: 300

############################################
# security settings for pulsar broker client
############################################
# The path to trusted certificates used by the Pulsar client to authenticate with Pulsar brokers
brokerClientTrustCertsFilePath:

########################
# State Management
########################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,13 @@ private static class BrokerStarter {
workerConfig.setZooKeeperSessionTimeoutMillis(brokerConfig.getZooKeeperSessionTimeoutMillis());
workerConfig.setZooKeeperOperationTimeoutSeconds(brokerConfig.getZooKeeperOperationTimeoutSeconds());

workerConfig.setTlsHostnameVerificationEnable(false);

workerConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
workerConfig.setTlsTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath());
workerConfig.setTlsEnableHostnameVerification(false);
workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath());

// client in worker will use this config to authenticate with broker
workerConfig.setClientAuthenticationPlugin(brokerConfig.getBrokerClientAuthenticationPlugin());
workerConfig.setClientAuthenticationParameters(brokerConfig.getBrokerClientAuthenticationParameters());
workerConfig.setBrokerClientAuthenticationPlugin(brokerConfig.getBrokerClientAuthenticationPlugin());
workerConfig.setBrokerClientAuthenticationParameters(brokerConfig.getBrokerClientAuthenticationParameters());

// inherit super users
workerConfig.setSuperUserRoles(brokerConfig.getSuperUserRoles());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,13 @@ public void start() throws Exception {
workerConfig.setZooKeeperSessionTimeoutMillis(config.getZooKeeperSessionTimeoutMillis());
workerConfig.setZooKeeperOperationTimeoutSeconds(config.getZooKeeperOperationTimeoutSeconds());

workerConfig.setTlsHostnameVerificationEnable(false);

workerConfig.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection());
workerConfig.setTlsTrustCertsFilePath(config.getTlsTrustCertsFilePath());
workerConfig.setTlsEnableHostnameVerification(false);
workerConfig.setBrokerClientTrustCertsFilePath(config.getTlsTrustCertsFilePath());

// client in worker will use this config to authenticate with broker
workerConfig.setClientAuthenticationPlugin(config.getBrokerClientAuthenticationPlugin());
workerConfig.setClientAuthenticationParameters(config.getBrokerClientAuthenticationParameters());
workerConfig.setBrokerClientAuthenticationPlugin(config.getBrokerClientAuthenticationPlugin());
workerConfig.setBrokerClientAuthenticationParameters(config.getBrokerClientAuthenticationParameters());

// inherit super users
workerConfig.setSuperUserRoles(config.getSuperUserRoles());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ void setup(Method method) throws Exception {

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl())
.operationTimeout(1000, TimeUnit.MILLISECONDS);
if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
&& isNotBlank(workerConfig.getClientAuthenticationParameters())) {
clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters());
if (isNotBlank(workerConfig.getBrokerClientAuthenticationPlugin())
&& isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) {
clientBuilder.authentication(workerConfig.getBrokerClientAuthenticationPlugin(),
workerConfig.getBrokerClientAuthenticationParameters());
}
pulsarClient = clientBuilder.build();

Expand Down Expand Up @@ -242,8 +242,8 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerId(workerId);

workerConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
workerConfig.setClientAuthenticationParameters(
workerConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
workerConfig.setBrokerClientAuthenticationParameters(
String.format("token:%s", adminToken));

workerConfig.setAuthenticationEnabled(config.isAuthenticationEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,12 @@ void setup(Method method) throws Exception {

ClientBuilder clientBuilder = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl());
if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
&& isNotBlank(workerConfig.getClientAuthenticationParameters())) {
if (isNotBlank(workerConfig.getBrokerClientAuthenticationPlugin())
&& isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) {
clientBuilder.enableTls(workerConfig.isUseTls());
clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters());
clientBuilder.authentication(workerConfig.getBrokerClientAuthenticationPlugin(),
workerConfig.getBrokerClientAuthenticationParameters());
clientBuilder.serviceUrl(pulsar.getBrokerServiceUrlTls());
}
pulsarClient = clientBuilder.build();
Expand Down Expand Up @@ -312,8 +312,8 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerId(workerId);

workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
workerConfig.setClientAuthenticationParameters(
workerConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
workerConfig.setBrokerClientAuthenticationParameters(
String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
workerConfig.setUseTls(true);
workerConfig.setTlsAllowInsecureConnection(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ void setup(Method method) throws Exception {
admin.clusters().updateCluster(config.getClusterName(), clusterData);

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
&& isNotBlank(workerConfig.getClientAuthenticationParameters())) {
if (isNotBlank(workerConfig.getBrokerClientAuthenticationPlugin())
&& isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) {
clientBuilder.enableTls(workerConfig.isUseTls());
clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters());
clientBuilder.authentication(workerConfig.getBrokerClientAuthenticationPlugin(),
workerConfig.getBrokerClientAuthenticationParameters());
}
pulsarClient = clientBuilder.build();

Expand Down Expand Up @@ -244,8 +244,8 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerId(workerId);

workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
workerConfig.setClientAuthenticationParameters(
workerConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
workerConfig.setBrokerClientAuthenticationParameters(
String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
workerConfig.setUseTls(true);
workerConfig.setTlsAllowInsecureConnection(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ void setup(Method method) throws Exception {
admin.clusters().updateCluster(config.getClusterName(), clusterData);

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
&& isNotBlank(workerConfig.getClientAuthenticationParameters())) {
if (isNotBlank(workerConfig.getBrokerClientAuthenticationPlugin())
&& isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) {
clientBuilder.enableTls(workerConfig.isUseTls());
clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters());
clientBuilder.authentication(workerConfig.getBrokerClientAuthenticationPlugin(),
workerConfig.getBrokerClientAuthenticationParameters());
}
pulsarClient = clientBuilder.build();

Expand Down Expand Up @@ -192,8 +192,8 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
workerConfig
.setWorkerId("c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort());

workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
workerConfig.setClientAuthenticationParameters(
workerConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
workerConfig.setBrokerClientAuthenticationParameters(
String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
workerConfig.setUseTls(true);
workerConfig.setTlsAllowInsecureConnection(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,12 @@ void setup(Method method) throws Exception {
admin.clusters().updateCluster(config.getClusterName(), clusterData);

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
&& isNotBlank(workerConfig.getClientAuthenticationParameters())) {
if (isNotBlank(workerConfig.getBrokerClientAuthenticationPlugin())
&& isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) {
clientBuilder.enableTls(workerConfig.isUseTls());
clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters());
clientBuilder.authentication(workerConfig.getBrokerClientAuthenticationPlugin(),
workerConfig.getBrokerClientAuthenticationParameters());
}
pulsarClient = clientBuilder.build();

Expand Down Expand Up @@ -332,8 +332,8 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerId(workerId);

workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
workerConfig.setClientAuthenticationParameters(
workerConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
workerConfig.setBrokerClientAuthenticationParameters(
String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
workerConfig.setUseTls(true);
workerConfig.setTlsAllowInsecureConnection(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerId(workerId);

workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
workerConfig.setClientAuthenticationParameters(
workerConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
workerConfig.setBrokerClientAuthenticationParameters(
String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
workerConfig.setUseTls(true);
workerConfig.setTlsAllowInsecureConnection(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,12 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
category = CATEGORY_CLIENT_SECURITY,
doc = "The authentication plugin used by function workers to talk to brokers"
)
private String clientAuthenticationPlugin;
private String brokerClientAuthenticationPlugin;
@FieldContext(
category = CATEGORY_CLIENT_SECURITY,
doc = "The parameters of the authentication plugin used by function workers to talk to brokers"
)
private String clientAuthenticationParameters;
private String brokerClientAuthenticationParameters;
@FieldContext(
category = CATEGORY_CLIENT_SECURITY,
doc = "Authentication plugin to use when connecting to bookies"
Expand Down Expand Up @@ -331,7 +331,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
category = CATEGORY_SECURITY,
doc = "Whether to enable hostname verification on TLS connections"
)
private boolean tlsHostnameVerificationEnable = false;
private boolean tlsEnableHostnameVerification = false;
@FieldContext(
category = CATEGORY_SECURITY,
doc = "Tls cert refresh duration in seconds (set 0 to check on every new connection)"
Expand Down Expand Up @@ -561,4 +561,33 @@ public static class KubernetesContainerFactory extends KubernetesRuntimeFactoryC
)
@Deprecated
private KubernetesContainerFactory kubernetesContainerFactory;

@FieldContext(
category = CATEGORY_CLIENT_SECURITY,
doc = "The parameters of the authentication plugin used by function workers to talk to brokers"
)
@Deprecated
private String clientAuthenticationParameters;
@FieldContext(
category = CATEGORY_CLIENT_SECURITY,
doc = "The authentication plugin used by function workers to talk to brokers"
)
@Deprecated
private String clientAuthenticationPlugin;

public String getBrokerClientAuthenticationPlugin() {
if (null == brokerClientAuthenticationPlugin) {
return clientAuthenticationPlugin;
} else {
return brokerClientAuthenticationPlugin;
}
}

public String getBrokerClientAuthenticationParameters() {
if (null == brokerClientAuthenticationParameters) {
return clientAuthenticationParameters;
} else {
return brokerClientAuthenticationParameters;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,12 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
AuthenticationConfig authConfig = null;
if (workerConfig.isAuthenticationEnabled()) {
authConfig = AuthenticationConfig.builder()
.clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin())
.clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters())
.clientAuthenticationPlugin(workerConfig.getBrokerClientAuthenticationPlugin())
.clientAuthenticationParameters(workerConfig.getBrokerClientAuthenticationParameters())
.tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath())
.useTls(workerConfig.isUseTls())
.tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection())
.tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable())
.tlsHostnameVerificationEnable(workerConfig.isTlsEnableHostnameVerification())
.build();

//initialize function authentication provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ private static URI initialize(WorkerConfig workerConfig)
throws InterruptedException, PulsarAdminException, IOException {
// initializing pulsar functions namespace
PulsarAdmin admin = WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(),
workerConfig.getBrokerClientAuthenticationPlugin(), workerConfig.getBrokerClientAuthenticationParameters(),
workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection(),
workerConfig.isTlsHostnameVerificationEnable());
workerConfig.isTlsEnableHostnameVerification());
InternalConfigurationData internalConf;
// make sure pulsar broker is up
log.info("Checking if pulsar service at {} is up...", workerConfig.getPulsarWebServiceUrl());
Expand Down
Loading

0 comments on commit e99b669

Please sign in to comment.