Skip to content

Commit

Permalink
PIP-45: Implement load-manager leader election using new Coordination…
Browse files Browse the repository at this point in the history
…Service interface (apache#9240)

* PIP-45: Implement load-manager leader election using new CoordinationService interface

* Fixed checkstyle issue

* Allow to mock the metadata instance creation

* More test fixes

* Fixed style

* Test fixes

* Cancel task if there's one already there.

* Test fixes
  • Loading branch information
merlimat authored Jan 22, 2021
1 parent 1e4c3ec commit 2fd0ce0
Show file tree
Hide file tree
Showing 14 changed files with 148 additions and 321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -74,7 +75,6 @@
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.BrokerInterceptors;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService.LeaderListener;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
Expand Down Expand Up @@ -115,6 +115,12 @@
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
import org.apache.pulsar.packages.management.core.PackagesManagement;
import org.apache.pulsar.packages.management.core.PackagesStorage;
import org.apache.pulsar.packages.management.core.PackagesStorageProvider;
Expand Down Expand Up @@ -209,6 +215,9 @@ public class PulsarService implements AutoCloseable {
private PrometheusMetricsServlet metricsServlet;
private List<PrometheusRawMetricsProvider> pendingMetricsProviders;

private MetadataStoreExtended localMetadataStore;
private CoordinationService coordinationService;


public enum State {
Init, Started, Closed
Expand Down Expand Up @@ -267,7 +276,7 @@ public PulsarService(ServiceConfiguration config,
new DefaultThreadFactory("pulsar"));
this.cacheExecutor = Executors.newScheduledThreadPool(config.getNumCacheExecutorThreadPoolSize(),
new DefaultThreadFactory("zk-cache-callback"));
}
}

/**
* Close the current pulsar service. All resources are released.
Expand Down Expand Up @@ -307,7 +316,7 @@ public void close() throws PulsarServerException {
}

if (this.leaderElectionService != null) {
this.leaderElectionService.stop();
this.leaderElectionService.close();
this.leaderElectionService = null;
}

Expand Down Expand Up @@ -378,10 +387,25 @@ public void close() throws PulsarServerException {
transactionBufferClient.close();
}

if (coordinationService != null) {
coordinationService.close();
}

if (localMetadataStore != null) {
localMetadataStore.close();
}

state = State.Closed;
isClosedCondition.signalAll();
} catch (Exception e) {
throw new PulsarServerException(e);
if (e instanceof CompletionException && e.getCause() instanceof MetadataStoreException) {
throw new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e));
} else if (e.getCause() instanceof CompletionException
&& e.getCause().getCause() instanceof MetadataStoreException) {
throw new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e.getCause()));
} else {
throw new PulsarServerException(e);
}
} finally {
mutex.unlock();
}
Expand Down Expand Up @@ -440,6 +464,10 @@ public void start() throws PulsarServerException {
throw new IllegalArgumentException("brokerServicePort/brokerServicePortTls must be present");
}

localMetadataStore = createLocalMetadataStore();

coordinationService = new CoordinationServiceImpl(localMetadataStore);

orderedExecutor = OrderedExecutor.newBuilder()
.numThreads(config.getNumOrderedExecutorThreads())
.name("pulsar-ordered")
Expand Down Expand Up @@ -646,36 +674,51 @@ public Boolean get() {
}
}

protected void startLeaderElectionService() {
this.leaderElectionService = new LeaderElectionService(this, new LeaderListener() {
@Override
public synchronized void brokerIsTheLeaderNow() {
if (getConfiguration().isLoadBalancerEnabled()) {
long loadSheddingInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());

loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(loadManager),
loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
}
}
public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
return MetadataStoreExtended.create(config.getZookeeperServers(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis((int) config.getZooKeeperSessionTimeoutMillis())
.allowReadOnlyOperations(false)
.build());
}

@Override
public synchronized void brokerIsAFollowerNow() {
if (loadSheddingTask != null) {
loadSheddingTask.cancel(false);
loadSheddingTask = null;
}
if (loadResourceQuotaTask != null) {
loadResourceQuotaTask.cancel(false);
loadResourceQuotaTask = null;
}
}
});
protected void startLeaderElectionService() {
this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
state -> {
if (state == LeaderElectionState.Leading) {
LOG.info("This broker was elected leader");
if (getConfiguration().isLoadBalancerEnabled()) {
long loadSheddingInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());

if (loadSheddingTask != null) {
loadSheddingTask.cancel(false);
}
if (loadResourceQuotaTask != null) {
loadResourceQuotaTask.cancel(false);
}
loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadSheddingTask(loadManager),
loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
}
} else {
LOG.info("This broker is a follower. Current leader is {}",
leaderElectionService.getCurrentLeader());
if (loadSheddingTask != null) {
loadSheddingTask.cancel(false);
loadSheddingTask = null;
}
if (loadResourceQuotaTask != null) {
loadResourceQuotaTask.cancel(false);
loadResourceQuotaTask = null;
}
}
});

leaderElectionService.start();
}
Expand Down Expand Up @@ -1313,6 +1356,14 @@ public Optional<Integer> getBrokerListenPortTls() {
return brokerService.getListenPortTls();
}

public MetadataStoreExtended getLocalMetadataStore() {
return localMetadataStore;
}

public CoordinationService getCoordinationService() {
return coordinationService;
}

public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfiguration brokerConfig,
String workerConfigFile) throws IOException {
WorkerConfig workerConfig = WorkerConfig.load(workerConfigFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,17 @@
*/
package org.apache.pulsar.broker.loadbalance;

import com.google.common.base.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* A class to hold the contents of the leader election node. Facilitates serialization and deserialization of the
* information that might be added for leader broker in the future.
*
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class LeaderBroker {
public final String serviceUrl;

private AtomicBoolean isLeaderReady = new AtomicBoolean(false);

// Need this default constructor for json conversion. Please do not remove this.
public LeaderBroker() {
this(null);
}

public LeaderBroker(String serviceUrl) {
this.serviceUrl = serviceUrl;
}

public String getServiceUrl() {
return this.serviceUrl;
}

@Override
public int hashCode() {
return Objects.hashCode(serviceUrl);
}

@Override
public boolean equals(Object obj) {
if (obj instanceof LeaderBroker) {
LeaderBroker other = (LeaderBroker) obj;
return Objects.equal(serviceUrl, other.serviceUrl);
}
return false;
}

public boolean isLeaderReady() {
return isLeaderReady.get();
}

public void setLeaderReady(boolean isLeaderReady) {
this.isLeaderReady.compareAndSet(!isLeaderReady, isLeaderReady);
}
private String serviceUrl;
}
Loading

0 comments on commit 2fd0ce0

Please sign in to comment.