Skip to content

Commit

Permalink
Fix some topic policy operation without backoff (apache#11560)
Browse files Browse the repository at this point in the history
* Fix some topic policy operation without backoff

Related to apache#11487
  • Loading branch information
codelipenghui authored Aug 5, 2021
1 parent 1d20e92 commit 92a3ac7
Showing 1 changed file with 42 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
Expand Down Expand Up @@ -782,41 +781,25 @@ protected CompletableFuture<OffloadPoliciesImpl> internalGetOffloadPolicies(bool
}

protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies have not been initialized yet.", topicName);
throw new RestException(Status.PRECONDITION_FAILED, "Policies have not been initialized yet");
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
}
topicPolicies.setOffloadPolicies(offloadPolicies);
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.thenCompose((res) -> {
//The policy update is asynchronous. Cache at this step may not be updated yet.
//So we need to set the loader by the incoming offloadPolicies instead of topic policies cache.
PartitionedTopicMetadata metadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
if (metadata.partitions > 0) {
List<CompletableFuture<Void>> futures = new ArrayList<>(metadata.partitions);
for (int i = 0; i < metadata.partitions; i++) {
futures.add(internalUpdateOffloadPolicies(offloadPolicies, topicName.getPartition(i)));
}
return FutureUtil.waitForAll(futures);
} else {
return internalUpdateOffloadPolicies(offloadPolicies, topicName);
}
})
.whenComplete((result, e) -> {
if (e != null) {
completableFuture.completeExceptionally(e);
} else {
completableFuture.complete(null);
return getTopicPoliciesAsyncWithRetry(topicName)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setOffloadPolicies(offloadPolicies);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}).thenCompose(__ -> {
//The policy update is asynchronous. Cache at this step may not be updated yet.
//So we need to set the loader by the incoming offloadPolicies instead of topic policies cache.
PartitionedTopicMetadata metadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
if (metadata.partitions > 0) {
List<CompletableFuture<Void>> futures = new ArrayList<>(metadata.partitions);
for (int i = 0; i < metadata.partitions; i++) {
futures.add(internalUpdateOffloadPolicies(offloadPolicies, topicName.getPartition(i)));
}
});
return completableFuture;
return FutureUtil.waitForAll(futures);
} else {
return internalUpdateOffloadPolicies(offloadPolicies, topicName);
}
});
}

protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolicies(boolean applied) {
Expand All @@ -835,19 +818,12 @@ protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolic
}

protected CompletableFuture<Void> internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) {
TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies have not been initialized yet.", topicName);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Policies have not been initialized yet"));
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
}
topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
return getTopicPoliciesAsyncWithRetry(topicName)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}

private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPoliciesImpl offloadPolicies,
Expand Down Expand Up @@ -898,18 +874,12 @@ protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(In
"maxUnackedNum must be 0 or more");
}

TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies have not been initialized yet.", topicName);
throw new RestException(Status.PRECONDITION_FAILED, "Policies have not been initialized yet");
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
}
topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
return getTopicPoliciesAsyncWithRetry(topicName)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}

protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnConsumer(boolean applied) {
Expand All @@ -930,19 +900,12 @@ protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Intege
"maxUnackedNum must be 0 or more");
}

TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies have not been initialized yet.", topicName);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Policies have not been initialized yet"));
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
}
topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
return getTopicPoliciesAsyncWithRetry(topicName)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}

protected CompletableFuture<Void> internalSetDeduplicationSnapshotInterval(Integer interval) {
Expand Down Expand Up @@ -2776,18 +2739,12 @@ protected CompletableFuture<Boolean> internalGetDeduplication(boolean applied) {
}

protected CompletableFuture<Void> internalSetDeduplication(Boolean enabled) {
TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies have not been initialized yet.", topicName);
throw new RestException(Status.PRECONDITION_FAILED, "Policies have not been initialized yet");
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
}
topicPolicies.setDeduplicationEnabled(enabled);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
return getTopicPoliciesAsyncWithRetry(topicName)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setDeduplicationEnabled(enabled);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}

protected CompletableFuture<Void> internalSetMessageTTL(Integer ttlInSecond) {
Expand Down

0 comments on commit 92a3ac7

Please sign in to comment.