Skip to content

Commit

Permalink
[pulsar-broker] namespace resources use metadata-store api (apache#9351)
Browse files Browse the repository at this point in the history
* [pulsar-broker] namespace resources use metadata-store api

fix build

fix api

Fix test:

fix test

fix test

* address comments
  • Loading branch information
rdhabalia authored Feb 8, 2021
1 parent efb2089 commit 7fd3218
Show file tree
Hide file tree
Showing 9 changed files with 453 additions and 1,125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,42 +229,6 @@ public void validatePoliciesReadOnlyAccess() {
}
}

/**
* Get the list of namespaces (on every cluster) for a given property.
*
* @param property the property name
* @return the list of namespaces
*/
protected List<String> getListOfNamespaces(String property) throws Exception {
List<String> namespaces = Lists.newArrayList();

// this will return a cluster in v1 and a namespace in v2
for (String clusterOrNamespace : globalZk().getChildren(path(POLICIES, property), false)) {
// Then get the list of namespaces
try {
final List<String> children = globalZk().getChildren(
path(POLICIES, property, clusterOrNamespace), false);
if (children == null || children.isEmpty()) {
String namespace = NamespaceName.get(property, clusterOrNamespace).toString();
// if the length is 0 then this is probably a leftover cluster from namespace created
// with the v1 admin format (prop/cluster/ns) and then deleted, so no need to add it to the list
if (globalZk().getData(path(POLICIES, namespace), false, null).length != 0) {
namespaces.add(namespace);
}
} else {
children.forEach(ns -> {
namespaces.add(NamespaceName.get(property, clusterOrNamespace, ns).toString());
});
}
} catch (KeeperException.NoNodeException e) {
// A cluster was deleted between the 2 getChildren() calls, ignoring
}
}

namespaces.sort(null);
return namespaces;
}

protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {
if (!topicName.isPersistent()) {
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -416,7 +380,7 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
try {
final String namespace = namespaceName.toString();
final String policyPath = AdminResource.path(POLICIES, namespace);
Policies policies = policiesCache().get(policyPath)
Policies policies = namespaceResources().get(policyPath)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,4 @@ public boolean exists(String path) throws MetadataStoreException {
public CompletableFuture<Boolean> existsAsync(String path) {
return cache.exists(path);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@
import java.util.Map;
import java.util.Optional;
import lombok.Getter;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

@Getter
public class NamespaceResources extends BaseResources<Policies> {
@Getter
private IsolationPolicyResources isolationPolicies;
private LocalPoliciesResources localPolicies;

public NamespaceResources(MetadataStoreExtended store) {
super(store, Policies.class);
isolationPolicies = new IsolationPolicyResources(store);
public NamespaceResources(MetadataStoreExtended localStore, MetadataStoreExtended configurationStore) {
super(configurationStore, Policies.class);
isolationPolicies = new IsolationPolicyResources(configurationStore);
localPolicies = new LocalPoliciesResources(localStore);
}

public static class IsolationPolicyResources extends BaseResources<Map<String, NamespaceIsolationData>> {
Expand All @@ -48,4 +51,10 @@ public Optional<NamespaceIsolationPolicies> getPolicies(String path) throws Meta
return data.isPresent() ? Optional.of(new NamespaceIsolationPolicies(data.get())) : Optional.empty();
}
}

public static class LocalPoliciesResources extends BaseResources<LocalPolicies> {
public LocalPoliciesResources(MetadataStoreExtended configurationStore) {
super(configurationStore, LocalPolicies.class);
}
}
}
Loading

0 comments on commit 7fd3218

Please sign in to comment.