Skip to content

Commit

Permalink
[Broker] Make PersistentTopicsBase#internalSetBacklogQuota async (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie authored Feb 8, 2022
1 parent 9165aed commit b91d0c4
Showing 1 changed file with 45 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2859,44 +2859,49 @@ protected void internalGetBacklogSizeByMessageId(AsyncResponse asyncResponse,
}

protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
BacklogQuotaImpl backlogQuota, boolean isGlobal) {
validateTopicPolicyOperation(topicName, PolicyName.BACKLOG, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();

BacklogQuotaImpl backlogQuota, boolean isGlobal) {
BacklogQuota.BacklogQuotaType finalBacklogQuotaType = backlogQuotaType == null
? BacklogQuota.BacklogQuotaType.destination_storage : backlogQuotaType;

return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, topicPolicies);
if (!checkBacklogQuota(backlogQuota, retentionPolicies)) {
log.warn(
"[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota",
clientAppId(), topicName);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Backlog Quota exceeds configured retention quota for topic. "
+ "Please increase retention quota and retry"));
}

if (backlogQuota != null) {
topicPolicies.getBackLogQuotaMap().put(finalBacklogQuotaType.name(), backlogQuota);
} else {
topicPolicies.getBackLogQuotaMap().remove(finalBacklogQuotaType.name());
}
Map<String, BacklogQuotaImpl> backLogQuotaMap = topicPolicies.getBackLogQuotaMap();
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.thenRun(() -> {
try {
log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
jsonMapper().writeValueAsString(backLogQuotaMap));
} catch (JsonProcessingException ignore) { }
return validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG, PolicyOperation.WRITE)
.thenAccept(__ -> validatePoliciesReadOnlyAccess())
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal))
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
return getRetentionPoliciesAsync(topicName, topicPolicies)
.thenCompose(retentionPolicies -> {
if (!checkBacklogQuota(backlogQuota, retentionPolicies)) {
log.warn(
"[{}] Failed to update backlog configuration for topic {}: conflicts with"
+ " retention quota",
clientAppId(), topicName);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Backlog Quota exceeds configured retention quota for topic. "
+ "Please increase retention quota and retry"));
}
if (backlogQuota != null) {
topicPolicies.getBackLogQuotaMap().put(finalBacklogQuotaType.name(), backlogQuota);
} else {
topicPolicies.getBackLogQuotaMap().remove(finalBacklogQuotaType.name());
}
Map<String, BacklogQuotaImpl> backLogQuotaMap = topicPolicies.getBackLogQuotaMap();
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService()
.updateTopicPoliciesAsync(topicName, topicPolicies)
.thenRun(() -> {
try {
log.info(
"[{}] Successfully updated backlog quota map: namespace={}, "
+ "topic={}, map={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
jsonMapper().writeValueAsString(backLogQuotaMap));
} catch (JsonProcessingException ignore) {
}
});
});
});
});
}

protected CompletableFuture<Void> internalSetReplicationClusters(List<String> clusterIds) {
Expand Down Expand Up @@ -2994,18 +2999,14 @@ protected CompletableFuture<Void> internalSetMessageTTL(Integer ttlInSecond, boo
});
}

private RetentionPolicies getRetentionPolicies(TopicName topicName, TopicPolicies topicPolicies) {
private CompletableFuture<RetentionPolicies> getRetentionPoliciesAsync(TopicName topicName,
TopicPolicies topicPolicies) {
RetentionPolicies retentionPolicies = topicPolicies.getRetentionPolicies();
if (retentionPolicies == null){
try {
retentionPolicies = getNamespacePoliciesAsync(topicName.getNamespaceObject())
.thenApply(policies -> policies.retention_policies)
.get(1L, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RestException(e);
}
if (retentionPolicies != null) {
return CompletableFuture.completedFuture(retentionPolicies);
}
return retentionPolicies;
return getNamespacePoliciesAsync(topicName.getNamespaceObject())
.thenApply(policies -> policies.retention_policies);
}

protected CompletableFuture<RetentionPolicies> internalGetRetention(boolean applied, boolean isGlobal) {
Expand Down

0 comments on commit b91d0c4

Please sign in to comment.