Skip to content

Commit

Permalink
Introduce auto bundle split and unloading of split bundle in ModularL…
Browse files Browse the repository at this point in the history
…oadManager (apache#857)
  • Loading branch information
rdhabalia authored and merlimat committed Oct 26, 2017
1 parent 8448198 commit eb9e5fc
Show file tree
Hide file tree
Showing 24 changed files with 479 additions and 47 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ loadBalancerBrokerComfortLoadLevelPercentage=65
# enable/disable namespace bundle auto split
loadBalancerAutoBundleSplitEnabled=false

# enable/disable automatic unloading of split bundles
loadBalancerAutoUnloadSplitBundlesEnabled=false

# maximum topics in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxTopics=1000

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,9 @@ loadBalancerBrokerComfortLoadLevelPercentage=65
# enable/disable namespace bundle auto split
loadBalancerAutoBundleSplitEnabled=false

# enable/disable automatic unloading of split bundles
loadBalancerAutoUnloadSplitBundlesEnabled=false

# maximum topics in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxTopics=1000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Usage threshold to defermine a broker is having just right level of load
private int loadBalancerBrokerComfortLoadLevelPercentage = 65;
// enable/disable automatic namespace bundle split
@FieldContext(dynamic = true)
private boolean loadBalancerAutoBundleSplitEnabled = false;
// enable/disable automatic unloading of split bundles
@FieldContext(dynamic = true)
private boolean loadBalancerAutoUnloadSplitBundlesEnabled = false;
// maximum topics in a bundle, otherwise bundle split will be triggered
private int loadBalancerNamespaceBundleMaxTopics = 1000;
// maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered
Expand Down Expand Up @@ -1112,14 +1116,22 @@ public void setLoadBalancerBrokerComfortLoadLevelPercentage(int percentage) {
this.loadBalancerBrokerComfortLoadLevelPercentage = percentage;
}

public boolean getLoadBalancerAutoBundleSplitEnabled() {
public boolean isLoadBalancerAutoBundleSplitEnabled() {
return this.loadBalancerAutoBundleSplitEnabled;
}

public void setLoadBalancerAutoBundleSplitEnabled(boolean enabled) {
this.loadBalancerAutoBundleSplitEnabled = enabled;
}

public boolean isLoadBalancerAutoUnloadSplitBundlesEnabled() {
return loadBalancerAutoUnloadSplitBundlesEnabled;
}

public void setLoadBalancerAutoUnloadSplitBundlesEnabled(boolean loadBalancerAutoUnloadSplitBundlesEnabled) {
this.loadBalancerAutoUnloadSplitBundlesEnabled = loadBalancerAutoUnloadSplitBundlesEnabled;
}

public void setLoadBalancerNamespaceMaximumBundles(int bundles) {
this.loadBalancerNamespaceMaximumBundles = bundles;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import java.io.IOException;
import java.net.URL;
import java.util.List;
Expand All @@ -42,7 +44,7 @@
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -74,7 +76,6 @@
import com.google.common.collect.Lists;

import io.netty.util.concurrent.DefaultThreadFactory;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

/**
* Main class for Pulsar broker service
Expand Down Expand Up @@ -428,7 +429,7 @@ private void startLoadManagementService() throws PulsarServerException {
if (config.isLoadBalancerEnabled()) {
LOG.info("Starting load balancer");
if (this.loadReportTask == null) {
long loadReportMinInterval = SimpleLoadManagerImpl.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
long loadReportMinInterval = LoadManagerShared.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate(
new LoadReportUpdaterTask(loadManager), loadReportMinInterval, loadReportMinInterval,
TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,22 @@ public double getMsgRateOut() {
public void setMsgRateOut(double msgRateOut) {
this.msgRateOut = msgRateOut;
}

/**
* Get the total message rate.
*
* @return Message rate in + message rate out.
*/
public double totalMsgRate() {
return msgRateIn + msgRateOut;
}

/**
* Get the total message throughput.
*
* @return Message throughput in + message throughput out.
*/
public double totalMsgThroughput() {
return msgThroughputIn + msgThroughputOut;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,8 @@ public void unloadNamespaceBundle(@PathParam("property") String property, @PathP
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void splitNamespaceBundle(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload) {
log.info("[{}] Split namespace bundle {}/{}/{}/{}", clientAppId(), property, cluster, namespace, bundleRange);

validateSuperUserAccess();
Expand All @@ -858,7 +859,7 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa
true);

try {
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle).get();
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload).get();
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
} catch (IllegalArgumentException e) {
log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), fqnn.toString(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance;

import java.util.Set;

import org.apache.pulsar.broker.PulsarService;

/**
* Load Manager component which determines what bundles should be split into two bundles.
*/
public interface BundleSplitStrategy {
/**
* Determines which bundles, if any, should be split.
*
* @param loadData
* Load data to base decisions on (does not have benefit of preallocated data since this may not be the
* leader broker).
* @param pulsar
* Service to use.
* @return A set of the bundles that should be split.
*/
Set<String> findBundlesToSplit(LoadData loadData, PulsarService pulsar);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public interface ModularLoadManager {
/**
* As the leader broker, attempt to automatically detect and split hot namespace bundles.
*/
void doNamespaceBundleSplit();
void checkNamespaceBundleSplit();

/**
* Initialize this load manager using the given pulsar service.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.pulsar.broker.LocalBrokerData;
//import org.apache.pulsar.broker.MessageData;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.BundleSplitStrategy;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;

/**
* Determines which bundles should be split based on various thresholds.
*/
public class BundleSplitterTask implements BundleSplitStrategy {
private static final Logger log = LoggerFactory.getLogger(BundleSplitStrategy.class);
private final Set<String> bundleCache;

/**
* Construct a BundleSplitterTask.
*
* @param pulsar
* Service to construct from.
*/
public BundleSplitterTask(final PulsarService pulsar) {
bundleCache = new HashSet<>();
}

/**
* Determines which bundles should be split based on various thresholds.
*
* @param loadData
* Load data to base decisions on (does not have benefit of preallocated data since this may not be the
* leader broker).
* @param localData
* Local data for the broker we are splitting on.
* @param pulsar
* Service to use.
* @return All bundles who have exceeded configured thresholds in number of topics, number of sessions, total
* message rates, or total throughput.
*/
@Override
public Set<String> findBundlesToSplit(final LoadData loadData, final PulsarService pulsar) {
bundleCache.clear();
final ServiceConfiguration conf = pulsar.getConfiguration();
int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles();
long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();
long maxBundleSessions = conf.getLoadBalancerNamespaceBundleMaxSessions();
long maxBundleMsgRate = conf.getLoadBalancerNamespaceBundleMaxMsgRate();
long maxBundleBandwidth = conf.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * LoadManagerShared.MIBI;
loadData.getBrokerData().forEach((broker, brokerData) -> {
LocalBrokerData localData = brokerData.getLocalData();
for (final Map.Entry<String, NamespaceBundleStats> entry : localData.getLastStats().entrySet()) {
final String bundle = entry.getKey();
final NamespaceBundleStats stats = entry.getValue();
double totalMessageRate = 0;
double totalMessageThroughput = 0;
// Attempt to consider long-term message data, otherwise effectively ignore.
if (loadData.getBundleData().containsKey(bundle)) {
final TimeAverageMessageData longTermData = loadData.getBundleData().get(bundle).getLongTermData();
totalMessageRate = longTermData.totalMsgRate();
totalMessageThroughput = longTermData.totalMsgThroughput();
}
if (stats.topics > maxBundleTopics || stats.consumerCount + stats.producerCount > maxBundleSessions
|| totalMessageRate > maxBundleMsgRate || totalMessageThroughput > maxBundleBandwidth) {
final String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
try {
final int bundleCount = pulsar.getNamespaceService()
.getBundleCount(new NamespaceName(namespace));
if (bundleCount < maxBundleCount) {
bundleCache.add(bundle);
} else {
log.warn(
"Could not split namespace bundle {} because namespace {} has too many bundles: {}",
bundle, namespace, bundleCount);
}
} catch (Exception e) {
log.warn("Error while getting bundle count for namespace {}", namespace, e);
}
}
}
});
return bundleCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
Expand All @@ -52,6 +53,9 @@ public class LoadManagerShared {

// Cache for shard brokers according to policies.
private static final Set<String> sharedCache = new HashSet<>();

// update LoadReport at most every 5 seconds
public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);

// Don't allow construction: static method namespace only.
private LoadManagerShared() {
Expand Down
Loading

0 comments on commit eb9e5fc

Please sign in to comment.