Skip to content

Commit

Permalink
fix misspelling and adjust code style (apache#5497)
Browse files Browse the repository at this point in the history
Signed-off-by: KevenYLi <[email protected]>
  • Loading branch information
kevenYLi authored and aahmed-se committed Oct 30, 2019
1 parent 01ca24b commit 96f14d2
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public interface LoadManager {

String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";

public void start() throws PulsarServerException;
void start() throws PulsarServerException;

/**
* Is centralized decision making to assign a new bundle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,9 @@ public static void applyNamespacePolicies(final ServiceUnitId serviceUnit,
log.error("Unable to parse brokerUrl from ResourceUnitId - [{}]", e);
continue;
}
// todo: in future check if the resource unit has resources to take
// the namespace
// todo: in future check if the resource unit has resources to take the namespace
if (isIsolationPoliciesPresent) {
// note: serviceUnitID is namespace name and ResourceID is
// brokerName
// note: serviceUnitID is namespace name and ResourceID is brokerName
if (policies.isPrimaryBroker(namespace, brokerUrl.getHost())) {
primariesCache.add(broker);
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -475,22 +473,22 @@ public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOw
* @param currentBroker
* @param pulsar
* @param brokerToNamespaceToBundleRange
* @param candidateBroekrs
* @param candidateBrokers
* @return
* @throws Exception
*/
public static boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker,
final PulsarService pulsar,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange,
Set<String> candidateBroekrs) throws Exception {
Set<String> candidateBrokers) throws Exception {

Map<String, Integer> brokerNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar, namespace,
brokerToNamespaceToBundleRange).get(10, TimeUnit.SECONDS);
if (brokerNamespaceCount != null && !brokerNamespaceCount.isEmpty()) {
int leastNsCount = Integer.MAX_VALUE;
int currentBrokerNsCount = 0;

for (String broker : candidateBroekrs) {
for (String broker : candidateBrokers) {
int nsCount = brokerNamespaceCount.getOrDefault(broker, 0);
if (currentBroker.equals(broker)) {
currentBrokerNsCount = nsCount;
Expand All @@ -505,13 +503,13 @@ public static boolean shouldAntiAffinityNamespaceUnload(String namespace, String
}
// check if all the brokers having same number of ns-count then broker can't unload
int leastNsOwnerBrokers = 0;
for (String broker : candidateBroekrs) {
for (String broker : candidateBrokers) {
if (leastNsCount == brokerNamespaceCount.getOrDefault(broker, 0)) {
leastNsOwnerBrokers++;
}
}
// if all candidate brokers own same-number of ns then broker can't unload
return candidateBroekrs.size() != leastNsOwnerBrokers;
return candidateBrokers.size() != leastNsOwnerBrokers;
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {
* Initialize this load manager using the given PulsarService. Should be called only once, after invoking the
* default constructor.
*
* @param pulsar
* The service to initialize with.
* @param pulsar The service to initialize with.
*/
@Override
public void initialize(final PulsarService pulsar) {
this.pulsar = pulsar;
availableActiveBrokers = new ZooKeeperChildrenCache(pulsar.getLocalZkCache(),
Expand Down Expand Up @@ -491,13 +491,11 @@ private void updateBundleData() {
final String bundle = entry.getKey();
final NamespaceBundleStats stats = entry.getValue();
if (bundleData.containsKey(bundle)) {
// If we recognize the bundle, add these stats as a new
// sample.
// If we recognize the bundle, add these stats as a new sample.
bundleData.get(bundle).update(stats);
} else {
// Otherwise, attempt to find the bundle data on ZooKeeper.
// If it cannot be found, use the latest stats as the first
// sample.
// If it cannot be found, use the latest stats as the first sample.
BundleData currentBundleData = getBundleDataOrDefault(bundle);
currentBundleData.update(stats);
bundleData.put(bundle, currentBundleData);
Expand Down Expand Up @@ -572,8 +570,7 @@ public synchronized void doLoadShedding() {
log.info("Only 1 broker available: no load shedding will be performed");
return;
}
// Remove bundles who have been unloaded for longer than the grace period from the recently unloaded
// map.
// Remove bundles who have been unloaded for longer than the grace period from the recently unloaded map.
final long timeout = System.currentTimeMillis()
- TimeUnit.MINUTES.toMillis(conf.getLoadBalancerSheddingGracePeriodMinutes());
final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class OverloadShedder implements LoadSheddingStrategy {
* The service configuration.
* @return A map from bundles to unload to the brokers on which they are loaded.
*/
@Override
public Multimap<String, String> findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) {
selectedBundlesCache.clear();
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
Expand Down

0 comments on commit 96f14d2

Please sign in to comment.