diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 88064971e7de9..530b35027a4a5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -42,6 +42,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -619,7 +620,8 @@ protected List getTopicPartitionList(TopicDomain topicDomain) { return topicPartitions; } - protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) { + protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions, + boolean createLocalTopicOnly) { Integer maxTopicsPerNamespace = null; try { @@ -672,55 +674,57 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n "Number of partitions should be less than or equal to " + maxPartitions)); return; } + + List> createFutureList = new ArrayList<>(); + + CompletableFuture createLocalFuture = new CompletableFuture<>(); + createFutureList.add(createLocalFuture); checkTopicExistsAsync(topicName).thenAccept(exists -> { if (exists) { log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName); asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists")); - } else { - - try { - String path = ZkAdminPaths.partitionedTopicPath(topicName); - namespaceResources().getPartitionedTopicResources() - .createAsync(path, new PartitionedTopicMetadata(numPartitions)).thenAccept(r -> { - log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName); - tryCreatePartitionsAsync(numPartitions).thenAccept(v -> { - log.info("[{}] Successfully created partitions for topic {}", clientAppId(), - topicName); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(e -> { - log.error("[{}] Failed to create partitions for topic {}", clientAppId(), - topicName); - // The partitioned topic is created but there are some partitions create failed - asyncResponse.resume(new RestException(e)); - return null; - }); - }).exceptionally(ex -> { - if (ex.getCause() instanceof AlreadyExistsException) { - log.warn("[{}] Failed to create already existing partitioned topic {}", - clientAppId(), topicName); - asyncResponse.resume( - new RestException(Status.CONFLICT, "Partitioned topic already exists")); - } else if (ex.getCause() instanceof BadVersionException) { - log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", - clientAppId(), topicName); - asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification")); - } else { - log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, - ex.getCause()); - asyncResponse.resume(new RestException(ex.getCause())); - } - return null; - }); - } catch (Exception e) { - log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); - resumeAsyncResponseExceptionally(asyncResponse, e); - } + return; } + + provisionPartitionedTopicPath(asyncResponse, numPartitions, createLocalTopicOnly) + .thenCompose(ignored -> tryCreatePartitionsAsync(numPartitions)) + .whenComplete((ignored, ex) -> { + if (ex != null) { + createLocalFuture.completeExceptionally(ex); + return; + } + createLocalFuture.complete(null); + }); }).exceptionally(ex -> { log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); + + if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) { + getNamespaceReplicatedClusters(namespaceName) + .stream() + .filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName())) + .forEach(cluster -> createFutureList.add( + ((TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics()) + .createPartitionedTopicAsync( + topicName.getPartitionedTopicName(), numPartitions, true))); + } + + FutureUtil.waitForAll(createFutureList).whenComplete((ignored, ex) -> { + if (ex != null) { + log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName, ex.getCause()); + if (ex.getCause() instanceof RestException) { + asyncResponse.resume(ex.getCause()); + } else { + resumeAsyncResponseExceptionally(asyncResponse, ex.getCause()); + } + return; + } + log.info("[{}] Successfully created partitions for topic {} in cluster {}", + clientAppId(), topicName, pulsar().getConfiguration().getClusterName()); + asyncResponse.resume(Response.noContent().build()); + }); } /** @@ -747,6 +751,42 @@ protected CompletableFuture checkTopicExistsAsync(TopicName topicName) }); } + private CompletableFuture provisionPartitionedTopicPath(AsyncResponse asyncResponse, + int numPartitions, + boolean createLocalTopicOnly) { + CompletableFuture future = new CompletableFuture<>(); + String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName); + namespaceResources() + .getPartitionedTopicResources() + .createAsync(partitionedTopicPath, new PartitionedTopicMetadata(numPartitions)) + .whenComplete((ignored, ex) -> { + if (ex != null) { + if (ex instanceof AlreadyExistsException) { + if (createLocalTopicOnly) { + future.complete(null); + return; + } + log.warn("[{}] Failed to create already existing partitioned topic {}", + clientAppId(), topicName); + future.completeExceptionally( + new RestException(Status.CONFLICT, "Partitioned topic already exists")); + } else if (ex instanceof BadVersionException) { + log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", + clientAppId(), topicName); + future.completeExceptionally( + new RestException(Status.CONFLICT, "Concurrent modification")); + } else { + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex); + future.completeExceptionally(new RestException(ex.getCause())); + } + return; + } + log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName); + future.complete(null); + }); + return future; + } + protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) { if (throwable instanceof WebApplicationException) { asyncResponse.resume((WebApplicationException) throwable); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java index 20fd24dea5727..daf6ea0a2c3f9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java @@ -142,14 +142,17 @@ public PersistentTopicInternalStats getInternalStats(@PathParam("property") Stri @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal" + " to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code = 409, message = "Partitioned topic already exist")}) - public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, - @PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("topic") @Encoded - String encodedTopic, - int numPartitions) { + public void createPartitionedTopic( + @Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + int numPartitions, + @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) { try { validateTopicName(property, cluster, namespace, encodedTopic); - internalCreatePartitionedTopic(asyncResponse, numPartitions); + internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly); } catch (Exception e) { log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); resumeAsyncResponseExceptionally(asyncResponse, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 46708dd451cde..babff0d006b70 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -154,14 +154,17 @@ public void revokePermissionsOnTopic(@PathParam("property") String property, @ApiResponse(code = 406, message = "The number of partitions should be " + "more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code = 409, message = "Partitioned topic already exist")}) - public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, - @PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("topic") @Encoded - String encodedTopic, - int numPartitions) { + public void createPartitionedTopic( + @Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + int numPartitions, + @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) { try { validateTopicName(property, cluster, namespace, encodedTopic); - internalCreatePartitionedTopic(asyncResponse, numPartitions); + internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly); } catch (Exception e) { log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); resumeAsyncResponseExceptionally(asyncResponse, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index cd6b31ccc9748..9d29967a7154f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -188,12 +188,12 @@ public void createPartitionedTopic( @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0") - int numPartitions) { - + int numPartitions, + @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) { try { validateGlobalNamespaceOwnership(tenant, namespace); validateTopicName(tenant, namespace, encodedTopic); - internalCreatePartitionedTopic(asyncResponse, numPartitions); + internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly); } catch (Exception e) { log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); resumeAsyncResponseExceptionally(asyncResponse, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 97727575ddffc..56e5ce109d4f9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -231,12 +231,13 @@ public void createPartitionedTopic( @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0") - int numPartitions) { + int numPartitions, + @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) { try { validateGlobalNamespaceOwnership(tenant, namespace); validatePartitionedTopicName(tenant, namespace, encodedTopic); validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE); - internalCreatePartitionedTopic(asyncResponse, numPartitions); + internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly); } catch (Exception e) { log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); resumeAsyncResponseExceptionally(asyncResponse, e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index db6cacab8eb7c..81ec0d54d49ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -785,7 +785,7 @@ public void persistentTopics() throws Exception { assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList()); response = mock(AsyncResponse.class); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); - persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5); + persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5, false); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 39a99d2784d69..a1c7276d35764 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -53,6 +53,7 @@ import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -169,7 +170,7 @@ public void testGetSubscriptions() { // 3) Create the partitioned topic response = mock(AsyncResponse.class); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); - persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3, true); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); @@ -289,7 +290,7 @@ public void testTerminatePartitionedTopic() { // 3) Create the partitioned topic AsyncResponse response = mock(AsyncResponse.class); - persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 1); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 1, true); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); @@ -375,7 +376,7 @@ public void testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix doReturn(new Policies()).when(persistentTopics).getNamespacePolicies(any()); AsyncResponse response = mock(AsyncResponse.class); ArgumentCaptor errCaptor = ArgumentCaptor.forClass(RestException.class); - persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true); verify(response, timeout(5000).times(1)).resume(errCaptor.capture()); Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode()); } @@ -399,7 +400,7 @@ public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString()); AsyncResponse response = mock(AsyncResponse.class); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); - persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, true, false, 10); @@ -428,7 +429,7 @@ public void testUnloadTopic() { // 3) create partitioned topic and unload response = mock(AsyncResponse.class); responseCaptor = ArgumentCaptor.forClass(Response.class); - persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6, true); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); response = mock(AsyncResponse.class); @@ -458,13 +459,13 @@ public void testUnloadTopicShallThrowNotFoundWhenTopicNotExist() { public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException { AsyncResponse response = mock(AsyncResponse.class); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); - persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3, true); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); response = mock(AsyncResponse.class); responseCaptor = ArgumentCaptor.forClass(Response.class); - nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3); + nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3, true); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); @@ -494,7 +495,7 @@ public void testGrantNonPartitionedTopic() { public void testCreateExistedPartition() { final AsyncResponse response = mock(AsyncResponse.class); final String topicName = "test-create-existed-partition"; - persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 3); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 3, true); final String partitionName = TopicName.get(topicName).getPartition(0).getLocalName(); try { @@ -513,7 +514,8 @@ public void testGrantPartitionedTopic() { final int numPartitions = 5; AsyncResponse response = mock(AsyncResponse.class); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); - persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions); + persistentTopics.createPartitionedTopic( + response, testTenant, testNamespace, partitionedTopicName, numPartitions, true); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); @@ -553,7 +555,8 @@ public void testRevokePartitionedTopic() { final int numPartitions = 5; AsyncResponse response = mock(AsyncResponse.class); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); - persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions); + persistentTopics.createPartitionedTopic( + response, testTenant, testNamespace, partitionedTopicName, numPartitions, true); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); String role = "role"; @@ -596,7 +599,7 @@ public void testTriggerCompactionTopic() { // create partitioned topic and compaction on it response = mock(AsyncResponse.class); - persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2, true); persistentTopics.compact(response, testTenant, testNamespace, partitionTopicName, true); responseCaptor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); @@ -613,7 +616,7 @@ public void testPeekWithSubscriptionNameNotExist() throws Exception { topicName).toString(); final String subscriptionName = "sub"; - admin.topics().createPartitionedTopic(topic, 3); + ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, 3, true).get(); final String partitionedTopic = topic + "-partition-0"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index d261e8516ac27..0ea39aa2021c8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; +import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; @@ -35,6 +36,7 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.SortedSet; @@ -62,6 +64,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -79,7 +82,6 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; @@ -1121,6 +1123,64 @@ public void testCleanupTopic() throws Exception { consumer.close(); } + + + @Test + public void createPartitionedTopicTest() throws Exception { + final String cluster1 = pulsar1.getConfig().getClusterName(); + final String cluster2 = pulsar2.getConfig().getClusterName(); + final String cluster3 = pulsar3.getConfig().getClusterName(); + final String namespace = newUniqueName("pulsar/ns"); + + final String persistentPartitionedTopic = + newUniqueName("persistent://" + namespace + "/partitioned"); + final String persistentNonPartitionedTopic = + newUniqueName("persistent://" + namespace + "/non-partitioned"); + final String nonPersistentPartitionedTopic = + newUniqueName("non-persistent://" + namespace + "/partitioned"); + final String nonPersistentNonPartitionedTopic = + newUniqueName("non-persistent://" + namespace + "/non-partitioned"); + final int numPartitions = 3; + + admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2, cluster3)); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); + + admin1.topics().createPartitionedTopic(persistentPartitionedTopic, numPartitions); + admin1.topics().createPartitionedTopic(nonPersistentPartitionedTopic, numPartitions); + admin1.topics().createNonPartitionedTopic(persistentNonPartitionedTopic); + admin1.topics().createNonPartitionedTopic(nonPersistentNonPartitionedTopic); + + List partitionedTopicList = admin1.topics().getPartitionedTopicList(namespace); + Assert.assertTrue(partitionedTopicList.contains(persistentPartitionedTopic)); + Assert.assertTrue(partitionedTopicList.contains(nonPersistentPartitionedTopic)); + + // expected topic list didn't contain non-persistent-non-partitioned topic, + // because this model topic didn't create path in local metadata store. + List expectedTopicList = Lists.newArrayList( + persistentNonPartitionedTopic, nonPersistentNonPartitionedTopic); + TopicName pt = TopicName.get(persistentPartitionedTopic); + for (int i = 0; i < numPartitions; i++) { + expectedTopicList.add(pt.getPartition(i).toString()); + } + + checkListContainExpectedTopic(admin1, namespace, expectedTopicList); + checkListContainExpectedTopic(admin2, namespace, expectedTopicList); + checkListContainExpectedTopic(admin3, namespace, expectedTopicList); + } + + private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, List expectedTopicList) { + // wait non-partitioned topics replicators created finished + final List list = new ArrayList<>(); + Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> { + list.clear(); + list.addAll(admin.topics().getList(namespace)); + return list.size() == expectedTopicList.size(); + }); + for (String expectTopic : expectedTopicList) { + Assert.assertTrue(list.contains(expectTopic)); + } + } + private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index c90ad15513af5..5f8cf1527f024 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -384,9 +384,15 @@ public CompletableFuture createNonPartitionedTopicAsync(String topic){ @Override public CompletableFuture createPartitionedTopicAsync(String topic, int numPartitions) { + return createPartitionedTopicAsync(topic, numPartitions, false); + } + + public CompletableFuture createPartitionedTopicAsync( + String topic, int numPartitions, boolean createLocalTopicOnly) { checkArgument(numPartitions > 0, "Number of partitions should be more than 0"); TopicName tn = validateTopic(topic); - WebTarget path = topicPath(tn, "partitions"); + WebTarget path = topicPath(tn, "partitions") + .queryParam("createLocalTopicOnly", Boolean.toString(createLocalTopicOnly)); return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON)); }