Skip to content

Commit

Permalink
Publish rate support cross multiple clusters (apache#13496)
Browse files Browse the repository at this point in the history
Publish rate support cross multiple clusters (apache#13496)
  • Loading branch information
Jason918 authored Dec 29, 2021
1 parent 978bb7c commit b5fbeb4
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4251,19 +4251,20 @@ protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
});
}

protected CompletableFuture<Optional<PublishRate>> internalGetPublishRate() {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Optional<PublishRate>> internalGetPublishRate(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getPublishRate));
}

protected CompletableFuture<Void> internalSetPublishRate(PublishRate publishRate) {
protected CompletableFuture<Void> internalSetPublishRate(PublishRate publishRate, boolean isGlobal) {
if (publishRate == null) {
return CompletableFuture.completedFuture(null);
}
return getTopicPoliciesAsyncWithRetry(topicName)
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setPublishRate(publishRate);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
Expand Down Expand Up @@ -4298,13 +4299,14 @@ protected CompletableFuture<Void> internalRemoveSubscriptionTypesEnabled(boolean
});
}

protected CompletableFuture<Void> internalRemovePublishRate() {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Void> internalRemovePublishRate(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setPublishRate(null);
op.get().setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3164,11 +3164,12 @@ public void getPublishRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetPublishRate())
.thenCompose(__ -> internalGetPublishRate(isGlobal))
.thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
: Response.noContent().build()))
.exceptionally(ex -> {
Expand All @@ -3189,20 +3190,22 @@ public void setPublishRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Dispatch rate for the specified topic") PublishRate publishRate) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetPublishRate(publishRate))
.thenCompose(__ -> internalSetPublishRate(publishRate, isGlobal))
.thenRun(() -> {
try {
log.info("[{}] Successfully set topic publish rate:"
+ " tenant={}, namespace={}, topic={}, publishRate={}",
+ " tenant={}, namespace={}, topic={}, isGlobal={}, publishRate={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
isGlobal,
jsonMapper().writeValueAsString(publishRate));
} catch (JsonProcessingException ignore) {}
asyncResponse.resume(Response.noContent().build());
Expand All @@ -3225,17 +3228,19 @@ public void removePublishRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalRemovePublishRate())
.thenCompose(__ -> internalRemovePublishRate(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}",
log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}, isGlobal={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName());
topicName.getLocalName(),
isGlobal);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.UUID;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
Expand All @@ -36,6 +37,7 @@
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -112,6 +114,34 @@ public void testReplicateMessageTTLPolicies() throws Exception {
assertNull(admin3.topicPolicies(true).getMessageTTL(topic)));
}

@Test
public void testReplicatePublishRatePolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
init(namespace, topic);
// set global topic policy
PublishRate publishRate = new PublishRate(100, 10000);
admin1.topicPolicies(true).setPublishRate(topic, publishRate);

// get global topic policy
untilRemoteClustersAsserted(
admin -> assertEquals(admin.topicPolicies(true).getPublishRate(topic), publishRate));

// remove global topic policy
admin1.topicPolicies(true).removePublishRate(topic);
untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies(true).getPublishRate(topic)));
}

private void untilRemoteClustersAsserted(ThrowingConsumer<PulsarAdmin> condition) {
Awaitility.await().untilAsserted(() -> condition.apply(admin2));
Awaitility.await().untilAsserted(() -> condition.apply(admin3));
}

private interface ThrowingConsumer<I> {
void apply(I input) throws Throwable;
}


@Test
public void testReplicatePersistentPolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,21 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import java.io.File;
import java.lang.reflect.Field;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.admin.cli.utils.SchemaExtractor;
import org.apache.pulsar.client.admin.Bookies;
import org.apache.pulsar.client.admin.BrokerStats;
Expand Down Expand Up @@ -942,6 +938,13 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-publish-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getPublishRate("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-publish-rate persistent://myprop/clust/ns1/ds1 -m 10 -b 100"));
verify(mockTopicsPolicies).setPublishRate("persistent://myprop/clust/ns1/ds1", new PublishRate(10, 100));
cmdTopics.run(split("remove-publish-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removePublishRate("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-max-consumers persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getMaxConsumers("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-max-consumers persistent://myprop/clust/ns1/ds1"));
Expand Down Expand Up @@ -1016,11 +1019,11 @@ public void topicPolicies() throws Exception {
verify(mockGlobalTopicsPolicies).setMaxProducers("persistent://myprop/clust/ns1/ds1", 99);

cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1 -g"));
verify(mockTopicsPolicies).getMessageTTL("persistent://myprop/clust/ns1/ds1", false);
verify(mockGlobalTopicsPolicies).getMessageTTL("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10 -g"));
verify(mockTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1", 10);
verify(mockGlobalTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1", 10);
cmdTopics.run(split("remove-message-ttl persistent://myprop/clust/ns1/ds1 -g"));
verify(mockTopicsPolicies).removeMessageTTL("persistent://myprop/clust/ns1/ds1");
verify(mockGlobalTopicsPolicies).removeMessageTTL("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
Expand Down Expand Up @@ -1052,6 +1055,13 @@ public void topicPolicies() throws Exception {
.ratePeriodInSecond(2)
.build());

cmdTopics.run(split("get-publish-rate persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getPublishRate("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-publish-rate persistent://myprop/clust/ns1/ds1 -m 10 -b 100 -g"));
verify(mockGlobalTopicsPolicies).setPublishRate("persistent://myprop/clust/ns1/ds1", new PublishRate(10, 100));
cmdTopics.run(split("remove-publish-rate persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removePublishRate("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable -g"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.RelativeTimeUtil;

Expand Down Expand Up @@ -74,6 +75,10 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("set-persistence", new SetPersistence());
jcommander.addCommand("remove-persistence", new RemovePersistence());

jcommander.addCommand("get-publish-rate", new GetPublishRate());
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("remove-publish-rate", new RemovePublishRate());

jcommander.addCommand("get-max-consumers", new GetMaxConsumers());
jcommander.addCommand("set-max-consumers", new SetMaxConsumers());
jcommander.addCommand("remove-max-consumers", new RemoveMaxConsumers());
Expand Down Expand Up @@ -601,6 +606,61 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get publish rate for a topic")
private class GetPublishRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. "
+ "If set to true, broker returns global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getPublishRate(persistentTopic));
}
}

@Parameters(commandDescription = "Set publish rate for a topic")
private class SetPublishRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"--msg-publish-rate", "-m"}, description = "message-publish-rate (default -1 will be "
+ "overwrite if not passed)", required = false)
private int msgPublishRate = -1;

@Parameter(names = {"--byte-publish-rate", "-b"}, description = "byte-publish-rate "
+ "(default -1 will be overwrite if not passed)", required = false)
private long bytePublishRate = -1;

@Parameter(names = {"--global", "-g"}, description = "Whether to set this policy globally.")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setPublishRate(persistentTopic,
new PublishRate(msgPublishRate, bytePublishRate));
}
}

@Parameters(commandDescription = "Remove publish rate for a topic")
private class RemovePublishRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"--global", "-g"}, description = "Whether to remove this policy globally. ")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removePublishRate(persistentTopic);
}
}

@Parameters(commandDescription = "Get the persistence policies for a topic")
private class GetPersistence extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ private void initDeprecatedCommands() {
cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-on-subscription");
cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-on-subscription");

cmdUsageFormatter.addDeprecatedCommand("get-publish-rate");
cmdUsageFormatter.addDeprecatedCommand("set-publish-rate");
cmdUsageFormatter.addDeprecatedCommand("remove-publish-rate");

cmdUsageFormatter.addDeprecatedCommand("get-maxProducers");
cmdUsageFormatter.addDeprecatedCommand("set-maxProducers");
cmdUsageFormatter.addDeprecatedCommand("remove-maxProducers");
Expand Down

0 comments on commit b5fbeb4

Please sign in to comment.