Skip to content

Commit

Permalink
[improve][admin] Not allow to terminate system topic. (apache#17006)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Aug 11, 2022
1 parent 21dc668 commit 96930fd
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
Expand Down Expand Up @@ -3688,6 +3689,10 @@ protected CompletableFuture<MessageId> internalTerminateAsync(boolean authoritat
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Termination of a partitioned topic is not allowed");
}
if (SystemTopicNames.isSystemTopic(topicName)) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Termination of a system topic is not allowed");
}
})
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -2533,4 +2534,18 @@ public void testGetNamespaceTopicList() throws Exception {
ListNamespaceTopicsOptions.builder().mode(Mode.NON_PERSISTENT).build());
Assert.assertTrue(notPersistentTopics.contains(nonPersistentTopic));
}

@Test
private void testTerminateSystemTopic() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testTerminateSystemTopic";
admin.topics().createNonPartitionedTopic(topic);
final String eventTopic = "persistent://prop-xyz/ns1/__change_events";
admin.topicPolicies().setMaxConsumers(topic, 2);
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(admin.topicPolicies().getMaxConsumers(topic), Integer.valueOf(2));
});
PulsarAdminException ex = expectThrows(PulsarAdminException.class,
() -> admin.topics().terminateTopic(eventTopic));
assertTrue(ex instanceof PulsarAdminException.NotAllowedException);
}
}

0 comments on commit 96930fd

Please sign in to comment.