Skip to content

Commit

Permalink
Fix topic deletion for V2 topics. (apache#1470)
Browse files Browse the repository at this point in the history
* Fix topic deletion for V2 topics.

- Add a isReplicated() method to topics
- Add isReplicated check before deleting a topic

* Remove global check.
  • Loading branch information
cckellogg authored and merlimat committed Mar 29, 2018
1 parent f4d6c2b commit dd29630
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,9 @@ protected void internalUnloadTopic(boolean authoritative) {
protected void internalDeleteTopic(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
if (topicName.isGlobal()) {

// v2 topics have a global name so check if the topic is replicated.
if (topic.isReplicated()) {
// Delete is disallowed on global topic
log.error("[{}] Delete topic is forbidden on global namespace {}", clientAppId(), topicName);
throw new RestException(Status.FORBIDDEN, "Delete forbidden on global namespace");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, lo

boolean isEncryptionRequired();

boolean isReplicated();

BacklogQuota getBacklogQuota();

void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,11 @@ public boolean isEncryptionRequired() {
return isEncryptionRequired;
}

@Override
public boolean isReplicated() {
return replicators.size() > 1;
}

@Override
public CompletableFuture<Void> unsubscribe(String subName) {
// No-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,11 @@ public boolean isEncryptionRequired() {
return isEncryptionRequired;
}

@Override
public boolean isReplicated() {
return replicators.size() > 1;
}

public CompletableFuture<MessageId> terminate() {
CompletableFuture<MessageId> future = new CompletableFuture<>();
ledger.asyncTerminate(new TerminateCallback() {
Expand Down

0 comments on commit dd29630

Please sign in to comment.