Skip to content

Commit

Permalink
Allow to automatically assign TCP ports when starting a broker (apach…
Browse files Browse the repository at this point in the history
…e#3555)

* Allow to automatically assign TCP ports when starting a broker

* Do not change the passed config, rather expose the effective port to PulsarService

* Fixed accessor name

* Also added same change for websocket and proxy

* Removed mock line in test

* Removed mistakenly duplicated bind line
  • Loading branch information
merlimat authored Oct 28, 2019
1 parent 0984f42 commit df19f26
Show file tree
Hide file tree
Showing 16 changed files with 354 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,6 @@ private static class BrokerStarter {
workerConfig = WorkerConfig.load(starterArguments.fnWorkerConfigFile);
}
// worker talks to local broker
boolean useTls = workerConfig.isUseTls();
String pulsarServiceUrl = useTls
? PulsarService.brokerUrlTls(brokerConfig)
: PulsarService.brokerUrl(brokerConfig);
String webServiceUrl = useTls
? PulsarService.webAddressTls(brokerConfig)
: PulsarService.webAddress(brokerConfig);
workerConfig.setPulsarServiceUrl(pulsarServiceUrl);
workerConfig.setPulsarWebServiceUrl(webServiceUrl);
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
brokerConfig.getAdvertisedAddress());
workerConfig.setWorkerHostname(hostname);
Expand All @@ -188,7 +179,6 @@ private static class BrokerStarter {
workerConfig.setZooKeeperSessionTimeoutMillis(brokerConfig.getZooKeeperSessionTimeoutMillis());
workerConfig.setZooKeeperOperationTimeoutSeconds(brokerConfig.getZooKeeperOperationTimeoutSeconds());

workerConfig.setUseTls(useTls);
workerConfig.setTlsHostnameVerificationEnable(false);

workerConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,6 @@ public void start() throws Exception {
workerConfig = WorkerConfig.load(this.getFnWorkerConfigFile());
}
// worker talks to local broker
boolean useTls = workerConfig.isUseTls();
String pulsarServiceUrl = useTls
? PulsarService.brokerUrlTls(config)
: PulsarService.brokerUrl(config);
String webServiceUrl = useTls
? PulsarService.webAddressTls(config)
: PulsarService.webAddress(config);
workerConfig.setPulsarServiceUrl(pulsarServiceUrl);
workerConfig.setPulsarWebServiceUrl(webServiceUrl);
if (this.isNoStreamStorage()) {
// only set the state storage service url when state is enabled.
workerConfig.setStateStorageServiceUrl(null);
Expand All @@ -306,7 +297,6 @@ public void start() throws Exception {
workerConfig.setZooKeeperSessionTimeoutMillis(config.getZooKeeperSessionTimeoutMillis());
workerConfig.setZooKeeperOperationTimeoutSeconds(config.getZooKeeperOperationTimeoutSeconds());

workerConfig.setUseTls(useTls);
workerConfig.setTlsHostnameVerificationEnable(false);

workerConfig.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ public class PulsarService implements AutoCloseable {
private ZooKeeperClientFactory zkClientFactory = null;
private final String bindAddress;
private final String advertisedAddress;
private final String webServiceAddress;
private final String webServiceAddressTls;
private final String brokerServiceUrl;
private final String brokerServiceUrlTls;
private String webServiceAddress;
private String webServiceAddressTls;
private String brokerServiceUrl;
private String brokerServiceUrlTls;
private final String brokerVersion;
private SchemaRegistryService schemaRegistryService = null;
private final Optional<WorkerService> functionWorkerService;
Expand Down Expand Up @@ -203,10 +203,6 @@ public PulsarService(ServiceConfiguration config, Optional<WorkerService> functi
state = State.Init;
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
this.advertisedAddress = advertisedAddress(config);
this.webServiceAddress = webAddress(config);
this.webServiceAddressTls = webAddressTls(config);
this.brokerServiceUrl = brokerUrl(config);
this.brokerServiceUrlTls = brokerUrlTls(config);
this.brokerVersion = PulsarVersion.getVersion();
this.config = config;
this.shutdownService = new MessagingServiceShutdownHook(this);
Expand Down Expand Up @@ -399,12 +395,6 @@ public void start() throws PulsarServerException {
// Start load management service (even if load balancing is disabled)
this.loadManager.set(LoadManager.create(this));

// Start the leader election service
startLeaderElectionService();

// needs load management service
this.startNamespaceService();

this.offloader = createManagedLedgerOffloader(this.getConfiguration());

brokerService.start();
Expand Down Expand Up @@ -463,13 +453,25 @@ public Boolean get() {
}
this.webService.addStaticResources("/static", "/static");

// Register heartbeat and bootstrap namespaces.
this.nsService.registerBootstrapNamespaces();

schemaRegistryService = SchemaRegistryService.create(this);

webService.start();

// Refresh addresses, since the port might have been dynamically assigned
this.webServiceAddress = webAddress(config);
this.webServiceAddressTls = webAddressTls(config);
this.brokerServiceUrl = brokerUrl(config);
this.brokerServiceUrlTls = brokerUrlTls(config);

// needs load management service
this.startNamespaceService();

// Start the leader election service
startLeaderElectionService();

// Register heartbeat and bootstrap namespaces.
this.nsService.registerBootstrapNamespaces();

this.metricsGenerator = new MetricsGenerator(this);

// By starting the Load manager service, the broker will also become visible
Expand Down Expand Up @@ -880,7 +882,7 @@ public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
if (this.adminClient == null) {
try {
ServiceConfiguration conf = this.getConfiguration();
String adminApiUrl = conf.isBrokerClientTlsEnabled() ? webAddressTls(config) : webAddress(config);
String adminApiUrl = conf.isBrokerClientTlsEnabled() ? webServiceAddressTls : webServiceAddress;
PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) //
.authentication( //
conf.getBrokerClientAuthenticationPlugin(), //
Expand Down Expand Up @@ -926,9 +928,9 @@ public static String advertisedAddress(ServiceConfiguration config) {
return ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
}

public static String brokerUrl(ServiceConfiguration config) {
private String brokerUrl(ServiceConfiguration config) {
if (config.getBrokerServicePort().isPresent()) {
return brokerUrl(advertisedAddress(config), config.getBrokerServicePort().get());
return brokerUrl(advertisedAddress(config), getBrokerListenPort().get());
} else {
return null;
}
Expand All @@ -938,9 +940,9 @@ public static String brokerUrl(String host, int port) {
return String.format("pulsar://%s:%d", host, port);
}

public static String brokerUrlTls(ServiceConfiguration config) {
public String brokerUrlTls(ServiceConfiguration config) {
if (config.getBrokerServicePortTls().isPresent()) {
return brokerUrlTls(advertisedAddress(config), config.getBrokerServicePortTls().get());
return brokerUrlTls(advertisedAddress(config), getBrokerListenPortTls().get());
} else {
return null;
}
Expand All @@ -950,9 +952,9 @@ public static String brokerUrlTls(String host, int port) {
return String.format("pulsar+ssl://%s:%d", host, port);
}

public static String webAddress(ServiceConfiguration config) {
public String webAddress(ServiceConfiguration config) {
if (config.getWebServicePort().isPresent()) {
return webAddress(advertisedAddress(config), config.getWebServicePort().get());
return webAddress(advertisedAddress(config), getListenPortHTTP().get());
} else {
return null;
}
Expand All @@ -962,9 +964,9 @@ public static String webAddress(String host, int port) {
return String.format("http://%s:%d", host, port);
}

public static String webAddressTls(ServiceConfiguration config) {
public String webAddressTls(ServiceConfiguration config) {
if (config.getWebServicePortTls().isPresent()) {
return webAddressTls(advertisedAddress(config), config.getWebServicePortTls().get());
return webAddressTls(advertisedAddress(config), getListenPortHTTPS().get());
} else {
return null;
}
Expand All @@ -987,6 +989,15 @@ private void startWorkerService(AuthenticationService authenticationService,
throws InterruptedException, IOException, KeeperException {
if (functionWorkerService.isPresent()) {
LOG.info("Starting function worker service");

WorkerConfig workerConfig = functionWorkerService.get().getWorkerConfig();
if (workerConfig.isUseTls()) {
workerConfig.setPulsarServiceUrl(brokerServiceUrlTls);
workerConfig.setPulsarWebServiceUrl(webServiceAddressTls);
} else {
workerConfig.setPulsarServiceUrl(brokerServiceUrl);
workerConfig.setPulsarWebServiceUrl(webServiceAddress);
}
String namespace = functionWorkerService.get()
.getWorkerConfig().getPulsarFunctionsNamespace();
String[] a = functionWorkerService.get().getWorkerConfig().getPulsarFunctionsNamespace().split("/");
Expand Down Expand Up @@ -1084,4 +1095,20 @@ private void startWorkerService(AuthenticationService authenticationService,
LOG.info("Function worker service started");
}
}

public Optional<Integer> getListenPortHTTP() {
return webService.getListenPortHTTP();
}

public Optional<Integer> getListenPortHTTPS() {
return webService.getListenPortHTTPS();
}

public Optional<Integer> getBrokerListenPort() {
return brokerService.getListenPort();
}

public Optional<Integer> getBrokerListenPortTls() {
return brokerService.getListenPortTls();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

public class NoopLoadManager implements LoadManager {

private PulsarService pulsar;
private String lookupServiceAddress;
private ResourceUnit localResourceUnit;
private ZooKeeper zkClient;
Expand All @@ -54,6 +55,11 @@ public class NoopLoadManager implements LoadManager {

@Override
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
}

@Override
public void start() throws PulsarServerException {
lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort().get();
localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
new PulsarResourceDescription());
Expand All @@ -62,10 +68,6 @@ public void initialize(PulsarService pulsar) {
localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
localData.setProtocols(pulsar.getProtocolDataToAdvertise());
}

@Override
public void start() throws PulsarServerException {
String brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,23 +265,6 @@ public LocalBrokerData deserialize(String key, byte[] content) throws Exception
defaultStats.msgRateIn = DEFAULT_MESSAGE_RATE;
defaultStats.msgRateOut = DEFAULT_MESSAGE_RATE;

Map<String, String> protocolData = pulsar.getProtocolDataToAdvertise();

lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
lastData.setProtocols(protocolData);
// configure broker-topic mode
lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());

localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
localData.setProtocols(protocolData);
localData.setBrokerVersionString(pulsar.getBrokerVersion());
// configure broker-topic mode
localData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());

placementStrategy = ModularLoadManagerStrategy.create(conf);
policies = new SimpleResourceAllocationPolicies(pulsar);
zkClient = pulsar.getZkClient();
Expand Down Expand Up @@ -791,6 +774,24 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
@Override
public void start() throws PulsarServerException {
try {
// At this point, the ports will be updated with the real port number that the server was assigned
Map<String, String> protocolData = pulsar.getProtocolDataToAdvertise();

lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
lastData.setProtocols(protocolData);
// configure broker-topic mode
lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());

localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
localData.setProtocols(protocolData);
localData.setBrokerVersionString(pulsar.getBrokerVersion());
// configure broker-topic mode
localData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());

// Register the brokers in zk list
createZPathIfNotExists(zkClient, LoadManager.LOADBALANCE_BROKERS_ROOT);

Expand Down Expand Up @@ -975,7 +976,7 @@ private void refreshBrokerToFailureDomainMap() {
log.warn("Failed to get domain-list for cluster {}", e.getMessage());
}
}

@Override
public LocalBrokerData getBrokerLocalData(String broker) {
String key = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, broker);
Expand Down
Loading

0 comments on commit df19f26

Please sign in to comment.