diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 94812dc5a7498..07c676af4854b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1107,11 +1107,15 @@ public void internalUnloadNamespaceBundle(AsyncResponse asyncResponse, String bu } @SuppressWarnings("deprecation") - protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleRange, + protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleName, boolean authoritative, boolean unload, String splitAlgorithmName) { validateSuperUserAccess(); - checkNotNull(bundleRange, "BundleRange should not be null"); - log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange); + checkNotNull(bundleName, "BundleRange should not be null"); + log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName); + + String bundleRange = bundleName.equals(Policies.LARGEST_BUNDLE) + ? findLargestBundleWithTopics(namespaceName).getBundleRange() + : bundleName; Policies policies = getNamespacePolicies(namespaceName); @@ -1163,6 +1167,10 @@ protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String }); } + private NamespaceBundle findLargestBundleWithTopics(NamespaceName namespaceName) { + return pulsar().getNamespaceService().getNamespaceBundleFactory().getBundlesWithHighestTopics(namespaceName); + } + private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) { NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName); if (algorithm == null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index 05f262134fbc4..9fbcf17a78b8c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -31,16 +31,21 @@ import com.google.common.collect.Range; import com.google.common.hash.HashFunction; import java.io.IOException; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.SortedSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.resources.LocalPoliciesResources; +import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.Policies; @@ -160,19 +165,6 @@ private void handleMetadataStoreNotification(Notification n) { } } - /** - * checks if the local broker is the owner of the namespace bundle. - * - * @param nsBundle - * @return - */ - private boolean isOwner(NamespaceBundle nsBundle) { - if (pulsar != null) { - return pulsar.getNamespaceService().getOwnershipCache().getOwnedBundle(nsBundle) != null; - } - return false; - } - public void invalidateBundleCache(NamespaceName namespace) { bundlesCache.synchronous().invalidate(namespace); } @@ -181,6 +173,37 @@ public CompletableFuture getBundlesAsync(NamespaceName nsname) return bundlesCache.get(nsname); } + public NamespaceBundle getBundlesWithHighestTopics(NamespaceName nsname) { + try { + return getBundlesWithHighestTopicsAsync(nsname).get(PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC, + TimeUnit.SECONDS); + } catch (Exception e) { + LOG.info("failed to derive bundle for {}", nsname, e); + throw new IllegalStateException(e instanceof ExecutionException ? e.getCause() : e); + } + } + + public CompletableFuture getBundlesWithHighestTopicsAsync(NamespaceName nsname) { + return pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(nsname).thenCompose(topics -> { + return bundlesCache.get(nsname).handle((bundles, e) -> { + Map countMap = new HashMap<>(); + NamespaceBundle resultBundle = null; + int maxCount = 0; + for (String topic : topics) { + NamespaceBundle bundle = bundles.findBundle(TopicName.get(topic)); + String bundleRange = bundles.findBundle(TopicName.get(topic)).getBundleRange(); + int count = countMap.getOrDefault(bundleRange, 0) + 1; + countMap.put(bundleRange, count); + if (count > maxCount) { + maxCount = count; + resultBundle = bundle; + } + } + return resultBundle; + }); + }); + } + public NamespaceBundles getBundles(NamespaceName nsname) { return bundlesCache.synchronous().get(nsname); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 0534c310de3d5..b1d458874cb03 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.namespace; +import static org.junit.Assert.assertNotEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -62,7 +63,10 @@ import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.LocalPolicies; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.impl.BundlesDataImpl.BundlesDataImplBuilder; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.GetResult; @@ -488,6 +492,53 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception { } } + @Test + public void testSplitLargestBundle() throws Exception { + String namespace = "prop/test/ns-abc2"; + String topic = "persistent://" + namespace + "/t1-"; + int totalTopics = 100; + + BundlesData bundleData = BundlesData.builder().numBundles(10).build(); + admin.namespaces().createNamespace(namespace, bundleData); + Consumer[] consumers = new Consumer[totalTopics]; + for (int i = 0; i < totalTopics; i++) { + consumers[i] = pulsarClient.newConsumer().topic(topic + i).subscriptionName("my-subscriber-name") + .subscribe(); + } + + NamespaceService namespaceService = pulsar.getNamespaceService(); + NamespaceName nsname = NamespaceName.get(namespace); + NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname); + + Map topicCount = Maps.newHashMap(); + int maxTopics = 0; + String maxBundle = null; + for (int i = 0; i < totalTopics; i++) { + String bundle = bundles.findBundle(TopicName.get(topic + i)).getBundleRange(); + int count = topicCount.getOrDefault(bundle, 0) + 1; + topicCount.put(bundle, count); + if (count > maxTopics) { + maxTopics = count; + maxBundle = bundle; + } + } + + String largestBundle = namespaceService.getNamespaceBundleFactory().getBundlesWithHighestTopics(nsname) + .getBundleRange(); + + assertEquals(maxBundle, largestBundle); + + for (int i = 0; i < totalTopics; i++) { + consumers[i].close(); + } + + admin.namespaces().splitNamespaceBundle(namespace, Policies.LARGEST_BUNDLE, false, null); + + for (NamespaceBundle bundle : namespaceService.getNamespaceBundleFactory().getBundles(nsname).getBundles()) { + assertNotEquals(bundle.getBundleRange(), maxBundle); + } + } + @SuppressWarnings("unchecked") private Pair> splitBundles(NamespaceBundleFactory utilityFactory, NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index ff773f6f8670f..631675fcbf729 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -66,6 +66,7 @@ public class Policies { public boolean deleted = false; public static final String FIRST_BOUNDARY = "0x00000000"; public static final String LAST_BOUNDARY = "0xffffffff"; + public static final String LARGEST_BUNDLE = "LARGEST"; @SuppressWarnings("checkstyle:MemberName") public boolean encryption_required = false; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 9677725033847..435b5d8b45ddb 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -788,7 +788,8 @@ private class SplitBundle extends CliCommand { @Parameter(description = "tenant/namespace", required = true) private java.util.List params; - @Parameter(names = { "--bundle", "-b" }, description = "{start-boundary}_{end-boundary}", required = true) + @Parameter(names = { "--bundle", + "-b" }, description = "{start-boundary}_{end-boundary} / LARGEST(bundle with highest topics)", required = true) private String bundle; @Parameter(names = { "--unload",