Skip to content

Commit

Permalink
[Issue 11339] Pulsar Admin List Subscription lists only subscriptions…
Browse files Browse the repository at this point in the history
… created for Partition-0 when partition specific subscriptions are created (apache#11355)

Fix apache#11339.

Documentation
This is a bug fix, no need documentation.
  • Loading branch information
Technoboy- authored Jul 23, 2021
1 parent 8b1d937 commit a60c189
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -1030,34 +1031,42 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut
false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
try {
// get the subscriptions only from the 1st partition
// since all the other partitions will have the same
// subscriptions
pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
.whenComplete((r, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to get list of subscriptions for {}: {}", clientAppId(),
topicName, ex.getMessage());

if (ex instanceof PulsarAdminException) {
PulsarAdminException pae = (PulsarAdminException) ex;
if (pae.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Internal topics have not been generated yet"));
return;
} else {
asyncResponse.resume(new RestException(pae));
return;
}
} else {
asyncResponse.resume(new RestException(ex));
return;
}
final Set<String> subscriptions = Sets.newConcurrentHashSet();
final List<CompletableFuture<Object>> subscriptionFutures = Lists.newArrayList();
if (topicName.getDomain() == TopicDomain.persistent) {
final Map<Integer, CompletableFuture<Boolean>> existsFutures = Maps.newConcurrentMap();
for (int i = 0; i < partitionMetadata.partitions; i++) {
String path = String.format("/managed-ledgers/%s/%s/%s", namespaceName.toString(),
domain(), topicName.getPartition(i).getEncodedLocalName());
CompletableFuture<Boolean> exists = getLocalPolicies().existsAsync(path);
existsFutures.put(i, exists);
}
FutureUtil.waitForAll(Lists.newArrayList(existsFutures.values())).thenApply(__ ->
existsFutures.entrySet().stream().filter(e -> e.getValue().join().booleanValue())
.map(item -> topicName.getPartition(item.getKey()).toString())
.collect(Collectors.toList())
).thenAccept(topics -> {
if (log.isDebugEnabled()) {
log.debug("activeTopics : {}", topics);
}
topics.forEach(topic -> {
try {
CompletableFuture<List<String>> subscriptionsAsync = pulsar().getAdminClient()
.topics().getSubscriptionsAsync(topic);
subscriptionFutures.add(subscriptionsAsync.thenApply(subscriptions::addAll));
} catch (PulsarServerException e) {
throw new RestException(e);
}
final List<String> subscriptions = Lists.newArrayList();
subscriptions.addAll(r);
asyncResponse.resume(subscriptions);
});
}).thenAccept(__ -> resumeAsyncResponse(asyncResponse, subscriptions, subscriptionFutures));
} else {
for (int i = 0; i < partitionMetadata.partitions; i++) {
CompletableFuture<List<String>> subscriptionsAsync = pulsar().getAdminClient().topics()
.getSubscriptionsAsync(topicName.getPartition(i).toString());
subscriptionFutures.add(subscriptionsAsync.thenApply(subscriptions::addAll));
}
resumeAsyncResponse(asyncResponse, subscriptions, subscriptionFutures);
}
} catch (Exception e) {
log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
asyncResponse.resume(e);
Expand All @@ -1073,6 +1082,32 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut
}
}

private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> subscriptions,
List<CompletableFuture<Object>> subscriptionFutures) {
FutureUtil.waitForAll(subscriptionFutures).whenComplete((r, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to get list of subscriptions for {}: {}", clientAppId(),
topicName, ex.getMessage());
if (ex instanceof PulsarAdminException) {
PulsarAdminException pae = (PulsarAdminException) ex;
if (pae.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Internal topics have not been generated yet"));
return;
} else {
asyncResponse.resume(new RestException(pae));
return;
}
} else {
asyncResponse.resume(new RestException(ex));
return;
}
} else {
asyncResponse.resume(new ArrayList<>(subscriptions));
}
});
}

private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
try {
validateTopicOwnership(topicName, authoritative);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,27 @@ public void testGetSubscriptions() {
true);
verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());

// 8) Delete the partitioned topic
// 8) Create a sub of partitioned-topic
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName + "-partition-1", "test", true,
(MessageIdImpl) MessageId.earliest, false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
//
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-1", true);
verify(response, timeout(5000).times(1)).resume(Lists.newArrayList("test"));
//
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", true);
verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
//
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName, true);
verify(response, timeout(5000).times(1)).resume(Lists.newArrayList("test"));

// 9) Delete the partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, true, false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
Expand Down

0 comments on commit a60c189

Please sign in to comment.