Skip to content

Commit

Permalink
[improve][broker] Make some methods in NamespacesBase async. (apache#…
Browse files Browse the repository at this point in the history
…15518)

### Motivation
See PIP apache#14365  and change tracker apache#15043. 

 Make `NamespacesBase` `getTenantNamespaces / createNamespace / getTopics / getPolicies / getPermissions` methods to pure async.
  • Loading branch information
Technoboy- authored May 14, 2022
1 parent e6996b8 commit 138ea35
Show file tree
Hide file tree
Showing 10 changed files with 453 additions and 310 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public void createPolicies(NamespaceName ns, Policies policies) throws MetadataS
create(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
}

public CompletableFuture<Void> createPoliciesAsync(NamespaceName ns, Policies policies) {
return createAsync(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
}

public boolean namespaceExists(NamespaceName ns) throws MetadataStoreException {
String path = joinPath(BASE_POLICIES_PATH, ns.toString());
return super.exists(path) && super.getChildren(path).isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.resources;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -78,7 +79,7 @@ public CompletableFuture<Void> updateTenantAsync(String tenantName, Function<Ten
}

public CompletableFuture<Boolean> tenantExistsAsync(String tenantName) {
return getCache().exists(joinPath(BASE_POLICIES_PATH, tenantName));
return existsAsync(joinPath(BASE_POLICIES_PATH, tenantName));
}

public List<String> getListOfNamespaces(String tenant) throws MetadataStoreException {
Expand Down Expand Up @@ -110,6 +111,41 @@ public List<String> getListOfNamespaces(String tenant) throws MetadataStoreExcep
return namespaces;
}

public CompletableFuture<List<String>> getListOfNamespacesAsync(String tenant) {
// this will return a cluster in v1 and a namespace in v2
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant))
.thenCompose(clusterOrNamespaces -> clusterOrNamespaces.stream().map(key ->
getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, key))
.thenCompose(children -> {
if (children == null || children.isEmpty()) {
String namespace = NamespaceName.get(tenant, key).toString();
// if the length is 0 then this is probably a leftover cluster from namespace
// created with the v1 admin format (prop/cluster/ns) and then deleted, so no
// need to add it to the list
return getAsync(joinPath(BASE_POLICIES_PATH, namespace))
.thenApply(opt -> opt.isPresent() ? Collections.singletonList(namespace)
: new ArrayList<String>())
.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof MetadataStoreException
.ContentDeserializationException) {
return new ArrayList<>();
}
throw FutureUtil.wrapToCompletionException(ex);
});
} else {
CompletableFuture<List<String>> ret = new CompletableFuture();
ret.complete(children.stream().map(ns -> NamespaceName.get(tenant, key, ns)
.toString()).collect(Collectors.toList()));
return ret;
}
})).reduce(CompletableFuture.completedFuture(new ArrayList<>()),
(accumulator, n) -> accumulator.thenCompose(namespaces -> n.thenApply(m -> {
namespaces.addAll(m);
return namespaces;
}))));
}

public CompletableFuture<List<String>> getActiveNamespaces(String tenant, String cluster) {
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, cluster));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName na
return FutureUtil.failedFuture(new RestException(e));
}
policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles;
if (policies.get().is_allow_auto_update_schema == null) {
// the type changed from boolean to Boolean. return broker value here for keeping compatibility.
policies.get().is_allow_auto_update_schema = pulsar().getConfig()
.isAllowAutoUpdateSchemaEnabled();
}
return CompletableFuture.completedFuture(policies.get());
});
} else {
Expand Down Expand Up @@ -534,6 +539,11 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
}
}

protected CompletableFuture<List<String>> getPartitionedTopicListAsync(TopicDomain topicDomain) {
return namespaceResources().getPartitionedTopicResources()
.listPartitionedTopicsAsync(namespaceName, topicDomain);
}

protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
try {
return getPulsarResources().getTopicResources().getExistingPartitions(topicName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,60 +103,54 @@
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class NamespacesBase extends AdminResource {

protected List<String> internalGetTenantNamespaces(String tenant) {
checkNotNull(tenant, "Tenant should not be null");
protected CompletableFuture<List<String>> internalGetTenantNamespaces(String tenant) {
if (tenant == null) {
return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Tenant should not be null"));
}
try {
NamedEntity.checkName(tenant);
} catch (IllegalArgumentException e) {
log.warn("[{}] Tenant name is invalid {}", clientAppId(), tenant, e);
throw new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid");
}
validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);

try {
if (!tenantResources().tenantExists(tenant)) {
throw new RestException(Status.NOT_FOUND, "Tenant not found");
}

return tenantResources().getListOfNamespaces(tenant);
} catch (Exception e) {
log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e);
throw new RestException(e);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"));
}
return validateTenantOperationAsync(tenant, TenantOperation.LIST_NAMESPACES)
.thenCompose(__ -> tenantResources().tenantExistsAsync(tenant))
.thenCompose(existed -> {
if (!existed) {
throw new RestException(Status.NOT_FOUND, "Tenant not found");
}
return tenantResources().getListOfNamespacesAsync(tenant);
});
}

protected void internalCreateNamespace(Policies policies) {
validateTenantOperation(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE);
validatePoliciesReadOnlyAccess();
validatePolicies(namespaceName, policies);

try {
int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
// no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
if (maxNamespacesPerTenant > 0) {
List<String> namespaces = tenantResources().getListOfNamespaces(namespaceName.getTenant());
if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
throw new RestException(Status.PRECONDITION_FAILED,
"Exceed the maximum number of namespace in tenant :" + namespaceName.getTenant());
}
}
namespaceResources().createPolicies(namespaceName, policies);
log.info("[{}] Created namespace {}", clientAppId(), namespaceName);
} catch (AlreadyExistsException e) {
log.warn("[{}] Failed to create namespace {} - already exists", clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Namespace already exists");
} catch (Exception e) {
log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
protected CompletableFuture<Void> internalCreateNamespace(Policies policies) {
return validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenAccept(__ -> validatePolicies(namespaceName, policies))
.thenCompose(__ -> {
int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
// no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
if (maxNamespacesPerTenant > 0) {
return tenantResources().getListOfNamespacesAsync(namespaceName.getTenant())
.thenAccept(namespaces -> {
if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
throw new RestException(Status.PRECONDITION_FAILED,
"Exceed the maximum number of namespace in tenant :"
+ namespaceName.getTenant());
}
});
}
return CompletableFuture.completedFuture(null);
})
.thenCompose(__ -> namespaceResources().createPoliciesAsync(namespaceName, policies))
.thenAccept(__ -> log.info("[{}] Created namespace {}", clientAppId(), namespaceName));
}

protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
Expand Down
Loading

0 comments on commit 138ea35

Please sign in to comment.