Skip to content

Commit

Permalink
[fix][broker] Replace sync method in NamespacesBase#internalDeleteNam…
Browse files Browse the repository at this point in the history
…espaceBundleAsync (apache#19391)
  • Loading branch information
lhotari authored Feb 2, 2023
1 parent bfea163 commit b5c260f
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,36 +486,39 @@ protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bund
});
}
}
return future.thenCompose(__ -> {
NamespaceBundle bundle =
validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, true);
return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
.thenCompose(topics -> {
CompletableFuture<Void> deleteTopicsFuture =
CompletableFuture.completedFuture(null);
if (!force) {
List<CompletableFuture<NamespaceBundle>> futures = new ArrayList<>();
for (String topic : topics) {
futures.add(pulsar().getNamespaceService()
.getBundleAsync(TopicName.get(topic))
.thenCompose(topicBundle -> {
if (bundle.equals(topicBundle)) {
throw new RestException(Status.CONFLICT,
"Cannot delete non empty bundle");
}
return CompletableFuture.completedFuture(null);
}));
return future
.thenCompose(__ ->
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
bundleRange,
authoritative, true))
.thenCompose(bundle -> {
return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
.thenCompose(topics -> {
CompletableFuture<Void> deleteTopicsFuture =
CompletableFuture.completedFuture(null);
if (!force) {
List<CompletableFuture<NamespaceBundle>> futures = new ArrayList<>();
for (String topic : topics) {
futures.add(pulsar().getNamespaceService()
.getBundleAsync(TopicName.get(topic))
.thenCompose(topicBundle -> {
if (bundle.equals(topicBundle)) {
throw new RestException(Status.CONFLICT,
"Cannot delete non empty bundle");
}
return CompletableFuture.completedFuture(null);
}));

}
deleteTopicsFuture = FutureUtil.waitForAll(futures);
}
return deleteTopicsFuture.thenCompose(
___ -> pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle))
.thenRun(() -> pulsar().getBrokerService().getBundleStats()
.remove(bundle.toString()));
});
});
}
deleteTopicsFuture = FutureUtil.waitForAll(futures);
}
return deleteTopicsFuture.thenCompose(
___ -> pulsar().getNamespaceService()
.removeOwnedServiceUnitAsync(bundle))
.thenRun(() -> pulsar().getBrokerService().getBundleStats()
.remove(bundle.toString()));
});
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,9 @@ public void testDeleteNamespaces() throws Exception {

@Test
public void testDeleteNamespaceWithBundles() throws Exception {
uriField.set(namespaces, uriInfo);
doReturn(URI.create(pulsar.getWebServiceAddress() + "/dummy/uri")).when(uriInfo).getRequestUri();

URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
String bundledNsLocal = "test-delete-namespace-with-bundles";
List<String> boundaries = List.of("0x00000000", "0x80000000", "0xffffffff");
Expand All @@ -870,25 +873,13 @@ public void testDeleteNamespaceWithBundles() throws Exception {
org.apache.pulsar.client.admin.Namespaces.class);
doReturn(namespacesAdmin).when(admin).namespaces();

doReturn(null).when(nsSvc).getWebServiceUrl(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
@Override
public boolean matches(NamespaceBundle bundle) {
return bundle.getNamespaceObject().equals(testNs);
}
}), Mockito.any());
doReturn(false).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
@Override
public boolean matches(NamespaceBundle bundle) {
return bundle.getNamespaceObject().equals(testNs);
}
}));
doReturn(CompletableFuture.completedFuture(Optional.of(localWebServiceUrl))).when(nsSvc)
.getWebServiceUrlAsync(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs)),
Mockito.any());
doReturn(CompletableFuture.completedFuture(false)).when(nsSvc)
.isServiceUnitOwnedAsync(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs)));
doReturn(CompletableFuture.completedFuture(Optional.of(mock(NamespaceEphemeralData.class)))).when(nsSvc)
.getOwnerAsync(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
@Override
public boolean matches(NamespaceBundle bundle) {
return bundle.getNamespaceObject().equals(testNs);
}
}));
.getOwnerAsync(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs)));

CompletableFuture<Void> preconditionFailed = new CompletableFuture<>();
ClientErrorException cee = new ClientErrorException(Status.PRECONDITION_FAILED);
Expand All @@ -900,21 +891,24 @@ public boolean matches(NamespaceBundle bundle) {
.deleteNamespaceBundleAsync(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean());

AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<RestException> captor = ArgumentCaptor.forClass(RestException.class);
ArgumentCaptor<WebApplicationException> captor = ArgumentCaptor.forClass(WebApplicationException.class);
namespaces.deleteNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal,
"0x00000000_0x80000000", false, false);
verify(response, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
assertEquals(captor.getValue().getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
doReturn(Optional.empty()).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
doReturn(CompletableFuture.completedFuture(Optional.empty())).when(nsSvc)
.getWebServiceUrlAsync(any(NamespaceBundle.class), any(LookupOptions.class));
response = mock(AsyncResponse.class);
namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
verify(response, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
// make one bundle owned
LookupOptions optionsHttps = LookupOptions.builder().authoritative(false).requestHttps(true).readOnly(false).build();
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0), optionsHttps);
doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0));
doReturn(CompletableFuture.completedFuture(Optional.of(localWebServiceUrl))).when(nsSvc)
.getWebServiceUrlAsync(nsBundles.getBundles().get(0), optionsHttps);
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc)
.isServiceUnitOwnedAsync(nsBundles.getBundles().get(0));
doReturn(CompletableFuture.completedFuture(null)).when(namespacesAdmin).deleteNamespaceBundleAsync(
testTenant + "/" + testLocalCluster + "/" + bundledNsLocal, "0x00000000_0x80000000",
false);
Expand All @@ -924,9 +918,10 @@ public boolean matches(NamespaceBundle bundle) {
verify(response, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
response = mock(AsyncResponse.class);
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
doReturn(CompletableFuture.completedFuture(Optional.of(localWebServiceUrl))).when(nsSvc)
.getWebServiceUrlAsync(any(NamespaceBundle.class), any(LookupOptions.class));
for (NamespaceBundle bundle : nsBundles.getBundles()) {
doReturn(true).when(nsSvc).isServiceUnitOwned(bundle);
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitOwnedAsync(bundle);
}
namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
ArgumentCaptor<Response> captor2 = ArgumentCaptor.forClass(Response.class);
Expand Down

0 comments on commit b5c260f

Please sign in to comment.