Skip to content

Commit

Permalink
[pulsar-broker] support split largest bundle of the namespace (apache…
Browse files Browse the repository at this point in the history
…#12361)

* [pulsar-broker] support split largest bundle of the namespace

* fix formatting violation
  • Loading branch information
rdhabalia authored Oct 15, 2021
1 parent cc70a1f commit 1ce016c
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1107,11 +1107,15 @@ public void internalUnloadNamespaceBundle(AsyncResponse asyncResponse, String bu
}

@SuppressWarnings("deprecation")
protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleRange,
protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleName,
boolean authoritative, boolean unload, String splitAlgorithmName) {
validateSuperUserAccess();
checkNotNull(bundleRange, "BundleRange should not be null");
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
checkNotNull(bundleName, "BundleRange should not be null");
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName);

String bundleRange = bundleName.equals(Policies.LARGEST_BUNDLE)
? findLargestBundleWithTopics(namespaceName).getBundleRange()
: bundleName;

Policies policies = getNamespacePolicies(namespaceName);

Expand Down Expand Up @@ -1163,6 +1167,10 @@ protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String
});
}

private NamespaceBundle findLargestBundleWithTopics(NamespaceName namespaceName) {
return pulsar().getNamespaceService().getNamespaceBundleFactory().getBundlesWithHighestTopics(namespaceName);
}

private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName);
if (algorithm == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,21 @@
import com.google.common.collect.Range;
import com.google.common.hash.HashFunction;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
Expand Down Expand Up @@ -160,19 +165,6 @@ private void handleMetadataStoreNotification(Notification n) {
}
}

/**
* checks if the local broker is the owner of the namespace bundle.
*
* @param nsBundle
* @return
*/
private boolean isOwner(NamespaceBundle nsBundle) {
if (pulsar != null) {
return pulsar.getNamespaceService().getOwnershipCache().getOwnedBundle(nsBundle) != null;
}
return false;
}

public void invalidateBundleCache(NamespaceName namespace) {
bundlesCache.synchronous().invalidate(namespace);
}
Expand All @@ -181,6 +173,37 @@ public CompletableFuture<NamespaceBundles> getBundlesAsync(NamespaceName nsname)
return bundlesCache.get(nsname);
}

public NamespaceBundle getBundlesWithHighestTopics(NamespaceName nsname) {
try {
return getBundlesWithHighestTopicsAsync(nsname).get(PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC,
TimeUnit.SECONDS);
} catch (Exception e) {
LOG.info("failed to derive bundle for {}", nsname, e);
throw new IllegalStateException(e instanceof ExecutionException ? e.getCause() : e);
}
}

public CompletableFuture<NamespaceBundle> getBundlesWithHighestTopicsAsync(NamespaceName nsname) {
return pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(nsname).thenCompose(topics -> {
return bundlesCache.get(nsname).handle((bundles, e) -> {
Map<String, Integer> countMap = new HashMap<>();
NamespaceBundle resultBundle = null;
int maxCount = 0;
for (String topic : topics) {
NamespaceBundle bundle = bundles.findBundle(TopicName.get(topic));
String bundleRange = bundles.findBundle(TopicName.get(topic)).getBundleRange();
int count = countMap.getOrDefault(bundleRange, 0) + 1;
countMap.put(bundleRange, count);
if (count > maxCount) {
maxCount = count;
resultBundle = bundle;
}
}
return resultBundle;
});
});
}

public NamespaceBundles getBundles(NamespaceName nsname) {
return bundlesCache.synchronous().get(nsname);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.namespace;

import static org.junit.Assert.assertNotEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -62,7 +63,10 @@
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.impl.BundlesDataImpl.BundlesDataImplBuilder;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.GetResult;
Expand Down Expand Up @@ -488,6 +492,53 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception {
}
}

@Test
public void testSplitLargestBundle() throws Exception {
String namespace = "prop/test/ns-abc2";
String topic = "persistent://" + namespace + "/t1-";
int totalTopics = 100;

BundlesData bundleData = BundlesData.builder().numBundles(10).build();
admin.namespaces().createNamespace(namespace, bundleData);
Consumer<byte[]>[] consumers = new Consumer[totalTopics];
for (int i = 0; i < totalTopics; i++) {
consumers[i] = pulsarClient.newConsumer().topic(topic + i).subscriptionName("my-subscriber-name")
.subscribe();
}

NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceName nsname = NamespaceName.get(namespace);
NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);

Map<String, Integer> topicCount = Maps.newHashMap();
int maxTopics = 0;
String maxBundle = null;
for (int i = 0; i < totalTopics; i++) {
String bundle = bundles.findBundle(TopicName.get(topic + i)).getBundleRange();
int count = topicCount.getOrDefault(bundle, 0) + 1;
topicCount.put(bundle, count);
if (count > maxTopics) {
maxTopics = count;
maxBundle = bundle;
}
}

String largestBundle = namespaceService.getNamespaceBundleFactory().getBundlesWithHighestTopics(nsname)
.getBundleRange();

assertEquals(maxBundle, largestBundle);

for (int i = 0; i < totalTopics; i++) {
consumers[i].close();
}

admin.namespaces().splitNamespaceBundle(namespace, Policies.LARGEST_BUNDLE, false, null);

for (NamespaceBundle bundle : namespaceService.getNamespaceBundleFactory().getBundles(nsname).getBundles()) {
assertNotEquals(bundle.getBundleRange(), maxBundle);
}
}

@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class Policies {
public boolean deleted = false;
public static final String FIRST_BOUNDARY = "0x00000000";
public static final String LAST_BOUNDARY = "0xffffffff";
public static final String LARGEST_BUNDLE = "LARGEST";

@SuppressWarnings("checkstyle:MemberName")
public boolean encryption_required = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,8 @@ private class SplitBundle extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Parameter(names = { "--bundle", "-b" }, description = "{start-boundary}_{end-boundary}", required = true)
@Parameter(names = { "--bundle",
"-b" }, description = "{start-boundary}_{end-boundary} / LARGEST(bundle with highest topics)", required = true)
private String bundle;

@Parameter(names = { "--unload",
Expand Down

0 comments on commit 1ce016c

Please sign in to comment.