Skip to content

Commit

Permalink
Fix time based backlog quota. (apache#11509)
Browse files Browse the repository at this point in the history
Fixes apache#11404

### Motivation
Time based backlog quota type message_age is set separately but when check backlog we are only checking against destination_storage type.
So fix to loop through all BacklogQuotaType when checking if backlog exceeded.

### Modification
* Added unit test.
* Added default implementation to make Admin Topic/Namespace backlog quota related API backward compatible.

(cherry picked from commit e82df7c)
  • Loading branch information
MarvinCai authored and hangc0276 committed Aug 11, 2021
1 parent 349f5b9 commit 353c72d
Show file tree
Hide file tree
Showing 22 changed files with 472 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,10 @@ protected void mergeNamespaceWithDefaults(Policies policies, String namespace, S

}

protected BacklogQuota namespaceBacklogQuota(String namespace, String namespacePath) {
return pulsar().getBrokerService().getBacklogQuotaManager().getBacklogQuota(namespace, namespacePath);
protected BacklogQuota namespaceBacklogQuota(String namespace, String namespacePath,
BacklogQuota.BacklogQuotaType backlogQuotaType) {
return pulsar().getBrokerService().getBacklogQuotaManager()
.getBacklogQuota(namespace, namespacePath, backlogQuotaType);
}

protected CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetry(TopicName topicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2625,14 +2625,17 @@ protected CompletableFuture<Map<BacklogQuota.BacklogQuotaType, BacklogQuota>> in
quotaMap = getNamespacePolicies(namespaceName).backlog_quota_map;
if (quotaMap.isEmpty()) {
String namespace = namespaceName.toString();
quotaMap.put(
BacklogQuota.BacklogQuotaType.destination_storage,
namespaceBacklogQuota(namespace, AdminResource.path(POLICIES, namespace))
);
for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
quotaMap.put(
backlogQuotaType,
namespaceBacklogQuota(namespace,
AdminResource.path(POLICIES, namespace), backlogQuotaType)
);
}
}
}
return quotaMap;
});
});
}

protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
Expand Down Expand Up @@ -2748,21 +2751,21 @@ protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retenti
return getTopicPoliciesAsyncWithRetry(topicName)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
BacklogQuota backlogQuota =
topicPolicies.getBackLogQuotaMap()
.get(BacklogQuota.BacklogQuotaType.destination_storage.name());
if (backlogQuota == null) {
Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
}
if (!checkBacklogQuota(backlogQuota, retention)) {
log.warn(
"[{}] Failed to update retention quota configuration for topic {}: "
+ "conflicts with retention quota",
clientAppId(), topicName);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Retention Quota must exceed configured backlog quota for topic. "
+ "Please increase retention quota and retry"));
for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
BacklogQuota backlogQuota = topicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name());
if (backlogQuota == null) {
Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
backlogQuota = policies.backlog_quota_map.get(backlogQuotaType);
}
if (!checkBacklogQuota(backlogQuota, retention)) {
log.warn(
"[{}] Failed to update retention quota configuration for topic {}: "
+ "conflicts with retention quota",
clientAppId(), topicName);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Retention Quota must exceed configured backlog quota for topic. "
+ "Please increase retention quota and retry"));
}
}
topicPolicies.setRetentionPolicies(retention);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public class BacklogQuotaManager {
public BacklogQuotaManager(PulsarService pulsar) {
this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled();
this.defaultQuota = BacklogQuotaImpl.builder()
.limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024)
.limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB()
* BacklogQuotaImpl.BYTES_IN_GIGABYTE)
.limitTime(pulsar.getConfiguration().getBacklogQuotaDefaultLimitSecond())
.retentionPolicy(pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy())
.build();
Expand All @@ -66,42 +67,42 @@ public BacklogQuotaImpl getDefaultQuota() {
return this.defaultQuota;
}

public BacklogQuotaImpl getBacklogQuota(String namespace, String policyPath) {
public BacklogQuotaImpl getBacklogQuota(String namespace, String policyPath, BacklogQuotaType backlogQuotaType) {
try {
return zkCache.get(policyPath)
.map(p -> (BacklogQuotaImpl) p.backlog_quota_map
.getOrDefault(BacklogQuotaType.destination_storage, defaultQuota))
.getOrDefault(backlogQuotaType, defaultQuota))
.orElse(defaultQuota);
} catch (Exception e) {
log.warn("Failed to read policies data, will apply the default backlog quota: namespace={}", namespace, e);
return this.defaultQuota;
}
}

public BacklogQuotaImpl getBacklogQuota(TopicName topicName) {
public BacklogQuotaImpl getBacklogQuota(TopicName topicName, BacklogQuotaType backlogQuotaType) {
String policyPath = AdminResource.path(POLICIES, topicName.getNamespace());
if (!isTopicLevelPoliciesEnable) {
return getBacklogQuota(topicName.getNamespace(), policyPath);
return getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType);
}

try {
return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName))
.map(TopicPolicies::getBackLogQuotaMap)
.map(map -> map.get(BacklogQuotaType.destination_storage.name()))
.orElseGet(() -> getBacklogQuota(topicName.getNamespace(), policyPath));
.map(map -> map.get(backlogQuotaType.name()))
.orElseGet(() -> getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType));
} catch (Exception e) {
log.warn("Failed to read topic policies data, will apply the namespace backlog quota: topicName={}",
topicName, e);
}
return getBacklogQuota(topicName.getNamespace(), policyPath);
return getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType);
}

public long getBacklogQuotaLimitInSize(TopicName topicName) {
return getBacklogQuota(topicName).getLimitSize();
return getBacklogQuota(topicName, BacklogQuotaType.destination_storage).getLimitSize();
}

public int getBacklogQuotaLimitInTime(TopicName topicName) {
return getBacklogQuota(topicName).getLimitTime();
return getBacklogQuota(topicName, BacklogQuotaType.message_age).getLimitTime();
}

/**
Expand All @@ -112,7 +113,7 @@ public int getBacklogQuotaLimitInTime(TopicName topicName) {
public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQuotaType backlogQuotaType,
boolean preciseTimeBasedBacklogQuotaCheck) {
TopicName topicName = TopicName.get(persistentTopic.getName());
BacklogQuota quota = getBacklogQuota(topicName);
BacklogQuota quota = getBacklogQuota(topicName, backlogQuotaType);
log.info("Backlog quota type {} exceeded for topic [{}]. Applying [{}] policy", backlogQuotaType,
persistentTopic.getName(), quota.getPolicy());
switch (quota.getPolicy()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1182,23 +1182,27 @@ protected void handleProducer(final CommandProducer cmdProducer) {

service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
// Before creating producer, check if backlog quota exceeded
// on topic
if (topic.isBacklogQuotaExceeded(producerName)) {
IllegalStateException illegalStateException = new IllegalStateException(
"Cannot create producer on topic with backlog quota exceeded");
BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy();
if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededError,
illegalStateException.getMessage());
} else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededException,
illegalStateException.getMessage());
// on topic for size based limit and time based limit
for (BacklogQuota.BacklogQuotaType backlogQuotaType :
BacklogQuota.BacklogQuotaType.values()) {
if (topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) {
IllegalStateException illegalStateException = new IllegalStateException(
"Cannot create producer on topic with backlog quota exceeded");
BacklogQuota.RetentionPolicy retentionPolicy = topic
.getBacklogQuota(backlogQuotaType).getPolicy();
if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededError,
illegalStateException.getMessage());
} else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededException,
illegalStateException.getMessage());
}
producerFuture.completeExceptionally(illegalStateException);
producers.remove(producerId, producerFuture);
return;
}
producerFuture.completeExceptionally(illegalStateException);
producers.remove(producerId, producerFuture);
return;
}

// Check whether the producer will publish encrypted messages or not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,15 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> onPoliciesUpdate(Policies data);

boolean isBacklogQuotaExceeded(String producerName);
boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType);

boolean isEncryptionRequired();

boolean getSchemaValidationEnforced();

boolean isReplicated();

BacklogQuota getBacklogQuota();
BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType);

void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats,
StatsOutputStream topicStatsStream, ClusterReplicationMetrics clusterReplicationMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
* @return Backlog quota for topic
*/
@Override
public BacklogQuota getBacklogQuota() {
public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) {
// No-op
throw new UnsupportedOperationException("getBacklogQuota method is not supported on non-persistent topic");
}
Expand All @@ -975,7 +975,7 @@ public BacklogQuota getBacklogQuota() {
* @return quota exceeded status for blocking producer creation
*/
@Override
public boolean isBacklogQuotaExceeded(String producerName) {
public boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) {
// No-op
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2369,27 +2369,29 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
* @return Backlog quota for topic
*/
@Override
public BacklogQuota getBacklogQuota() {
public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) {
TopicName topicName = TopicName.get(this.getName());
return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName);
return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName, backlogQuotaType);
}

/**
*
* @return quota exceeded status for blocking producer creation
*/
@Override
public boolean isBacklogQuotaExceeded(String producerName) {
BacklogQuota backlogQuota = getBacklogQuota();
public boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) {
BacklogQuota backlogQuota = getBacklogQuota(backlogQuotaType);

if (backlogQuota != null) {
BacklogQuota.RetentionPolicy retentionPolicy = backlogQuota.getPolicy();

if ((retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold
|| retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception)
&& (isSizeBacklogExceeded() || isTimeBacklogExceeded())) {
log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName);
return true;
|| retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception)) {
if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage && isSizeBacklogExceeded()
|| backlogQuotaType == BacklogQuota.BacklogQuotaType.message_age && isTimeBacklogExceeded()){
log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName);
return true;
}
} else {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
Expand Down Expand Up @@ -99,8 +100,10 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.managedLedgerStats.storageLogicalSize = mlStats.getStoredMessagesLogicalSize();
stats.managedLedgerStats.backlogSize = ml.getEstimatedBacklogSize();
stats.managedLedgerStats.offloadedStorageUsed = ml.getOffloadedSize();
stats.backlogQuotaLimit = topic.getBacklogQuota().getLimitSize();
stats.backlogQuotaLimitTime = topic.getBacklogQuota().getLimitTime();
stats.backlogQuotaLimit = topic
.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize();
stats.backlogQuotaLimitTime = topic
.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime();

stats.managedLedgerStats.storageWriteLatencyBuckets
.addAll(mlStats.getInternalAddEntryLatencyBuckets());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,35 @@
*/
package org.apache.pulsar.broker;

import com.google.common.collect.ImmutableMap;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;

import java.util.Collections;
import java.util.Map;

public class ConfigHelper {
private ConfigHelper() {}


public static Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlogQuotaMap(ServiceConfiguration configuration) {
return Collections.singletonMap(BacklogQuota.BacklogQuotaType.destination_storage,
backlogQuota(configuration));
return ImmutableMap.of(BacklogQuota.BacklogQuotaType.destination_storage,
sizeBacklogQuota(configuration),
BacklogQuota.BacklogQuotaType.message_age,
timeBacklogQuota(configuration));
}

public static BacklogQuota backlogQuota(ServiceConfiguration configuration) {
public static BacklogQuota sizeBacklogQuota(ServiceConfiguration configuration) {
return BacklogQuota.builder()
.limitSize(configuration.getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024)
.limitSize(configuration.getBacklogQuotaDefaultLimitGB() * BacklogQuotaImpl.BYTES_IN_GIGABYTE)
.retentionPolicy(configuration.getBacklogQuotaDefaultRetentionPolicy())
.build();
}

public static BacklogQuota timeBacklogQuota(ServiceConfiguration configuration) {
return BacklogQuota.builder()
.limitTime(configuration.getBacklogQuotaDefaultLimitSecond())
.retentionPolicy(configuration.getBacklogQuotaDefaultRetentionPolicy())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ public void testBacklogQuotaDisabled() {
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);

try {
admin.topics().setBacklogQuota(testTopic, backlogQuota);
admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), HttpStatus.METHOD_NOT_ALLOWED_405);
}

try {
admin.topics().removeBacklogQuota(testTopic);
admin.topics().removeBacklogQuota(testTopic, BacklogQuota.BacklogQuotaType.destination_storage);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), HttpStatus.METHOD_NOT_ALLOWED_405);
Expand Down
Loading

0 comments on commit 353c72d

Please sign in to comment.