Skip to content

Commit

Permalink
[pulsar-broker] Fix: handle failed partitions topic creation (apache#…
Browse files Browse the repository at this point in the history
…10374)

* [pulsar-broker] Fix: handle failed partitions topic creation

* fix test
  • Loading branch information
rdhabalia authored Oct 6, 2021
1 parent 8b55636 commit bfa2b29
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,12 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative) {
* @param numPartitions
*/
protected void internalUpdatePartitionedTopic(int numPartitions,
boolean updateLocalTopicOnly, boolean authoritative) {
boolean updateLocalTopicOnly, boolean authoritative,
boolean force) {
validateTopicOwnership(topicName, authoritative);
validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);

// Only do the validation if it's the first hop.
if (!updateLocalTopicOnly) {
if (!updateLocalTopicOnly && !force) {
validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
}
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
Expand Down Expand Up @@ -459,7 +459,8 @@ protected void internalUpdatePartitionedTopic(int numPartitions,
}
try {
tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
updatePartitionedTopic(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
updatePartitionedTopic(topicName, numPartitions, force).get(DEFAULT_OPERATION_TIMEOUT_SEC,
TimeUnit.SECONDS);
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
Expand Down Expand Up @@ -507,7 +508,7 @@ private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions,
}
results.add(pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics()
.updatePartitionedTopicAsync(topicName.toString(),
numPartitions, true));
numPartitions, true, false));
});
return FutureUtil.waitForAll(results);
}
Expand Down Expand Up @@ -3651,34 +3652,39 @@ private PersistentReplicator getReplicatorReference(String replName, PersistentT
}
}

private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int numPartitions) {
return createSubscriptions(topicName, numPartitions)
.thenCompose(__ -> {
CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources()
.updatePartitionedTopicAsync(topicName,
p -> new PartitionedTopicMetadata(numPartitions));
future.exceptionally(ex -> {
// If the update operation fails, clean up the partitions that were created
getPartitionedTopicMetadataAsync(topicName, false, false)
.thenAccept(metadata -> {
int oldPartition = metadata.partitions;
for (int i = oldPartition; i < numPartitions; i++) {
topicResources().deletePersistentTopicAsync(topicName.getPartition(i))
.exceptionally(ex1 -> {
log.warn("[{}] Failed to clean up managedLedger {}",
clientAppId(),
topicName, ex1.getCause());
return null;
});
}
}).exceptionally(e -> {
log.warn("[{}] Failed to clean up managedLedger", topicName, e);
return null;
});
return null;
});
return future;

private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int numPartitions, boolean force) {
CompletableFuture<Void> result = new CompletableFuture<>();
createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources()
.updatePartitionedTopicAsync(topicName, p -> new PartitionedTopicMetadata(numPartitions));
future.exceptionally(ex -> {
// If the update operation fails, clean up the partitions that were created
getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
int oldPartition = metadata.partitions;
for (int i = oldPartition; i < numPartitions; i++) {
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1 -> {
log.warn("[{}] Failed to clean up managedLedger {}", clientAppId(), topicName,
ex1.getCause());
return null;
});
}
}).exceptionally(e -> {
log.warn("[{}] Failed to clean up managedLedger", topicName, e);
return null;
});
return null;
});
return future;
}).thenAccept(__ -> result.complete(null)).exceptionally(ex -> {
if (force && ex.getCause() instanceof PulsarAdminException.ConflictException) {
result.complete(null);
return null;
}
result.completeExceptionally(ex);
return null;
});
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,10 @@ public void updatePartitionedTopic(@PathParam("property") String property, @Path
@QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("force") @DefaultValue("false") boolean force,
int numPartitions) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative);
internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,12 +725,13 @@ public void updatePartitionedTopic(
@QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("force") @DefaultValue("false") boolean force,
@ApiParam(value = "The number of partitions for the topic",
required = true, type = "int", defaultValue = "0")
int numPartitions) {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validatePartitionedTopicMetadata(tenant, namespace, encodedTopic);
internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative);
internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2144,15 +2144,13 @@ public void testGetListInBundle() throws Exception {
public void testGetTopicsWithDifferentMode() throws Exception {
final String namespace = "prop-xyz/ns1";

final String persistentTopicName = TopicName.get(
"persistent",
NamespaceName.get(namespace),
"get_topics_mode_" + UUID.randomUUID().toString()).toString();
final String persistentTopicName = TopicName
.get("persistent", NamespaceName.get(namespace), "get_topics_mode_" + UUID.randomUUID().toString())
.toString();

final String nonPersistentTopicName = TopicName.get(
"non-persistent",
NamespaceName.get(namespace),
"get_topics_mode_" + UUID.randomUUID().toString()).toString();
final String nonPersistentTopicName = TopicName
.get("non-persistent", NamespaceName.get(namespace), "get_topics_mode_" + UUID.randomUUID().toString())
.toString();

Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistentTopicName).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(nonPersistentTopicName).create();
Expand Down Expand Up @@ -2185,6 +2183,7 @@ public void testGetTopicsWithDifferentMode() throws Exception {
producer2.close();
}


@Test(dataProvider = "isV1")
public void testNonPartitionedTopic(boolean isV1) throws Exception {
String tenant = "prop-xyz";
Expand All @@ -2195,4 +2194,41 @@ public void testNonPartitionedTopic(boolean isV1) throws Exception {
admin.topics().createNonPartitionedTopic(topic);
assertTrue(admin.topics().getList(namespace).contains(topic));
}

/**
* Validate retring failed partitioned topic should succeed.
* @throws Exception
*/
@Test
public void testFailedUpdatePartitionedTopic() throws Exception {
final String topicName = "failed-topic";
final String subName1 = topicName + "-my-sub-1";
final int startPartitions = 4;
final int newPartitions = 8;
final String partitionedTopicName = "persistent://prop-xyz/ns1/" + topicName;

URL pulsarUrl = new URL(pulsar.getWebServiceAddress());

admin.topics().createPartitionedTopic(partitionedTopicName, startPartitions);
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
Consumer<byte[]> consumer1 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName1)
.subscriptionType(SubscriptionType.Shared).subscribe();
consumer1.close();

// validate partition topic is created
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, startPartitions);

// create a subscription for few new partition which can fail
admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1,
MessageId.earliest);

try {
admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, false);
} catch (PulsarAdminException.PreconditionFailedException e) {
// Ok
}
admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, true);
// validate subscription is created for new partition.
assertNotNull(admin.topics().getStats(partitionedTopicName + "-partition-" + 6).getSubscriptions().get(subName1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,8 @@ public void testUpdatePartitionedTopicCoontainedInOldTopic() throws Exception {
verify(response2, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());

persistentTopics.updatePartitionedTopic(property, cluster, namespace, partitionedTopicName2, false, false, 10);
persistentTopics.updatePartitionedTopic(property, cluster, namespace, partitionedTopicName2, false, false,
false, 10);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,8 @@ public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, false, false, 10);
persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, false, false, false,
10);
}

@Test(timeOut = 10_000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,46 @@ List<String> getListInBundle(String namespace, String bundleRange)
* Number of new partitions of already exist partitioned-topic
* @param updateLocalTopicOnly
* Used by broker for global topic with multiple replicated clusters
* @param force
* Update forcefully without validating existing partitioned topic
* @returns a future that can be used to track when the partitioned topic is updated
*/
void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocalTopicOnly, boolean force)
throws PulsarAdminException;

/**
* Update number of partitions of a non-global partitioned topic asynchronously.
* <p/>
* It requires partitioned-topic to be already exist and number of new partitions must be greater than existing
* number of partitions. Decrementing number of partitions requires deletion of topic which is not supported.
* <p/>
*
* @param topic
* Topic name
* @param numPartitions
* Number of new partitions of already exist partitioned-topic
* @param updateLocalTopicOnly
* Used by broker for global topic with multiple replicated clusters
* @param force
* Update forcefully without validating existing partitioned topic
* @return a future that can be used to track when the partitioned topic is updated
*/
CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions, boolean updateLocalTopicOnly,
boolean force);

/**
* Update number of partitions of a non-global partitioned topic.
* <p/>
* It requires partitioned-topic to be already exist and number of new partitions must be greater than existing
* number of partitions. Decrementing number of partitions requires deletion of topic which is not supported.
* <p/>
*
* @param topic
* Topic name
* @param numPartitions
* Number of new partitions of already exist partitioned-topic
* @param updateLocalTopicOnly
* Used by broker for global topic with multiple replicated clusters
* @returns a future that can be used to track when the partitioned topic is updated
*/
void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocalTopicOnly)
Expand All @@ -463,7 +502,6 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal
* Number of new partitions of already exist partitioned-topic
* @param updateLocalTopicOnly
* Used by broker for global topic with multiple replicated clusters
*
* @return a future that can be used to track when the partitioned topic is updated
*/
CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions, boolean updateLocalTopicOnly);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,14 +455,20 @@ public void updatePartitionedTopic(String topic, int numPartitions)

@Override
public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions) {
return updatePartitionedTopicAsync(topic, numPartitions, false);
return updatePartitionedTopicAsync(topic, numPartitions, false, false);
}

@Override
public void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocalTopicOnly)
throws PulsarAdminException {
updatePartitionedTopic(topic, numPartitions, updateLocalTopicOnly, false);
}

@Override
public void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocalTopicOnly, boolean force)
throws PulsarAdminException {
try {
updatePartitionedTopicAsync(topic, numPartitions, updateLocalTopicOnly)
updatePartitionedTopicAsync(topic, numPartitions, updateLocalTopicOnly, force)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
Expand All @@ -477,10 +483,17 @@ public void updatePartitionedTopic(String topic, int numPartitions, boolean upda
@Override
public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions,
boolean updateLocalTopicOnly) {
return updatePartitionedTopicAsync(topic, numPartitions, updateLocalTopicOnly, false);
}

@Override
public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions,
boolean updateLocalTopicOnly, boolean force) {
checkArgument(numPartitions > 0, "Number of partitions must be more than 0");
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitions");
path = path.queryParam("updateLocalTopicOnly", Boolean.toString(updateLocalTopicOnly));
path = path.queryParam("updateLocalTopicOnly", Boolean.toString(updateLocalTopicOnly)).queryParam("force",
force);
return asyncPostRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ public void topics() throws Exception {
verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1");

cmdTopics.run(split("update-partitioned-topic persistent://myprop/clust/ns1/ds1 -p 6"));
verify(mockTopics).updatePartitionedTopic("persistent://myprop/clust/ns1/ds1", 6);
verify(mockTopics).updatePartitionedTopic("persistent://myprop/clust/ns1/ds1", 6, false, false);

cmdTopics.run(split("get-partitioned-topic-metadata persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getPartitionedTopicMetadata("persistent://myprop/clust/ns1/ds1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,14 @@ private class UpdatePartitionedCmd extends CliCommand {
"--partitions" }, description = "Number of partitions for the topic", required = true)
private int numPartitions;

@Parameter(names = { "-f",
"--force" }, description = "Update forcefully without validating existing partitioned topic ", required = false)
private boolean force;

@Override
void run() throws Exception {
String topic = validateTopicName(params);
getTopics().updatePartitionedTopic(topic, numPartitions);
getTopics().updatePartitionedTopic(topic, numPartitions, false, force);
}
}

Expand Down

0 comments on commit bfa2b29

Please sign in to comment.