Skip to content

Commit

Permalink
PIP-45: Migrate NamespaceService to use MetadataStore (apache#10532)
Browse files Browse the repository at this point in the history
* PIP-45: Migrate NamespaceService to use MetadataStore

* Fixed copying policies to local store

* Fixed checkstyle

* Fixed tests

* Fixed test

* Fixed handling of BadVersionException

* Converted bundle split into an HTTP async operation

* Fixed issues in concurrent bundle splits

* Fixed checkstyle

* Make sure the ownership is triggered for the first attempt

* validateTopicOwnershipAsync

* Fixed deadlock when removeTopicFromCache calls NamespaceService.getBundle from MetadataStore thread

* Fixed checkstyle

* Fixed handling of future composition in validateTopicOwnershipAsync

* In internalCreateSubscriptionForNonPartitionedTopic() we shouldn't have mix sync & async operations

* Fixed checkstyle

* Fixed internalUnloadNonPartitionedTopic

* Fixed mixed sync-async usage of NamespaceService.getBundle

* Use awaitility in AdminTest

* validateGlobalNamespaceOwnership should also be done asynchronously

* Fixed checkstyle

* Fixed missing ,

* Try to get stack traces for timedout test

* Added async version of getTopicReference

* Removed import

* Fixed test mocks

* Reflect that removeTopicFromCache is an async operation
  • Loading branch information
merlimat authored May 25, 2021
1 parent ed2dfc9 commit 4d4f624
Show file tree
Hide file tree
Showing 25 changed files with 818 additions and 941 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ public Boolean get() {

state = State.Started;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e);
throw new PulsarServerException(e);
} finally {
mutex.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -127,13 +125,6 @@ protected void validateAdminAccessForTenant(String property) {
super.validateAdminAccessForTenant(property);
}

// This is a stub method for Mockito
@Override
protected void validateNamespaceOwnershipWithBundles(String property, String cluster, String namespace,
boolean authoritative, boolean readOnly, BundlesData bundleData) {
super.validateNamespaceOwnershipWithBundles(property, cluster, namespace, authoritative, readOnly, bundleData);
}

// This is a stub method for Mockito
@Override
protected void validateBundleOwnership(String property, String cluster, String namespace, boolean authoritative,
Expand Down Expand Up @@ -315,9 +306,8 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
Policies policies = namespaceResources().get(policyPath)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
BundlesData bundleData = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName).getBundlesData();
policies.bundles = bundleData != null ? bundleData : policies.bundles;

return policies;
Expand All @@ -343,7 +333,7 @@ protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName na
.thenCompose(bundles -> {
BundlesData bundleData = null;
try {
bundleData = NamespaceBundleFactory.getBundlesData(bundles);
bundleData = bundles.getBundlesData();
} catch (Exception e) {
log.error("[{}] Failed to get namespace policies {}", clientAppId(), namespaceName, e);
return FutureUtil.failedFuture(new RestException(e));
Expand Down Expand Up @@ -465,30 +455,25 @@ protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadat
TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) {
try {
validateClusterOwnership(topicName.getCluster());
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
} catch (Exception e) {
return FutureUtil.failedFuture(e);
}

try {
validateTopicOperation(topicName, TopicOperation.LOOKUP);
} catch (RestException e) {
return FutureUtil.failedFuture(e);
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing lookup. topic={}, role={}. Error: {}", topicName,
clientAppId(), e.getMessage(), e);
return FutureUtil.failedFuture(e);
}

if (checkAllowAutoCreation) {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName);
} else {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName);
}
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
return validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject())
.thenRun(() -> {
validateTopicOperation(topicName, TopicOperation.LOOKUP);
})
.thenCompose(__ -> {
if (checkAllowAutoCreation) {
return pulsar().getBrokerService()
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName);
} else {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName);
}
});
}

protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
Expand Down Expand Up @@ -561,9 +546,8 @@ protected Policies getNamespacePolicies(String property, String cluster, String
Policies policies = namespaceResources().get(AdminResource.path(POLICIES, property, cluster, namespace))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(NamespaceName.get(property, cluster, namespace));
BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
BundlesData bundleData = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(NamespaceName.get(property, cluster, namespace)).getBundlesData();
policies.bundles = bundleData != null ? bundleData : policies.bundles;
return policies;
} catch (RestException re) {
Expand Down
Loading

0 comments on commit 4d4f624

Please sign in to comment.