Skip to content

Commit

Permalink
Support get applied BlacklogQuota (apache#9828)
Browse files Browse the repository at this point in the history
Master Issue: apache#9216

### Modifications

Add applied API and fix default value in unit test
### Verifying this change
Verify the applied API and CMD
Verify the default value in namespace-level
  • Loading branch information
315157973 authored Mar 8, 2021
1 parent 926bb69 commit c2ebf1b
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,6 @@ protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName na
}

protected void mergeNamespaceWithDefaults(Policies policies, String namespace, String namespacePath) {
if (policies.backlog_quota_map.isEmpty()) {
Policies.setStorageQuota(policies, namespaceBacklogQuota(namespace, namespacePath));
}

final ServiceConfiguration config = pulsar().getConfiguration();

if (policies.max_consumers_per_subscription < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2530,6 +2530,28 @@ protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative)
return offlineTopicStats;
}

protected Map<BacklogQuota.BacklogQuotaType, BacklogQuota> internalGetBacklogQuota(boolean applied) {
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaMap = getTopicPolicies(topicName)
.map(TopicPolicies::getBackLogQuotaMap)
.map(map -> {
HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> hashMap = Maps.newHashMap();
map.forEach((key, value) -> hashMap.put(BacklogQuota.BacklogQuotaType.valueOf(key), value));
return hashMap;
}).orElse(Maps.newHashMap());
if (applied && quotaMap.isEmpty()) {
quotaMap = getNamespacePolicies(namespaceName).backlog_quota_map;
if (quotaMap.isEmpty()) {
String namespace = namespaceName.toString();
quotaMap.put(
BacklogQuota.BacklogQuotaType.destination_storage,
namespaceBacklogQuota(namespace, AdminResource.path(POLICIES, namespace))
);

}
}
return quotaMap;
}

protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
if (backlogQuotaType == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@

import static org.apache.pulsar.common.util.Codec.decode;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Maps;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1463,19 +1461,11 @@ public PersistentOfflineTopicStats getBacklog(
message = "Topic level policy is disabled, to enable the topic level policy and retry")})
public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
return getTopicPolicies(topicName)
.map(TopicPolicies::getBackLogQuotaMap)
.map(map -> {
HashMap<BacklogQuotaType, BacklogQuota> hashMap = Maps.newHashMap();
map.forEach((key, value) -> {
hashMap.put(BacklogQuotaType.valueOf(key), value);
});
return hashMap;
})
.orElse(Maps.newHashMap());
return internalGetBacklogQuota(applied);
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,6 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
policies.auth_policies.namespace_auth.put("spiffe://developer/passport-role", EnumSet.allOf(AuthAction.class));
policies.auth_policies.namespace_auth.put("my-role", EnumSet.allOf(AuthAction.class));

// set default quotas on namespace
Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf));
policies.topicDispatchRate.put("test", ConfigHelper.topicDispatchRate(conf));
policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf));
policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf));
Expand Down Expand Up @@ -1677,11 +1675,11 @@ private List<MessageId> publishMessagesOnPersistentTopic(String topicName, int m
@Test
public void backlogQuotas() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1"),
ConfigHelper.backlogQuotaMap(conf));
Maps.newHashMap());

Map<BacklogQuotaType, BacklogQuota> quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1");
assertEquals(quotaMap.size(), 1);
assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), ConfigHelper.backlogQuota(conf));
assertEquals(quotaMap.size(), 0);
assertNull(quotaMap.get(BacklogQuotaType.destination_storage));

admin.namespaces().setBacklogQuota("prop-xyz/ns1",
new BacklogQuota(1 * 1024 * 1024, RetentionPolicy.producer_exception));
Expand All @@ -1693,8 +1691,8 @@ public void backlogQuotas() throws Exception {
admin.namespaces().removeBacklogQuota("prop-xyz/ns1");

quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1");
assertEquals(quotaMap.size(), 1);
assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), ConfigHelper.backlogQuota(conf));
assertEquals(quotaMap.size(), 0);
assertNull(quotaMap.get(BacklogQuotaType.destination_storage));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
package org.apache.pulsar.broker.admin;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
Expand Down Expand Up @@ -61,6 +63,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -210,6 +213,38 @@ public void testCheckBacklogQuota() throws Exception {
admin.topics().deletePartitionedTopic(testTopic, true);
}

@Test(timeOut = 20000)
public void testGetBacklogQuotaApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
assertEquals(admin.topics().getBacklogQuotaMap(topic), Maps.newHashMap());
assertEquals(admin.namespaces().getBacklogQuotaMap(myNamespace), Maps.newHashMap());
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> brokerQuotaMap = ConfigHelper.backlogQuotaMap(conf);
assertEquals(admin.topics().getBacklogQuotaMap(topic, true), brokerQuotaMap);
BacklogQuota namespaceQuota = new BacklogQuota(30L, BacklogQuota.RetentionPolicy.producer_exception);

admin.namespaces().setBacklogQuota(myNamespace, namespaceQuota);
Awaitility.await().untilAsserted(() -> assertFalse(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty()));
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> namespaceQuotaMap = Maps.newHashMap();
namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, namespaceQuota);
assertEquals(admin.topics().getBacklogQuotaMap(topic, true), namespaceQuotaMap);

BacklogQuota topicQuota = new BacklogQuota(40L, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
admin.topics().setBacklogQuota(topic, topicQuota);
Awaitility.await().untilAsserted(() -> assertFalse(admin.topics().getBacklogQuotaMap(topic).isEmpty()));
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> topicQuotaMap = Maps.newHashMap();
topicQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, topicQuota);
assertEquals(admin.topics().getBacklogQuotaMap(topic, true), topicQuotaMap);

admin.namespaces().removeBacklogQuota(myNamespace);
admin.topics().removeBacklogQuota(topic);
Awaitility.await().untilAsserted(() -> assertTrue(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty()));
Awaitility.await().untilAsserted(() -> assertTrue(admin.topics().getBacklogQuotaMap(topic).isEmpty()));
assertEquals(admin.topics().getBacklogQuotaMap(topic, true), brokerQuotaMap);
}

@Test
public void testCheckBacklogQuotaFailed() throws Exception {
RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,6 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
policies.bundles = Policies.defaultBundle();
policies.auth_policies.namespace_auth.put("my-role", EnumSet.allOf(AuthAction.class));

// set default quotas on namespace
Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf));
policies.topicDispatchRate.put("test", ConfigHelper.topicDispatchRate(conf));
policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf));
policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf));
Expand Down Expand Up @@ -1351,11 +1349,11 @@ private void publishMessagesOnPersistentTopic(String topicName, int messages, in
@Test
public void backlogQuotas() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop-xyz/use/ns1"),
ConfigHelper.backlogQuotaMap(conf));
Maps.newHashMap());

Map<BacklogQuotaType, BacklogQuota> quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/use/ns1");
assertEquals(quotaMap.size(), 1);
assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), ConfigHelper.backlogQuota(conf));
assertEquals(quotaMap.size(), 0);
assertNull(quotaMap.get(BacklogQuotaType.destination_storage));

admin.namespaces().setBacklogQuota("prop-xyz/use/ns1",
new BacklogQuota(1 * 1024 * 1024, RetentionPolicy.producer_exception));
Expand All @@ -1367,8 +1365,8 @@ public void backlogQuotas() throws Exception {
admin.namespaces().removeBacklogQuota("prop-xyz/use/ns1");

quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/use/ns1");
assertEquals(quotaMap.size(), 1);
assertEquals(quotaMap.get(BacklogQuotaType.destination_storage), ConfigHelper.backlogQuota(conf));
assertEquals(quotaMap.size(), 0);
assertNull(quotaMap.get(BacklogQuotaType.destination_storage));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.beust.jcommander.internal.Maps;
import com.google.common.collect.Sets;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down Expand Up @@ -133,7 +133,7 @@ private void rolloverStats() {
@Test
public void testBacklogQuotaWithReader() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
Maps.newHashMap());
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) {
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testBacklogQuotaWithReader() throws Exception {
@Test
public void testTriggerBacklogQuotaWithReader() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
Maps.newHashMap());
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) {
Expand Down Expand Up @@ -263,7 +263,7 @@ public void testTriggerBacklogQuotaWithReader() throws Exception {
@Test
public void testConsumerBacklogEviction() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
Maps.newHashMap());
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
Expand Down Expand Up @@ -295,7 +295,7 @@ public void testConsumerBacklogEviction() throws Exception {
@Test
public void testConsumerBacklogEvictionWithAck() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
Maps.newHashMap());
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build();
Expand Down Expand Up @@ -327,7 +327,7 @@ public void testConsumerBacklogEvictionWithAck() throws Exception {
@Test
public void testConcurrentAckAndEviction() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
Maps.newHashMap());
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));

Expand Down Expand Up @@ -400,7 +400,7 @@ public void run() {
@Test
public void testNoEviction() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
Maps.newHashMap());
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));

Expand Down Expand Up @@ -466,7 +466,7 @@ public void run() {
@Test
public void testEvictionMulti() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
ConfigHelper.backlogQuotaMap(config));
Maps.newHashMap());
admin.namespaces().setBacklogQuota("prop/ns-quota",
new BacklogQuota(15 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));

Expand Down Expand Up @@ -575,7 +575,7 @@ public void run() {
@Test
public void testAheadProducerOnHold() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
ConfigHelper.backlogQuotaMap(config));
Maps.newHashMap());
admin.namespaces().setBacklogQuota("prop/quotahold",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_request_hold));
final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
Expand Down Expand Up @@ -614,7 +614,7 @@ public void testAheadProducerOnHold() throws Exception {
@Test
public void testAheadProducerOnHoldTimeout() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
ConfigHelper.backlogQuotaMap(config));
Maps.newHashMap());
admin.namespaces().setBacklogQuota("prop/quotahold",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_request_hold));
final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
Expand Down Expand Up @@ -649,7 +649,7 @@ public void testAheadProducerOnHoldTimeout() throws Exception {
@Test
public void testProducerException() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
ConfigHelper.backlogQuotaMap(config));
Maps.newHashMap());
admin.namespaces().setBacklogQuota("prop/quotahold",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
Expand Down Expand Up @@ -686,7 +686,7 @@ public void testProducerException() throws Exception {
@Test
public void testProducerExceptionAndThenUnblock() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
ConfigHelper.backlogQuotaMap(config));
Maps.newHashMap());
admin.namespaces().setBacklogQuota("prop/quotahold",
new BacklogQuota(10 * 1024, BacklogQuota.RetentionPolicy.producer_exception));
final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,16 @@ void createSubscription(String topic, String subscriptionName, MessageId message
*/
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic) throws PulsarAdminException;

/**
* Get applied backlog quota map for a topic.
* @param topic
* @param applied
* @return
* @throws PulsarAdminException
*/
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic, boolean applied)
throws PulsarAdminException;

/**
* Set a backlog quota for a topic.
* The backlog quota can be set on this resource:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1522,9 +1522,16 @@ public void failed(Throwable throwable) {

@Override
public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic) throws PulsarAdminException {
return getBacklogQuotaMap(topic, false);
}

@Override
public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic, boolean applied)
throws PulsarAdminException {
try {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "backlogQuotaMap");
path = path.queryParam("applied", applied);
return request(path).get(new GenericType<Map<BacklogQuotaType, BacklogQuota>>() {
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,14 @@ public void topics() throws Exception {
cmdTopics.run(split("stats-internal persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getInternalStats("persistent://myprop/clust/ns1/ds1", false);

cmdTopics.run(split("get-backlog-quotas persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getBacklogQuotaMap("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -l 10 -p producer_request_hold"));
verify(mockTopics).setBacklogQuota("persistent://myprop/clust/ns1/ds1"
, new BacklogQuota(10L, BacklogQuota.RetentionPolicy.producer_request_hold));
cmdTopics.run(split("remove-backlog-quota persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeBacklogQuota("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("info-internal persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getInternalInfo("persistent://myprop/clust/ns1/ds1");

Expand Down
Loading

0 comments on commit c2ebf1b

Please sign in to comment.