Skip to content

Commit

Permalink
Fixed deadlock when checking topic ownership (apache#16310)
Browse files Browse the repository at this point in the history
* Fixed deadlock when checking topic ownership

* Fixed mocked test

* Fixed ServerCnxTest

* Fixed testProducerOnNotOwnedTopic
  • Loading branch information
merlimat authored Jul 3, 2022
1 parent 139c168 commit 893fef9
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,15 @@ public boolean isServiceUnitActive(TopicName topicName) {
}
}

public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) {
Optional<CompletableFuture<OwnedBundle>> res = ownershipCache.getOwnedBundleAsync(getBundle(topicName));
if (!res.isPresent()) {
return CompletableFuture.completedFuture(false);
}

return res.get().thenApply(ob -> ob != null && ob.isActive());
}

private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
return ownershipCache.getOwnedBundle(getFullBundle(fqnn)) != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,7 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get();

if (topicLoadSemaphore.tryAcquire()) {
createPersistentTopic(topic, createIfMissing, topicFuture, properties);
checkOwnershipAndCreatePersistentTopic(topic, createIfMissing, topicFuture, properties);
topicFuture.handle((persistentTopic, ex) -> {
// release permit and process pending topic
topicLoadSemaphore.release();
Expand All @@ -1346,20 +1346,32 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
return topicFuture;
}

private void createPersistentTopic(final String topic, boolean createIfMissing,
private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean createIfMissing,
CompletableFuture<Optional<Topic>> topicFuture,
Map<String, String> properties) {
TopicName topicName = TopicName.get(topic);
pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
.thenAccept(isActive -> {
if (isActive) {
createPersistentTopic(topic, createIfMissing, topicFuture, properties);
} else {
// namespace is being unloaded
String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic);
log.warn(msg);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
}
}).exceptionally(ex -> {
topicFuture.completeExceptionally(ex);
return null;
});
}

final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
private void createPersistentTopic(final String topic, boolean createIfMissing,
CompletableFuture<Optional<Topic>> topicFuture,
Map<String, String> properties) {
TopicName topicName = TopicName.get(topic);
if (!pulsar.getNamespaceService().isServiceUnitActive(topicName)) {
// namespace is being unloaded
String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic);
log.warn(msg);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
return;
}
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());

if (isTransactionInternalName(topicName)) {
String msg = String.format("Can not create transaction system topic %s", topic);
Expand Down Expand Up @@ -2595,7 +2607,7 @@ private void createPendingLoadTopic() {
CompletableFuture<Optional<Topic>> pendingFuture = pendingTopic.getTopicFuture();
final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get();
final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
createPersistentTopic(topic, true, pendingFuture, pendingTopic.getProperties());
checkOwnershipAndCreatePersistentTopic(topic, true, pendingFuture, pendingTopic.getProperties());
pendingFuture.handle((persistentTopic, ex) -> {
// release permit and process next pending topic
if (acquiredPermit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public void setup() throws Exception {
doReturn(nsSvc).when(pulsar).getNamespaceService();
doReturn(true).when(nsSvc).isServiceUnitOwned(any());
doReturn(true).when(nsSvc).isServiceUnitActive(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any());

setupMLAsyncCallbackMocks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public void setup() throws Exception {
doReturn(namespaceService).when(pulsar).getNamespaceService();
doReturn(true).when(namespaceService).isServiceUnitOwned(any());
doReturn(true).when(namespaceService).isServiceUnitActive(any());
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any());
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any());
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL);
Expand Down Expand Up @@ -489,7 +490,8 @@ public void testProducerOnNotOwnedTopic() throws Exception {
setChannelConnected();

// Force the case where the broker doesn't own any topic
doReturn(false).when(namespaceService).isServiceUnitActive(any(TopicName.class));
doReturn(CompletableFuture.completedFuture(false)).when(namespaceService)
.isServiceUnitActiveAsync(any(TopicName.class));

// test PRODUCER failure case
ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* producer id */, 1 /* request id */,
Expand Down

0 comments on commit 893fef9

Please sign in to comment.