Skip to content

Commit

Permalink
ModularLoadManager should not attempt to update the load report on ZK…
Browse files Browse the repository at this point in the history
… if we're not connected (apache#12191)
  • Loading branch information
merlimat authored Oct 4, 2021
1 parent dd0660d commit 908a073
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.BrokerData;
Expand Down Expand Up @@ -81,13 +80,14 @@
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ModularLoadManagerImpl implements ModularLoadManager, Consumer<Notification> {
public class ModularLoadManagerImpl implements ModularLoadManager {
private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImpl.class);

// Path to ZNode whose children contain BundleData jsons for each bundle (new API version of ResourceQuota).
Expand Down Expand Up @@ -182,6 +182,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, Consumer<Noti

private Map<String, String> brokerToFailureDomainMap;

private SessionEvent lastMetadataSessionEvent = SessionEvent.Reconnected;

// record load balancing metrics
private AtomicReference<List<Metrics>> loadBalancingMetrics = new AtomicReference<>();
// record bundle unload metrics
Expand Down Expand Up @@ -239,7 +241,8 @@ public void initialize(final PulsarService pulsar) {
bundlesCache = pulsar.getLocalMetadataStore().getMetadataCache(BundleData.class);
resourceQuotaCache = pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
timeAverageBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(TimeAverageBrokerData.class);
pulsar.getLocalMetadataStore().registerListener(this);
pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);

if (SystemUtils.IS_OS_LINUX) {
brokerHostUsage = new LinuxBrokerHostUsageImpl(pulsar);
Expand Down Expand Up @@ -271,8 +274,7 @@ public void initialize(final PulsarService pulsar) {
loadSheddingPipeline.add(createLoadSheddingStrategy());
}

@Override
public void accept(Notification t) {
public void handleDataNotification(Notification t) {
if (t.getPath().startsWith(LoadManager.LOADBALANCE_BROKERS_ROOT)) {
brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT)
.thenAccept(brokers -> {
Expand All @@ -287,6 +289,10 @@ public void accept(Notification t) {
}
}

private void handleMetadataSessionEvent(SessionEvent e) {
lastMetadataSessionEvent = e;
}

private LoadSheddingStrategy createLoadSheddingStrategy() {
try {
Class<?> loadSheddingClass = Class.forName(conf.getLoadBalancerLoadSheddingStrategy());
Expand Down Expand Up @@ -956,7 +962,11 @@ public void writeBrokerDataOnZooKeeper(boolean force) {
lock.lock();
try {
updateLocalBrokerData();
if (needBrokerDataUpdate() || force) {

// Do not attempt to write if not connected
if (lastMetadataSessionEvent != null
&& lastMetadataSessionEvent.isConnected()
&& (needBrokerDataUpdate() || force)) {
localData.setLastUpdate(System.currentTimeMillis());

brokerDataLock.updateValue(localData).join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public void testLoadShedding() throws Exception {
when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
brokerDataMap.put(primaryHost, brokerDataSpy1);
// Need to update all the bundle data for the shredder to see the spy.
primaryLoadManager.accept(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080"));
primaryLoadManager.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080"));

Thread.sleep(100);
localBrokerData.setCpu(new ResourceUsage(80, 100));
Expand Down

0 comments on commit 908a073

Please sign in to comment.