Skip to content

Commit

Permalink
Fix create partition of an exist topic doesn't throw RestException (a…
Browse files Browse the repository at this point in the history
…pache#9342)

### Motivation

Currently creating a partition of an existed partitioned topic was created doesn't throw any exception. However it should be an invalid behavior. The reason is that when a non partitioned topic was created, it only checks whether the number of partitions is positive. However, no matter the topic doesn't exist or the topic is an existed partition, the number of partitions is 0. This PR is to distinguish these two cases and throw a `RestException` when the non-partitioned topic is an existed partition.

### Modifications

- When creating a non-partition topic that is an existed partition, throw a `RestException` and add a test to verify it.
- Add a test provider to ensure backward compatibility that users can still create missed partitions by creating non-partitioned topics of the missed partition.
- Fix the functions worker startup. Before this PR it always tries to create the metadata topics like `public/functions/assignment` no matter if it exists. Here we ignore the `ConflictException`.

### Verifying this change

This change added tests and can be verified as follows:

  - *Added PersistentTopics#testCreateExistedPartition*
  - *Added another test provider to PartitionCreationTest#testCreateMissedPartitions*
  - *Add tests for creating existed topics to `AdminApiTeststestPersistentTopicCreation`.*
  • Loading branch information
BewareMyPower authored Feb 8, 2021
1 parent ab94743 commit 16a2240
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,6 @@ protected void validateTopicName(String property, String namespace, String encod
topic, e);
throw new RestException(Status.PRECONDITION_FAILED, "Topic name is not valid");
}

this.topicName = TopicName.get(domain(), namespaceName, topic);
}

protected void validatePartitionedTopicName(String tenant, String namespace, String encodedTopic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,21 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative) {
}

try {
Optional<Topic> existedTopic = pulsar().getBrokerService().getTopicIfExists(topicName.toString()).get();
if (existedTopic.isPresent()) {
log.error("[{}] Topic {} already exists", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "This topic already exists");
}

Topic createdTopic = getOrCreateTopic(topicName);
log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), createdTopic);
} catch (Exception e) {
log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
if (e instanceof RestException) {
throw (RestException) e;
} else {
log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2203,7 +2203,20 @@ public void testPersistentTopicCreation() throws Exception {
final String partitionedtopic = "persistent://prop-xyz/ns1/partitioned-topic";

admin.topics().createNonPartitionedTopic(nonPartitionedtopic);
try {
admin.topics().createNonPartitionedTopic(nonPartitionedtopic);
fail("should not be able to create an existed non-partitioned topic");
} catch (PulsarAdminException e) {
assertTrue(e instanceof ConflictException);
}

admin.topics().createPartitionedTopic(partitionedtopic, 2);
try {
admin.topics().createPartitionedTopic(partitionedtopic, 1);
fail("should not be able to create an existed partitioned topic");
} catch (PulsarAdminException e) {
assertTrue(e instanceof ConflictException);
}

try {
admin.topics().createPartitionedTopic(nonPartitionedtopic, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand Down Expand Up @@ -78,6 +80,7 @@

@PrepareForTest(PersistentTopics.class)
@PowerMockIgnore("com.sun.management.*")
@Slf4j
public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {

private PersistentTopics persistentTopics;
Expand Down Expand Up @@ -426,6 +429,23 @@ public void testGrantNonPartitionedTopic() {
Assert.assertEquals(permissions.get(role), expectActions);
}

@Test
public void testCreateExistedPartition() {
final AsyncResponse response = mock(AsyncResponse.class);
final String topicName = "test-create-existed-partition";
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 3);

final String partitionName = TopicName.get(topicName).getPartition(0).getLocalName();
try {
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, partitionName, false);
Assert.fail();
} catch (RestException e) {
log.error("Failed to create {}: {}", partitionName, e.getMessage());
Assert.assertEquals(e.getResponse().getStatus(), 409);
Assert.assertEquals(e.getMessage(), "This topic already exists");
}
}

@Test
public void testGrantPartitionedTopic() {
final String partitionedTopicName = "partitioned-topic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -32,6 +33,8 @@

public class BrokerServiceAutoSubscriptionCreationTest extends BrokerTestBase {

private AtomicInteger testId = new AtomicInteger(0);

@BeforeClass
@Override
protected void setup() throws Exception {
Expand All @@ -53,7 +56,7 @@ protected void cleanupTest() throws Exception {
public void testAutoSubscriptionCreationDisable() throws Exception {
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);

final String topicName = "persistent://prop/ns-abc/test-subtopic";
final String topicName = "persistent://prop/ns-abc/test-subtopic-" + testId.getAndIncrement();
final String subscriptionName = "test-subtopic-sub";

admin.topics().createNonPartitionedTopic(topicName);
Expand All @@ -71,7 +74,7 @@ public void testAutoSubscriptionCreationDisable() throws Exception {
public void testSubscriptionCreationWithAutoCreationDisable() throws Exception {
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);

final String topicName = "persistent://prop/ns-abc/test-subtopic";
final String topicName = "persistent://prop/ns-abc/test-subtopic-" + testId.getAndIncrement();
final String subscriptionName = "test-subtopic-sub-1";

admin.topics().createNonPartitionedTopic(topicName);
Expand All @@ -87,7 +90,7 @@ public void testSubscriptionCreationWithAutoCreationDisable() throws Exception {

@Test
public void testAutoSubscriptionCreationNamespaceAllowOverridesBroker() throws Exception {
final String topic = "persistent://prop/ns-abc/test-subtopic";
final String topic = "persistent://prop/ns-abc/test-subtopic-" + testId.getAndIncrement();
final String subscriptionName = "test-subtopic-sub-2";
final TopicName topicName = TopicName.get(topic);

Expand All @@ -104,7 +107,7 @@ public void testAutoSubscriptionCreationNamespaceAllowOverridesBroker() throws E

@Test
public void testAutoSubscriptionCreationNamespaceDisallowOverridesBroker() throws Exception {
final String topic = "persistent://prop/ns-abc/test-subtopic";
final String topic = "persistent://prop/ns-abc/test-subtopic-" + testId.getAndIncrement();
final String subscriptionName = "test-subtopic-sub-3";
final TopicName topicName = TopicName.get(topic);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,15 @@ public void testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreat
Assert.assertEquals(consumer.getConsumers().size(), 5);
}

@Test(timeOut = 60000)
public void testCreateMissedPartitions() throws JsonProcessingException, KeeperException, InterruptedException, PulsarAdminException, PulsarClientException {
@DataProvider(name = "restCreateMissedPartitions")
public Object[] restCreateMissedPartitions() {
return new Object[] { true, false };
}

@Test(timeOut = 60000, dataProvider = "restCreateMissedPartitions")
public void testCreateMissedPartitions(boolean useRestApi) throws JsonProcessingException, KeeperException, InterruptedException, PulsarAdminException, PulsarClientException {
conf.setAllowAutoTopicCreation(false);
final String topic = "testCreateMissedPartitions";
final String topic = "testCreateMissedPartitions-useRestApi-" + useRestApi;
String path = ZkAdminPaths.partitionedTopicPath(TopicName.get(topic));
int numPartitions = 3;
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
Expand All @@ -134,7 +139,14 @@ public void testCreateMissedPartitions() throws JsonProcessingException, KeeperE
//ok here, consumer will create failed with 'Topic does not exist'
}
Assert.assertNull(consumer);
admin.topics().createMissedPartitions(topic);
if (useRestApi) {
admin.topics().createMissedPartitions(topic);
} else {
final TopicName topicName = TopicName.get(topic);
for (int i = 0; i < numPartitions; i++) {
admin.topics().createNonPartitionedTopic(topicName.getPartition(i).toString());
}
}
consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe();
Assert.assertNotNull(consumer);
Assert.assertTrue(consumer instanceof MultiTopicsConsumerImpl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,18 @@ public void initInBroker(ServiceConfiguration brokerConfig,
LOG.info("Function worker service setup completed");
}

private void tryCreateNonPartitionedTopic(final String topic) throws PulsarAdminException {
try {
getBrokerAdmin().topics().createNonPartitionedTopic(topic);
} catch (PulsarAdminException e) {
if (e instanceof PulsarAdminException.ConflictException) {
log.warn("Failed to create topic '{}': {}", topic, e.getMessage());
} else {
throw e;
}
}
}

@Override
public void start(AuthenticationService authenticationService,
AuthorizationService authorizationService,
Expand Down Expand Up @@ -435,9 +447,9 @@ public void start(AuthenticationService authenticationService,
this.functionAdmin = clientCreator.newPulsarAdmin(functionWebServiceUrl, workerConfig);
this.client = clientCreator.newPulsarClient(workerConfig.getPulsarServiceUrl(), workerConfig);

getBrokerAdmin().topics().createNonPartitionedTopic(workerConfig.getFunctionAssignmentTopic());
getBrokerAdmin().topics().createNonPartitionedTopic(workerConfig.getClusterCoordinationTopic());
getBrokerAdmin().topics().createNonPartitionedTopic(workerConfig.getFunctionMetadataTopic());
tryCreateNonPartitionedTopic(workerConfig.getFunctionAssignmentTopic());
tryCreateNonPartitionedTopic(workerConfig.getClusterCoordinationTopic());
tryCreateNonPartitionedTopic(workerConfig.getFunctionMetadataTopic());
//create scheduler manager
this.schedulerManager = new SchedulerManager(workerConfig, client, getBrokerAdmin(), workerStatsManager, errorNotifier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.testng.Assert.fail;
import com.google.gson.Gson;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -35,6 +34,8 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import net.jodah.failsafe.Failsafe;
Expand Down Expand Up @@ -110,6 +111,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
.withMaxDuration(ONE_MINUTE)
.withDelay(TEN_SECONDS)
.onRetry(e -> log.error("Retry ... "));
private final AtomicInteger testId = new AtomicInteger(0);

PulsarFunctionsTest(FunctionRuntimeType functionRuntimeType) {
super(functionRuntimeType);
Expand Down Expand Up @@ -2344,7 +2346,7 @@ private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWit

final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
final String outputTopicName = "debe-output-topic-name";
final String outputTopicName = "debe-output-topic-name-" + testId.getAndIncrement();
boolean isJsonConverter = converterClassName.endsWith("JsonConverter");
final String consumeTopicName = "debezium/mysql-"
+ (isJsonConverter ? "json" : "avro")
Expand Down Expand Up @@ -2440,11 +2442,11 @@ private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWit
getSourceInfoNotFound(tenant, namespace, sourceName);
}

private void testDebeziumPostgreSqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
private void testDebeziumPostgreSqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {

final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
final String outputTopicName = "debe-output-topic-name";
final String outputTopicName = "debe-output-topic-name-" + testId.getAndIncrement();
final String consumeTopicName = "debezium/postgresql/dbserver1.inventory.products";
final String sourceName = "test-source-debezium-postgersql-" + functionRuntimeType + "-" + randomName(8);

Expand Down

0 comments on commit 16a2240

Please sign in to comment.