Skip to content

Commit

Permalink
Add backoff for setting for getting topic policies. (apache#11487)
Browse files Browse the repository at this point in the history
### Motivation

Currently, if we start a new broker which does not owned any namepsaces bundles.
Then when use the pulsar-admin to setting or getting topic policies, we will get
`topic policies have not been initialized yet` error log and the admin operation will
get failed.

The root cause is we are failed immediately without any retry while the topic polices
cache have not init yet. So the PR to introduce the backoff for setting or getting
the topic policy if encounter the topic policies cache not init exception

### Verifying this change

Remove the cache init check for the tests.
  • Loading branch information
codelipenghui authored Jul 30, 2021
1 parent c798eaf commit bebaadf
Show file tree
Hide file tree
Showing 19 changed files with 438 additions and 570 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.servlet.ServletContext;
import javax.ws.rs.WebApplicationException;
Expand All @@ -43,6 +46,8 @@
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -74,6 +79,7 @@ public abstract class AdminResource extends PulsarWebResource {
private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
public static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
private static final long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000;

protected BookKeeper bookKeeper() {
return pulsar().getBookKeeperClient();
Expand Down Expand Up @@ -341,19 +347,46 @@ protected BacklogQuota namespaceBacklogQuota(String namespace, String namespaceP
return pulsar().getBrokerService().getBacklogQuotaManager().getBacklogQuota(namespace, namespacePath);
}

protected Optional<TopicPolicies> getTopicPolicies(TopicName topicName) {
protected CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetry(TopicName topicName) {
return internalGetTopicPoliciesAsyncWithRetry(topicName,
new AtomicLong(DEFAULT_GET_TOPIC_POLICY_TIMEOUT), null, null);
}

protected CompletableFuture<Optional<TopicPolicies>> internalGetTopicPoliciesAsyncWithRetry(TopicName topicName,
final AtomicLong remainingTime, final Backoff backoff, CompletableFuture<Optional<TopicPolicies>> future) {
CompletableFuture<Optional<TopicPolicies>> response = future == null ? new CompletableFuture<>() : future;
try {
checkTopicLevelPolicyEnable();
return Optional.ofNullable(pulsar().getTopicPoliciesService().getTopicPolicies(topicName));
response.complete(Optional.ofNullable(pulsar()
.getTopicPoliciesService().getTopicPolicies(topicName)));
} catch (RestException re) {
throw re;
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e){
log.error("Topic {} policies have not been initialized yet.", topicName);
throw new RestException(e);
response.completeExceptionally(re);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
Backoff usedBackoff = backoff == null ? new BackoffBuilder()
.setInitialTime(500, TimeUnit.MILLISECONDS)
.setMandatoryStop(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS)
.setMax(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS)
.create() : backoff;
long nextDelay = Math.min(usedBackoff.next(), remainingTime.get());
if (nextDelay <= 0) {
response.completeExceptionally(new TimeoutException(
String.format("Failed to get topic policy withing configured timeout %s ms",
DEFAULT_GET_TOPIC_POLICY_TIMEOUT)));
} else {
if (log.isDebugEnabled()) {
log.error("Topic {} policies have not been initialized yet, retry after {}ms",
topicName, nextDelay);
}
pulsar().getExecutor().schedule(() -> {
remainingTime.addAndGet(-nextDelay);
internalGetTopicPoliciesAsyncWithRetry(topicName, remainingTime, usedBackoff, response);
}, nextDelay, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
log.error("[{}] Failed to get topic policies {}", clientAppId(), topicName, e);
throw new RestException(e);
response.completeExceptionally(e);
}
return response;
}

protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retention) {
Expand Down
Loading

0 comments on commit bebaadf

Please sign in to comment.