Skip to content

Commit

Permalink
Do not create system topic for heartbeat namespace (apache#11499)
Browse files Browse the repository at this point in the history
  • Loading branch information
codelipenghui authored Jul 29, 2021
1 parent 7f2ca8f commit 6d8cbc7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
Expand Down Expand Up @@ -180,6 +181,11 @@ public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicNa
public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
CompletableFuture<Void> result = new CompletableFuture<>();
NamespaceName namespace = namespaceBundle.getNamespaceObject();
if (NamespaceService.checkHeartbeatNamespace(namespace) != null
|| NamespaceService.checkHeartbeatNamespaceV2(namespace) != null) {
result.complete(null);
return result;
}
createSystemTopicFactoryIfNeeded();
synchronized (this) {
if (readerCaches.get(namespace) != null) {
Expand Down Expand Up @@ -210,6 +216,10 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
@Override
public CompletableFuture<Void> removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
NamespaceName namespace = namespaceBundle.getNamespaceObject();
if (NamespaceService.checkHeartbeatNamespace(namespace) != null
|| NamespaceService.checkHeartbeatNamespaceV2(namespace) != null) {
return CompletableFuture.completedFuture(null);
}
AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace);
if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -2456,4 +2457,14 @@ public void testPolicyIsDeleteTogetherAutomatically() throws Exception {
.isNull());
}

@Test
public void testDoNotCreateSystemTopicForHeartbeatNamespace() {
assertTrue(pulsar.getBrokerService().getTopics().size() > 0);
pulsar.getBrokerService().getTopics().forEach((k, v) -> {
TopicName topicName = TopicName.get(k);
assertNull(NamespaceService.checkHeartbeatNamespace(topicName.getNamespaceObject()));
assertNull(NamespaceService.checkHeartbeatNamespaceV2(topicName.getNamespaceObject()));
});
}

}

0 comments on commit 6d8cbc7

Please sign in to comment.