Skip to content

Commit

Permalink
[Broker] Make PersistentTopicsBase#internalUpdatePartitionedTopic asy…
Browse files Browse the repository at this point in the history
…nc (apache#14141)
  • Loading branch information
Technoboy- authored Apr 30, 2022
1 parent c55fcb7 commit 003182b
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.servlet.ServletContext;
Expand Down Expand Up @@ -251,18 +250,13 @@ protected void validatePartitionedTopicName(String tenant, String namespace, Str
}
}

protected void validatePartitionedTopicMetadata(String tenant, String namespace, String encodedTopic) {
try {
PartitionedTopicMetadata partitionedTopicMetadata =
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
if (partitionedTopicMetadata.partitions < 1) {
throw new RestException(Status.CONFLICT, "Topic is not partitioned topic");
}
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to validate partitioned topic metadata {}://{}/{}/{}",
domain(), tenant, namespace, topicName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, "Check topic partition meta failed.");
}
protected CompletableFuture<Void> validatePartitionedTopicMetadataAsync() {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
.thenAccept(metadata -> {
if (metadata.partitions < 1) {
throw new RestException(Status.CONFLICT, "Topic is not partitioned topic");
}
});
}

@Deprecated
Expand Down Expand Up @@ -545,6 +539,17 @@ protected Set<String> getNamespaceReplicatedClusters(NamespaceName namespaceName
}
}

protected CompletableFuture<Set<String>> getNamespaceReplicatedClustersAsync(NamespaceName namespaceName) {
return namespaceResources().getPoliciesAsync(namespaceName)
.thenApply(policies -> {
if (policies.isPresent()) {
return policies.get().replication_clusters;
} else {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}
});
}

protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
try {
return namespaceResources().getPartitionedTopicResources()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.admin.impl;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -186,6 +185,19 @@ protected List<String> internalGetList(Optional<String> bundle) {
}
}

protected CompletableFuture<List<String>> internalGetListAsync() {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
.thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
.thenAccept(exists -> {
if (!exists) {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}
})
.thenCompose(__ -> topicResources().listPersistentTopicsAsync(namespaceName))
.thenApply(topics -> topics.stream().filter(topic ->
!isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList()));
}

protected List<String> internalGetPartitionedTopicList() {
validateNamespaceOperation(namespaceName, NamespaceOperation.GET_TOPICS);
// Validate that namespace exists, throws 404 if it doesn't exist
Expand Down Expand Up @@ -418,88 +430,65 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<Stri
* well. Therefore, it can violate partition ordering at producers until all producers are restarted at application.
*
* @param numPartitions
* @param updateLocalTopicOnly
* @param authoritative
* @param force
*/
protected void internalUpdatePartitionedTopic(int numPartitions,
boolean updateLocalTopicOnly, boolean authoritative,
boolean force) {
protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int numPartitions,
boolean updateLocalTopicOnly,
boolean authoritative, boolean force) {
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
return FutureUtil.failedFuture(new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be more than 0"));
}

validateTopicOwnership(topicName, authoritative);
validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
// Only do the validation if it's the first hop.
if (!updateLocalTopicOnly && !force) {
validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
}
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
if (maxPartitions > 0 && numPartitions > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal to " + maxPartitions);
}

if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());
if (!clusters.contains(pulsar().getConfig().getClusterName())) {
log.error("[{}] local cluster is not part of replicated cluster for namespace {}", clientAppId(),
topicName);
throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
}
try {
tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
createSubscriptions(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION,
PolicyOperation.WRITE))
.thenCompose(__ -> {
if (!updateLocalTopicOnly && !force) {
return validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);
} else {
return CompletableFuture.completedFuture(null);
}
log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
// if this cluster is the first hop which needs to coordinate with other clusters then update partitions in
// other clusters and then update number of partitions.
if (!updateLocalTopicOnly) {
CompletableFuture<Void> updatePartition = new CompletableFuture<>();
updatePartitionInOtherCluster(numPartitions, clusters).thenRun(() -> {
try {
namespaceResources().getPartitionedTopicResources()
.updatePartitionedTopicAsync(topicName, p ->
new PartitionedTopicMetadata(numPartitions)
).thenAccept(r -> updatePartition.complete(null)).exceptionally(ex -> {
updatePartition.completeExceptionally(ex.getCause());
return null;
});
} catch (Exception e) {
updatePartition.completeExceptionally(e);
}
}).exceptionally(ex -> {
updatePartition.completeExceptionally(ex);
return null;
});
try {
updatePartition.get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("{} Failed to update number of partitions in zk for topic {} and partitions {}",
clientAppId(), topicName, numPartitions, e);
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
}
throw new RestException(e);
})
.thenCompose(__ -> {
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
if (maxPartitions > 0 && numPartitions > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal to " + maxPartitions);
}
}
return;
}

try {
tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
updatePartitionedTopic(topicName, numPartitions, force).get(DEFAULT_OPERATION_TIMEOUT_SEC,
TimeUnit.SECONDS);
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
}
log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
// Only do the validation if it's the first hop.
if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
return getNamespaceReplicatedClustersAsync(topicName.getNamespaceObject())
.thenApply(clusters -> {
if (!clusters.contains(pulsar().getConfig().getClusterName())) {
log.error("[{}] local cluster is not part of replicated cluster for namespace {}",
clientAppId(), topicName);
throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate"
+ " cluster list");
}
return clusters;
})
.thenCompose(clusters -> tryCreatePartitionsAsync(numPartitions).thenApply(ignore ->
clusters))
.thenCompose(clusters -> createSubscriptions(topicName, numPartitions).thenApply(ignore ->
clusters))
.thenCompose(clusters -> {
if (!updateLocalTopicOnly) {
return updatePartitionInOtherCluster(numPartitions, clusters)
.thenCompose(v -> namespaceResources().getPartitionedTopicResources()
.updatePartitionedTopicAsync(topicName, p ->
new PartitionedTopicMetadata(numPartitions)
));
} else {
return CompletableFuture.completedFuture(null);
}
});
} else {
return tryCreatePartitionsAsync(numPartitions)
.thenCompose(ignore -> updatePartitionedTopic(topicName, numPartitions, force));
}
});
}

protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
Expand Down Expand Up @@ -4187,40 +4176,44 @@ private void validateClientVersion() {
*
* @param topicName
*/
private void validatePartitionTopicUpdate(String topicName, int numberOfPartition) {
List<String> existingTopicList = internalGetList(Optional.empty());
TopicName partitionTopicName = TopicName.get(domain(), namespaceName, topicName);
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(partitionTopicName, false, false);
int oldPartition = metadata.partitions;
String prefix = partitionTopicName.getPartitionedTopicName() + TopicName.PARTITIONED_TOPIC_SUFFIX;
for (String exsitingTopicName : existingTopicList) {
if (exsitingTopicName.startsWith(prefix)) {
try {
long suffix = Long.parseLong(exsitingTopicName.substring(
exsitingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
// Skip partition of partitioned topic by making sure
// the numeric suffix greater than old partition number.
if (suffix >= oldPartition && suffix <= (long) numberOfPartition) {
log.warn(
"[{}] Already have non partition topic {} which contains partition suffix"
+ " '-partition-' and end with numeric value smaller than the new number"
+ " of partition. Update of partitioned topic {} could cause conflict.",
clientAppId(),
exsitingTopicName, topicName);
throw new RestException(Status.PRECONDITION_FAILED,
"Already have non partition topic " + exsitingTopicName
+ " which contains partition suffix '-partition-' "
+ "and end with numeric value and end with numeric value smaller than the new "
+ "number of partition. Update of partitioned topic "
+ topicName + " could cause conflict.");
}
} catch (NumberFormatException e) {
// Do nothing, if value after partition suffix is not pure numeric value,
// as it can't conflict with internal created partitioned topic's name.
}
}
}
private CompletableFuture<Void> validatePartitionTopicUpdateAsync(String topicName, int numberOfPartition) {
return internalGetListAsync().thenCompose(existingTopicList -> {
TopicName partitionTopicName = TopicName.get(domain(), namespaceName, topicName);
String prefix = partitionTopicName.getPartitionedTopicName() + TopicName.PARTITIONED_TOPIC_SUFFIX;
return getPartitionedTopicMetadataAsync(partitionTopicName, false, false)
.thenAccept(metadata -> {
int oldPartition = metadata.partitions;
for (String existingTopicName : existingTopicList) {
if (existingTopicName.startsWith(prefix)) {
try {
long suffix = Long.parseLong(existingTopicName.substring(
existingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
// Skip partition of partitioned topic by making sure
// the numeric suffix greater than old partition number.
if (suffix >= oldPartition && suffix <= (long) numberOfPartition) {
log.warn(
"[{}] Already have non partition topic {} which contains partition"
+ " suffix '-partition-' and end with numeric value smaller"
+ " than the new number of partition. Update of partitioned"
+ " topic {} could cause conflict.",
clientAppId(),
existingTopicName, topicName);
throw new RestException(Status.PRECONDITION_FAILED,
"Already have non partition topic " + existingTopicName
+ " which contains partition suffix '-partition-' "
+ "and end with numeric value and end with numeric value"
+ " smaller than the new number of partition. Update of"
+ " partitioned topic " + topicName + " could cause conflict.");
}
} catch (NumberFormatException e) {
// Do nothing, if value after partition suffix is not pure numeric value,
// as it can't conflict with internal created partitioned topic's name.
}
}
}
});
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,25 @@ public void createNonPartitionedTopic(
@ApiResponse(code = 406, message = "The number of partitions should be more than 0"
+ " and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist")})
public void updatePartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void updatePartitionedTopic(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("force") @DefaultValue("false") boolean force,
int numPartitions) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force);
internalUpdatePartitionedTopicAsync(numPartitions, updateLocalTopicOnly, authoritative, force)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand Down
Loading

0 comments on commit 003182b

Please sign in to comment.