Skip to content

Commit

Permalink
[improve][broker] Make some methods of ClusterBase pure async. (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored May 13, 2022
1 parent 7f976da commit aa03ccd
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -397,31 +396,49 @@ private CompletableFuture<Void> internalDeleteClusterAsync(String cluster) {
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public Map<String, ? extends NamespaceIsolationData> getNamespaceIsolationPolicies(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster
) throws Exception {
validateSuperUserAccess();
if (!clusterResources().clusterExists(cluster)) {
throw new RestException(Status.NOT_FOUND, "Cluster " + cluster + " does not exist.");
}
public void getNamespaceIsolationPolicies(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster
) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validateClusterExistAsync(cluster, Status.NOT_FOUND))
.thenCompose(__ -> internalGetNamespaceIsolationPolicies(cluster))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

try {
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
.getIsolationDataPolicies(cluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
// construct the response to Namespace isolation data map
return nsIsolationPolicies.getPolicies();
} catch (Exception e) {
log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, e);
throw new RestException(e);
}
/**
* Verify that the cluster exists.
* For compatibility to avoid breaking changes, we can specify a REST status code when it doesn't exist.
* @param cluster Cluster name
* @param notExistStatus REST status code
*/
private CompletableFuture<Void> validateClusterExistAsync(String cluster, Status notExistStatus) {
return clusterResources().clusterExistsAsync(cluster)
.thenAccept(clusterExist -> {
if (!clusterExist) {
throw new RestException(notExistStatus, "Cluster " + cluster + " does not exist.");
}
});
}

private CompletableFuture<Map<String, NamespaceIsolationDataImpl>> internalGetNamespaceIsolationPolicies(
String cluster) {
return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster)
.thenApply(namespaceIsolationPolicies -> {
if (!namespaceIsolationPolicies.isPresent()) {
throw new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist");
}
return namespaceIsolationPolicies.get().getPolicies();
});
}


@GET
@Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
@ApiOperation(
Expand All @@ -435,40 +452,28 @@ private CompletableFuture<Void> internalDeleteClusterAsync(String cluster) {
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public NamespaceIsolationData getNamespaceIsolationPolicy(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The name of the namespace isolation policy",
required = true
)
public void getNamespaceIsolationPolicy(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster,
@ApiParam(value = "The name of the namespace isolation policy", required = true)
@PathParam("policyName") String policyName
) throws Exception {
validateSuperUserAccess();
validateClusterExists(cluster);

try {
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
.getIsolationDataPolicies(cluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
// construct the response to Namespace isolation data map
if (!nsIsolationPolicies.getPolicies().containsKey(policyName)) {
log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}",
clientAppId(), policyName, cluster);
throw new RestException(Status.NOT_FOUND,
"Cannot find NamespaceIsolationPolicy " + policyName + " for cluster " + cluster);
}
return nsIsolationPolicies.getPolicies().get(policyName);
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster, e);
throw new RestException(e);
}
) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validateClusterExistAsync(cluster, Status.PRECONDITION_FAILED))
.thenCompose(__ -> internalGetNamespaceIsolationPolicies(cluster))
.thenAccept(policies -> {
// construct the response to Namespace isolation data map
if (!policies.containsKey(policyName)) {
throw new RestException(Status.NOT_FOUND,
"Cannot find NamespaceIsolationPolicy " + policyName + " for cluster " + cluster);
}
asyncResponse.resume(policies.get(policyName));
}).exceptionally(ex -> {
log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies/{}",
clientAppId(), cluster, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand All @@ -485,53 +490,44 @@ public NamespaceIsolationData getNamespaceIsolationPolicy(
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public List<BrokerNamespaceIsolationData> getBrokersWithNamespaceIsolationPolicy(
@ApiParam(
value = "The cluster name",
required = true
)
public void getBrokersWithNamespaceIsolationPolicy(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster) {
validateSuperUserAccess();
validateClusterExists(cluster);

Set<String> availableBrokers;
Map<String, ? extends NamespaceIsolationData> nsPolicies;
try {
availableBrokers = pulsar().getLoadManager().get().getAvailableBrokers();
} catch (Exception e) {
log.error("[{}] Failed to get list of brokers in cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
try {
Optional<NamespaceIsolationPolicies> nsPoliciesResult = namespaceIsolationPolicies()
.getIsolationDataPolicies(cluster);
if (!nsPoliciesResult.isPresent()) {
throw new RestException(Status.NOT_FOUND, "namespace-isolation policies not found for " + cluster);
}
nsPolicies = nsPoliciesResult.get().getPolicies();
} catch (Exception e) {
log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, e);
throw new RestException(e);
}
return availableBrokers.stream().map(broker -> {
BrokerNamespaceIsolationData.Builder brokerIsolationData = BrokerNamespaceIsolationData.builder()
.brokerName(broker);
if (nsPolicies != null) {
List<String> namespaceRegexes = new ArrayList<>();
nsPolicies.forEach((name, policyData) -> {
NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData);
if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) {
namespaceRegexes.addAll(policyData.getNamespaces());
if (nsPolicyImpl.isPrimaryBroker(broker)) {
brokerIsolationData.primary(true);
}
}
validateSuperUserAccessAsync()
.thenCompose(__ -> validateClusterExistAsync(cluster, Status.PRECONDITION_FAILED))
.thenCompose(__ -> pulsar().getLoadManager().get().getAvailableBrokersAsync())
.thenCompose(availableBrokers -> internalGetNamespaceIsolationPolicies(cluster)
.thenApply(policies -> availableBrokers.stream()
.map(broker -> internalGetBrokerNsIsolationData(broker, policies))
.collect(Collectors.toList())))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

brokerIsolationData.namespaceRegex(namespaceRegexes);
}
private BrokerNamespaceIsolationData internalGetBrokerNsIsolationData(
String broker,
Map<String, NamespaceIsolationDataImpl> policies) {
BrokerNamespaceIsolationData.Builder brokerIsolationData =
BrokerNamespaceIsolationData.builder().brokerName(broker);
if (policies == null) {
return brokerIsolationData.build();
}).collect(Collectors.toList());
}
List<String> namespaceRegexes = new ArrayList<>();
policies.forEach((name, policyData) -> {
NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData);
if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) {
namespaceRegexes.addAll(policyData.getNamespaces());
brokerIsolationData.primary(nsPolicyImpl.isPrimaryBroker(broker));
brokerIsolationData.policyName(name);
}
});
brokerIsolationData.namespaceRegex(namespaceRegexes);
return brokerIsolationData.build();
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,10 @@ public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle bundle)
}

public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNameSpacesStatusAsync() {
return getLocalNamespaceIsolationPoliciesAsync()
.thenCompose(namespaceIsolationPolicies -> {
return pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies()
.getIsolationDataPoliciesAsync(pulsar.getConfiguration().getClusterName())
.thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new))
.thenCompose(namespaceIsolationPolicies -> {
Collection<CompletableFuture<OwnedBundle>> futures =
ownershipCache.getOwnedBundlesAsync().values();
return FutureUtil.waitForAll(futures)
Expand Down Expand Up @@ -768,13 +770,6 @@ private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj,
return nsOwnedStatus;
}

private CompletableFuture<NamespaceIsolationPolicies> getLocalNamespaceIsolationPoliciesAsync() {
String localCluster = pulsar.getConfiguration().getClusterName();
return pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies()
.getIsolationDataPoliciesAsync(localCluster)
.thenApply(nsIsolationPolicies -> nsIsolationPolicies.orElseGet(NamespaceIsolationPolicies::new));
}

public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exception {
try {
// Does ZooKeeper says that the namespace is disabled?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,7 @@ public void brokerNamespaceIsolationPolicies() throws Exception {
assertEquals(brokerIsolationDataList.get(0).getBrokerName(), brokerAddress);
assertEquals(brokerIsolationDataList.get(0).getNamespaceRegex().size(), 1);
assertEquals(brokerIsolationDataList.get(0).getNamespaceRegex().get(0), namespaceRegex);
assertEquals(brokerIsolationDataList.get(0).getPolicyName(), policyName1);

BrokerNamespaceIsolationDataImpl brokerIsolationData = (BrokerNamespaceIsolationDataImpl) admin.clusters()
.getBrokerWithNamespaceIsolationPolicy(cluster, brokerAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ public void internalConfiguration() throws Exception {
}

@Test
@SuppressWarnings("unchecked")
public void clusters() throws Exception {
assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet());
verify(clusters, never()).validateSuperUserAccessAsync();
Expand Down Expand Up @@ -239,7 +240,7 @@ public void clusters() throws Exception {
ClusterData.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build());

try {
clusters.getNamespaceIsolationPolicies("use");
asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
Expand All @@ -259,7 +260,7 @@ public void clusters() throws Exception {
.build();
AsyncResponse response = mock(AsyncResponse.class);
clusters.setNamespaceIsolationPolicy(response,"use", "policy1", policyData);
clusters.getNamespaceIsolationPolicies("use");
asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));

try {
asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
Expand All @@ -269,7 +270,8 @@ public void clusters() throws Exception {
}

clusters.deleteNamespaceIsolationPolicy("use", "policy1");
assertTrue(clusters.getNamespaceIsolationPolicies("use").isEmpty());
assertTrue(((Map<String, NamespaceIsolationDataImpl>) asynRequests(ctx ->
clusters.getNamespaceIsolationPolicies(ctx, "use"))).isEmpty());

asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet());
Expand All @@ -289,7 +291,7 @@ public void clusters() throws Exception {
}

try {
clusters.getNamespaceIsolationPolicies("use");
asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
Expand Down Expand Up @@ -406,8 +408,8 @@ public void clusters() throws Exception {
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
verify(clusters, times(18)).validateSuperUserAccessAsync();
verify(clusters, times(6)).validateSuperUserAccess();
verify(clusters, times(22)).validateSuperUserAccessAsync();
verify(clusters, times(2)).validateSuperUserAccess();
}

Object asynRequests(Consumer<TestAsyncResponse> function) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ public void clusterNamespaceIsolationPolicies() throws PulsarAdminException {
List<BrokerNamespaceIsolationData> isoList = admin.clusters().getBrokersWithNamespaceIsolationPolicy("use");
assertEquals(isoList.size(), 1);
assertTrue(isoList.get(0).isPrimary());
assertEquals(isoList.get(0).getPolicyName(), policyName1);

// verify update of primary
nsPolicyData1.getPrimary().remove(0);
Expand Down

0 comments on commit aa03ccd

Please sign in to comment.