Skip to content

Commit

Permalink
Fixed checking for maxTopicsPerNamespace (apache#9121)
Browse files Browse the repository at this point in the history
* Fixed checking for `maxTopicsPerNamespace`

* Fixed other mocked test issue

Co-authored-by: penghui <[email protected]>
  • Loading branch information
merlimat and codelipenghui authored Jan 5, 2021
1 parent afe6af2 commit 32f9662
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -811,9 +811,20 @@ protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
}

protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
Integer maxTopicsPerNamespace;
Integer maxTopicsPerNamespace = null;

try {
Policies policies = getNamespacePolicies(namespaceName);
maxTopicsPerNamespace = policies.max_topics_per_namespace;
} catch (RestException e) {
if (e.getResponse().getStatus() != Status.NOT_FOUND.getStatusCode()) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), namespaceName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}

try {
maxTopicsPerNamespace = getNamespacePolicies(namespaceName).max_topics_per_namespace;
if (maxTopicsPerNamespace == null) {
maxTopicsPerNamespace = pulsar().getConfig().getMaxTopicsPerNamespace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -299,6 +300,7 @@ public void testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix
doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
doReturn(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString());
doReturn(new Policies()).when(persistentTopics).getNamespacePolicies(any());
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
Expand Down

0 comments on commit 32f9662

Please sign in to comment.