Skip to content

Commit

Permalink
Fix NPE: namespaceService need leaderElection service (apache#3238)
Browse files Browse the repository at this point in the history
### Motivation

namespace-service uses leaderElectionService so, leaderElectionService should start before namespace-service.

```
Caused by: java.lang.NullPointerException
        at org.apache.pulsar.broker.namespace.NamespaceService.searchForCandidateBroker(NamespaceService.java:376) ~[pulsar-broker-2.2.jar]
        ... 9 more


                at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_131]
        at org.apache.pulsar.broker.namespace.NamespaceService.searchForCandidateBroker(NamespaceService.java:395) ~[pulsar-broker-2.2.jar]
        at org.apache.pulsar.broker.namespace.NamespaceService.lambda$22(NamespaceService.java:335) ~[pulsar-broker-2.2.jar]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_131]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_131]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_131]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [pulsar-functions-metrics-2.2.2-yahoo.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
```
  • Loading branch information
rdhabalia authored and sijie committed Dec 26, 2018
1 parent 1e4f769 commit ea6f366
Showing 1 changed file with 37 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ public void start() throws PulsarServerException {
// Start load management service (even if load balancing is disabled)
this.loadManager.set(LoadManager.create(this));

// Start the leader election service
startLeaderElectionService();

// needs load management service
this.startNamespaceService();

Expand Down Expand Up @@ -418,39 +421,6 @@ public Boolean get() {
// Register heartbeat and bootstrap namespaces.
this.nsservice.registerBootstrapNamespaces();

// Start the leader election service
this.leaderElectionService = new LeaderElectionService(this, new LeaderListener() {
@Override
public synchronized void brokerIsTheLeaderNow() {
if (getConfiguration().isLoadBalancerEnabled()) {
long loadSheddingInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());

loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(loadManager),
loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
}
}

@Override
public synchronized void brokerIsAFollowerNow() {
if (loadSheddingTask != null) {
loadSheddingTask.cancel(false);
loadSheddingTask = null;
}
if (loadResourceQuotaTask != null) {
loadResourceQuotaTask.cancel(false);
loadResourceQuotaTask = null;
}
}
});

leaderElectionService.start();

schemaRegistryService = SchemaRegistryService.create(this);

webService.start();
Expand Down Expand Up @@ -480,6 +450,40 @@ public synchronized void brokerIsAFollowerNow() {
}
}

private void startLeaderElectionService() {
this.leaderElectionService = new LeaderElectionService(this, new LeaderListener() {
@Override
public synchronized void brokerIsTheLeaderNow() {
if (getConfiguration().isLoadBalancerEnabled()) {
long loadSheddingInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());

loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(loadManager),
loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
}
}

@Override
public synchronized void brokerIsAFollowerNow() {
if (loadSheddingTask != null) {
loadSheddingTask.cancel(false);
loadSheddingTask = null;
}
if (loadResourceQuotaTask != null) {
loadResourceQuotaTask.cancel(false);
loadResourceQuotaTask = null;
}
}
});

leaderElectionService.start();
}

private void acquireSLANamespace() {
try {
// Namespace not created hence no need to unload it
Expand Down

0 comments on commit ea6f366

Please sign in to comment.