diff --git a/conf/broker.conf b/conf/broker.conf index 368a97a3ebf02..c1d1f9192a233 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -165,6 +165,12 @@ brokerDeleteInactivePartitionedTopicMetadataEnabled=false # Topics that are inactive for longer than this value will be deleted brokerDeleteInactiveTopicsMaxInactiveDurationSeconds= +# Allow you to delete a tenant forcefully. +forceDeleteTenantAllowed=false + +# Allow you to delete a namespace forcefully. +forceDeleteNamespaceAllowed=false + # Max pending publish requests per connection to avoid keeping large number of pending # requests in memory. Default: 1000 maxPendingPublishdRequestsPerConnection=1000 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 2087dc137379e..526c414f9d2ea 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -374,6 +374,18 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private Integer brokerDeleteInactiveTopicsMaxInactiveDurationSeconds = null; + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Allow forced deletion of tenants. Default is false." + ) + private boolean forceDeleteTenantAllowed = false; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Allow forced deletion of namespaces. Default is false." + ) + private boolean forceDeleteNamespaceAllowed = false; + @FieldContext( category = CATEGORY_POLICIES, doc = "Max pending publish requests per connection to avoid keeping large number of pending " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index dfc265e7dcaff..e20d2f8039b6d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -325,6 +325,12 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE); validatePoliciesReadOnlyAccess(); + if (!pulsar().getConfiguration().isForceDeleteNamespaceAllowed()) { + asyncResponse.resume( + new RestException(Status.METHOD_NOT_ALLOWED, "Broker doesn't allow forced deletion of namespaces")); + return; + } + // ensure that non-global namespace is directed to the correct cluster if (!namespaceName.isGlobal()) { validateClusterOwnership(namespaceName.getCluster()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java index 11f8db6a82799..390b1fb96d0a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java @@ -227,6 +227,7 @@ public void updateTenant(@Suspended final AsyncResponse asyncResponse, @ApiOperation(value = "Delete a tenant and all namespaces and topics under it.") @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), @ApiResponse(code = 404, message = "Tenant does not exist"), + @ApiResponse(code = 405, message = "Broker doesn't allow forced deletion of tenants"), @ApiResponse(code = 409, message = "The tenant still has active namespaces") }) public void deleteTenant(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "The tenant name") String tenant, @@ -290,6 +291,12 @@ protected void internalDeleteTenant(AsyncResponse asyncResponse, String tenant) } protected void internalDeleteTenantForcefully(AsyncResponse asyncResponse, String tenant) { + if (!pulsar().getConfiguration().isForceDeleteTenantAllowed()) { + asyncResponse.resume( + new RestException(Status.METHOD_NOT_ALLOWED, "Broker doesn't allow forced deletion of tenants")); + return; + } + List namespaces; try { namespaces = getListOfNamespaces(tenant); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index b2cadd7f0463b..783a37b08a4be 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -196,6 +196,7 @@ public void createNamespace(@PathParam("property") String property, @PathParam(" @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), + @ApiResponse(code = 405, message = "Broker doesn't allow forced deletion of namespaces"), @ApiResponse(code = 409, message = "Namespace is not empty") }) public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 3195f6b87a899..42a9728079a26 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -144,6 +144,7 @@ public void createNamespace(@PathParam("tenant") String tenant, @PathParam("name @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), + @ApiResponse(code = 405, message = "Broker doesn't allow forced deletion of namespaces"), @ApiResponse(code = 409, message = "Namespace is not empty") }) public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 6b121d8169860..98e4aac018fac 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1106,6 +1106,9 @@ public void testDeleteNamespaceBundle(Integer numBundles) throws Exception { @Test public void testDeleteTenantForcefully() throws Exception { + // allow forced deletion of tenants + pulsar.getConfiguration().setForceDeleteTenantAllowed(true); + String tenant = "my-tenant"; assertFalse(admin.tenants().getTenants().contains(tenant)); @@ -1134,6 +1137,9 @@ public void testDeleteTenantForcefully() throws Exception { // Expected: cannot delete non-empty tenant } + // allow forced deletion of namespaces + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true); + // delete tenant forcefully admin.tenants().deleteTenant(tenant, true); assertFalse(admin.tenants().getTenants().contains(tenant)); @@ -1142,6 +1148,52 @@ public void testDeleteTenantForcefully() throws Exception { new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"))); assertTrue(admin.tenants().getTenants().contains(tenant)); assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty()); + + // reset back to false + pulsar.getConfiguration().setForceDeleteTenantAllowed(false); + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); + } + + @Test + public void testForceDeleteTenantNotAllowed() throws Exception { + assertFalse(pulsar.getConfiguration().isForceDeleteTenantAllowed()); + + String tenant = "my-tenant"; + assertFalse(admin.tenants().getTenants().contains(tenant)); + + // create tenant + admin.tenants().createTenant(tenant, + new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"))); + + assertTrue(admin.tenants().getTenants().contains(tenant)); + + // create namespace + String namespace = tenant + "/my-ns"; + admin.namespaces().createNamespace("my-tenant/my-ns", Sets.newHashSet("test")); + + assertEquals(admin.namespaces().getNamespaces(tenant), Lists.newArrayList("my-tenant/my-ns")); + + // create topic + String topic = namespace + "/my-topic"; + admin.topics().createPartitionedTopic(topic, 10); + + assertFalse(admin.topics().getList(namespace).isEmpty()); + + try { + admin.tenants().deleteTenant(tenant, false); + fail("should have failed"); + } catch (PulsarAdminException e) { + // Expected: cannot delete non-empty tenant + } + + try { + admin.tenants().deleteTenant(tenant, true); + fail("should have failed"); + } catch (PulsarAdminException e) { + // Expected: cannot delete due to broker is not allowed + } + + assertTrue(admin.tenants().getTenants().contains(tenant)); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index f4260d46b42c5..bc6d902b890ff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -1139,6 +1139,9 @@ public void testDeleteNamespace() throws Exception { */ @Test public void testForceDeleteNamespace() throws Exception { + // allow forced deletion of namespaces + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true); + String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace"); String topic = namespace + "/topic"; String non_persistent_topic = "non-persistent://" + topic; @@ -1163,6 +1166,43 @@ public void testForceDeleteNamespace() throws Exception { admin.namespaces().createNamespace(namespace, 100); topicList = admin.topics().getList(namespace); assertTrue(topicList.isEmpty()); + + // reset back to false + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); + } + + @Test + public void testForceDeleteNamespaceNotAllowed() throws Exception { + assertFalse(pulsar.getConfiguration().isForceDeleteNamespaceAllowed()); + + String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace"); + String topic = namespace + "/topic"; + String non_persistent_topic = "non-persistent://" + topic; + + admin.namespaces().createNamespace(namespace, 100); + + admin.topics().createPartitionedTopic(topic, 10); + + admin.topics().createNonPartitionedTopic(non_persistent_topic); + + List topicList = admin.topics().getList(namespace); + assertFalse(topicList.isEmpty()); + + try { + admin.namespaces().deleteNamespace(namespace, false); + fail("should have failed"); + } catch (PulsarAdminException e) { + // Expected: Cannot delete non empty namespace + } + + try { + admin.namespaces().deleteNamespace(namespace, true); + fail("should have failed"); + } catch (PulsarAdminException e) { + // Expected: Cannot delete due to broker is not allowed + } + + assertTrue(admin.namespaces().getNamespaces(this.testTenant).contains(namespace)); } @Test diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index 95b786fc28d54..8308f62830038 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -185,6 +185,8 @@ internalListenerName|Specify the internal listener name for the broker.

* |brokerDeleteInactiveTopicsFrequencySeconds| How often to check for inactive topics |60| | brokerDeleteInactiveTopicsMode | Set the mode to delete inactive topics.
  • `delete_when_no_subscriptions`: delete the topic which has no subscriptions or active producers.
  • `delete_when_subscriptions_caught_up`: delete the topic whose subscriptions have no backlogs and which has no active producers or consumers. | `delete_when_no_subscriptions` | | brokerDeleteInactiveTopicsMaxInactiveDurationSeconds | Set the maximum duration for inactive topics. If it is not specified, the `brokerDeleteInactiveTopicsFrequencySeconds` parameter is adopted. | N/A | +|forceDeleteTenantAllowed| Allow you to delete a tenant forcefully. |false| +|forceDeleteNamespaceAllowed| Allow you to delete a namespace forcefully. |false| |messageExpiryCheckIntervalInMinutes| How frequently to proactively check and purge expired messages |5| |brokerServiceCompactionMonitorIntervalInSeconds| Interval between checks to see if topics with compaction policies need to be compacted |60| |delayedDeliveryEnabled|Whether to enable the delayed delivery for messages. If disabled, messages will be immediately delivered and there will be no tracking overhead.|true|