Skip to content

Commit

Permalink
[Issue apache#3436][pulsar-broker] Creating REST Endpoint for non-par…
Browse files Browse the repository at this point in the history
…titioned topic creation (apache#3625)

We are adding a REST endpoint which allows the admin to create non-partitioned topics
through PersistentTopics.
  • Loading branch information
ConcurrencyPractitioner authored and sijie committed Mar 4, 2019
1 parent 65daec1 commit 548c726
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,18 @@ protected void internalCreatePartitionedTopic(int numPartitions, boolean authori
}
}

protected void internalCreateNonPartitionedTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());

try {
getOrCreateTopic(topicName);
log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), topicName);
} catch (Exception e) {
log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
}

/**
* It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to
* already exist and number of new partitions must be greater than existing number of partitions. Decrementing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,22 @@ public void createPartitionedTopic(@PathParam("tenant") String tenant, @PathPara
internalCreatePartitionedTopic(numPartitions, authoritative);
}

@PUT
@Path("/{tenant}/{namespace}/{topic}")
@ApiOperation(value="Create a non-partitioned topic.", notes = "This is the only REST endpoint from which non-partitioned topics could be created.")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 409, message = "Partitioned topic already exist"),
@ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void createNonPartitionedTopic(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateGlobalNamespaceOwnership(tenant,namespace);
internalCreateNonPartitionedTopic(authoritative);
}

/**
* It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to be
* already exist and number of new partitions must be greater than existing number of partitions. Decrementing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testGetSubscriptions() {
}

@Test
public void testGetSubscriptionsWithAutoTopicCreationDisabled() {
public void testNonPartitionedTopics() {
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
final String nonPartitionTopic = "non-partitioned-topic";
persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest);
Expand All @@ -126,5 +126,8 @@ public void testGetSubscriptionsWithAutoTopicCreationDisabled() {
} catch (RestException exc) {
Assert.assertTrue(exc.getMessage().contains("zero partitions"));
}
final String nonPartitionTopic2 = "secondary-non-partitioned-topic";
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopic2, true);
Assert.assertEquals(persistentTopics.getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true).partitions, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,18 @@ List<String> getListInBundle(String namespace, String bundleRange)
*/
void createPartitionedTopic(String topic, int numPartitions) throws PulsarAdminException;

/**
* Create a non-partitioned topic.
*
* <p>
* Create a non-partitioned topic.
* <p>
*
* @param topic Topic name
* @throws PulsarAdminException
*/
void createNonPartitionedTopic(String topic) throws PulsarAdminException;

/**
* Create a partitioned topic asynchronously.
* <p>
Expand All @@ -217,6 +229,13 @@ List<String> getListInBundle(String namespace, String bundleRange)
*/
CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions);

/**
* Create a non-partitioned topic asynchronously.
*
* @param topic Topic name
*/
CompletableFuture<Void> createNonPartitionedTopicAsync(String topic);

/**
* Update number of partitions of a non-global partitioned topic.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,25 @@ public void createPartitionedTopic(String topic, int numPartitions) throws Pulsa
}
}

@Override
public void createNonPartitionedTopic(String topic) throws PulsarAdminException {
try {
createNonPartitionedTopicAsync(topic).get();
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e.getCause());
}
}

@Override
public CompletableFuture<Void> createNonPartitionedTopicAsync(String topic){
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn);
return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}

@Override
public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions) {
checkArgument(numPartitions > 1, "Number of partitions should be more than 1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,9 @@ void topics() throws Exception {
cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32"));
verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32);

cmdTopics.run(split("create persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).createNonPartitionedTopic("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("list-partitioned-topics myprop/clust/ns1"));
verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public CmdTopics(PulsarAdmin admin) {
jcommander.addCommand("expire-messages-all-subscriptions", new ExpireMessagesForAllSubscriptions());

jcommander.addCommand("create-partitioned-topic", new CreatePartitionedCmd());
jcommander.addCommand("create", new CreateNonPartitionedCmd());
jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd());
jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd());

Expand Down Expand Up @@ -213,6 +214,19 @@ void run() throws Exception {
}
}

@Parameters(commandDescription = "Create a non-partitioned topic.")
private class CreateNonPartitionedCmd extends CliCommand {

@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;

@Override
void run() throws Exception {
String topic = validateTopicName(params);
topics.createNonPartitionedTopic(topic);
}
}

@Parameters(commandDescription = "Update existing non-global partitioned topic. \n"
+ "\t\tNew updating number of partitions must be greater than existing number of partitions.")
private class UpdatePartitionedCmd extends CliCommand {
Expand Down
27 changes: 27 additions & 0 deletions site2/docs/admin-api-partitioned-topics.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@ int numPartitions = 4;
admin.persistentTopics().createPartitionedTopic(topicName, numPartitions);
```

## Nonpartitioned topics resources

### Create

Nonpartitioned topics in Pulsar must be explicitly created if allowAutoTopicCreation or createIfMissing is disabled.
When creating a non-partitioned topic, you need to provide a topic name.

#### pulsar-admin

You can create non-partitioned topics using the [`create`](reference-pulsar-admin.md#create)
command and specifying the topic name as an argument. This is an example command:

```shell
$ bin/pulsar-admin topics create persistent://my-tenant/my-namespace/my-topic
```

#### REST API

{@inject: endpoint|PUT|admin/v2/persistent/:tenant/:namespace/:topic|operation/createNonPartitionedTopic}

#### Java

```java
String topicName = "persistent://my-tenant/my-namespace/my-topic";
admin.topics().createNonPartitionedTopic(topicName);
```

### Get metadata

Partitioned topics have metadata associated with them that you can fetch as a JSON object.
Expand Down
10 changes: 9 additions & 1 deletion site2/docs/reference-pulsar-admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -1684,6 +1684,7 @@ Subcommands
* `offload-status`
* `create-partitioned-topic`
* `delete-partitioned-topic`
* `create`
* `get-partitioned-topic-metadata`
* `update-partitioned-topic`
* `list`
Expand Down Expand Up @@ -1773,7 +1774,6 @@ Options
|---|---|---|
|`-p`, `--partitions`|The number of partitions for the topic|0|


### `delete-partitioned-topic`
Delete a partitioned topic. This will also delete all the partitions of the topic if they exist.

Expand All @@ -1782,6 +1782,14 @@ Usage
$ pulsar-admin topics delete-partitioned-topic {persistent|non-persistent}
```

### `create`
Creates a non-partitioned topic. A non-partitioned topic must explicitly be created by the user if allowAutoTopicCreation or createIfMissing is disabled.

Usage
```bash
$ pulsar-admin topics create {persistent|non-persistent}://tenant/namespace/topic
```

### `get-partitioned-topic-metadata`
Get the partitioned topic metadata. If the topic is not created or is a non-partitioned topic, this will return an empty topic with zero partitions.

Expand Down

0 comments on commit 548c726

Please sign in to comment.