Skip to content

Commit

Permalink
Check partitioned topics in namespace when delete the namespace apach…
Browse files Browse the repository at this point in the history
…e#2822 (apache#2833)

* Fix issue-2822

* Add UT and replace string to TopicDomain

* Add missing import

* Fix import
  • Loading branch information
codelipenghui authored and merlimat committed Oct 25, 2018
1 parent 9c43205 commit 6a79a9a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.util.Codec.decode;

import java.net.MalformedURLException;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import javax.servlet.ServletContext;
import javax.ws.rs.WebApplicationException;
Expand All @@ -39,6 +40,7 @@
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -481,4 +483,25 @@ protected boolean isNamespaceReplicated(NamespaceName namespaceName) {
throw new RestException(e);
}
}

protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
List<String> partitionedTopics = Lists.newArrayList();

try {
String partitionedTopicPath = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), topicDomain.value());
List<String> topics = globalZk().getChildren(partitionedTopicPath, false);
partitionedTopics = topics.stream()
.map(s -> String.format("persistent://%s/%s", namespaceName.toString(), decode(s)))
.collect(Collectors.toList());
} catch (KeeperException.NoNodeException e) {
// NoNode means there are no partitioned topics in this domain for this namespace
} catch (Exception e) {
log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(),
namespaceName.toString(), e);
throw new RestException(e);
}

partitionedTopics.sort(null);
return partitionedTopics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
Expand Down Expand Up @@ -177,7 +178,9 @@ protected void internalDeleteNamespace(boolean authoritative) {

boolean isEmpty;
try {
isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).isEmpty();
isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).isEmpty()
&& getPartitionedTopicList(TopicDomain.persistent).isEmpty()
&& getPartitionedTopicList(TopicDomain.non_persistent).isEmpty();
} catch (Exception e) {
throw new RestException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,7 @@ protected List<String> internalGetPartitionedTopicList() {
log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}

List<String> partitionedTopics = Lists.newArrayList();

try {
String partitionedTopicPath = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain());
List<String> topics = globalZk().getChildren(partitionedTopicPath, false);
partitionedTopics = topics.stream()
.map(s -> String.format("persistent://%s/%s", namespaceName.toString(), decode(s)))
.collect(Collectors.toList());
} catch (KeeperException.NoNodeException e) {
// NoNode means there are no partitioned topics in this domain for this namespace
} catch (Exception e) {
log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(),
namespaceName.toString(), e);
throw new RestException(e);
}

partitionedTopics.sort(null);
return partitionedTopics;
return getPartitionedTopicList(TopicDomain.getEnum(domain()));
}

protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,19 @@ public void testDeleteNamespaces() throws Exception {
// Ok, namespace not empty
assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode());
}
// delete the topic from ZK
mockZookKeeper.delete("/managed-ledgers/" + topicName.getPersistenceNamingEncoding(), -1);

ZkUtils.createFullPathOptimistic(mockZookKeeper, "/admin/partitioned-topics/" + topicName.getPersistenceNamingEncoding(),
new byte[0], null, null);
try {
namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false);
fail("should have failed");
} catch (RestException e) {
// Ok, namespace not empty
assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode());
}
mockZookKeeper.delete("/admin/partitioned-topics/" + topicName.getPersistenceNamingEncoding(), -1);

testNs = this.testGlobalNamespaces.get(0);
// setup ownership to localhost
Expand All @@ -640,15 +653,6 @@ public void testDeleteNamespaces() throws Exception {
assertEquals(namespaces.getTenantNamespaces(this.testTenant), nsList);

testNs = this.testLocalNamespaces.get(1);
try {
namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false);
fail("should have failed");
} catch (RestException e) {
// Ok
}

// delete the topic from ZK
mockZookKeeper.delete("/managed-ledgers/" + topicName.getPersistenceNamingEncoding(), -1);
// ensure refreshed topics list in the cache
pulsar.getLocalZkCacheService().managedLedgerListCache().clearTree();
// setup ownership to localhost
Expand Down

0 comments on commit 6a79a9a

Please sign in to comment.