Skip to content

Commit

Permalink
Fixed logic for forceful topic deletion (apache#7356)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jun 26, 2020
1 parent c83a656 commit 53bc73c
Showing 1 changed file with 20 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -605,6 +606,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName,
consumer.consumerName(), USAGE_COUNT_UPDATER.get(PersistentTopic.this));
}
USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
future.completeExceptionally(
new BrokerServiceException("Connection was closed while the opening the cursor "));
} else {
Expand Down Expand Up @@ -838,10 +840,14 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
try {
if (isFenced) {
log.warn("[{}] Topic is already being closed or deleted", topic);
deleteFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
return deleteFuture;
return FutureUtil.failedFuture(new TopicFencedException("Topic is already fenced"));
} else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions"));
} else if (failIfHasBacklogs && hasBacklogs()) {
return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions did not catch up"));
}

isFenced = true; // Avoid clients reconnections while deleting
CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
Expand All @@ -861,30 +867,16 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
}

closeClientFuture.thenAccept(delete -> {
if (USAGE_COUNT_UPDATER.get(this) == 0) {
isFenced = true;
List<CompletableFuture<Void>> futures = Lists.newArrayList();

if (failIfHasSubscriptions) {
if (!subscriptions.isEmpty()) {
isFenced = false;
deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions"));
return;
}
} else if (failIfHasBacklogs) {
if (hasBacklogs()) {
isFenced = false;
deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions did not catch up"));
return;
}
} else {
subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
}
if (deleteSchema) {
futures.add(deleteSchema().thenApply(schemaVersion -> null));
}

FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
// We can proceed with the deletion if either:
// 1. No one is connected
// 2. We want to kick out everyone and forcefully delete the topic.
// In this case, we shouldn't care if the usageCount is 0 or not, just proceed
if (USAGE_COUNT_UPDATER.get(this) == 0 || (closeIfClientsConnected && !failIfHasSubscriptions)) {
CompletableFuture<SchemaVersion> deleteSchemaFuture = deleteSchema ?
deleteSchema()
: CompletableFuture.completedFuture(null);

deleteSchemaFuture.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
isFenced = false;
Expand Down Expand Up @@ -918,10 +910,12 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
}
});
} else {
isFenced = false;
deleteFuture.completeExceptionally(new TopicBusyException(
"Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
}
}).exceptionally(ex->{
isFenced = false;
deleteFuture.completeExceptionally(
new TopicBusyException("Failed to close clients before deleting topic."));
return null;
Expand Down

0 comments on commit 53bc73c

Please sign in to comment.