Skip to content

Commit

Permalink
[fix][broker] Fix delete namespace fail by a In-flight topic (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored Feb 17, 2023
1 parent f1765be commit 3855585
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -113,6 +114,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.zookeeper.KeeperException;

@Slf4j
public abstract class NamespacesBase extends AdminResource {
Expand Down Expand Up @@ -202,78 +204,94 @@ protected CompletableFuture<List<String>> internalGetNonPersistentTopics(Policie
});
}

@SuppressWarnings("unchecked")
protected CompletableFuture<Void> internalDeleteNamespaceAsync(boolean force) {
CompletableFuture<Policies> preconditionCheck = precheckWhenDeleteNamespace(namespaceName, force);
return preconditionCheck
/**
* Delete the namespace and retry to resolve some topics that were not created successfully(in metadata)
* during the deletion.
*/
protected @Nonnull CompletableFuture<Void> internalDeleteNamespaceAsync(boolean force) {
final CompletableFuture<Void> future = new CompletableFuture<>();
internalRetryableDeleteNamespaceAsync0(force, 5, future);
return future;
}
private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTimes,
@Nonnull CompletableFuture<Void> callback) {
precheckWhenDeleteNamespace(namespaceName, force)
.thenCompose(policies -> {
final CompletableFuture<List<String>> topicsFuture;
if (policies == null || CollectionUtils.isEmpty(policies.replication_clusters)){
return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
}
return pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
})
.thenCompose(allTopics -> pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName)
.thenCompose(allPartitionedTopics -> {
List<List<String>> topicsSum = new ArrayList<>(2);
topicsSum.add(allTopics);
topicsSum.add(allPartitionedTopics);
return CompletableFuture.completedFuture(topicsSum);
}))
.thenCompose(topics -> {
List<String> allTopics = topics.get(0);
ArrayList<String> allUserCreatedTopics = new ArrayList<>();
List<String> allPartitionedTopics = topics.get(1);
ArrayList<String> allUserCreatedPartitionTopics = new ArrayList<>();
boolean hasNonSystemTopic = false;
List<String> allSystemTopics = new ArrayList<>();
List<String> allPartitionedSystemTopics = new ArrayList<>();
List<String> topicPolicy = new ArrayList<>();
List<String> partitionedTopicPolicy = new ArrayList<>();
for (String topic : allTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
allUserCreatedTopics.add(topic);
} else {
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
topicPolicy.add(topic);
} else {
allSystemTopics.add(topic);
}
}
}
for (String topic : allPartitionedTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
allUserCreatedPartitionTopics.add(topic);
} else {
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
partitionedTopicPolicy.add(topic);
} else {
allPartitionedSystemTopics.add(topic);
}
}
}
if (!force) {
if (hasNonSystemTopic) {
throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace");
}
topicsFuture = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
} else {
topicsFuture = pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
}
return namespaceResources().setPoliciesAsync(namespaceName, old -> {
old.deleted = true;
return old;
}).thenCompose(ignore -> {
return internalDeleteTopicsAsync(allUserCreatedTopics);
}).thenCompose(ignore -> {
return internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics);
}).thenCompose(ignore -> {
return internalDeleteTopicsAsync(allSystemTopics);
}).thenCompose(ignore__ -> {
return internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics);
}).thenCompose(ignore -> {
return internalDeleteTopicsAsync(topicPolicy);
}).thenCompose(ignore__ -> {
return internalDeletePartitionedTopicsAsync(partitionedTopicPolicy);
});
return topicsFuture.thenCompose(allTopics ->
pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName)
.thenCompose(allPartitionedTopics -> {
List<List<String>> topicsSum = new ArrayList<>(2);
topicsSum.add(allTopics);
topicsSum.add(allPartitionedTopics);
return CompletableFuture.completedFuture(topicsSum);
}))
.thenCompose(topics -> {
List<String> allTopics = topics.get(0);
ArrayList<String> allUserCreatedTopics = new ArrayList<>();
List<String> allPartitionedTopics = topics.get(1);
ArrayList<String> allUserCreatedPartitionTopics = new ArrayList<>();
boolean hasNonSystemTopic = false;
List<String> allSystemTopics = new ArrayList<>();
List<String> allPartitionedSystemTopics = new ArrayList<>();
List<String> topicPolicy = new ArrayList<>();
List<String> partitionedTopicPolicy = new ArrayList<>();
for (String topic : allTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
allUserCreatedTopics.add(topic);
} else {
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
topicPolicy.add(topic);
} else {
allSystemTopics.add(topic);
}
}
}
for (String topic : allPartitionedTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
allUserCreatedPartitionTopics.add(topic);
} else {
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
partitionedTopicPolicy.add(topic);
} else {
allPartitionedSystemTopics.add(topic);
}
}
}
if (!force) {
if (hasNonSystemTopic) {
throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace");
}
}
final CompletableFuture<Void> markDeleteFuture;
if (policies != null && policies.deleted) {
markDeleteFuture = CompletableFuture.completedFuture(null);
} else {
markDeleteFuture = namespaceResources().setPoliciesAsync(namespaceName, old -> {
old.deleted = true;
return old;
});
}
return markDeleteFuture.thenCompose(__ ->
internalDeleteTopicsAsync(allUserCreatedTopics))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics))
.thenCompose(ignore ->
internalDeleteTopicsAsync(allSystemTopics))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
.thenCompose(ignore ->
internalDeleteTopicsAsync(topicPolicy))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(partitionedTopicPolicy));
});
})
.thenCompose(ignore -> pulsar().getNamespaceService()
.getNamespaceBundleFactory().getBundlesAsync(namespaceName))
Expand All @@ -297,7 +315,32 @@ protected CompletableFuture<Void> internalDeleteNamespaceAsync(boolean force) {
return CompletableFuture.completedFuture(null);
})
).collect(Collectors.toList())))
.thenCompose(ignore -> internalClearZkSources());
.thenCompose(ignore -> internalClearZkSources())
.whenComplete((result, error) -> {
if (error != null) {
final Throwable rc = FutureUtil.unwrapCompletionException(error);
if (rc instanceof MetadataStoreException) {
if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) {
log.info("[{}] There are in-flight topics created during the namespace deletion, "
+ "retry to delete the namespace again.", namespaceName);
final int next = retryTimes - 1;
if (next > 0) {
// async recursive
internalRetryableDeleteNamespaceAsync0(force, next, callback);
} else {
callback.completeExceptionally(
new RestException(Status.CONFLICT, "The broker still have in-flight topics"
+ " created during namespace deletion, please try again."));
// drop out recursive
}
return;
}
}
callback.completeExceptionally(error);
return;
}
callback.complete(result);
});
}

private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(List<String> topicNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -249,12 +248,6 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof PulsarAdminException.ConflictException) {
log.info("[{}] There are new topics created during the namespace deletion, "
+ "retry to delete the namespace again.", namespaceName);
pulsar().getExecutor().execute(() -> internalDeleteNamespaceAsync(force));
}
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete namespace {}", clientAppId(), namespaceName, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -197,12 +196,6 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof PulsarAdminException.ConflictException) {
log.info("[{}] There are new topics created during the namespace deletion, "
+ "retry to delete the namespace again.", namespaceName);
pulsar().getExecutor().execute(() -> internalDeleteNamespaceAsync(force));
}
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete namespace {}", clientAppId(), namespaceName, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,6 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
if (log.isDebugEnabled()) {
log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies);
}
if (namespacePolicies.deleted) {
return;
}
topicPolicies.getRetentionPolicies().updateNamespaceValue(namespacePolicies.retention_policies);
topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold);
topicPolicies.getReplicationClusters().updateNamespaceValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.NotAcceptableException;
import javax.ws.rs.core.Response.Status;
Expand Down Expand Up @@ -1681,6 +1682,11 @@ public void testDeleteNamespaceWithTopicPolicies() throws Exception {
// verify namespace can be deleted even without topic policy events
admin.namespaces().deleteNamespace(namespace, true);

Awaitility.await().untilAsserted(() -> {
final CompletableFuture<Optional<Topic>> eventTopicFuture =
pulsar.getBrokerService().getTopics().get("persistent://test-tenant/test-ns2/__change_events");
assertNull(eventTopicFuture);
});
admin.namespaces().createNamespace(namespace, Set.of("test"));
// create topic
String topic = namespace + "/test-topic2";
Expand Down

0 comments on commit 3855585

Please sign in to comment.