Skip to content

Commit

Permalink
Add topic type validation before update topic partitions (apache#7181)
Browse files Browse the repository at this point in the history
Fixes apache#7152 

### Motivation
This PR add topic type validation before update partition numbers of partitioned-topic. 
This can prevent  creating new partitions for non-partitoned topics.
  • Loading branch information
aloyszhang authored Jun 8, 2020
1 parent bcfa7f0 commit 710b22d
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -356,6 +357,19 @@ protected void validatePartitionedTopicName(String tenant, String namespace, Str
}
}

protected void validatePartitionedTopicMetadata(String tenant, String namespace, String encodedTopic) {
String completeTopicName = tenant + "/" + namespace + "/" + Codec.decode(encodedTopic);
try {
PartitionedTopicMetadata partitionedTopicMetadata =
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(TopicName.get(completeTopicName)).get();
if (partitionedTopicMetadata.partitions < 1) {
throw new RestException(Status.CONFLICT, "Topic is not partitioned topic");
}
} catch ( InterruptedException | ExecutionException e) {
throw new RestException(Status.INTERNAL_SERVER_ERROR, "Check topic partition meta failed.");
}
}

@Deprecated
protected void validateTopicName(String property, String cluster, String namespace, String encodedTopic) {
String topic = Codec.decode(encodedTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,8 @@ public void createNonPartitionedTopic(
* well. Therefore, it can violate partition ordering at producers until all producers are restarted at application.
*
* @param tenant
* @param cluster
* @param namespace
* @param topic
* @param encodedTopic
* @param numPartitions
*/
@POST
Expand Down Expand Up @@ -283,6 +282,7 @@ public void updatePartitionedTopic(
@ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0")
int numPartitions) {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validatePartitionedTopicMetadata(tenant, namespace, encodedTopic);
internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,29 @@
import static org.testng.Assert.fail;

import javax.ws.rs.core.Response.Status;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.util.Codec;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/**
* Unit test {@link AdminResource}.
*/
public class AdminResourceTest {
public class AdminResourceTest extends BrokerTestBase {

@BeforeClass
@Override
public void setup() throws Exception {
super.baseSetup();
}

@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

private static AdminResource mockResource() {
return new AdminResource() {
Expand Down Expand Up @@ -66,4 +81,30 @@ public void testValidatePartitionedTopicNameInvalid() {
}
}

@Test
public void testValidatePartitionedTopicMetadata() throws Exception {
String tenant = "prop";
String namespace = "ns-abc";
String partitionedTopic = "partitionedTopic";
String nonPartitionedTopic = "notPartitionedTopic";
int partitions = 3;

String completePartitionedTopic = tenant + "/" + namespace + "/" + partitionedTopic;
String completeNonPartitionedTopic = tenant + "/" + namespace + "/" + nonPartitionedTopic;

admin.topics().createNonPartitionedTopic(completeNonPartitionedTopic);
admin.topics().createPartitionedTopic(completePartitionedTopic, partitions);

AdminResource resource = mockResource();
resource.setPulsar(pulsar);
// validate should pass when topic is partitioned topic
resource.validatePartitionedTopicMetadata(tenant, namespace, Codec.encode(partitionedTopic));
// validate should failed when topic is non-partitioned topic
try {
resource.validatePartitionedTopicMetadata(tenant, namespace, Codec.encode(nonPartitionedTopic));
fail("Should fail validation on non-partitioned topic");
} catch (RestException re) {
assertEquals(Status.CONFLICT.getStatusCode(), re.getResponse().getStatus());
}
}
}

0 comments on commit 710b22d

Please sign in to comment.