Skip to content

Commit

Permalink
Add more config for auto-topic-creation (apache#4963)
Browse files Browse the repository at this point in the history
Master Issue:  apache#4926

### Motivation


Curently the partitioned-topic and non-partitioned topic is a little confuse for users. in PR apache#3450 we add config for auto-topic-creation.
We could leverage this config to provide some more config for auto-topic-creation.

### Modifications

- Add `allowAutoTopicCreationType` and `allowAutoTopicCreationNumPartitions` to configuration.
- Users can use both configurations when they decide to create a topic automatically.
- Add test.
- Update doc.
  • Loading branch information
fxbing authored and sijie committed Sep 17, 2019
1 parent 6df8d40 commit 547c421
Show file tree
Hide file tree
Showing 36 changed files with 442 additions and 46 deletions.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ ttlDurationDefaultInSeconds=0
# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
allowAutoTopicCreation=true

# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=partitioned

# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
defaultNumPartitions=1

# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=true

Expand Down
9 changes: 9 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -592,3 +592,12 @@ allowLoopback=true
# will heart performance. It is better to give a higher number of gc
# interval if there is enough disk capacity.
gcWaitTime=300000

# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
allowAutoTopicCreation=true

# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=non-partitioned

# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
defaultNumPartitions=1
Original file line number Diff line number Diff line change
Expand Up @@ -895,9 +895,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Allow automated creation of non-partition topics if set to true (default value)."
doc = "Allow automated creation of topics if set to true (default value)."
)
private boolean allowAutoTopicCreation = true;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)"
)
private String allowAutoTopicCreationType = "partitioned";
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "The number of partitioned topics that is allowed to be automatically created"
+ "if allowAutoTopicCreationType is partitioned."
)
private int defaultNumPartitions = 1;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Number of threads to be used for managed ledger tasks dispatching"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.pulsar.broker.admin;

import com.fasterxml.jackson.core.JsonProcessingException;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import org.apache.pulsar.common.api.proto.PulsarApi;
import static org.apache.pulsar.common.util.Codec.decode;

import java.net.MalformedURLException;
Expand Down Expand Up @@ -81,6 +83,7 @@
public abstract class AdminResource extends PulsarWebResource {
private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
private static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";

protected ZooKeeper globalZk() {
Expand Down Expand Up @@ -500,7 +503,7 @@ protected ZooKeeperChildrenCache failureDomainListCache() {
}

protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
boolean authoritative) {
boolean authoritative, boolean checkAllowAutoCreation) {
validateClusterOwnership(topicName.getCluster());
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
Expand All @@ -519,7 +522,12 @@ protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicNa
}

String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(), topicName.getEncodedLocalName());
PartitionedTopicMetadata partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);
PartitionedTopicMetadata partitionMetadata;
if (checkAllowAutoCreation) {
partitionMetadata = fetchPartitionedTopicMetadataCheckAllowAutoCreation(pulsar(), path, topicName);
} else {
partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);
}

if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), topicName,
Expand Down Expand Up @@ -566,6 +574,96 @@ public PartitionedTopicMetadata deserialize(String key, byte[] content) throws E
return metadataFuture;
}

protected static PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllowAutoCreation(
PulsarService pulsar, String path, TopicName topicName) {
try {
return fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsar, path, topicName)
.get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e;
}
throw new RestException(e);
}
}

protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(
PulsarService pulsar, String path, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
boolean allowAutoTopicCreation = pulsar.getConfiguration().isAllowAutoTopicCreation();
String topicType = pulsar.getConfiguration().getAllowAutoTopicCreationType();
boolean topicExist;
try {
topicExist = pulsar.getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
.contains(topicName.toString());
} catch (Exception e) {
log.warn("Unexpected error while getting list of topics. topic={}. Error: {}",
topicName, e.getMessage(), e);
throw new RestException(e);
}
fetchPartitionedTopicMetadataAsync(pulsar, path).whenCompleteAsync((metadata, ex) -> {
if (ex != null) {
metadataFuture.completeExceptionally(ex);
// If topic is already exist, creating partitioned topic is not allowed.
} else if (metadata.partitions == 0 && !topicExist && allowAutoTopicCreation &&
TopicType.PARTITIONED.toString().equals(topicType)) {
createDefaultPartitionedTopicAsync(pulsar, path).whenComplete((defaultMetadata, e) -> {
if (e == null) {
metadataFuture.complete(defaultMetadata);
} else if (e instanceof KeeperException) {
try {
Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
if (!pulsar.getGlobalZkCache().exists(path)){
metadataFuture.completeExceptionally(e);
return;
}
} catch (InterruptedException | KeeperException exc) {
metadataFuture.completeExceptionally(exc);
return;
}
fetchPartitionedTopicMetadataAsync(pulsar, path).whenComplete((metadata2, ex2) -> {
if (ex2 != null) {
metadataFuture.completeExceptionally(ex2);
} else {
metadataFuture.complete(metadata2);
}
});
} else {
metadataFuture.completeExceptionally(e);
}
});
} else {
metadataFuture.complete(metadata);
}
});
} catch (Exception e) {
metadataFuture.completeExceptionally(e);
}
return metadataFuture;
}

protected static CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(
PulsarService pulsar, String path) {
int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");
PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = new CompletableFuture<>();
try {
byte[] content = jsonMapper().writeValueAsBytes(configMetadata);
ZkUtils.createFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(), path, content,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// we wait for the data to be synced in all quorums and the observers
Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
partitionedTopicFuture.complete(configMetadata);
} catch (JsonProcessingException | KeeperException | InterruptedException e) {
log.error("Failed to create default partitioned topic.", e);
partitionedTopicFuture.completeExceptionally(e);
}
return partitionedTopicFuture;
}

protected void validateClusterExists(String cluster) {
try {
if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
Expand Down Expand Up @@ -627,4 +725,18 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
partitionedTopics.sort(null);
return partitionedTopics;
}

enum TopicType {
PARTITIONED("partitioned"),
NON_PARTITIONED("non-partitioned");
private String type;

TopicType(String type) {
this.type = type;
}

public String toString() {
return type;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,13 @@ public void healthcheck(@Suspended AsyncResponse asyncResponse) throws Exception
PulsarClient client = pulsar().getClient();

String messageStr = UUID.randomUUID().toString();
// create non-partitioned topic manually
try {
pulsar().getBrokerService().getTopic(topic, true).get();
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return;
}
CompletableFuture<Producer<String>> producerFuture =
client.newProducer(Schema.STRING).topic(topic).createAsync();
CompletableFuture<Reader<String>> readerFuture = client.newReader(Schema.STRING)
Expand Down
Loading

0 comments on commit 547c421

Please sign in to comment.