Skip to content

Commit

Permalink
Allow to create partitioned topic with 1 partition (apache#4764)
Browse files Browse the repository at this point in the history
**Motivation**
when create partitioned topic, there is a check that `numPartitions > 1`, if numPartitions==1, it will fail.
Some user may want to create partitioned topic with only 1 topic at the start time, and during using it, could update to more topics later. 

**Modification**
change check of `numPartitions > 1` to `numPartitions > 0`

expect all existing ut passed.
  • Loading branch information
jiazhai authored and sijie committed Jul 21, 2019
1 parent af6d4da commit 5162393
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ protected void internalRevokePermissionsOnTopic(String role) {

protected void internalCreatePartitionedTopic(int numPartitions) {
validateAdminAccessForTenant(topicName.getTenant());
if (numPartitions <= 1) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
Expand Down Expand Up @@ -431,8 +431,8 @@ protected void internalUpdatePartitionedTopic(int numPartitions) {
topicName);
throw new RestException(Status.FORBIDDEN, "Update forbidden on global namespace");
}
if (numPartitions <= 1) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
updatePartitionedTopic(topicName, numPartitions).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ public void createPartitionedTopic(@PathParam("property") String property, @Path
int numPartitions) {
validateTopicName(property, cluster, namespace, encodedTopic);
validateAdminAccessForTenant(topicName.getTenant());
if (numPartitions <= 1) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ public void createPartitionedTopic(
validateGlobalNamespaceOwnership(tenant,namespace);
validateTopicName(tenant, namespace, encodedTopic);
validateAdminAccessForTenant(topicName.getTenant());
if (numPartitions <= 1) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public void createNonPartitionedTopic(
@ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 1"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void createPartitionedTopic(String topic, int numPartitions) throws Pulsa

@Override
public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions) {
checkArgument(numPartitions > 1, "Number of partitions should be more than 1");
checkArgument(numPartitions > 0, "Number of partitions should be more than 0");
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "partitions");
return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,17 @@ public void createNonPartitionedTopic(String topic) throws PulsarAdminException
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> createNonPartitionedTopicAsync(String topic){
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn);
return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}

@Override
public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions) {
checkArgument(numPartitions > 1, "Number of partitions should be more than 1");
checkArgument(numPartitions > 0, "Number of partitions should be more than 0");
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitions");
return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
Expand All @@ -255,7 +255,7 @@ public void updatePartitionedTopic(String topic, int numPartitions) throws Pulsa

@Override
public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions) {
checkArgument(numPartitions > 1, "Number of partitions must be more than 1");
checkArgument(numPartitions > 0, "Number of partitions must be more than 0");
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitions");
return asyncPostRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
Expand Down Expand Up @@ -587,7 +587,7 @@ public void failed(Throwable throwable) {
});
return future;
}

@Override
public void deleteSubscription(String topic, String subName) throws PulsarAdminException {
try {
Expand Down

0 comments on commit 5162393

Please sign in to comment.