diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 6746d29af732b..30f01ece7de11 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Response; @@ -113,6 +114,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; +import org.apache.zookeeper.KeeperException; @Slf4j public abstract class NamespacesBase extends AdminResource { @@ -202,78 +204,94 @@ protected CompletableFuture> internalGetNonPersistentTopics(Policie }); } - @SuppressWarnings("unchecked") - protected CompletableFuture internalDeleteNamespaceAsync(boolean force) { - CompletableFuture preconditionCheck = precheckWhenDeleteNamespace(namespaceName, force); - return preconditionCheck + /** + * Delete the namespace and retry to resolve some topics that were not created successfully(in metadata) + * during the deletion. + */ + protected @Nonnull CompletableFuture internalDeleteNamespaceAsync(boolean force) { + final CompletableFuture future = new CompletableFuture<>(); + internalRetryableDeleteNamespaceAsync0(force, 5, future); + return future; + } + private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTimes, + @Nonnull CompletableFuture callback) { + precheckWhenDeleteNamespace(namespaceName, force) .thenCompose(policies -> { + final CompletableFuture> topicsFuture; if (policies == null || CollectionUtils.isEmpty(policies.replication_clusters)){ - return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); - } - return pulsar().getNamespaceService().getFullListOfTopics(namespaceName); - }) - .thenCompose(allTopics -> pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName) - .thenCompose(allPartitionedTopics -> { - List> topicsSum = new ArrayList<>(2); - topicsSum.add(allTopics); - topicsSum.add(allPartitionedTopics); - return CompletableFuture.completedFuture(topicsSum); - })) - .thenCompose(topics -> { - List allTopics = topics.get(0); - ArrayList allUserCreatedTopics = new ArrayList<>(); - List allPartitionedTopics = topics.get(1); - ArrayList allUserCreatedPartitionTopics = new ArrayList<>(); - boolean hasNonSystemTopic = false; - List allSystemTopics = new ArrayList<>(); - List allPartitionedSystemTopics = new ArrayList<>(); - List topicPolicy = new ArrayList<>(); - List partitionedTopicPolicy = new ArrayList<>(); - for (String topic : allTopics) { - if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) { - hasNonSystemTopic = true; - allUserCreatedTopics.add(topic); - } else { - if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { - topicPolicy.add(topic); - } else { - allSystemTopics.add(topic); - } - } - } - for (String topic : allPartitionedTopics) { - if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) { - hasNonSystemTopic = true; - allUserCreatedPartitionTopics.add(topic); - } else { - if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { - partitionedTopicPolicy.add(topic); - } else { - allPartitionedSystemTopics.add(topic); - } - } - } - if (!force) { - if (hasNonSystemTopic) { - throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace"); - } + topicsFuture = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); + } else { + topicsFuture = pulsar().getNamespaceService().getFullListOfTopics(namespaceName); } - return namespaceResources().setPoliciesAsync(namespaceName, old -> { - old.deleted = true; - return old; - }).thenCompose(ignore -> { - return internalDeleteTopicsAsync(allUserCreatedTopics); - }).thenCompose(ignore -> { - return internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics); - }).thenCompose(ignore -> { - return internalDeleteTopicsAsync(allSystemTopics); - }).thenCompose(ignore__ -> { - return internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics); - }).thenCompose(ignore -> { - return internalDeleteTopicsAsync(topicPolicy); - }).thenCompose(ignore__ -> { - return internalDeletePartitionedTopicsAsync(partitionedTopicPolicy); - }); + return topicsFuture.thenCompose(allTopics -> + pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName) + .thenCompose(allPartitionedTopics -> { + List> topicsSum = new ArrayList<>(2); + topicsSum.add(allTopics); + topicsSum.add(allPartitionedTopics); + return CompletableFuture.completedFuture(topicsSum); + })) + .thenCompose(topics -> { + List allTopics = topics.get(0); + ArrayList allUserCreatedTopics = new ArrayList<>(); + List allPartitionedTopics = topics.get(1); + ArrayList allUserCreatedPartitionTopics = new ArrayList<>(); + boolean hasNonSystemTopic = false; + List allSystemTopics = new ArrayList<>(); + List allPartitionedSystemTopics = new ArrayList<>(); + List topicPolicy = new ArrayList<>(); + List partitionedTopicPolicy = new ArrayList<>(); + for (String topic : allTopics) { + if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) { + hasNonSystemTopic = true; + allUserCreatedTopics.add(topic); + } else { + if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { + topicPolicy.add(topic); + } else { + allSystemTopics.add(topic); + } + } + } + for (String topic : allPartitionedTopics) { + if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) { + hasNonSystemTopic = true; + allUserCreatedPartitionTopics.add(topic); + } else { + if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { + partitionedTopicPolicy.add(topic); + } else { + allPartitionedSystemTopics.add(topic); + } + } + } + if (!force) { + if (hasNonSystemTopic) { + throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace"); + } + } + final CompletableFuture markDeleteFuture; + if (policies != null && policies.deleted) { + markDeleteFuture = CompletableFuture.completedFuture(null); + } else { + markDeleteFuture = namespaceResources().setPoliciesAsync(namespaceName, old -> { + old.deleted = true; + return old; + }); + } + return markDeleteFuture.thenCompose(__ -> + internalDeleteTopicsAsync(allUserCreatedTopics)) + .thenCompose(ignore -> + internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics)) + .thenCompose(ignore -> + internalDeleteTopicsAsync(allSystemTopics)) + .thenCompose(ignore -> + internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics)) + .thenCompose(ignore -> + internalDeleteTopicsAsync(topicPolicy)) + .thenCompose(ignore -> + internalDeletePartitionedTopicsAsync(partitionedTopicPolicy)); + }); }) .thenCompose(ignore -> pulsar().getNamespaceService() .getNamespaceBundleFactory().getBundlesAsync(namespaceName)) @@ -297,7 +315,32 @@ protected CompletableFuture internalDeleteNamespaceAsync(boolean force) { return CompletableFuture.completedFuture(null); }) ).collect(Collectors.toList()))) - .thenCompose(ignore -> internalClearZkSources()); + .thenCompose(ignore -> internalClearZkSources()) + .whenComplete((result, error) -> { + if (error != null) { + final Throwable rc = FutureUtil.unwrapCompletionException(error); + if (rc instanceof MetadataStoreException) { + if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) { + log.info("[{}] There are in-flight topics created during the namespace deletion, " + + "retry to delete the namespace again.", namespaceName); + final int next = retryTimes - 1; + if (next > 0) { + // async recursive + internalRetryableDeleteNamespaceAsync0(force, next, callback); + } else { + callback.completeExceptionally( + new RestException(Status.CONFLICT, "The broker still have in-flight topics" + + " created during namespace deletion, please try again.")); + // drop out recursive + } + return; + } + } + callback.completeExceptionally(error); + return; + } + callback.complete(result); + }); } private CompletableFuture internalDeletePartitionedTopicsAsync(List topicNames) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 59613558eb863..153e29506c3d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -47,7 +47,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.web.RestException; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceName; @@ -249,12 +248,6 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP asyncResponse.resume(Response.noContent().build()); }) .exceptionally(ex -> { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - if (cause instanceof PulsarAdminException.ConflictException) { - log.info("[{}] There are new topics created during the namespace deletion, " - + "retry to delete the namespace again.", namespaceName); - pulsar().getExecutor().execute(() -> internalDeleteNamespaceAsync(force)); - } if (!isRedirectException(ex)) { log.error("[{}] Failed to delete namespace {}", clientAppId(), namespaceName, ex); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index f5e23db79b92f..12ff229c2f0ed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -49,7 +49,6 @@ import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils; import org.apache.pulsar.broker.web.RestException; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; @@ -197,12 +196,6 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP asyncResponse.resume(Response.noContent().build()); }) .exceptionally(ex -> { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - if (cause instanceof PulsarAdminException.ConflictException) { - log.info("[{}] There are new topics created during the namespace deletion, " - + "retry to delete the namespace again.", namespaceName); - pulsar().getExecutor().execute(() -> internalDeleteNamespaceAsync(force)); - } if (!isRedirectException(ex)) { log.error("[{}] Failed to delete namespace {}", clientAppId(), namespaceName, ex); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 4e095cd66ba08..6245ce19eebc6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -249,9 +249,6 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { if (log.isDebugEnabled()) { log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies); } - if (namespacePolicies.deleted) { - return; - } topicPolicies.getRetentionPolicies().updateNamespaceValue(namespacePolicies.retention_policies); topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold); topicPolicies.getReplicationClusters().updateNamespaceValue( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 66b2b1d1470ea..90be220cfd8f7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -46,6 +46,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.ws.rs.NotAcceptableException; import javax.ws.rs.core.Response.Status; @@ -1681,6 +1682,11 @@ public void testDeleteNamespaceWithTopicPolicies() throws Exception { // verify namespace can be deleted even without topic policy events admin.namespaces().deleteNamespace(namespace, true); + Awaitility.await().untilAsserted(() -> { + final CompletableFuture> eventTopicFuture = + pulsar.getBrokerService().getTopics().get("persistent://test-tenant/test-ns2/__change_events"); + assertNull(eventTopicFuture); + }); admin.namespaces().createNamespace(namespace, Set.of("test")); // create topic String topic = namespace + "/test-topic2";