Skip to content

Commit

Permalink
[Issue 9790] Support disabled the tenants/namespaces force deletion i…
Browse files Browse the repository at this point in the history
…n the broker.conf (apache#9819)

### Motivation

Fixes apache#9790

### Modifications

- Add new configs below and keep the default value to `false`:
```
forceDeleteTenantAllowed=false
forceDeleteNamespaceAllowed=false
```
- Add unit tests for this change.
  • Loading branch information
murong00 authored Mar 10, 2021
1 parent 76883d4 commit e246458
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 0 deletions.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ brokerDeleteInactivePartitionedTopicMetadataEnabled=false
# Topics that are inactive for longer than this value will be deleted
brokerDeleteInactiveTopicsMaxInactiveDurationSeconds=

# Allow you to delete a tenant forcefully.
forceDeleteTenantAllowed=false

# Allow you to delete a namespace forcefully.
forceDeleteNamespaceAllowed=false

# Max pending publish requests per connection to avoid keeping large number of pending
# requests in memory. Default: 1000
maxPendingPublishdRequestsPerConnection=1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,18 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private Integer brokerDeleteInactiveTopicsMaxInactiveDurationSeconds = null;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Allow forced deletion of tenants. Default is false."
)
private boolean forceDeleteTenantAllowed = false;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Allow forced deletion of namespaces. Default is false."
)
private boolean forceDeleteNamespaceAllowed = false;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Max pending publish requests per connection to avoid keeping large number of pending "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
validatePoliciesReadOnlyAccess();

if (!pulsar().getConfiguration().isForceDeleteNamespaceAllowed()) {
asyncResponse.resume(
new RestException(Status.METHOD_NOT_ALLOWED, "Broker doesn't allow forced deletion of namespaces"));
return;
}

// ensure that non-global namespace is directed to the correct cluster
if (!namespaceName.isGlobal()) {
validateClusterOwnership(namespaceName.getCluster());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ public void updateTenant(@Suspended final AsyncResponse asyncResponse,
@ApiOperation(value = "Delete a tenant and all namespaces and topics under it.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 405, message = "Broker doesn't allow forced deletion of tenants"),
@ApiResponse(code = 409, message = "The tenant still has active namespaces") })
public void deleteTenant(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") @ApiParam(value = "The tenant name") String tenant,
Expand Down Expand Up @@ -290,6 +291,12 @@ protected void internalDeleteTenant(AsyncResponse asyncResponse, String tenant)
}

protected void internalDeleteTenantForcefully(AsyncResponse asyncResponse, String tenant) {
if (!pulsar().getConfiguration().isForceDeleteTenantAllowed()) {
asyncResponse.resume(
new RestException(Status.METHOD_NOT_ALLOWED, "Broker doesn't allow forced deletion of tenants"));
return;
}

List<String> namespaces;
try {
namespaces = getListOfNamespaces(tenant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public void createNamespace(@PathParam("property") String property, @PathParam("
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 405, message = "Broker doesn't allow forced deletion of namespaces"),
@ApiResponse(code = 409, message = "Namespace is not empty") })
public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public void createNamespace(@PathParam("tenant") String tenant, @PathParam("name
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 405, message = "Broker doesn't allow forced deletion of namespaces"),
@ApiResponse(code = 409, message = "Namespace is not empty") })
public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,9 @@ public void testDeleteNamespaceBundle(Integer numBundles) throws Exception {

@Test
public void testDeleteTenantForcefully() throws Exception {
// allow forced deletion of tenants
pulsar.getConfiguration().setForceDeleteTenantAllowed(true);

String tenant = "my-tenant";
assertFalse(admin.tenants().getTenants().contains(tenant));

Expand Down Expand Up @@ -1134,6 +1137,9 @@ public void testDeleteTenantForcefully() throws Exception {
// Expected: cannot delete non-empty tenant
}

// allow forced deletion of namespaces
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);

// delete tenant forcefully
admin.tenants().deleteTenant(tenant, true);
assertFalse(admin.tenants().getTenants().contains(tenant));
Expand All @@ -1142,6 +1148,52 @@ public void testDeleteTenantForcefully() throws Exception {
new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")));
assertTrue(admin.tenants().getTenants().contains(tenant));
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());

// reset back to false
pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
}

@Test
public void testForceDeleteTenantNotAllowed() throws Exception {
assertFalse(pulsar.getConfiguration().isForceDeleteTenantAllowed());

String tenant = "my-tenant";
assertFalse(admin.tenants().getTenants().contains(tenant));

// create tenant
admin.tenants().createTenant(tenant,
new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")));

assertTrue(admin.tenants().getTenants().contains(tenant));

// create namespace
String namespace = tenant + "/my-ns";
admin.namespaces().createNamespace("my-tenant/my-ns", Sets.newHashSet("test"));

assertEquals(admin.namespaces().getNamespaces(tenant), Lists.newArrayList("my-tenant/my-ns"));

// create topic
String topic = namespace + "/my-topic";
admin.topics().createPartitionedTopic(topic, 10);

assertFalse(admin.topics().getList(namespace).isEmpty());

try {
admin.tenants().deleteTenant(tenant, false);
fail("should have failed");
} catch (PulsarAdminException e) {
// Expected: cannot delete non-empty tenant
}

try {
admin.tenants().deleteTenant(tenant, true);
fail("should have failed");
} catch (PulsarAdminException e) {
// Expected: cannot delete due to broker is not allowed
}

assertTrue(admin.tenants().getTenants().contains(tenant));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,9 @@ public void testDeleteNamespace() throws Exception {
*/
@Test
public void testForceDeleteNamespace() throws Exception {
// allow forced deletion of namespaces
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);

String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace");
String topic = namespace + "/topic";
String non_persistent_topic = "non-persistent://" + topic;
Expand All @@ -1163,6 +1166,43 @@ public void testForceDeleteNamespace() throws Exception {
admin.namespaces().createNamespace(namespace, 100);
topicList = admin.topics().getList(namespace);
assertTrue(topicList.isEmpty());

// reset back to false
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
}

@Test
public void testForceDeleteNamespaceNotAllowed() throws Exception {
assertFalse(pulsar.getConfiguration().isForceDeleteNamespaceAllowed());

String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace");
String topic = namespace + "/topic";
String non_persistent_topic = "non-persistent://" + topic;

admin.namespaces().createNamespace(namespace, 100);

admin.topics().createPartitionedTopic(topic, 10);

admin.topics().createNonPartitionedTopic(non_persistent_topic);

List<String> topicList = admin.topics().getList(namespace);
assertFalse(topicList.isEmpty());

try {
admin.namespaces().deleteNamespace(namespace, false);
fail("should have failed");
} catch (PulsarAdminException e) {
// Expected: Cannot delete non empty namespace
}

try {
admin.namespaces().deleteNamespace(namespace, true);
fail("should have failed");
} catch (PulsarAdminException e) {
// Expected: Cannot delete due to broker is not allowed
}

assertTrue(admin.namespaces().getNamespaces(this.testTenant).contains(namespace));
}

@Test
Expand Down
2 changes: 2 additions & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ internalListenerName|Specify the internal listener name for the broker.<br><br>*
|brokerDeleteInactiveTopicsFrequencySeconds| How often to check for inactive topics |60|
| brokerDeleteInactiveTopicsMode | Set the mode to delete inactive topics. <li> `delete_when_no_subscriptions`: delete the topic which has no subscriptions or active producers. <li> `delete_when_subscriptions_caught_up`: delete the topic whose subscriptions have no backlogs and which has no active producers or consumers. | `delete_when_no_subscriptions` |
| brokerDeleteInactiveTopicsMaxInactiveDurationSeconds | Set the maximum duration for inactive topics. If it is not specified, the `brokerDeleteInactiveTopicsFrequencySeconds` parameter is adopted. | N/A |
|forceDeleteTenantAllowed| Allow you to delete a tenant forcefully. |false|
|forceDeleteNamespaceAllowed| Allow you to delete a namespace forcefully. |false|
|messageExpiryCheckIntervalInMinutes| How frequently to proactively check and purge expired messages |5|
|brokerServiceCompactionMonitorIntervalInSeconds| Interval between checks to see if topics with compaction policies need to be compacted |60|
|delayedDeliveryEnabled|Whether to enable the delayed delivery for messages. If disabled, messages will be immediately delivered and there will be no tracking overhead.|true|
Expand Down

0 comments on commit e246458

Please sign in to comment.