Skip to content

Commit

Permalink
Start Pulsar in TLS Only mode and deprecate tlsEnabled flag. (apache#…
Browse files Browse the repository at this point in the history
…2988)

### Motivation
Start Pulsar services (broker, proxy, websocket, discovery) in TLS only mode, so that they only listen on TLS ports.

Once TlsPort is set tlsEnabled flag becomes redundant information - hence getting rid of the flag in relevant components.

### Modifications

a. Changed the Ports to Option<Integer> in the configuration file.
b. In Websocket Service there was a bug where we used 'tlsEnabled' flag to start listening on a TLS port and to talk to broker in on serviceUrlTls - separated the flag into two (tlsEnabled and brokerClientTlsEnabled) and deprecated tlsEnabled.
c. Fixed a lot of tests which relied on tlsEnabled flag.

### Result
Brokers can now listen to TLS only port.
  • Loading branch information
Jai Asher authored and sijie committed Dec 16, 2018
1 parent 576609b commit 18b2a20
Show file tree
Hide file tree
Showing 76 changed files with 325 additions and 271 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pulsar-broker/src/test/resources/pulsar-functions-api-examples.jar
.project
.settings/
.recommenders/
.factorypath

# Intellij
.idea/
Expand Down
10 changes: 5 additions & 5 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ configurationStoreServers=
# Broker data port
brokerServicePort=6650

# Broker data port for TLS
brokerServicePortTls=6651
# Broker data port for TLS - By default TLS is disabled
brokerServicePortTls=

# Port to use to server HTTP request
webServicePort=8080

# Port to use to server HTTPS request
webServicePortTls=8443
# Port to use to server HTTPS request - By default TLS is disabled
webServicePortTls=

# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0
Expand Down Expand Up @@ -231,7 +231,7 @@ proxyRoles=
# else it just accepts the originalPrincipal and authorizes it (if required).
authenticateOriginalAuthData=false

# Enable TLS
# Deprecated - Use webServicePortTls and brokerServicePortTls instead
tlsEnabled=false

# Path for the TLS certificate file
Expand Down
6 changes: 3 additions & 3 deletions conf/discovery.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ zookeeperSessionTimeoutMs=30000
servicePort=6650

# Port to use to server binary-proto-tls request
servicePortTls=6651
servicePortTls=

# Port that discovery service listen on
webServicePort=8080

# Port to use to server HTTPS request
webServicePortTls=8443
webServicePortTls=

# Control whether to bind directly on localhost rather than on normal hostname
bindOnLocalhost=false
Expand Down Expand Up @@ -65,7 +65,7 @@ superUserRoles=
authorizationAllowWildcardsMatching=false

##### --- TLS --- #####
# Enable TLS
# Deprecated - Use servicePortTls and webServicePortTls instead
tlsEnabled=false

# Path for the TLS certificate file
Expand Down
6 changes: 3 additions & 3 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ zookeeperSessionTimeoutMs=30000
servicePort=6650

# The port to use to server binary Protobuf TLS requests
servicePortTls=6651
servicePortTls=

# Port that discovery service listen on
webServicePort=8080

# Port to use to server HTTPS request
webServicePortTls=8443
webServicePortTls=

# Path for the file used to determine the rotation status for the proxy instance when responding
# to service discovery health checks
Expand Down Expand Up @@ -110,7 +110,7 @@ maxConcurrentLookupRequests=50000

##### --- TLS --- #####

# Whether TLS is enabled for the proxy
# Deprecated - use servicePortTls and webServicePortTls instead
tlsEnabledInProxy=false

# Path for the TLS certificate file
Expand Down
5 changes: 3 additions & 2 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ brokerServiceUrlTls=
# Port to use to server HTTP request
webServicePort=8080
# Port to use to server HTTPS request
webServicePortTls=8443
webServicePortTls=

# Path for the file used to determine the rotation status for the proxy-instance when responding
# to service discovery health checks
Expand Down Expand Up @@ -79,6 +79,7 @@ authorizationAllowWildcardsMatching=false
superUserRoles=

# Authentication settings of the proxy itself. Used to connect to brokers
brokerClientTlsEnabled=false;
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
brokerClientTrustCertsFilePath=
Expand All @@ -88,7 +89,7 @@ anonymousUserRole=

### --- TLS --- ###

# Enable TLS
# Deprecated - use webServicePortTls and brokerClientTlsEnabled instead
tlsEnabled=false

# Accept untrusted TLS certificate from client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Configuration Store connection string
@FieldContext(required = false)
private String configurationStoreServers;
private int brokerServicePort = 6650;
private int brokerServicePortTls = 6651;
private Integer brokerServicePort = 6650;
private Integer brokerServicePortTls = null;
// Port to use to server HTTP request
private int webServicePort = 8080;
private Integer webServicePort = 8080;
// Port to use to server HTTPS request
private int webServicePortTls = 8443;
private Integer webServicePortTls = null;

// Hostname or IP address the service binds on.
private String bindAddress = "0.0.0.0";
Expand Down Expand Up @@ -228,7 +228,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int maxConsumersPerSubscription = 0;

/***** --- TLS --- ****/
// Enable TLS
@Deprecated
private boolean tlsEnabled = false;
// Path for the TLS certificate file
private String tlsCertificateFilePath;
Expand Down Expand Up @@ -589,5 +589,20 @@ public Optional<Double> getLoadBalancerOverrideBrokerNicSpeedGbps() {
public int getBookkeeperHealthCheckIntervalSec() {
return (int) bookkeeperClientHealthCheckIntervalSeconds;
}

public Optional<Integer> getBrokerServicePort() {
return Optional.ofNullable(brokerServicePort);
}

public Optional<Integer> getBrokerServicePortTls() {
return Optional.ofNullable(brokerServicePortTls);
}

public Optional<Integer> getWebServicePort() {
return Optional.ofNullable(webServicePort);
}

public Optional<Integer> getWebServicePortTls() {
return Optional.ofNullable(webServicePortTls);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ public void testConfigurationConverting() throws Exception {
// check whether converting correctly
assertEquals(serviceConfiguration.getZookeeperServers(), "localhost:2181");
assertEquals(serviceConfiguration.getConfigurationStoreServers(), "localhost:2184");
assertEquals(serviceConfiguration.getBrokerServicePort(), 7650);
assertEquals(serviceConfiguration.getBrokerServicePortTls(), 7651);
assertEquals(serviceConfiguration.getWebServicePort(), 9080);
assertEquals(serviceConfiguration.getWebServicePortTls(), 9443);
assertEquals(serviceConfiguration.getBrokerServicePort().get(), new Integer(7650));
assertEquals(serviceConfiguration.getBrokerServicePortTls().get(), new Integer(7651));
assertEquals(serviceConfiguration.getWebServicePort().get(), new Integer(9080));
assertEquals(serviceConfiguration.getWebServicePortTls().get(), new Integer(9443));

// check whether exception causes
try {
Expand Down Expand Up @@ -112,7 +112,7 @@ public void testPulsarConfiguraitonLoadingStream() throws Exception {
assertEquals(serviceConfig.getBacklogQuotaDefaultLimitGB(), 18);
assertEquals(serviceConfig.getClusterName(), "usc");
assertEquals(serviceConfig.getBrokerClientAuthenticationParameters(), "role:my-role");
assertEquals(serviceConfig.getBrokerServicePort(), 7777);
assertEquals(serviceConfig.getBrokerServicePort().get(), new Integer(7777));
assertEquals(serviceConfig.getManagedLedgerDigestType(), DigestType.CRC32C);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,17 @@ private static class BrokerStarter {
boolean useTls = workerConfig.isUseTls();
String localhost = "127.0.0.1";
String pulsarServiceUrl = useTls
? PulsarService.brokerUrlTls(localhost, brokerConfig.getBrokerServicePortTls())
: PulsarService.brokerUrl(localhost, brokerConfig.getBrokerServicePort());
? PulsarService.brokerUrlTls(localhost, brokerConfig.getBrokerServicePortTls().get())
: PulsarService.brokerUrl(localhost, brokerConfig.getBrokerServicePort().get());
String webServiceUrl = useTls
? PulsarService.webAddressTls(localhost, brokerConfig.getWebServicePortTls())
: PulsarService.webAddress(localhost, brokerConfig.getWebServicePort());
? PulsarService.webAddressTls(localhost, brokerConfig.getWebServicePortTls().get())
: PulsarService.webAddress(localhost, brokerConfig.getWebServicePort().get());
workerConfig.setPulsarServiceUrl(pulsarServiceUrl);
workerConfig.setPulsarWebServiceUrl(webServiceUrl);
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
brokerConfig.getAdvertisedAddress());
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerPort(brokerConfig.getWebServicePort());
workerConfig.setWorkerPort(brokerConfig.getWebServicePort().get());
workerConfig.setWorkerId(
"c-" + brokerConfig.getClusterName()
+ "-fw-" + hostname
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,16 @@ void start() throws Exception {
workerConfig = WorkerConfig.load(this.getFnWorkerConfigFile());
}
// worker talks to local broker
workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort());
workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort());
workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get());
workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort().get());
if (!this.isNoStreamStorage()) {
// only set the state storage service url when state is enabled.
workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" + this.getStreamStoragePort());
}
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
config.getAdvertisedAddress());
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerPort(config.getWebServicePort());
workerConfig.setWorkerPort(config.getWebServicePort().get());
workerConfig.setWorkerId(
"c-" + config.getClusterName()
+ "-fw-" + hostname
Expand All @@ -294,9 +294,9 @@ void start() throws Exception {
broker.start();

URL webServiceUrl = new URL(
String.format("http://%s:%d", config.getAdvertisedAddress(), config.getWebServicePort()));
String.format("http://%s:%d", config.getAdvertisedAddress(), config.getWebServicePort().get()));
final String brokerServiceUrl = String.format("pulsar://%s:%d", config.getAdvertisedAddress(),
config.getBrokerServicePort());
config.getBrokerServicePort().get());
admin = PulsarAdmin.builder().serviceHttpUrl(webServiceUrl.toString()).authentication(
config.getBrokerClientAuthenticationPlugin(), config.getBrokerClientAuthenticationParameters()).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public MessagingServiceShutdownHook(PulsarService service) {
public void run() {
if (service.getConfiguration() != null) {
LOG.info("messaging service shutdown hook started, lookup port="
+ service.getConfiguration().getWebServicePort() + ", broker url=" + service.getBrokerServiceUrl());
+ service.getConfiguration().getWebServicePort().get() + ", broker url=" + service.getBrokerServiceUrl());
}

ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("shutdown-thread"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ public synchronized void brokerIsAFollowerNow() {
this.startWorkerService();

LOG.info("messaging service is ready, bootstrap service on port={}, broker url={}, cluster={}, configs={}",
config.getWebServicePort(), brokerServiceUrl, config.getClusterName(),
config.getWebServicePort().get(), brokerServiceUrl, config.getClusterName(),
ReflectionToStringBuilder.toString(config));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
Expand Down Expand Up @@ -855,18 +855,22 @@ public static String advertisedAddress(ServiceConfiguration config) {
}

public static String brokerUrl(ServiceConfiguration config) {
return brokerUrl(advertisedAddress(config), config.getBrokerServicePort());
if (config.getBrokerServicePort().isPresent()) {
return brokerUrl(advertisedAddress(config), config.getBrokerServicePort().get());
} else {
return null;
}
}

public static String brokerUrl(String host, int port) {
return String.format("pulsar://%s:%d", host, port);
}

public static String brokerUrlTls(ServiceConfiguration config) {
if (config.isTlsEnabled()) {
return brokerUrlTls(advertisedAddress(config), config.getBrokerServicePortTls());
if (config.getBrokerServicePortTls().isPresent()) {
return brokerUrlTls(advertisedAddress(config), config.getBrokerServicePortTls().get());
} else {
return "";
return null;
}
}

Expand All @@ -875,18 +879,22 @@ public static String brokerUrlTls(String host, int port) {
}

public static String webAddress(ServiceConfiguration config) {
return webAddress(advertisedAddress(config), config.getWebServicePort());
if (config.getWebServicePort().isPresent()) {
return webAddress(advertisedAddress(config), config.getWebServicePort().get());
} else {
return null;
}
}

public static String webAddress(String host, int port) {
return String.format("http://%s:%d", host, port);
}

public static String webAddressTls(ServiceConfiguration config) {
if (config.isTlsEnabled()) {
return webAddressTls(advertisedAddress(config), config.getWebServicePortTls());
if (config.getWebServicePortTls().isPresent()) {
return webAddressTls(advertisedAddress(config), config.getWebServicePortTls().get());
} else {
return "";
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ protected void validateTopicName(String property, String cluster, String namespa
protected void validateBrokerName(String broker) throws MalformedURLException {
String brokerUrl = String.format("http://%s", broker);
String brokerUrlTls = String.format("https://%s", broker);
if (!pulsar().getWebServiceAddress().equals(brokerUrl)
&& !pulsar().getWebServiceAddressTls().equals(brokerUrlTls)) {
if (!brokerUrl.equals(pulsar().getWebServiceAddress())
&& !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
String[] parts = broker.split(":");
checkArgument(parts.length == 2, "Invalid broker url %s", broker);
String host = parts[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class NoopLoadManager implements LoadManager {

@Override
public void initialize(PulsarService pulsar) {
lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort();
lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort().get();
localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
new PulsarResourceDescription());
zkClient = pulsar.getZkClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ public void start() throws PulsarServerException {
// Register the brokers in zk list
createZPathIfNotExists(zkClient, LoadManager.LOADBALANCE_BROKERS_ROOT);

String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort();
String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort().get();
brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress;
updateLocalBrokerData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public void start() throws PulsarServerException {
// ignore the exception, node might be present already
}
}
String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort();
String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort().get();
brokerZnodePath = LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
LoadReport loadReport = null;
try {
Expand Down Expand Up @@ -1113,7 +1113,7 @@ private LoadReport generateLoadReportForcefully() throws Exception {
loadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
loadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
loadReport.setName(String.format("%s:%s", pulsar.getAdvertisedAddress(),
pulsar.getConfiguration().getWebServicePort()));
pulsar.getConfiguration().getWebServicePort().get()));
loadReport.setBrokerVersionString(pulsar.getBrokerVersion());

SystemResourceUsage systemResourceUsage = this.getSystemResourceUsage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ private boolean isBrokerActive(String candidateBroker) throws KeeperException, I

if (LOG.isDebugEnabled()) {
LOG.debug("Broker not found for SLA Monitoring Namespace {}",
candidateBroker + ":" + config.getWebServicePort());
candidateBroker + ":" + config.getWebServicePort().get());
}
return false;
}
Expand Down Expand Up @@ -972,11 +972,22 @@ public void unloadSLANamespace() throws Exception {
}

public static String getHeartbeatNamespace(String host, ServiceConfiguration config) {
return String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), host, config.getWebServicePort());
Integer port = null;
if (config.getWebServicePort().isPresent()) {
port = config.getWebServicePort().get();
} else if (config.getWebServicePortTls().isPresent()) {
port = config.getWebServicePortTls().get();
}
return String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), host, port);
}

public static String getSLAMonitorNamespace(String host, ServiceConfiguration config) {
return String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host, config.getWebServicePort());
public static String getSLAMonitorNamespace(String host, ServiceConfiguration config) {
Integer port = null;
if (config.getWebServicePort().isPresent()) {
port = config.getWebServicePort().get();
} else if (config.getWebServicePortTls().isPresent()) {
port = config.getWebServicePortTls().get();
}
return String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host, port);
}

public static String checkHeartbeatNamespace(ServiceUnitId ns) {
Expand Down
Loading

0 comments on commit 18b2a20

Please sign in to comment.