Skip to content

Commit

Permalink
[issue apache#3895] Bugfix for non partitioned topic create (apache#3910
Browse files Browse the repository at this point in the history
)

**Motivation**

Provide a different approach from apache#3902 trying to fix apache#3895.

**Modifications**

  - Add unit test in order to exploit issue.
  - Do proper validations for non topic create.
  - Add missing validateTopicName() method on create non partitioned topic.
  • Loading branch information
lovelle authored and sijie committed Mar 27, 2019
1 parent ee98e8b commit ee7b6a4
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
Expand All @@ -68,7 +66,6 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
Expand Down Expand Up @@ -399,11 +396,16 @@ protected void internalCreatePartitionedTopic(int numPartitions, boolean authori
}

protected void internalCreateNonPartitionedTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateAdminAccessForTenant(topicName.getTenant());

if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}

validateTopicOwnership(topicName, authoritative);
try {
getOrCreateTopic(topicName);
log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), topicName);
Topic createdTopic = getOrCreateTopic(topicName);
log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), createdTopic);
} catch (Exception e) {
log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public void createNonPartitionedTopic(@PathParam("tenant") String tenant, @PathP
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateGlobalNamespaceOwnership(tenant,namespace);
validateTopicName(tenant, namespace, encodedTopic);
internalCreateNonPartitionedTopic(authoritative);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
Expand Down Expand Up @@ -130,4 +131,13 @@ public void testNonPartitionedTopics() {
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopic2, true);
Assert.assertEquals(persistentTopics.getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true).partitions, 0);
}

@Test
public void testCreateNonPartitionedTopic() {
final String topicName = "standard-topic";
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
PartitionedTopicMetadata pMetadata = persistentTopics.getPartitionedMetadata(
testTenant, testNamespace, topicName, true);
Assert.assertEquals(pMetadata.partitions, 0);
}
}

0 comments on commit ee7b6a4

Please sign in to comment.