Skip to content

Commit

Permalink
[improve][broker] Make some methods of ClusterBase pure async. (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored May 10, 2022
1 parent 7234911 commit d7a4f7f
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
Expand Down Expand Up @@ -84,6 +87,25 @@ public void deleteCluster(String clusterName) throws MetadataStoreException {
delete(joinPath(BASE_CLUSTERS_PATH, clusterName));
}

public CompletableFuture<Void> deleteClusterAsync(String clusterName) {
return deleteAsync(joinPath(BASE_CLUSTERS_PATH, clusterName));
}

public CompletableFuture<Boolean> isClusterUsedAsync(String clusterName) {
return getCache().getChildren(BASE_POLICIES_PATH)
.thenCompose(tenants -> {
List<CompletableFuture<List<String>>> futures = tenants.stream()
.map(tenant -> getCache().getChildren(joinPath(BASE_POLICIES_PATH, tenant, clusterName)))
.collect(Collectors.toList());
return FutureUtil.waitForAll(futures)
.thenApply(__ -> {
// We found a tenant that has at least a namespace in this cluster
return futures.stream().map(CompletableFuture::join)
.anyMatch(CollectionUtils::isNotEmpty);
});
});
}

public boolean isClusterUsed(String clusterName) throws MetadataStoreException {
for (String tenant : getCache().getChildren(BASE_POLICIES_PATH).join()) {
if (!getCache().getChildren(joinPath(BASE_POLICIES_PATH, tenant, clusterName)).join().isEmpty()) {
Expand Down Expand Up @@ -133,6 +155,21 @@ public void deleteFailureDomain(String clusterName, String domainName) throws Me
delete(path);
}

public CompletableFuture<Void> deleteFailureDomainsAsync(String clusterName) {
String failureDomainPath = joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN);
return existsAsync(failureDomainPath)
.thenCompose(exists -> {
if (!exists) {
return CompletableFuture.completedFuture(null);
}
return getChildrenAsync(failureDomainPath)
.thenCompose(children -> FutureUtil.waitForAll(children.stream()
.map(domain -> deleteAsync(joinPath(failureDomainPath, domain)))
.collect(Collectors.toList())))
.thenCompose(__ -> deleteAsync(failureDomainPath));
});
}

public void deleteFailureDomains(String clusterName) throws MetadataStoreException {
String failureDomainPath = joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN);
if (!exists(failureDomainPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,19 @@ public Optional<NamespaceIsolationPolicies> getIsolationDataPolicies(String clus
return data.isPresent() ? Optional.of(new NamespaceIsolationPolicies(data.get())) : Optional.empty();
}

public CompletableFuture<NamespaceIsolationPolicies> getIsolationDataPoliciesAsync(String cluster) {
public CompletableFuture<Optional<NamespaceIsolationPolicies>> getIsolationDataPoliciesAsync(String cluster) {
return getAsync(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES))
.thenApply(data -> data.map(NamespaceIsolationPolicies::new)
.orElseGet(NamespaceIsolationPolicies::new));
.thenApply(data -> data.map(NamespaceIsolationPolicies::new));
}

public void deleteIsolationData(String cluster) throws MetadataStoreException {
delete(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES));
}

public CompletableFuture<Void> deleteIsolationDataAsync(String cluster) {
return deleteAsync(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES));
}

public void createIsolationData(String cluster, Map<String, NamespaceIsolationDataImpl> id)
throws MetadataStoreException {
create(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES), id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.resources.ClusterResources.FailureDomainResources;
import org.apache.pulsar.broker.web.RestException;
Expand Down Expand Up @@ -238,68 +239,62 @@ public void updateCluster(
@ApiResponse(code = 412, message = "Peer cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void setPeerClusterNames(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The list of peer cluster names",
required = true,
examples = @Example(
value = @ExampleProperty(
mediaType = MediaType.APPLICATION_JSON,
value =
"[\n"
+ " 'cluster-a',\n"
+ " 'cluster-b'\n"
+ "]"
)
)
)
LinkedHashSet<String> peerClusterNames
) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
public void setPeerClusterNames(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The list of peer cluster names",
required = true,
examples = @Example(
value = @ExampleProperty(mediaType = MediaType.APPLICATION_JSON,
value = "[\n"
+ " 'cluster-a',\n"
+ " 'cluster-b'\n"
+ "]")))
LinkedHashSet<String> peerClusterNames) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> innerSetPeerClusterNamesAsync(cluster, peerClusterNames))
.thenAccept(__ -> {
log.info("[{}] Successfully added peer-cluster {} for {}",
clientAppId(), peerClusterNames, cluster);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to validate peer-cluster list {}, {}", clientAppId(), peerClusterNames, ex);
if (realCause instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Cluster does not exist"));
return null;
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});

}

private CompletableFuture<Void> innerSetPeerClusterNamesAsync(String cluster,
LinkedHashSet<String> peerClusterNames) {
// validate if peer-cluster exist
if (peerClusterNames != null && !peerClusterNames.isEmpty()) {
for (String peerCluster : peerClusterNames) {
try {
if (cluster.equalsIgnoreCase(peerCluster)) {
throw new RestException(Status.PRECONDITION_FAILED,
cluster + " itself can't be part of peer-list");
}
clusterResources().getCluster(peerCluster)
.orElseThrow(() -> new RestException(Status.PRECONDITION_FAILED,
"Peer cluster " + peerCluster + " does not exist"));
} catch (RestException e) {
log.warn("[{}] Peer cluster doesn't exist from {}, {}", clientAppId(), peerClusterNames,
e.getMessage());
throw e;
} catch (Exception e) {
log.warn("[{}] Failed to validate peer-cluster list {}, {}", clientAppId(), peerClusterNames,
e.getMessage());
throw new RestException(e);
CompletableFuture<Void> future;
if (CollectionUtils.isNotEmpty(peerClusterNames)) {
future = FutureUtil.waitForAll(peerClusterNames.stream().map(peerCluster -> {
if (cluster.equalsIgnoreCase(peerCluster)) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
cluster + " itself can't be part of peer-list"));
}
}
}

try {
clusterResources().updateCluster(cluster, old ->
old.clone()
.peerClusterNames(peerClusterNames)
.build()
);
log.info("[{}] Successfully added peer-cluster {} for {}", clientAppId(), peerClusterNames, cluster);
} catch (NotFoundException e) {
log.warn("[{}] Failed to update cluster {}: Does not exist", clientAppId(), cluster);
throw new RestException(Status.NOT_FOUND, "Cluster does not exist");
} catch (Exception e) {
log.error("[{}] Failed to update cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
return clusterResources().getClusterAsync(peerCluster)
.thenAccept(peerClusterOpt -> {
if (!peerClusterOpt.isPresent()) {
throw new RestException(Status.PRECONDITION_FAILED,
"Peer cluster " + peerCluster + " does not exist");
}
});
}).collect(Collectors.toList()));
} else {
future = CompletableFuture.completedFuture(null);
}
return future.thenCompose(__ -> clusterResources().updateClusterAsync(cluster,
old -> old.clone().peerClusterNames(peerClusterNames).build()));
}

@GET
Expand All @@ -315,22 +310,20 @@ public void setPeerClusterNames(
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public Set<String> getPeerCluster(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster
) {
validateSuperUserAccess();
try {
ClusterData clusterData = clusterResources().getCluster(cluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist"));
return clusterData.getPeerClusterNames();
} catch (Exception e) {
log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
public void getPeerCluster(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster) {
validateSuperUserAccessAsync()
.thenCompose(__ -> clusterResources().getClusterAsync(cluster))
.thenAccept(clusterOpt -> {
ClusterData clusterData =
clusterOpt.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist"));
asyncResponse.resume(clusterData.getPeerClusterNames());
}).exceptionally(ex -> {
log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
Expand All @@ -346,54 +339,49 @@ public Set<String> getPeerCluster(
@ApiResponse(code = 412, message = "Cluster is not empty."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void deleteCluster(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster
) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
public void deleteCluster(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> internalDeleteClusterAsync(cluster))
.thenAccept(__ -> {
log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof NotFoundException) {
log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), cluster);
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Cluster does not exist"));
return null;
}
log.error("[{}] Failed to delete cluster {}", clientAppId(), cluster, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private CompletableFuture<Void> internalDeleteClusterAsync(String cluster) {
// Check that the cluster is not used by any tenant (eg: no namespaces provisioned there)
boolean isClusterUsed = false;
try {
isClusterUsed = pulsar().getPulsarResources().getClusterResources().isClusterUsed(cluster);

// check the namespaceIsolationPolicies associated with the cluster
Optional<NamespaceIsolationPolicies> nsIsolationPolicies =
namespaceIsolationPolicies().getIsolationDataPolicies(cluster);

// Need to delete the isolation policies if present
if (nsIsolationPolicies.isPresent()) {
if (nsIsolationPolicies.get().getPolicies().isEmpty()) {
namespaceIsolationPolicies().deleteIsolationData(cluster);
} else {
isClusterUsed = true;
}
}
} catch (Exception e) {
log.error("[{}] Failed to get cluster usage {}", clientAppId(), cluster, e);
throw new RestException(e);
}

if (isClusterUsed) {
log.warn("[{}] Failed to delete cluster {} - Cluster not empty", clientAppId(), cluster);
throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
}

try {
clusterResources().getFailureDomainResources().deleteFailureDomains(cluster);
clusterResources().deleteCluster(cluster);
log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
} catch (NotFoundException e) {
log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), cluster);
throw new RestException(Status.NOT_FOUND, "Cluster does not exist");
} catch (Exception e) {
log.error("[{}] Failed to delete cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
return pulsar().getPulsarResources().getClusterResources().isClusterUsedAsync(cluster)
.thenCompose(isClusterUsed -> {
if (isClusterUsed) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
}
// check the namespaceIsolationPolicies associated with the cluster
return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster);
}).thenCompose(nsIsolationPoliciesOpt -> {
if (nsIsolationPoliciesOpt.isPresent()) {
if (!nsIsolationPoliciesOpt.get().getPolicies().isEmpty()) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
}
// Need to delete the isolation policies if present
return namespaceIsolationPolicies().deleteIsolationDataAsync(cluster);
}
return CompletableFuture.completedFuture(null);
}).thenCompose(unused -> clusterResources()
.getFailureDomainResources().deleteFailureDomainsAsync(cluster)
.thenCompose(__ -> clusterResources().deleteClusterAsync(cluster)));
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,8 @@ private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj,
private CompletableFuture<NamespaceIsolationPolicies> getLocalNamespaceIsolationPoliciesAsync() {
String localCluster = pulsar.getConfiguration().getClusterName();
return pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies()
.getIsolationDataPoliciesAsync(localCluster);
.getIsolationDataPoliciesAsync(localCluster)
.thenApply(nsIsolationPolicies -> nsIsolationPolicies.orElseGet(NamespaceIsolationPolicies::new));
}

public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exception {
Expand Down
Loading

0 comments on commit d7a4f7f

Please sign in to comment.