Skip to content

Commit

Permalink
Delayed delivery policy support cross multiple clusters (apache#13550)
Browse files Browse the repository at this point in the history
Delayed delivery policy support cross multiple clusters
  • Loading branch information
315157973 authored Dec 30, 2021
1 parent 4f942d7 commit a02356f
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,12 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
});
}

protected CompletableFuture<Void> internalSetDelayedDeliveryPolicies(DelayedDeliveryPolicies deliveryPolicies) {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Void> internalSetDelayedDeliveryPolicies(DelayedDeliveryPolicies deliveryPolicies,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setIsGlobal(isGlobal);
topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
topicPolicies.setDelayedDeliveryTickTimeMillis(
deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
Expand Down Expand Up @@ -778,8 +780,9 @@ protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authorit
}
}

protected CompletableFuture<DelayedDeliveryPolicies> internalGetDelayedDeliveryPolicies(boolean applied) {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<DelayedDeliveryPolicies> internalGetDelayedDeliveryPolicies(boolean applied,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> {
TopicPolicies policies = op.orElseGet(TopicPolicies::new);
DelayedDeliveryPolicies delayedDeliveryPolicies = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,12 +632,13 @@ public void getDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncRespo
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("applied") boolean applied,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetDelayedDeliveryPolicies(applied))
.thenCompose(__ -> internalGetDelayedDeliveryPolicies(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getDelayedDeliveryPolicies", ex, asyncResponse);
Expand All @@ -655,6 +656,7 @@ public void setDelayedDeliveryPolicies(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Delayed delivery policies for the specified topic")
Expand All @@ -663,7 +665,7 @@ public void setDelayedDeliveryPolicies(
validatePoliciesReadOnlyAccess();
validateTopicPolicyOperation(topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE);
preValidation(authoritative)
.thenCompose(__ -> internalSetDelayedDeliveryPolicies(deliveryPolicies))
.thenCompose(__ -> internalSetDelayedDeliveryPolicies(deliveryPolicies, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setDelayedDeliveryPolicies", ex, asyncResponse);
Expand All @@ -682,13 +684,14 @@ public void deleteDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncRe
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
validatePoliciesReadOnlyAccess();
validateTopicPolicyOperation(topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE);
preValidation(authoritative)
.thenCompose(__ -> internalSetDelayedDeliveryPolicies(null))
.thenCompose(__ -> internalSetDelayedDeliveryPolicies(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("deleteDelayedDeliveryPolicies", ex, asyncResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
Expand Down Expand Up @@ -366,6 +367,26 @@ public void testReplicatorMessageDispatchRatePolicies() throws Exception {
assertNull(admin3.topicPolicies(true).getDispatchRate(persistentTopicName)));
}

@Test
public void testReplicateDelayedDelivery() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
init(namespace, topic);
DelayedDeliveryPolicies policies = DelayedDeliveryPolicies.builder().active(true).tickTime(10000L).build();
// set delayed delivery
admin1.topicPolicies(true).setDelayedDeliveryPolicy(topic, policies);
Awaitility.await().ignoreExceptions().untilAsserted(() ->
assertEquals(admin2.topicPolicies(true).getDelayedDeliveryPolicy(topic), policies));
Awaitility.await().ignoreExceptions().untilAsserted(() ->
assertEquals(admin3.topicPolicies(true).getDelayedDeliveryPolicy(topic), policies));
// remove delayed delivery
admin1.topicPolicies(true).removeDelayedDeliveryPolicy(topic);
Awaitility.await().untilAsserted(() ->
assertNull(admin2.topicPolicies(true).getDelayedDeliveryPolicy(topic)));
Awaitility.await().untilAsserted(() ->
assertNull(admin3.topicPolicies(true).getDelayedDeliveryPolicy(topic)));
}

@Test
public void testReplicatorInactiveTopicPolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,14 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("set-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -m 99"));
verify(mockTopicsPolicies, times(1)).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99);

cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable"));
verify(mockTopicsPolicies).setDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1",
DelayedDeliveryPolicies.builder().tickTime(10000).active(true).build());
cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ;

cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable"));
Expand Down Expand Up @@ -1117,6 +1125,14 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-subscribe-rate persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeSubscribeRate("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable -g"));
verify(mockGlobalTopicsPolicies).setDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1",
DelayedDeliveryPolicies.builder().tickTime(10000).active(true).build());
cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ;

cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable -g"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.TopicPolicies;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.client.api.SubscriptionType;
Expand Down Expand Up @@ -97,6 +98,10 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("set-max-consumers", new SetMaxConsumers());
jcommander.addCommand("remove-max-consumers", new RemoveMaxConsumers());

jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery());
jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery());
jcommander.addCommand("remove-delayed-delivery", new RemoveDelayedDelivery());

jcommander.addCommand("get-dispatch-rate", new GetDispatchRate());
jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
Expand Down Expand Up @@ -561,6 +566,82 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get the delayed delivery policy for a topic")
private class GetDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;

@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal;

@Override
void run() throws PulsarAdminException {
String topicName = validateTopicName(params);
print(getTopicPolicies(isGlobal).getDelayedDeliveryPolicy(topicName, applied));
}
}

@Parameters(commandDescription = "Set the delayed delivery policy on a topic")
private class SetDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Parameter(names = { "--enable", "-e" }, description = "Enable delayed delivery messages")
private boolean enable = false;

@Parameter(names = { "--disable", "-d" }, description = "Disable delayed delivery messages")
private boolean disable = false;

@Parameter(names = { "--time", "-t" }, description = "The tick time for when retrying on delayed delivery messages, " +
"affecting the accuracy of the delivery time compared to the scheduled time. (eg: 1s, 10s, 1m, 5h, 3d)")
private String delayedDeliveryTimeStr = "1s";

@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal;

@Override
void run() throws PulsarAdminException {
String topicName = validateTopicName(params);
long delayedDeliveryTimeInMills;
try {
delayedDeliveryTimeInMills = TimeUnit.SECONDS.toMillis(
RelativeTimeUtil.parseRelativeTimeInSeconds(delayedDeliveryTimeStr));
} catch (IllegalArgumentException exception) {
throw new ParameterException(exception.getMessage());
}

if (enable == disable) {
throw new ParameterException("Need to specify either --enable or --disable");
}

getTopicPolicies(isGlobal).setDelayedDeliveryPolicy(topicName, DelayedDeliveryPolicies.builder()
.tickTime(delayedDeliveryTimeInMills)
.active(enable)
.build());
}
}

@Parameters(commandDescription = "Remove the delayed delivery policy on a topic")
private class RemoveDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal;

@Override
void run() throws PulsarAdminException {
String topicName = validateTopicName(params);
getTopicPolicies(isGlobal).removeDelayedDeliveryPolicy(topicName);
}
}

@Parameters(commandDescription = "Remove max number of producers for a topic")
private class RemoveMaxProducers extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ private void initDeprecatedCommands() {
cmdUsageFormatter.addDeprecatedCommand("get-subscription-types-enabled");
cmdUsageFormatter.addDeprecatedCommand("remove-subscription-types-enabled");

cmdUsageFormatter.addDeprecatedCommand("get-delayed-delivery");
cmdUsageFormatter.addDeprecatedCommand("set-delayed-delivery");
cmdUsageFormatter.addDeprecatedCommand("remove-delayed-delivery");

cmdUsageFormatter.addDeprecatedCommand("get-max-producers");
cmdUsageFormatter.addDeprecatedCommand("set-max-producers");
cmdUsageFormatter.addDeprecatedCommand("remove-max-producers");
Expand Down

0 comments on commit a02356f

Please sign in to comment.