Skip to content

Commit

Permalink
fix:remove the loadbalance/bundle-data node (apache#13164)
Browse files Browse the repository at this point in the history
Fixes apache#13162


### Motivation


fix: remove the loadbalance/bundle-data node, when deleting namespace. it will cause zk node leak, fix apache#13162

### Modifications

remove the bundle-data Recursively
  • Loading branch information
leizhiyuan authored Dec 8, 2021
1 parent a89c6e4 commit 1220f84
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class NamespaceResources extends BaseResources<Policies> {

private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
private static final String NAMESPACE_BASE_PATH = "/namespace";
private static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data";

public NamespaceResources(MetadataStore localStore, MetadataStore configurationStore, int operationTimeoutSec) {
super(configurationStore, Policies.class, operationTimeoutSec);
Expand Down Expand Up @@ -296,4 +297,39 @@ public CompletableFuture<Void> clearPartitionedTopicTenantAsync(String tenant) {
return future;
}
}

// clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` for zk-node
public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) {
final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString());
CompletableFuture<Void> future = new CompletableFuture<Void>();
deleteRecursiveAsync(this, namespaceBundlePath).whenComplete((ignore, ex) -> {
if (ex instanceof MetadataStoreException.NotFoundException) {
future.complete(null);
} else if (ex != null) {
future.completeExceptionally(ex);
} else {
future.complete(null);
}
});

return future;
}

// clear resource of `/loadbalance/bundle-data/{tenant}/` for zk-node
public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
CompletableFuture<Void> future = new CompletableFuture<Void>();
deleteRecursiveAsync(this, tenantBundlePath).whenComplete((ignore, ex) -> {
if (ex instanceof MetadataStoreException.NotFoundException) {
future.complete(null);
} else if (ex != null) {
future.completeExceptionally(ex);
} else {
future.complete(null);
}
});

return future;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ protected void internalClearZkSources(AsyncResponse asyncResponse) {
.thenCompose(ignore -> namespaceResources().deletePoliciesAsync(namespaceName))
// clear z-node of local policies
.thenCompose(ignore -> getLocalPolicies().deleteLocalPoliciesAsync(namespaceName))
// clear /loadbalance/bundle-data
.thenCompose(ignore -> namespaceResources().deleteBundleDataAsync(namespaceName))
.whenComplete((ignore, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to remove namespace or managed-ledger for {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ protected void internalDeleteTenant(AsyncResponse asyncResponse, String tenant)
.getPartitionedTopicResources().clearPartitionedTopicTenantAsync(tenant))
.thenCompose(ignore -> pulsar().getPulsarResources().getLocalPolicies()
.deleteLocalPoliciesTenantAsync(tenant))
.thenCompose(ignore -> pulsar().getPulsarResources().getNamespaceResources()
.deleteBundleDataTenantAsync(tenant))
.whenComplete((ignore, ex) -> {
if (ex != null) {
log.error("[{}] Failed to delete tenant {}", clientAppId(), tenant, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1311,9 +1311,11 @@ public void testDeleteTenant() throws Exception {
final String managedLedgersPath = "/managed-ledgers/" + tenant;
final String partitionedTopicPath = "/admin/partitioned-topics/" + tenant;
final String localPoliciesPath = "/admin/local-policies/" + tenant;
final String bundleDataPath = "/loadbalance/bundle-data/" + tenant;
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
assertFalse(pulsar.getLocalMetadataStore().exists(partitionedTopicPath).join());
assertFalse(pulsar.getLocalMetadataStore().exists(localPoliciesPath).join());
assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
}

@Test
Expand Down Expand Up @@ -1357,6 +1359,10 @@ public void testDeleteNamespace() throws Exception {

final String managedLedgersPath = "/managed-ledgers/" + namespace;
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());


final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
}


Expand Down

0 comments on commit 1220f84

Please sign in to comment.