Skip to content

Commit

Permalink
Ensure broker is fully boostrapped before load manager register itself (
Browse files Browse the repository at this point in the history
apache#2935)

### Motivation

In some cases the broker can immediately gets assigned traffic before it's fully boostrapped. 

This happens because the load manager is registering the broker in ZK before some of the initialization steps are completed. 

This results in NPE, like : 

```
Caused by: java.lang.NullPointerException
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.hasSchema(PersistentTopic.java:1815) ~[org.apache.pulsar-pulsar-broker-2.2.0-streamlio-22.jar:2.2.0-streamlio-22]
	at org.apache.pulsar.broker.service.ServerCnx.lambda$25(ServerCnx.java:836) ~[org.apache.pulsar-pulsar-broker-2.2.0-streamlio-22.jar:2.2.0-streamlio-22]
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_181]
```

### Modifications

 * Register the broker in ZK only after the full start sequence has been done. This will ensure other brokers will not discover this broker before it's ready.
 * Expose the "is ready" state in the VipStatus -- This will be used to make sure the load balancer will not direct any lookup request to the broker before it's ready.
  • Loading branch information
merlimat authored and sijie committed Nov 20, 2018
1 parent 3008a4b commit 7efce98
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.common.configuration;

import java.io.File;
import java.util.function.Supplier;

import javax.servlet.ServletContext;
import javax.ws.rs.GET;
Expand All @@ -34,18 +35,23 @@
public class VipStatus {

public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath";
public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe";

@Context
protected ServletContext servletContext;

@GET
@Context
public String checkStatus() {

String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
@SuppressWarnings("unchecked")
Supplier<Boolean> isReadyProbe = (Supplier<Boolean>) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE);

boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true;

if (statusFilePath != null) {
File statusFile = new File(statusFilePath);
if (statusFile.exists() && statusFile.isFile()) {
if (isReady && statusFile.exists() && statusFile.isFile()) {
return "OK";
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public enum State {
Init, Started, Closed
}

private State state;
private volatile State state;

private final ReentrantLock mutex = new ReentrantLock();
private final Condition isClosedCondition = mutex.newCondition();
Expand Down Expand Up @@ -355,8 +355,6 @@ public void start() throws PulsarServerException {
// Start load management service (even if load balancing is disabled)
this.loadManager.set(LoadManager.create(this));

this.startLoadManagementService();

// needs load management service
this.startNamespaceService();

Expand All @@ -369,6 +367,13 @@ public void start() throws PulsarServerException {
attributeMap.put(WebService.ATTRIBUTE_PULSAR_NAME, this);
Map<String, Object> vipAttributeMap = Maps.newHashMap();
vipAttributeMap.put(VipStatus.ATTRIBUTE_STATUS_FILE_PATH, this.config.getStatusFilePath());
vipAttributeMap.put(VipStatus.ATTRIBUTE_IS_READY_PROBE, new Supplier<Boolean>() {
@Override
public Boolean get() {
// Ensure the VIP status is only visible when the broker is fully initialized
return state == State.Started;
}
});
this.webService.addRestResources("/", VipStatus.class.getPackage().getName(), false, vipAttributeMap);
this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false, attributeMap);
this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true, attributeMap);
Expand Down Expand Up @@ -446,11 +451,16 @@ public synchronized void brokerIsAFollowerNow() {

leaderElectionService.start();

schemaRegistryService = SchemaRegistryService.create(this);

webService.start();

this.metricsGenerator = new MetricsGenerator(this);

schemaRegistryService = SchemaRegistryService.create(this);
// By starting the Load manager service, the broker will also become visible
// to the rest of the broker by creating the registration z-node. This needs
// to be done only when the broker is fully operative.
this.startLoadManagementService();

state = State.Started;

Expand Down Expand Up @@ -810,12 +820,12 @@ public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
.authentication( //
conf.getBrokerClientAuthenticationPlugin(), //
conf.getBrokerClientAuthenticationParameters());

if (conf.isBrokerClientTlsEnabled()) {
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
}

this.adminClient = builder.build();
LOG.info("Admin api url: " + adminApiUrl);
} catch (Exception e) {
Expand Down

0 comments on commit 7efce98

Please sign in to comment.