Skip to content

Commit

Permalink
Ensure getting list of topics for namespace is handled asynchronously (
Browse files Browse the repository at this point in the history
…apache#5188)

* Ensure getting list of topics for namespace is handled asynchrounously

* Fixed mocked zk deadlock

* Test fixes

* Fixed mutex unlocking in mock-zookeeper create method

* Fixed caching of empty values

* Do async call in background

* Fixed merge conflicts

* Fixed broken import from shaded class
  • Loading branch information
merlimat authored Sep 18, 2019
1 parent f3d37d0 commit 871a1e0
Show file tree
Hide file tree
Showing 17 changed files with 276 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ public void register(Watcher watcher) {
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
throws KeeperException, InterruptedException {
mutex.lock();

final Set<Watcher> toNotifyCreate = Sets.newHashSet();
final Set<Watcher> toNotifyParent = Sets.newHashSet();
final String parent = path.substring(0, path.lastIndexOf("/"));

try {
checkProgrammedFail();

Expand All @@ -136,7 +141,6 @@ public String create(String path, byte[] data, List<ACL> acl, CreateMode createM
throw new KeeperException.NodeExistsException(path);
}

final String parent = path.substring(0, path.lastIndexOf("/"));
if (!parent.isEmpty() && !tree.containsKey(parent)) {
throw new KeeperException.NoNodeException();
}
Expand All @@ -152,55 +156,57 @@ public String create(String path, byte[] data, List<ACL> acl, CreateMode createM

tree.put(path, Pair.of(data, 0));

final Set<Watcher> toNotifyCreate = Sets.newHashSet();
toNotifyCreate.addAll(watchers.get(path));

final Set<Watcher> toNotifyParent = Sets.newHashSet();
if (!parent.isEmpty()) {
toNotifyParent.addAll(watchers.get(parent));
}
watchers.removeAll(path);
final String finalPath = path;
executor.execute(() -> {
toNotifyCreate.forEach(
watcher -> watcher.process(
new WatchedEvent(EventType.NodeCreated,
KeeperState.SyncConnected,
finalPath)));
toNotifyParent.forEach(
watcher -> watcher.process(
new WatchedEvent(EventType.NodeChildrenChanged,
KeeperState.SyncConnected,
parent)));
});

return path;
} finally {

mutex.unlock();
}

final String finalPath = path;
executor.execute(() -> {

toNotifyCreate.forEach(
watcher -> watcher.process(
new WatchedEvent(EventType.NodeCreated,
KeeperState.SyncConnected,
finalPath)));
toNotifyParent.forEach(
watcher -> watcher.process(
new WatchedEvent(EventType.NodeChildrenChanged,
KeeperState.SyncConnected,
parent)));
});

return path;
}

@Override
public void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode,
final StringCallback cb, final Object ctx) {
if (stopped) {
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
return;
}

final Set<Watcher> toNotifyCreate = Sets.newHashSet();
toNotifyCreate.addAll(watchers.get(path));

final Set<Watcher> toNotifyParent = Sets.newHashSet();
final String parent = path.substring(0, path.lastIndexOf("/"));
if (!parent.isEmpty()) {
toNotifyParent.addAll(watchers.get(parent));
}
watchers.removeAll(path);

executor.execute(() -> {
mutex.lock();

if (stopped) {
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
return;
}

final Set<Watcher> toNotifyCreate = Sets.newHashSet();
toNotifyCreate.addAll(watchers.get(path));

final Set<Watcher> toNotifyParent = Sets.newHashSet();
final String parent = path.substring(0, path.lastIndexOf("/"));
if (!parent.isEmpty()) {
toNotifyParent.addAll(watchers.get(parent));
}

if (getProgrammedFailStatus()) {
mutex.unlock();
cb.processResult(failReturnCode.intValue(), path, ctx, null);
Expand All @@ -215,6 +221,7 @@ public void create(final String path, final byte[] data, final List<ACL> acl, Cr
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
} else {
tree.put(path, Pair.of(data, 0));
watchers.removeAll(path);
mutex.unlock();
cb.processResult(0, path, ctx, null);

Expand Down Expand Up @@ -331,6 +338,12 @@ public void getChildren(final String path, final Watcher watcher, final Children
return;
}

if (!tree.containsKey(path)) {
mutex.unlock();
cb.processResult(KeeperException.Code.NoNode, path, ctx, null);
return;
}

List<String> children = Lists.newArrayList();
for (String item : tree.tailMap(path).keySet()) {
if (!item.startsWith(path)) {
Expand All @@ -347,12 +360,12 @@ public void getChildren(final String path, final Watcher watcher, final Children
}
}

mutex.unlock();

cb.processResult(0, path, ctx, children);
if (watcher != null) {
watchers.put(path, watcher);
}
mutex.unlock();

cb.processResult(0, path, ctx, children);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) {
List<CompletableFuture<Topic>> persistentTopics = Lists.newArrayList();
long topicLoadStart = System.nanoTime();

for (String topic : getNamespaceService().getListOfPersistentTopics(nsName)) {
for (String topic : getNamespaceService().getListOfPersistentTopics(nsName).join()) {
try {
TopicName topicName = TopicName.get(topic);
if (bundle.includes(topicName)) {
Expand Down Expand Up @@ -974,7 +974,6 @@ public String getSafeBrokerServiceUrl() {
return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls;
}


private void startWorkerService(AuthenticationService authenticationService,
AuthorizationService authorizationService)
throws InterruptedException, IOException, KeeperException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTop
try {
topicExist = pulsar.getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
.join()
.contains(topicName.toString());
} catch (Exception e) {
log.warn("Unexpected error while getting list of topics. topic={}. Error: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -189,7 +187,7 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth

boolean isEmpty;
try {
isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).isEmpty()
isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join().isEmpty()
&& getPartitionedTopicList(TopicDomain.persistent).isEmpty()
&& getPartitionedTopicList(TopicDomain.non_persistent).isEmpty();
} catch (Exception e) {
Expand Down Expand Up @@ -319,7 +317,7 @@ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authori
NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, true);
try {
List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
for (String topic : topics) {
NamespaceBundle topicBundle = (NamespaceBundle) pulsar().getNamespaceService()
.getBundle(TopicName.get(topic));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ protected void internalCreatePartitionedTopic(int numPartitions) {
try {
boolean topicExist = pulsar().getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
.join()
.contains(topicName.toString());
if (topicExist) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
Expand Down Expand Up @@ -1201,7 +1202,7 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
return;
}
}

if (partitionException.get() != null) {
log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
subscriptionName, targetMessageId, partitionException.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,25 @@ public List<String> getNamespacesForCluster(@PathParam("property") String proper
@ApiOperation(hidden = true, value = "Get the list of all the topics under a certain namespace.", response = String.class, responseContainer = "Set")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public List<String> getTopics(@PathParam("property") String property,
public void getTopics(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) {
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
@Suspended AsyncResponse asyncResponse) {
validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);

// Validate that namespace exists, throws 404 if it doesn't exist
getNamespacePolicies(namespaceName);

try {
return pulsar().getNamespaceService().getListOfTopics(namespaceName, mode);
} catch (Exception e) {
log.error("Failed to get topics list for namespace {}/{}/{}", property, cluster, namespace, e);
throw new RestException(e);
}
pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
.thenAccept(topics -> {
asyncResponse.resume(topics);
})
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
asyncResponse.resume(ex);
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public void createPartitionedTopic(@PathParam("property") String property, @Path
try {
boolean topicExist = pulsar().getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
.join()
.contains(topicName.toString());
if (topicExist) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
*/
package org.apache.pulsar.broker.admin.v2;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;

import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -49,17 +54,12 @@
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;

@Path("/namespaces")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
Expand All @@ -80,21 +80,25 @@ public List<String> getTenantNamespaces(@PathParam("tenant") String tenant) {
@ApiOperation(value = "Get the list of all the topics under a certain namespace.", response = String.class, responseContainer = "Set")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public List<String> getTopics(@PathParam("tenant") String tenant,
public void getTopics(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) {
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
@Suspended AsyncResponse asyncResponse) {
validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);

// Validate that namespace exists, throws 404 if it doesn't exist
getNamespacePolicies(namespaceName);

try {
return pulsar().getNamespaceService().getListOfTopics(namespaceName, mode);
} catch (Exception e) {
log.error("Failed to get topics list for namespace {}", namespaceName, e);
throw new RestException(e);
}
pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
.thenAccept(topics -> {
asyncResponse.resume(topics);
})
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
asyncResponse.resume(ex);
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public void createPartitionedTopic(
try {
boolean topicExist = pulsar().getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
.join()
.contains(topicName.toString());
if (topicExist) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
Expand Down
Loading

0 comments on commit 871a1e0

Please sign in to comment.