Skip to content

Commit

Permalink
[pulsar-broker] Fix: unblock stuck thread on update-partition api (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored Apr 30, 2021
1 parent d757db1 commit 464ad59
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.admin.impl;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static org.apache.pulsar.common.util.Codec.decode;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.zafarkhaja.semver.Version;
Expand Down Expand Up @@ -483,7 +484,7 @@ protected void internalUpdatePartitionedTopic(int numPartitions,
throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
}
try {
createSubscriptions(topicName, numPartitions).get();
createSubscriptions(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
Expand Down Expand Up @@ -512,7 +513,7 @@ protected void internalUpdatePartitionedTopic(int numPartitions,
return null;
});
try {
updatePartition.get();
updatePartition.get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("{} Failed to update number of partitions in zk for topic {} and partitions {}",
clientAppId(), topicName, numPartitions, e);
Expand All @@ -529,8 +530,8 @@ protected void internalUpdatePartitionedTopic(int numPartitions,
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
tryCreatePartitionsAsync(numPartitions).get();
updatePartitionedTopic(topicName, numPartitions).get();
tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
updatePartitionedTopic(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
Expand Down

0 comments on commit 464ad59

Please sign in to comment.