Skip to content

Commit

Permalink
remove unnessary bundle listener triger when topic lookup (apache#10126)
Browse files Browse the repository at this point in the history
  • Loading branch information
hangc0276 authored Apr 16, 2021
1 parent 5752363 commit 743777a
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ private CompletableFuture<Optional<Map.Entry<NamespaceEphemeralData, Stat>>> res
ownedBundlesCache.put(path, CompletableFuture.completedFuture(ownedBundle));
}
ownershipReadOnlyCache.invalidate(path);
namespaceService.onNamespaceBundleOwned(ownedBundle.getNamespaceBundle());
}
}
return optionalOwnerDataWithStat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.namespace;

import com.google.common.collect.Sets;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
Expand Down Expand Up @@ -124,4 +125,53 @@ public void testGetAllPartitions() throws PulsarAdminException, ExecutionExcepti
admin.namespaces().deleteNamespace(namespace);
}

@Test
public void testNamespaceBundleLookupOnwershipListener() throws PulsarAdminException, InterruptedException,
PulsarClientException {
final CountDownLatch countDownLatch = new CountDownLatch(2);
final AtomicInteger onLoad = new AtomicInteger(0);
final AtomicInteger unLoad = new AtomicInteger(0);

final String namespace = "prop/" + UUID.randomUUID().toString();

pulsar.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() {
@Override
public void onLoad(NamespaceBundle bundle) {
countDownLatch.countDown();
onLoad.addAndGet(1);
}

@Override
public void unLoad(NamespaceBundle bundle) {
countDownLatch.countDown();
unLoad.addAndGet(1);
}

@Override
public boolean test(NamespaceBundle namespaceBundle) {
return namespaceBundle.getNamespaceObject().toString().equals(namespace);
}
});

admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
assertTrue(admin.namespaces().getNamespaces("prop").contains(namespace));

final String topic = "persistent://" + namespace + "/os-0";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();

producer.close();

admin.lookups().lookupTopic(topic);

admin.namespaces().unload(namespace);

countDownLatch.await();

Assert.assertEquals(onLoad.get(), 1);
Assert.assertEquals(unLoad.get(), 1);
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(namespace);
}
}

0 comments on commit 743777a

Please sign in to comment.