Skip to content

Commit

Permalink
Add KeyStore support in WebSocket, Function Worker HTTPS Servers (apa…
Browse files Browse the repository at this point in the history
…che#15084)

* Add KeyStore support in WebSocket, Function Worker HTTPS Servers

* Avoid leaking worker config password

* Fix checkstyle

* Replace broker with appropriate text in annotations

* Update python script for new configs

Co-authored-by: Lari Hotari <[email protected]>

### Motivation

We support configuring KeyStores for the broker and the proxy, but not the WebSocket or the Function Worker. By adding this support, users are able to provide KeyStores of type PCKS12 or JKS, which adds flexibility. Further, these KeyStores simplify support for additional algorithms because we rely on the TLS provider to load the KeyStore instead of loading keys ourselves.

### Modifications

* Add `KeyStoreSSLContext`s to the function worker server
* Add `KeyStoreSSLContext`s to the web socket server
* Add configurations to the function worker, the web socket, and the proxy configuration files to simply configuration
* Rely on `toString`, not `ObjectMapper`, when converting the `WorkerConfig` to a string so that we don't log the KeyStore password. (Add a test to verify this logic. Note that we don't want the `ObjectMapper` to ignore the field because we use mappers when converting configuration classes.)

### Verifying this change

I manually verified that this change works in a minikube cluster. The underlying method named `KeyStoreSSLContext#createSslContextFactory` is already used and tested, so I don't believe we need additional testing on that component.

### Does this pull request potentially affect one of the following parts:

This change adds a new way to configure TLS in the WebSocket and Function Worker HTTPS Servers. As such, it adds new configuration. This configuration is named in the same way that the broker and proxy configuration is named, so it is consistent.

### Documentation

I've documented the new configuration in the appropriate configuration files.
  • Loading branch information
michaeljmarshall authored Apr 21, 2022
1 parent 151f1d1 commit a410396
Show file tree
Hide file tree
Showing 12 changed files with 312 additions and 25 deletions.
2 changes: 2 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,8 @@ tlsRequireTrustedClientCertOnConnect=false
tlsProvider=

### --- KeyStore TLS config variables --- ###
## Note that some of the above TLS configs also apply to the KeyStore TLS configuration.

# Enable TLS with KeyStore type configuration in broker.
tlsEnabledWithKeyStore=false

Expand Down
40 changes: 40 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,46 @@ tlsAllowInsecureConnection: false
tlsEnableHostnameVerification: false
# Tls cert refresh duration in seconds (set 0 to check on every new connection)
tlsCertRefreshCheckDurationSec: 300
# Whether client certificates are required for TLS. Connections are rejected if the client
# certificate isn't trusted.
tlsRequireTrustedClientCertOnConnect: false

### --- KeyStore TLS config variables --- ###
## Note that some of the above TLS configs also apply to the KeyStore TLS configuration.

# TLS Provider for KeyStore type
tlsProvider:

# Enable TLS with KeyStore type configuration in function worker.
tlsEnabledWithKeyStore: false

# TLS KeyStore type configuration in function worker: JKS, PKCS12
tlsKeyStoreType: JKS

# TLS KeyStore path in function worker
tlsKeyStore:

# TLS KeyStore password for function worker
tlsKeyStorePassword:

# TLS TrustStore type configuration in function worker: JKS, PKCS12
tlsTrustStoreType: JKS

# TLS TrustStore path in function worker
tlsTrustStore:

# TLS TrustStore password in function worker, default value is empty password
tlsTrustStorePassword:

# Specify the tls protocols the function worker's web service will use to negotiate during TLS handshake
# (a comma-separated list of protocol names).
# Examples:- [TLSv1.3, TLSv1.2]
webServiceTlsProtocols:

# Specify the tls cipher the function worker will use to negotiate during TLS Handshake
# (a comma-separated list of ciphers).
# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
webServiceTlsCiphers:

########################
# State Management
Expand Down
27 changes: 27 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,33 @@ webServicePort=8080
# Port to use to server HTTPS request
webServicePortTls=

### --- KeyStore TLS config variables --- ###
## Note that some of the above TLS configs also apply to the KeyStore TLS configuration.

# TLS Provider for KeyStore type
tlsProvider=

# Enable TLS with KeyStore type configuration in proxy.
tlsEnabledWithKeyStore=false

# TLS KeyStore type configuration in proxy: JKS, PKCS12
tlsKeyStoreType=JKS

# TLS KeyStore path in proxy
tlsKeyStore=

# TLS KeyStore password for proxy
tlsKeyStorePassword=

# TLS TrustStore type configuration in proxy: JKS, PKCS12
tlsTrustStoreType=JKS

# TLS TrustStore path in proxy
tlsTrustStore=

# TLS TrustStore password in proxy, default value is empty password
tlsTrustStorePassword=

# Specify the tls protocols the proxy's web service will use to negotiate during TLS handshake
# (a comma-separated list of protocol names).
# Examples:- [TLSv1.3, TLSv1.2]
Expand Down
37 changes: 37 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,43 @@ tlsRequireTrustedClientCertOnConnect=false
# Tls cert refresh duration in seconds (set 0 to check on every new connection)
tlsCertRefreshCheckDurationSec=300

### --- KeyStore TLS config variables --- ###
## Note that some of the above TLS configs also apply to the KeyStore TLS configuration.

# TLS Provider for KeyStore type
tlsProvider=

# Enable TLS with KeyStore type configuration in WebSocket.
tlsEnabledWithKeyStore=false

# TLS KeyStore type configuration in WebSocket: JKS, PKCS12
tlsKeyStoreType=JKS

# TLS KeyStore path in WebSocket
tlsKeyStore=

# TLS KeyStore password for WebSocket
tlsKeyStorePassword=

# TLS TrustStore type configuration in WebSocket: JKS, PKCS12
tlsTrustStoreType=JKS

# TLS TrustStore path in WebSocket
tlsTrustStore=

# TLS TrustStore password in WebSocket, default value is empty password
tlsTrustStorePassword=

# Specify the tls protocols the proxy's web service will use to negotiate during TLS handshake
# (a comma-separated list of protocol names).
# Examples:- [TLSv1.3, TLSv1.2]
webServiceTlsProtocols=

# Specify the tls cipher the proxy will use to negotiate during TLS Handshake
# (a comma-separated list of ciphers).
# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
webServiceTlsCiphers=

### --- Deprecated config variables --- ###

# Deprecated. Use configurationStoreServers
Expand Down
4 changes: 3 additions & 1 deletion docker/pulsar/scripts/gen-yml-from-env.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
'proxyRoles',
'schemaRegistryCompatibilityCheckers',
'brokerClientTlsCiphers',
'brokerClientTlsProtocols'
'brokerClientTlsProtocols',
'webServiceTlsCiphers',
'webServiceTlsProtocols',
]

PF_ENV_PREFIX = 'PF_'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2547,7 +2547,7 @@ public class ServiceConfiguration implements PulsarConfiguration {

@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "TLS Provider for Specify the SSL provider for the broker service: \n"
doc = "Specify the TLS provider for the broker service: \n"
+ "When using TLS authentication with CACert, the valid value is either OPENSSL or JDK.\n"
+ "When using TLS authentication with KeyStore, available values can be SunJSSE, Conscrypt and etc."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.ToString;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
Expand Down Expand Up @@ -74,6 +76,8 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
@Category
private static final String CATEGORY_SECURITY = "Common Security Settings (applied for both worker and client)";
@Category
private static final String CATEGORY_KEYSTORE_TLS = "KeyStoreTLS";
@Category
private static final String CATEGORY_WORKER_SECURITY = "Worker Security Settings";
@Category
private static final String CATEGORY_CLIENT_SECURITY = "Security settings for clients talking to brokers";
Expand Down Expand Up @@ -445,6 +449,74 @@ public boolean isBrokerClientAuthenticationEnabled() {
doc = "Tls cert refresh duration in seconds (set 0 to check on every new connection)"
)
private long tlsCertRefreshCheckDurationSec = 300;

/**** --- KeyStore TLS config variables. --- ****/
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "Enable TLS with KeyStore type configuration in function worker"
)
private boolean tlsEnabledWithKeyStore = false;

@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "Specify the TLS provider for the function worker service: \n"
+ "When using TLS authentication with CACert, the valid value is either OPENSSL or JDK.\n"
+ "When using TLS authentication with KeyStore, available values can be SunJSSE, Conscrypt and etc."
)
private String tlsProvider = null;

@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "TLS KeyStore type configuration in function worker: JKS, PKCS12"
)
private String tlsKeyStoreType = "JKS";

@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "TLS KeyStore path in function worker"
)
private String tlsKeyStore = null;

@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "TLS KeyStore password for function worker"
)
@ToString.Exclude
private String tlsKeyStorePassword = null;

@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "TLS TrustStore type configuration in function worker: JKS, PKCS12"
)
private String tlsTrustStoreType = "JKS";

@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "TLS TrustStore path in function worker"
)
private String tlsTrustStore = null;

@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "TLS TrustStore password for function worker, null means empty password."
)
@ToString.Exclude
private String tlsTrustStorePassword = null;

@FieldContext(
category = CATEGORY_WORKER_SECURITY,
doc = "Specify the tls protocols the proxy's web service will use to negotiate during TLS Handshake.\n\n"
+ "Example:- [TLSv1.3, TLSv1.2]"
)
private Set<String> webServiceTlsProtocols = new TreeSet<>();

@FieldContext(
category = CATEGORY_WORKER_SECURITY,
doc = "Specify the tls cipher the proxy's web service will use to negotiate during TLS Handshake.\n\n"
+ "Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]"
)
private Set<String> webServiceTlsCiphers = new TreeSet<>();

@FieldContext(
category = CATEGORY_WORKER_SECURITY,
doc = "Enforce authentication"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import static org.testng.Assert.assertTrue;

import java.net.URL;
import java.util.Locale;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -121,4 +122,19 @@ public void testLoadResourceRestrictionsConfig() throws Exception {

assertTrue(newK8SWc.isFunctionInstanceResourceChangeInLockStep());
}

@Test
public void testPasswordsNotLeakedOnToString() throws Exception {
URL yamlUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
WorkerConfig wc = WorkerConfig.load(yamlUrl.toURI().getPath());
assertFalse(wc.toString().toLowerCase(Locale.ROOT).contains("password"), "Stringified config must not contain password");
}

@Test
public void testPasswordsPresentOnObjectMapping() throws Exception {
URL yamlUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
WorkerConfig wc = WorkerConfig.load(yamlUrl.toURI().getPath());
assertTrue((new ObjectMapper().writeValueAsString(wc)).toLowerCase(Locale.ROOT).contains("password"),
"ObjectMapper output must include passwords for proper serialization");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import static org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
Expand Down Expand Up @@ -402,12 +400,7 @@ public void start(AuthenticationService authenticationService,

workerStatsManager.startupTimeStart();
log.info("/** Starting worker id={} **/", workerConfig.getWorkerId());

try {
log.info("Worker Configs: {}", new ObjectMapper().writeValueAsString(workerConfig));
} catch (JsonProcessingException e) {
log.warn("Failed to print worker configs with error {}", e.getMessage(), e);
}
log.info("Worker Configs: {}", workerConfig);

try {
DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(workerConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.broker.web.RateLimitingFilter;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
Expand Down Expand Up @@ -125,12 +126,32 @@ private void init() {

if (this.workerConfig.getTlsEnabled()) {
try {
SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory(
this.workerConfig.isTlsAllowInsecureConnection(), this.workerConfig.getTlsTrustCertsFilePath(),
this.workerConfig.getTlsCertificateFilePath(), this.workerConfig.getTlsKeyFilePath(),
this.workerConfig.isTlsRequireTrustedClientCertOnConnect(),
true,
this.workerConfig.getTlsCertRefreshCheckDurationSec());
SslContextFactory sslCtxFactory;
if (workerConfig.isTlsEnabledWithKeyStore()) {
sslCtxFactory = KeyStoreSSLContext.createSslContextFactory(
workerConfig.getTlsProvider(),
workerConfig.getTlsKeyStoreType(),
workerConfig.getTlsKeyStore(),
workerConfig.getTlsKeyStorePassword(),
workerConfig.isTlsAllowInsecureConnection(),
workerConfig.getTlsTrustStoreType(),
workerConfig.getTlsTrustStore(),
workerConfig.getTlsTrustStorePassword(),
workerConfig.isTlsRequireTrustedClientCertOnConnect(),
workerConfig.getWebServiceTlsCiphers(),
workerConfig.getWebServiceTlsProtocols(),
workerConfig.getTlsCertRefreshCheckDurationSec()
);
} else {
sslCtxFactory = SecurityUtility.createSslContextFactory(
workerConfig.isTlsAllowInsecureConnection(),
workerConfig.getTlsTrustCertsFilePath(),
workerConfig.getTlsCertificateFilePath(),
workerConfig.getTlsKeyFilePath(),
workerConfig.isTlsRequireTrustedClientCertOnConnect(),
true,
workerConfig.getTlsCertRefreshCheckDurationSec());
}
httpsConnector = new ServerConnector(server, sslCtxFactory);
httpsConnector.setPort(this.workerConfig.getWorkerPortTls());
connectors.add(httpsConnector);
Expand Down
Loading

0 comments on commit a410396

Please sign in to comment.