Skip to content

Commit

Permalink
[fix][broker] fix delete_when_subscriptions_caught_up doesn't work wh…
Browse files Browse the repository at this point in the history
…ile have active consumers (apache#18283)
  • Loading branch information
codelipenghui authored Nov 3, 2022
1 parent 2bed5ff commit 67d9d63
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,9 @@ public CompletableFuture<Void> deleteForcefully() {
* Flag indicating whether delete should succeed if topic still has unconnected subscriptions. Set to
* false when called from admin API (it will delete the subs too), and set to true when called from GC
* thread
* @param failIfHasBacklogs
* Flag indicating whether delete should succeed if topic has backlogs. Set to false when called from
* admin API (it will delete the subs too), and set to true when called from GC thread
* @param closeIfClientsConnected
* Flag indicate whether explicitly close connected
* producers/consumers/replicators before trying to delete topic.
Expand All @@ -1160,16 +1163,33 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
if (isClosingOrDeleting) {
log.warn("[{}] Topic is already being closed or deleted", topic);
return FutureUtil.failedFuture(new TopicFencedException("Topic is already fenced"));
} else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions: " + subscriptions.keys()));
} else if (failIfHasBacklogs && hasBacklogs()) {
List<String> backlogSubs =
subscriptions.values().stream()
.filter(sub -> sub.getNumberOfEntriesInBacklog(false) > 0)
.map(PersistentSubscription::getName).toList();
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs));
}
// We can proceed with the deletion if either:
// 1. No one is connected and no subscriptions
// 2. The topic have subscriptions but no backlogs for all subscriptions
// if delete_when_no_subscriptions is applied
// 3. We want to kick out everyone and forcefully delete the topic.
// In this case, we shouldn't care if the usageCount is 0 or not, just proceed
if (!closeIfClientsConnected) {
if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions: " + subscriptions.keys()));
} else if (failIfHasBacklogs) {
if (hasBacklogs()) {
List<String> backlogSubs =
subscriptions.values().stream()
.filter(sub -> sub.getNumberOfEntriesInBacklog(false) > 0)
.map(PersistentSubscription::getName).toList();
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs));
} else if (!producers.isEmpty()) {
return FutureUtil.failedFuture(new TopicBusyException(
"Topic has " + producers.size() + " connected producers"));
}
} else if (currentUsageCount() > 0) {
return FutureUtil.failedFuture(new TopicBusyException(
"Topic has " + currentUsageCount() + " connected producers/consumers"));
}
}

fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
Expand All @@ -1179,94 +1199,82 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futures = new ArrayList<>();
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
FutureUtil.waitForAll(futures).thenRun(() -> {
closeClientFuture.complete(null);
}).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);
return null;
});
} else {
closeClientFuture.complete(null);
}
FutureUtil.waitForAll(futures).thenRun(() -> {
closeClientFuture.complete(null);
}).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);
return null;
});

closeClientFuture.thenAccept(delete -> {
// We can proceed with the deletion if either:
// 1. No one is connected
// 2. We want to kick out everyone and forcefully delete the topic.
// In this case, we shouldn't care if the usageCount is 0 or not, just proceed
if (currentUsageCount() == 0 || (closeIfClientsConnected && !failIfHasSubscriptions)) {
CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);

deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema())
.thenCompose(__ -> deleteTopicPolicies())
.thenCompose(__ -> transactionBufferCleanupAndClose())
.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>();
subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub)));

FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
if (e != null) {
log.error("[{}] Error deleting topic", topic, e);
unfenceTopicToResume();
deleteFuture.completeExceptionally(e);
} else {
ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
brokerService.removeTopicFromCache(PersistentTopic.this);

dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);

unregisterTopicPolicyListener();

log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
}
closeClientFuture.thenAccept(__ -> {
CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);

deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema())
.thenCompose(ignore -> deleteTopicPolicies())
.thenCompose(ignore -> transactionBufferCleanupAndClose())
.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>();
subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub)));

FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
if (e != null) {
log.error("[{}] Error deleting topic", topic, e);
unfenceTopicToResume();
deleteFuture.completeExceptionally(e);
} else {
ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
brokerService.removeTopicFromCache(PersistentTopic.this);

dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);

unregisterTopicPolicyListener();

log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
}

@Override
public void
deleteLedgerFailed(ManagedLedgerException exception,
Object ctx) {
if (exception.getCause()
instanceof MetadataStoreException.NotFoundException) {
log.info("[{}] Topic is already deleted {}",
topic, exception.getMessage());
deleteLedgerComplete(ctx);
} else {
unfenceTopicToResume();
log.error("[{}] Error deleting topic",
topic, exception);
deleteFuture.completeExceptionally(
new PersistenceException(exception));
}
@Override
public void
deleteLedgerFailed(ManagedLedgerException exception,
Object ctx) {
if (exception.getCause()
instanceof MetadataStoreException.NotFoundException) {
log.info("[{}] Topic is already deleted {}",
topic, exception.getMessage());
deleteLedgerComplete(ctx);
} else {
unfenceTopicToResume();
log.error("[{}] Error deleting topic",
topic, exception);
deleteFuture.completeExceptionally(
new PersistenceException(exception));
}
}, null);
}
}, null);

}
});
}
});
} else {
unfenceTopicToResume();
deleteFuture.completeExceptionally(new TopicBusyException(
"Topic has " + currentUsageCount() + " connected producers/consumers"));
}
}
});
}
});
}).exceptionally(ex->{
unfenceTopicToResume();
deleteFuture.completeExceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ public void testDeleteWhenNoBacklogs() throws Exception {
producer.send("Pulsar".getBytes());
}

consumer.close();
producer.close();

Thread.sleep(2000);
Expand All @@ -338,6 +337,7 @@ public void testDeleteWhenNoBacklogs() throws Exception {
admin.topics().skipAllMessages(topic, "sub");
Awaitility.await()
.untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic)));
consumer.close();
}

@Test
Expand Down

0 comments on commit 67d9d63

Please sign in to comment.