diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index cf8355f4c9a1c..f85c6175a7cb6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -38,9 +38,11 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import javax.ws.rs.WebApplicationException; @@ -829,15 +831,34 @@ protected void internalCreateSubscription(String subscriptionName, MessageIdImpl try { if (partitionMetadata.partitions > 0) { // Create the subscription on each partition - List> futures = Lists.newArrayList(); PulsarAdmin admin = pulsar().getAdminClient(); + CountDownLatch latch = new CountDownLatch(partitionMetadata.partitions); + AtomicReference exception = new AtomicReference<>(); + AtomicInteger failureCount = new AtomicInteger(0); + for (int i = 0; i < partitionMetadata.partitions; i++) { - futures.add(admin.topics().createSubscriptionAsync(topicName.getPartition(i).toString(), - subscriptionName, messageId)); + admin.persistentTopics() + .createSubscriptionAsync(topicName.getPartition(i).toString(), subscriptionName, messageId) + .handle((result, ex) -> { + if (ex != null) { + int c = failureCount.incrementAndGet(); + // fail the operation on unknown exception or if all the partitioned failed due to + // subscription-already-exist + if (c == partitionMetadata.partitions + || !(ex instanceof PulsarAdminException.ConflictException)) { + exception.set(ex); + } + } + latch.countDown(); + return null; + }); } - FutureUtil.waitForAll(futures).join(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } } else { validateAdminOperationOnTopic(authoritative); @@ -850,10 +871,10 @@ protected void internalCreateSubscription(String subscriptionName, MessageIdImpl PersistentSubscription subscription = (PersistentSubscription) topic .createSubscription(subscriptionName, InitialPosition.Latest).get(); subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get(); - log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), - topicName, subscriptionName, messageId); + log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), topicName, + subscriptionName, messageId); } - } catch (Exception e) { + } catch (Throwable e) { Throwable t = e.getCause(); log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, subscriptionName, messageId, e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java index 0cf2c5ca914a9..f5d8bd0db9ad5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java @@ -103,4 +103,30 @@ public void createSubscriptionOnPartitionedTopic() throws Exception { Lists.newArrayList("sub-1")); } } + + @Test + public void createSubscriptionOnPartitionedTopicWithPartialFailure() throws Exception { + String topic = "persistent://my-property/my-ns/my-partitioned-topic"; + admin.topics().createPartitionedTopic(topic, 10); + + // create subscription for one partition + final String partitionedTopic0 = topic+"-partition-0"; + admin.topics().createSubscription(partitionedTopic0, "sub-1", MessageId.latest); + + admin.topics().createSubscription(topic, "sub-1", MessageId.latest); + + // Create should fail if the subscription already exists + try { + admin.topics().createSubscription(topic, "sub-1", MessageId.latest); + fail("Should have failed"); + } catch (Exception e) { + // Expected + } + + for (int i = 0; i < 10; i++) { + assertEquals( + admin.topics().getSubscriptions(TopicName.get(topic).getPartition(i).toString()), + Lists.newArrayList("sub-1")); + } + } }