Skip to content

Commit

Permalink
When the loadmanager leader is not available, fall through regular le…
Browse files Browse the repository at this point in the history
…ast loaded selection (apache#3688)

* When the loadmanager leader is not available, fall through regular least loaded selection

* Handle exceptions coming from mock zk in tests
  • Loading branch information
merlimat authored Mar 1, 2019
1 parent 1f376e1 commit ccfb949
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
Expand Down Expand Up @@ -179,6 +180,22 @@ public void stop() {
if (stopped) {
return;
}

if (isLeader()) {
// Make sure to remove the leader election z-node in case the session doesn't
// get closed properly. This is to avoid having to wait the session timeout
// to elect a new one.
// This delete operation is safe to do here (with version=-1) because either:
// 1. The ZK session is still valid, in which case this broker is still
// the "leader" and we have to remove the z-node
// 2. The session has already expired, in which case this delete operation
// will not go through
try {
pulsar.getLocalZkCache().getZooKeeper().delete(ELECTION_ROOT, -1);
} catch (Throwable t) {
log.warn("Failed to cleanup election root znode: {}", t);
}
}
stopped = true;
log.info("LeaderElectionService stopped");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,12 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}

if (candidateBroker == null) {
if (!this.loadManager.get().isCentralized() || pulsar.getLeaderElectionService().isLeader()) {
if (!this.loadManager.get().isCentralized()
|| pulsar.getLeaderElectionService().isLeader()

// If leader is not active, fallback to pick the least loaded from current broker loadmanager
|| !isBrokerActive(pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl())
) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
lookupFuture.complete(Optional.empty());
Expand Down Expand Up @@ -977,7 +982,7 @@ public static String getHeartbeatNamespace(String host, ServiceConfiguration con
port = config.getWebServicePort().get();
} else if (config.getWebServicePortTls().isPresent()) {
port = config.getWebServicePortTls().get();
}
}
return String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), host, port);
}
public static String getSLAMonitorNamespace(String host, ServiceConfiguration config) {
Expand All @@ -986,7 +991,7 @@ public static String getSLAMonitorNamespace(String host, ServiceConfiguration co
port = config.getWebServicePort().get();
} else if (config.getWebServicePortTls().isPresent()) {
port = config.getWebServicePortTls().get();
}
}
return String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host, port);
}

Expand Down

0 comments on commit ccfb949

Please sign in to comment.