From eb9e5fc933ea0a1f98f02c201af29634e7caaa0a Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Thu, 26 Oct 2017 14:56:02 -0700 Subject: [PATCH] Introduce auto bundle split and unloading of split bundle in ModularLoadManager (#857) --- conf/broker.conf | 3 + conf/standalone.conf | 3 + .../pulsar/broker/ServiceConfiguration.java | 14 +- .../apache/pulsar/broker/PulsarService.java | 7 +- .../pulsar/broker/TimeAverageMessageData.java | 18 +++ .../pulsar/broker/admin/Namespaces.java | 5 +- .../loadbalance/BundleSplitStrategy.java | 40 +++++ .../loadbalance/ModularLoadManager.java | 2 +- .../loadbalance/impl/BundleSplitterTask.java | 111 ++++++++++++++ .../loadbalance/impl/LoadManagerShared.java | 4 + .../impl/ModularLoadManagerImpl.java | 59 +++++++- .../impl/ModularLoadManagerWrapper.java | 7 +- .../impl/SimpleLoadManagerImpl.java | 8 +- .../broker/namespace/NamespaceService.java | 36 +++-- .../common/naming/NamespaceBundleFactory.java | 13 +- .../pulsar/broker/admin/AdminApiTest.java | 2 +- .../pulsar/broker/admin/NamespacesTest.java | 4 +- .../broker/loadbalance/LoadBalancerTest.java | 30 ++-- .../namespace/NamespaceServiceTest.java | 4 +- .../client/api/BrokerServiceLookupTest.java | 139 +++++++++++++++++- .../pulsar/client/admin/Namespaces.java | 3 +- .../client/admin/internal/NamespacesImpl.java | 6 +- .../pulsar/admin/cli/CmdNamespaces.java | 6 +- .../pulsar/admin/cli/PulsarAdminToolTest.java | 2 +- 24 files changed, 479 insertions(+), 47 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/BundleSplitStrategy.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java diff --git a/conf/broker.conf b/conf/broker.conf index b842e6e299553..4ebe8d77c8494 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -334,6 +334,9 @@ loadBalancerBrokerComfortLoadLevelPercentage=65 # enable/disable namespace bundle auto split loadBalancerAutoBundleSplitEnabled=false +# enable/disable automatic unloading of split bundles +loadBalancerAutoUnloadSplitBundlesEnabled=false + # maximum topics in a bundle, otherwise bundle split will be triggered loadBalancerNamespaceBundleMaxTopics=1000 diff --git a/conf/standalone.conf b/conf/standalone.conf index 54e951dada96f..c3705018f759a 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -306,6 +306,9 @@ loadBalancerBrokerComfortLoadLevelPercentage=65 # enable/disable namespace bundle auto split loadBalancerAutoBundleSplitEnabled=false +# enable/disable automatic unloading of split bundles +loadBalancerAutoUnloadSplitBundlesEnabled=false + # maximum topics in a bundle, otherwise bundle split will be triggered loadBalancerNamespaceBundleMaxTopics=1000 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index eb4437270fb7c..9a17b7da44f7b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -305,7 +305,11 @@ public class ServiceConfiguration implements PulsarConfiguration { // Usage threshold to defermine a broker is having just right level of load private int loadBalancerBrokerComfortLoadLevelPercentage = 65; // enable/disable automatic namespace bundle split + @FieldContext(dynamic = true) private boolean loadBalancerAutoBundleSplitEnabled = false; + // enable/disable automatic unloading of split bundles + @FieldContext(dynamic = true) + private boolean loadBalancerAutoUnloadSplitBundlesEnabled = false; // maximum topics in a bundle, otherwise bundle split will be triggered private int loadBalancerNamespaceBundleMaxTopics = 1000; // maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered @@ -1112,7 +1116,7 @@ public void setLoadBalancerBrokerComfortLoadLevelPercentage(int percentage) { this.loadBalancerBrokerComfortLoadLevelPercentage = percentage; } - public boolean getLoadBalancerAutoBundleSplitEnabled() { + public boolean isLoadBalancerAutoBundleSplitEnabled() { return this.loadBalancerAutoBundleSplitEnabled; } @@ -1120,6 +1124,14 @@ public void setLoadBalancerAutoBundleSplitEnabled(boolean enabled) { this.loadBalancerAutoBundleSplitEnabled = enabled; } + public boolean isLoadBalancerAutoUnloadSplitBundlesEnabled() { + return loadBalancerAutoUnloadSplitBundlesEnabled; + } + + public void setLoadBalancerAutoUnloadSplitBundlesEnabled(boolean loadBalancerAutoUnloadSplitBundlesEnabled) { + this.loadBalancerAutoUnloadSplitBundlesEnabled = loadBalancerAutoUnloadSplitBundlesEnabled; + } + public void setLoadBalancerNamespaceMaximumBundles(int bundles) { this.loadBalancerNamespaceMaximumBundles = bundles; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 5ac670eee1719..22dab007d5f12 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; + import java.io.IOException; import java.net.URL; import java.util.List; @@ -42,7 +44,7 @@ import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask; import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask; import org.apache.pulsar.broker.loadbalance.LoadSheddingTask; -import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; @@ -74,7 +76,6 @@ import com.google.common.collect.Lists; import io.netty.util.concurrent.DefaultThreadFactory; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; /** * Main class for Pulsar broker service @@ -428,7 +429,7 @@ private void startLoadManagementService() throws PulsarServerException { if (config.isLoadBalancerEnabled()) { LOG.info("Starting load balancer"); if (this.loadReportTask == null) { - long loadReportMinInterval = SimpleLoadManagerImpl.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL; + long loadReportMinInterval = LoadManagerShared.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL; this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate( new LoadReportUpdaterTask(loadManager), loadReportMinInterval, loadReportMinInterval, TimeUnit.MILLISECONDS); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TimeAverageMessageData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TimeAverageMessageData.java index 0055de1fb0ee5..6cba7c6d766e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TimeAverageMessageData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TimeAverageMessageData.java @@ -161,4 +161,22 @@ public double getMsgRateOut() { public void setMsgRateOut(double msgRateOut) { this.msgRateOut = msgRateOut; } + + /** + * Get the total message rate. + * + * @return Message rate in + message rate out. + */ + public double totalMsgRate() { + return msgRateIn + msgRateOut; + } + + /** + * Get the total message throughput. + * + * @return Message throughput in + message throughput out. + */ + public double totalMsgThroughput() { + return msgThroughputIn + msgThroughputOut; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java index fdb727990e0f7..2daa9ef367058 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java @@ -841,7 +841,8 @@ public void unloadNamespaceBundle(@PathParam("property") String property, @PathP @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") }) public void splitNamespaceBundle(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @QueryParam("unload") @DefaultValue("false") boolean unload) { log.info("[{}] Split namespace bundle {}/{}/{}/{}", clientAppId(), property, cluster, namespace, bundleRange); validateSuperUserAccess(); @@ -858,7 +859,7 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa true); try { - pulsar().getNamespaceService().splitAndOwnBundle(nsBundle).get(); + pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload).get(); log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString()); } catch (IllegalArgumentException e) { log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), fqnn.toString(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/BundleSplitStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/BundleSplitStrategy.java new file mode 100644 index 0000000000000..a8c158c88da28 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/BundleSplitStrategy.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance; + +import java.util.Set; + +import org.apache.pulsar.broker.PulsarService; + +/** + * Load Manager component which determines what bundles should be split into two bundles. + */ +public interface BundleSplitStrategy { + /** + * Determines which bundles, if any, should be split. + * + * @param loadData + * Load data to base decisions on (does not have benefit of preallocated data since this may not be the + * leader broker). + * @param pulsar + * Service to use. + * @return A set of the bundles that should be split. + */ + Set findBundlesToSplit(LoadData loadData, PulsarService pulsar); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java index fb2663711e084..308da9e938956 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java @@ -47,7 +47,7 @@ public interface ModularLoadManager { /** * As the leader broker, attempt to automatically detect and split hot namespace bundles. */ - void doNamespaceBundleSplit(); + void checkNamespaceBundleSplit(); /** * Initialize this load manager using the given pulsar service. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java new file mode 100644 index 0000000000000..71578a3d6f29a --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.impl; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.pulsar.broker.LocalBrokerData; +//import org.apache.pulsar.broker.MessageData; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.TimeAverageMessageData; +import org.apache.pulsar.broker.loadbalance.BundleSplitStrategy; +import org.apache.pulsar.broker.loadbalance.LoadData; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; + +/** + * Determines which bundles should be split based on various thresholds. + */ +public class BundleSplitterTask implements BundleSplitStrategy { + private static final Logger log = LoggerFactory.getLogger(BundleSplitStrategy.class); + private final Set bundleCache; + + /** + * Construct a BundleSplitterTask. + * + * @param pulsar + * Service to construct from. + */ + public BundleSplitterTask(final PulsarService pulsar) { + bundleCache = new HashSet<>(); + } + + /** + * Determines which bundles should be split based on various thresholds. + * + * @param loadData + * Load data to base decisions on (does not have benefit of preallocated data since this may not be the + * leader broker). + * @param localData + * Local data for the broker we are splitting on. + * @param pulsar + * Service to use. + * @return All bundles who have exceeded configured thresholds in number of topics, number of sessions, total + * message rates, or total throughput. + */ + @Override + public Set findBundlesToSplit(final LoadData loadData, final PulsarService pulsar) { + bundleCache.clear(); + final ServiceConfiguration conf = pulsar.getConfiguration(); + int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles(); + long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics(); + long maxBundleSessions = conf.getLoadBalancerNamespaceBundleMaxSessions(); + long maxBundleMsgRate = conf.getLoadBalancerNamespaceBundleMaxMsgRate(); + long maxBundleBandwidth = conf.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * LoadManagerShared.MIBI; + loadData.getBrokerData().forEach((broker, brokerData) -> { + LocalBrokerData localData = brokerData.getLocalData(); + for (final Map.Entry entry : localData.getLastStats().entrySet()) { + final String bundle = entry.getKey(); + final NamespaceBundleStats stats = entry.getValue(); + double totalMessageRate = 0; + double totalMessageThroughput = 0; + // Attempt to consider long-term message data, otherwise effectively ignore. + if (loadData.getBundleData().containsKey(bundle)) { + final TimeAverageMessageData longTermData = loadData.getBundleData().get(bundle).getLongTermData(); + totalMessageRate = longTermData.totalMsgRate(); + totalMessageThroughput = longTermData.totalMsgThroughput(); + } + if (stats.topics > maxBundleTopics || stats.consumerCount + stats.producerCount > maxBundleSessions + || totalMessageRate > maxBundleMsgRate || totalMessageThroughput > maxBundleBandwidth) { + final String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundle); + try { + final int bundleCount = pulsar.getNamespaceService() + .getBundleCount(new NamespaceName(namespace)); + if (bundleCount < maxBundleCount) { + bundleCache.add(bundle); + } else { + log.warn( + "Could not split namespace bundle {} because namespace {} has too many bundles: {}", + bundle, namespace, bundleCount); + } + } catch (Exception e) { + log.warn("Error while getting bundle count for namespace {}", namespace, e); + } + } + } + }); + return bundleCache; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java index 91733a8c3da68..9e290cc1098d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.admin.AdminResource; @@ -52,6 +53,9 @@ public class LoadManagerShared { // Cache for shard brokers according to policies. private static final Set sharedCache = new HashSet<>(); + + // update LoadReport at most every 5 seconds + public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5); // Don't allow construction: static method namespace only. private LoadManagerShared() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 5b74980616118..c33e1763e14a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -53,6 +53,8 @@ import org.apache.pulsar.broker.loadbalance.ModularLoadManager; import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate; +import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.policies.data.ResourceQuota; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -70,6 +72,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.pulsar.broker.loadbalance.BundleSplitStrategy; + import org.apache.zookeeper.KeeperException.NoNodeException; import io.netty.util.concurrent.DefaultThreadFactory; @@ -119,6 +123,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach // Path to the ZNode containing the LocalBrokerData json for this broker. private String brokerZnodePath; + // Strategy to use for splitting bundles. + private BundleSplitStrategy bundleSplitStrategy; + // Service configuration belonging to the pulsar service. private ServiceConfiguration conf; @@ -236,6 +243,8 @@ public LocalBrokerData deserialize(String key, byte[] content) throws Exception brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar); } + bundleSplitStrategy = new BundleSplitterTask(pulsar); + conf = pulsar.getConfiguration(); // Initialize the default stats to assume for unseen bundles (hard-coded for now). @@ -416,6 +425,8 @@ private boolean needBrokerDataUpdate() { private void updateAll() { updateAllBrokerData(); updateBundleData(); + // broker has latest load-report: check if any bundle requires split + checkNamespaceBundleSplit(); } // As the leader broker, update the broker data map in loadData by querying ZooKeeper for the broker data put there @@ -580,8 +591,41 @@ public synchronized void doLoadShedding() { * As the leader broker, attempt to automatically detect and split hot namespace bundles. */ @Override - public void doNamespaceBundleSplit() { - // TODO? + public void checkNamespaceBundleSplit() { + + if (!conf.isLoadBalancerAutoBundleSplitEnabled() || pulsar.getLeaderElectionService() == null + || !pulsar.getLeaderElectionService().isLeader()) { + return; + } + final boolean unloadSplitBundles = pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(); + synchronized (bundleSplitStrategy) { + final Set bundlesToBeSplit = bundleSplitStrategy.findBundlesToSplit(loadData, pulsar); + NamespaceBundleFactory namespaceBundleFactory = pulsar.getNamespaceService().getNamespaceBundleFactory(); + for (String bundleName : bundlesToBeSplit) { + try { + final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundleName); + final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundleName); + if (!namespaceBundleFactory + .canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) { + continue; + } + log.info("Load-manager splitting budnle {} and unloading {}", bundleName, unloadSplitBundles); + pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, + unloadSplitBundles); + // Make sure the same bundle is not selected again. + loadData.getBundleData().remove(bundleName); + localData.getLastStats().remove(bundleName); + // Clear namespace bundle-cache + this.pulsar.getNamespaceService().getNamespaceBundleFactory() + .invalidateBundleCache(new NamespaceName(namespaceName)); + deleteBundleDataFromZookeeper(bundleName); + log.info("Successfully split namespace bundle {}", bundleName); + } catch (Exception e) { + log.error("Failed to split namespace bundle {}", bundleName, e); + } + } + } + } /** @@ -786,4 +830,15 @@ public void writeBundleDataOnZooKeeper() { } } } + + private void deleteBundleDataFromZookeeper(String bundle) { + final String zooKeeperPath = getBundleDataZooKeeperPath(bundle); + try { + if (zkClient.exists(zooKeeperPath, null) != null) { + zkClient.delete(zooKeeperPath, -1); + } + } catch (Exception e) { + log.warn("Failed to delete bundle-data {} from zookeeper", bundle, e); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java index c132c6996d62e..a1f831848ab30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.loadbalance.impl; -import java.io.IOException; import java.util.Collections; import java.util.List; @@ -55,7 +54,7 @@ public void doLoadShedding() { @Override public void doNamespaceBundleSplit() { - loadManager.doNamespaceBundleSplit(); + loadManager.checkNamespaceBundleSplit(); } @Override @@ -114,4 +113,8 @@ public void writeResourceQuotasToZooKeeper() { public Deserializer getLoadReportDeserializer() { return loadManager.getLoadReportDeserializer(); } + + public ModularLoadManager getLoadManager() { + return loadManager; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 1e473ea341bbc..d48fd590f9a5e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -47,6 +47,7 @@ import org.apache.pulsar.broker.loadbalance.PlacementStrategy; import org.apache.pulsar.broker.loadbalance.ResourceUnit; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate; +import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.policies.data.ResourceQuota; @@ -172,8 +173,6 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene private ZooKeeperChildrenCache availableActiveBrokers; private static final long MBytes = 1024 * 1024; - // update LoadReport at most every 5 seconds - public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5); // last LoadReport stored in ZK private volatile LoadReport lastLoadReport; // last timestamp resource usage was checked @@ -449,7 +448,7 @@ private long getLoadBalancerBrokerComfortLoadThresholdPercentage() { private boolean getLoadBalancerAutoBundleSplitEnabled() { return this.getDynamicConfigurationBoolean(LOADBALANCER_DYNAMIC_SETTING_AUTO_BUNDLE_SPLIT_ENABLED, SETTING_NAME_AUTO_BUNDLE_SPLIT_ENABLED, - pulsar.getConfiguration().getLoadBalancerAutoBundleSplitEnabled()); + pulsar.getConfiguration().isLoadBalancerAutoBundleSplitEnabled()); } /* @@ -1463,7 +1462,8 @@ public void doNamespaceBundleSplit() throws Exception { try { pulsar.getAdminClient().namespaces().splitNamespaceBundle( LoadManagerShared.getNamespaceNameFromBundleName(bundleName), - LoadManagerShared.getBundleRangeFromBundleName(bundleName)); + LoadManagerShared.getBundleRangeFromBundleName(bundleName), + pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled()); log.info("Successfully split namespace bundle {}", bundleName); } catch (Exception e) { log.error("Failed to split namespace bundle {}", bundleName, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index fef11d5036a7f..8e51112629759 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -41,6 +41,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; + import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; @@ -552,11 +553,11 @@ public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exceptio * @return * @throws Exception */ - public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle) throws Exception { + public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle, final boolean unload) throws Exception { - final CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture unloadFuture = new CompletableFuture<>(); - Pair> splittedBundles = bundleFactory.splitBundles(bundle, + final Pair> splittedBundles = bundleFactory.splitBundles(bundle, 2 /* by default split into 2 */); if (splittedBundles != null) { checkNotNull(splittedBundles.getLeft()); @@ -580,34 +581,49 @@ public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle) throws // update bundled_topic cache for load-report-generation pulsar.getBrokerService().refreshTopicToStatsMaps(bundle); loadManager.get().setLoadReportForceUpdateFlag(); - future.complete(null); + unloadFuture.complete(null); } catch (Exception e) { String msg1 = format( "failed to disable bundle %s under namespace [%s] with error %s", nsname.toString(), bundle.toString(), e.getMessage()); LOG.warn(msg1, e); - future.completeExceptionally(new ServiceUnitNotReadyException(msg1)); + unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1)); } } else { String msg2 = format("failed to update namespace [%s] policies due to %s", nsname.toString(), KeeperException.create(KeeperException.Code.get(rc)).getMessage()); LOG.warn(msg2); - future.completeExceptionally(new ServiceUnitNotReadyException(msg2)); + unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2)); } }))); } catch (Exception e) { String msg = format("failed to aquire ownership of split bundle for namespace [%s], %s", nsname.toString(), e.getMessage()); LOG.warn(msg, e); - future.completeExceptionally(new ServiceUnitNotReadyException(msg)); + unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); } } else { String msg = format("bundle %s not found under namespace", bundle.toString()); - future.completeExceptionally(new ServiceUnitNotReadyException(msg)); + unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); } - return future; + + return unloadFuture.thenApply(res -> { + if (!unload) { + return null; + } + // unload new split bundles + splittedBundles.getRight().forEach(splitBundle -> { + try { + unloadNamespaceBundle(splitBundle); + } catch (Exception e) { + LOG.warn("Failed to unload split bundle {}", splitBundle, e); + throw new RuntimeException("Failed to unload split bundle " + splitBundle, e); + } + }); + return null; + }); } /** @@ -634,6 +650,8 @@ private void updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBun policies.get().bundles = getBundlesData(nsBundles); this.pulsar.getLocalZkCache().getZooKeeper().setData(path, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies.get()), -1, callback, null); + // invalidate namespace's local-policies + this.pulsar.getLocalZkCacheService().policiesCache().invalidate(path); } public OwnershipCache getOwnershipCache() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index 94d6859f64072..996a245a7a146 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -50,6 +50,7 @@ import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Splitter; +import com.google.common.collect.BoundType; import com.google.common.collect.Range; import com.google.common.hash.HashFunction; @@ -147,6 +148,16 @@ public NamespaceBundle getBundle(NamespaceName nsname, Range hashRange) { return new NamespaceBundle(nsname, hashRange, this); } + public NamespaceBundle getBundle(String namespace, String bundleRange) { + checkArgument(bundleRange.contains("_"), "Invalid bundle range"); + String[] boundaries = bundleRange.split("_"); + Long lowerEndpoint = Long.decode(boundaries[0]); + Long upperEndpoint = Long.decode(boundaries[1]); + Range hashRange = Range.range(lowerEndpoint, BoundType.CLOSED, upperEndpoint, + (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN); + return getBundle(new NamespaceName(namespace), hashRange); + } + public NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception { return bundlesCache.synchronous().get(fqnn).getFullBundle(); } @@ -231,7 +242,7 @@ public Pair> splitBundles(NamespaceBundl return null; } - private boolean canSplitBundle(NamespaceBundle bundle) { + public boolean canSplitBundle(NamespaceBundle bundle) { Range range = bundle.getKeyRange(); return range.upperEndpoint() - range.lowerEndpoint() > 1; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 25c76904e04e9..86b83a9f7b781 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -885,7 +885,7 @@ public void testNamespaceSplitBundle() throws Exception { assertEquals(admin.persistentTopics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff"); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index e737a9d5ce69b..0158722ca6145 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -847,7 +847,7 @@ public void testSplitBundles() throws Exception { // split bundles try { namespaces.splitNamespaceBundle(testProperty, testLocalCluster, bundledNsLocal, "0x00000000_0xffffffff", - false); + false, true); // verify split bundles BundlesData bundlesData = namespaces.getBundlesData(testProperty, testLocalCluster, bundledNsLocal); assertNotNull(bundlesData); @@ -879,7 +879,7 @@ public void testSplitBundleWithUnDividedRange() throws Exception { // split bundles try { namespaces.splitNamespaceBundle(testProperty, testLocalCluster, bundledNsLocal, "0x08375b1a_0x08375b1b", - false); + false, false); } catch (RestException re) { assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index e04ecbd27ad57..b99fad26b0e32 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -668,16 +668,26 @@ public void testNamespaceBundleAutoSplit() throws Exception { pulsarServices[0].getLoadManager().get().doNamespaceBundleSplit(); // verify bundles are split - verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-01", "0x00000000_0x80000000"); - verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-02", "0x00000000_0x80000000"); - verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-03", "0x00000000_0x80000000"); - verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-04", "0x00000000_0x80000000"); - verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-05", "0x00000000_0x80000000"); - verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-06", "0x00000000_0x80000000"); - verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-07", "0x00000000_0x80000000"); - verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-08", "0x00000000_0x80000000"); - verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-09", "0x00000000_0x80000000"); - verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-10", "0x00000000_0x02000000"); + verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-01", "0x00000000_0x80000000", + false); + verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-02", "0x00000000_0x80000000", + false); + verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-03", "0x00000000_0x80000000", + false); + verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-04", "0x00000000_0x80000000", + false); + verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-05", "0x00000000_0x80000000", + false); + verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-06", "0x00000000_0x80000000", + false); + verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-07", "0x00000000_0x80000000", + false); + verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-08", "0x00000000_0x80000000", + false); + verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-09", "0x00000000_0x80000000", + false); + verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-10", "0x00000000_0x02000000", + false); } /* diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 857104ab6b69b..acb0785696d3c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -112,7 +112,7 @@ public void testSplitAndOwnBundles() throws Exception { NamespaceBundle originalBundle = bundles.findBundle(dn); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false); try { result.get(); @@ -192,7 +192,7 @@ public void testSplitMapWithRefreshedStatMap() throws Exception { assertNotNull(list); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false); try { result.get(); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index f57bcea65af20..229d584444a63 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.URI; import java.net.URL; import java.net.URLConnection; @@ -37,6 +38,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -52,7 +54,10 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; +import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.LoadManager; +import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.api.Authentication; @@ -814,7 +819,7 @@ public void testSplitUnloadLookupTest() throws Exception { Assert.assertEquals(bundleInBroker2.toString(), unsplitBundle); // (5) Split the bundle for topic-1 - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff"); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true); // (6) Broker-2 should get the watch and update bundle cache final int retry = 5; @@ -828,7 +833,7 @@ public void testSplitUnloadLookupTest() throws Exception { // (7) Make lookup request again to Broker-2 which should succeed. final String topic2 = "persistent://" + namespace + "/topic2"; - Consumer consumer2 = pulsarClient2.subscribe(topic2, "my-subscriber-name", new ConsumerConfiguration()); + Consumer consumer2 = pulsarClient.subscribe(topic2, "my-subscriber-name", new ConsumerConfiguration()); NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService() .getBundle(DestinationName.get(topic2)); @@ -840,6 +845,136 @@ public void testSplitUnloadLookupTest() throws Exception { pulsar2.close(); } + + /** + * + *
+     * When broker-1's Modular-load-manager splits the bundle and update local-policies, broker-2 should get watch of
+     * local-policies and update bundleCache so, new lookup can be redirected properly.
+     * 
+     * (1) Start broker-1 and broker-2
+     * (2) Make sure broker-2 always assign bundle to broker1
+     * (3) Broker-2 receives topic-1 request, creates local-policies and sets the watch
+     * (4) Broker-1 will own topic-1
+     * (5) Broker-2 will be a leader and trigger Split the bundle for topic-1
+     * (6) Broker-2 should get the watch and update bundle cache
+     * (7) Make lookup request again to Broker-2 which should succeed.
+     * 
+     * 
+ * + * @throws Exception + */ + @Test(timeOut = 5000) + public void testModularLoadManagerSplitBundle() throws Exception { + + log.info("-- Starting {} test --", methodName); + final String loadBalancerName = conf.getLoadManagerClassName(); + + try { + final String namespace = "my-property/use/my-ns"; + // (1) Start broker-1 + ServiceConfiguration conf2 = new ServiceConfiguration(); + conf2.setBrokerServicePort(PortManager.nextFreePort()); + conf2.setBrokerServicePortTls(PortManager.nextFreePort()); + conf2.setWebServicePort(PortManager.nextFreePort()); + conf2.setWebServicePortTls(PortManager.nextFreePort()); + conf2.setAdvertisedAddress("localhost"); + conf2.setClusterName(conf.getClusterName()); + conf2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + PulsarService pulsar2 = startBroker(conf2); + + // configure broker-1 with ModularLoadlManager + stopBroker(); + conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + startBroker(); + + pulsar.getLoadManager().get().writeLoadReportOnZookeeper(); + pulsar2.getLoadManager().get().writeLoadReportOnZookeeper(); + + LoadManager loadManager1 = spy(pulsar.getLoadManager().get()); + LoadManager loadManager2 = spy(pulsar2.getLoadManager().get()); + Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager"); + loadManagerField.setAccessible(true); + + // (2) Make sure broker-2 always assign bundle to broker1 + // mock: redirect request to leader [2] + doReturn(true).when(loadManager2).isCentralized(); + loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2)); + // mock: return Broker1 as a Least-loaded broker when leader receies request [3] + doReturn(true).when(loadManager1).isCentralized(); + SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null); + doReturn(resourceUnit).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class)); + loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1)); + + URI broker2ServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort()); + PulsarClient pulsarClient2 = PulsarClient.create(broker2ServiceUrl.toString(), new ClientConfiguration()); + + // (3) Broker-2 receives topic-1 request, creates local-policies and sets the watch + final String topic1 = "persistent://" + namespace + "/topic1"; + Consumer consumer1 = pulsarClient2.subscribe(topic1, "my-subscriber-name", new ConsumerConfiguration()); + + Set serviceUnits1 = pulsar.getNamespaceService().getOwnedServiceUnits().stream() + .map(nb -> nb.toString()).collect(Collectors.toSet()); + + // (4) Broker-1 will own topic-1 + final String unsplitBundle = namespace + "/0x00000000_0xffffffff"; + Assert.assertTrue(serviceUnits1.contains(unsplitBundle)); + // broker-2 should have this bundle into the cache + DestinationName destination = DestinationName.get(topic1); + NamespaceBundle bundleInBroker2 = pulsar2.getNamespaceService().getBundle(destination); + Assert.assertEquals(bundleInBroker2.toString(), unsplitBundle); + + // update broker-1 bundle report to zk + pulsar.getBrokerService().updateRates(); + pulsar.getLoadManager().get().writeLoadReportOnZookeeper(); + // this will create znode for bundle-data + pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper(); + pulsar2.getLoadManager().get().writeLoadReportOnZookeeper(); + + // (5) Modular-load-manager will split the bundle due to max-topic threshold reached + Field leaderField = LeaderElectionService.class.getDeclaredField("isLeader"); + Method updateAllMethod = ModularLoadManagerImpl.class.getDeclaredMethod("updateAll"); + updateAllMethod.setAccessible(true); + leaderField.setAccessible(true); + AtomicBoolean isLeader = (AtomicBoolean) leaderField.get(pulsar2.getLeaderElectionService()); + isLeader.set(true); + ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) ((ModularLoadManagerWrapper) pulsar2 + .getLoadManager().get()).getLoadManager(); + // broker-2 loadManager is a leader and let it refresh load-report from all the brokers + updateAllMethod.invoke(loadManager); + conf2.setLoadBalancerAutoBundleSplitEnabled(true); + conf2.setLoadBalancerAutoUnloadSplitBundlesEnabled(true); + conf2.setLoadBalancerNamespaceBundleMaxTopics(0); + loadManager.checkNamespaceBundleSplit(); + + // (6) Broker-2 should get the watch and update bundle cache + final int retry = 5; + for (int i = 0; i < retry; i++) { + if (pulsar2.getNamespaceService().getBundle(destination).equals(bundleInBroker2) && i != retry - 1) { + Thread.sleep(200); + } else { + break; + } + } + + // (7) Make lookup request again to Broker-2 which should succeed. + final String topic2 = "persistent://" + namespace + "/topic2"; + Consumer consumer2 = pulsarClient.subscribe(topic2, "my-subscriber-name", new ConsumerConfiguration()); + + NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService() + .getBundle(DestinationName.get(topic2)); + Assert.assertFalse(bundleInBroker1AfterSplit.equals(unsplitBundle)); + + consumer1.close(); + consumer2.close(); + pulsarClient2.close(); + pulsar2.close(); + } finally { + conf.setLoadManagerClassName(loadBalancerName); + } + + } + /**** helper classes ****/ public static class MockAuthenticationProvider implements AuthenticationProvider { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 8c5a82177cd39..afc7d1db1f241 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -724,10 +724,11 @@ public interface Namespaces { * * @param namespace * @param range of bundle to split + * @param unload newly split bundles from the broker * @throws PulsarAdminException * Unexpected error */ - void splitNamespaceBundle(String namespace, String bundle) throws PulsarAdminException; + void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles) throws PulsarAdminException; /** * Set message-dispatch-rate (topics under this namespace can dispatch this many messages per second) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index a78134431f215..35beb0a7dcab1 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -355,11 +355,13 @@ public void unloadNamespaceBundle(String namespace, String bundle) throws Pulsar } @Override - public void splitNamespaceBundle(String namespace, String bundle) throws PulsarAdminException { + public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles) + throws PulsarAdminException { try { NamespaceName ns = new NamespaceName(namespace); request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path(bundle) - .path("split")).put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + .path("split").queryParam("unload", Boolean.toString(unloadSplitBundles))) + .put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); } catch (Exception e) { throw getApiException(e); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 0861d9e4b9bb2..9977e635fdab9 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -308,11 +308,15 @@ private class SplitBundle extends CliCommand { @Parameter(names = { "--bundle", "-b" }, description = "{start-boundary}_{end-boundary}\n", required = true) private String bundle; + + @Parameter(names = { "--unload", + "-u" }, description = "Unload newly split bundles after splitting old bundle", required = false) + private boolean unload; @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - admin.namespaces().splitNamespaceBundle(namespace, bundle); + admin.namespaces().splitNamespaceBundle(namespace, bundle, unload); } } diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 9f10723ee7134..7725d2b9104f0 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -212,7 +212,7 @@ void namespaces() throws Exception { verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff"); namespaces.run(split("split-bundle myprop/clust/ns1 -b 0x00000000_0xffffffff")); - verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff"); + verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false); namespaces.run(split("get-backlog-quotas myprop/clust/ns1")); verify(mockNamespaces).getBacklogQuotaMap("myprop/clust/ns1");