Skip to content

Commit

Permalink
Enable users to specify some time options in units other than seconds (
Browse files Browse the repository at this point in the history
…apache#15563)

### Motivation

In the `pulsar-admin` command, enable users to specify values for some time-related options not only in seconds, but also in minutes, hours, days, and so on.

e.g.
```sh
# Expire messages older than 3 hours
$ ./bin/pulsar-admin topics expire-messages -s sub -t 3h persistent://foo/bar/baz
```
  • Loading branch information
Masahiro Sakamoto authored May 14, 2022
1 parent 1c17f6d commit 52cae2f
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,19 @@ public void namespaces() throws Exception {
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);

namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p consumer_backlog_eviction -l 10K -lt 10m"));
verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
BacklogQuota.builder()
.limitSize(10 * 1024)
.limitTime(10 * 60)
.retentionPolicy(RetentionPolicy.consumer_backlog_eviction)
.build(),
BacklogQuota.BacklogQuotaType.destination_storage);

mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);

namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -lt 10000 -t message_age"));
verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
BacklogQuota.builder()
Expand Down Expand Up @@ -1094,6 +1107,16 @@ public void topicPolicies() throws Exception {
BacklogQuota.BacklogQuotaType.destination_storage);
//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1w -p consumer_backlog_eviction"));
verify(mockTopicsPolicies).setBacklogQuota("persistent://myprop/clust/ns1/ds1",
BacklogQuota.builder()
.limitSize(-1)
.limitTime(60 * 60 * 24 * 7)
.retentionPolicy(RetentionPolicy.consumer_backlog_eviction)
.build(),
BacklogQuota.BacklogQuotaType.destination_storage);
//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1000 -p producer_request_hold -t message_age"));
verify(mockTopicsPolicies).setBacklogQuota("persistent://myprop/clust/ns1/ds1",
BacklogQuota.builder()
Expand Down Expand Up @@ -1354,6 +1377,16 @@ public void topics() throws Exception {
BacklogQuota.BacklogQuotaType.destination_storage);
//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 5h -p consumer_backlog_eviction"));
verify(mockTopics).setBacklogQuota("persistent://myprop/clust/ns1/ds1",
BacklogQuota.builder()
.limitSize(-1)
.limitTime(5 * 60 * 60)
.retentionPolicy(RetentionPolicy.consumer_backlog_eviction)
.build(),
BacklogQuota.BacklogQuotaType.destination_storage);
//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1000 -p producer_request_hold -t message_age"));
verify(mockTopics).setBacklogQuota("persistent://myprop/clust/ns1/ds1",
BacklogQuota.builder()
Expand Down Expand Up @@ -1389,6 +1422,9 @@ public void topics() throws Exception {
cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -t 100"));
verify(mockTopics).expireMessages("persistent://myprop/clust/ns1/ds1", "sub1", 100);

cmdTopics.run(split("expire-messages-all-subscriptions persistent://myprop/clust/ns1/ds1 -t 100"));
verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100);

cmdTopics.run(split("get-subscribe-rate persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getSubscribeRate("persistent://myprop/clust/ns1/ds1", true);

Expand All @@ -1406,8 +1442,8 @@ public void topics() throws Exception {
cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -p 1:1 -e"));
verify(mockTopics).expireMessages(eq("persistent://myprop/clust/ns1/ds1"), eq("sub1"), eq(new MessageIdImpl(1, 1, -1)), eq(true));

cmdTopics.run(split("expire-messages-all-subscriptions persistent://myprop/clust/ns1/ds1 -t 100"));
verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100);
cmdTopics.run(split("expire-messages-all-subscriptions persistent://myprop/clust/ns1/ds1 -t 1d"));
verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 60 * 60 * 24);

cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest"));
verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false);
Expand Down Expand Up @@ -1718,6 +1754,9 @@ public boolean matches(Long timestamp) {
cmdTopics.run(split("remove-maxProducers persistent://myprop/clust/ns1/ds2"));
verify(mockTopics).removeMaxProducers("persistent://myprop/clust/ns1/ds2");

cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 30m"));
verify(mockTopics).setMessageTTL("persistent://myprop/clust/ns1/ds1", 30 * 60);

cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getMessageTTL("persistent://myprop/clust/ns1/ds1", true);

Expand Down Expand Up @@ -1824,6 +1863,15 @@ public void persistentTopics() throws Exception {
topics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3"));
verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3);

// cmd with option cannot be executed repeatedly
topics = new CmdPersistentTopics(() -> admin);

topics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -t 2h"));
verify(mockTopics).expireMessages("persistent://myprop/clust/ns1/ds1", "sub1", 2 * 60 * 60);

topics.run(split("expire-messages-all-subscriptions persistent://myprop/clust/ns1/ds1 -t 3d"));
verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 3 * 60 * 60 * 24);

// argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a
// range of +/- 1 second of the expected timestamp
class TimestampMatcher implements ArgumentMatcher<Long> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1233,8 +1233,9 @@ private class SetBacklogQuota extends CliCommand {
private String limitStr;

@Parameter(names = { "-lt", "--limitTime" },
description = "Time limit in second, non-positive number for disabling time limit.")
private int limitTime = -1;
description = "Time limit in second (or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w), "
+ "non-positive number for disabling time limit.")
private String limitTimeStr = null;

@Parameter(names = { "-p", "--policy" }, description = "Retention policy to enforce when the limit is reached. "
+ "Valid options are: [producer_request_hold, producer_exception, consumer_backlog_eviction]",
Expand Down Expand Up @@ -1268,10 +1269,23 @@ void run() throws PulsarAdminException {
backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values())));
}

long limitTimeInSec = -1;
if (limitTimeStr != null) {
try {
limitTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(limitTimeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(e.getMessage());
}
}
if (limitTimeInSec > Integer.MAX_VALUE) {
throw new ParameterException(
String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
}

String namespace = validateNamespace(params);
getAdmin().namespaces().setBacklogQuota(namespace,
BacklogQuota.builder().limitSize(limit)
.limitTime(limitTime)
.limitTime((int) limitTimeInSec)
.retentionPolicy(policy)
.build(),
backlogQuotaType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,12 +454,18 @@ private class ExpireMessages extends CliCommand {
"--subscription" }, description = "Subscription to be skip messages on", required = true)
private String subName;

@Parameter(names = { "-t", "--expireTime" },
description = "Expire messages older than time in seconds", required = true)
private long expireTimeInSeconds;
@Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds "
+ "(or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w)", required = true)
private String expireTimeStr;

@Override
void run() throws PulsarAdminException {
long expireTimeInSeconds;
try {
expireTimeInSeconds = RelativeTimeUtil.parseRelativeTimeInSeconds(expireTimeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(e.getMessage());
}
String persistentTopic = validatePersistentTopic(params);
getPersistentTopics().expireMessages(persistentTopic, subName, expireTimeInSeconds);
}
Expand All @@ -471,12 +477,18 @@ private class ExpireMessagesForAllSubscriptions extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-t", "--expireTime" },
description = "Expire messages older than time in seconds", required = true)
private long expireTimeInSeconds;
@Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds "
+ "(or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w)", required = true)
private String expireTimeStr;

@Override
void run() throws PulsarAdminException {
long expireTimeInSeconds;
try {
expireTimeInSeconds = RelativeTimeUtil.parseRelativeTimeInSeconds(expireTimeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(e.getMessage());
}
String persistentTopic = validatePersistentTopic(params);
getPersistentTopics().expireMessagesForAllSubscriptions(persistentTopic, expireTimeInSeconds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,9 @@ private class SetBacklogQuota extends CliCommand {
private String limitStr = "-1";

@Parameter(names = { "-lt", "--limitTime" },
description = "Time limit in second, non-positive number for disabling time limit.")
private int limitTime = -1;
description = "Time limit in second (or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w), "
+ "non-positive number for disabling time limit.")
private String limitTimeStr = null;

@Parameter(names = { "-p", "--policy" }, description = "Retention policy to enforce when the limit is reached. "
+ "Valid options are: [producer_request_hold, producer_exception, consumer_backlog_eviction]",
Expand Down Expand Up @@ -924,10 +925,23 @@ void run() throws PulsarAdminException {
backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values())));
}

long limitTimeInSec = -1;
if (limitTimeStr != null) {
try {
limitTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(limitTimeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(e.getMessage());
}
}
if (limitTimeInSec > Integer.MAX_VALUE) {
throw new ParameterException(
String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
}

String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setBacklogQuota(persistentTopic,
BacklogQuota.builder().limitSize(limit)
.limitTime(limitTime)
.limitTime((int) limitTimeInSec)
.retentionPolicy(policy)
.build(),
backlogQuotaType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,9 @@ private class ExpireMessages extends CliCommand {
"--subscription" }, description = "Subscription to be skip messages on", required = true)
private String subName;

@Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds")
private long expireTimeInSeconds = -1;
@Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds "
+ "(or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w)")
private String expireTimeStr = null;

@Parameter(names = { "--position",
"-p" }, description = "message position to reset back to (ledgerId:entryId)", required = false)
Expand All @@ -896,6 +897,15 @@ private class ExpireMessages extends CliCommand {

@Override
void run() throws PulsarAdminException {
long expireTimeInSeconds = -1;
if (expireTimeStr != null) {
try {
expireTimeInSeconds = RelativeTimeUtil.parseRelativeTimeInSeconds(expireTimeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(e.getMessage());
}
}

if (expireTimeInSeconds >= 0 && isNotBlank(messagePosition)) {
throw new ParameterException(String.format("Can't expire message by time and "
+ "by message position at the same time."));
Expand All @@ -921,12 +931,18 @@ private class ExpireMessagesForAllSubscriptions extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-t",
"--expireTime" }, description = "Expire messages older than time in seconds", required = true)
private long expireTimeInSeconds;
@Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds "
+ "(or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w)", required = true)
private String expireTimeStr;

@Override
void run() throws PulsarAdminException {
long expireTimeInSeconds;
try {
expireTimeInSeconds = RelativeTimeUtil.parseRelativeTimeInSeconds(expireTimeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(e.getMessage());
}
String topic = validateTopicName(params);
getTopics().expireMessagesForAllSubscriptions(topic, expireTimeInSeconds);
}
Expand Down Expand Up @@ -1440,8 +1456,9 @@ private class SetBacklogQuota extends CliCommand {
private String limitStr = "-1";

@Parameter(names = { "-lt", "--limitTime" },
description = "Time limit in second, non-positive number for disabling time limit.")
private int limitTime = -1;
description = "Time limit in second (or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w), "
+ "non-positive number for disabling time limit.")
private String limitTimeStr = null;

@Parameter(names = { "-p", "--policy" },
description = "Retention policy to enforce when the limit is reached. Valid options are: "
Expand Down Expand Up @@ -1477,10 +1494,23 @@ void run() throws PulsarAdminException {
backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values())));
}

long limitTimeInSec = -1;
if (limitTimeStr != null) {
try {
limitTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(limitTimeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(e.getMessage());
}
}
if (limitTimeInSec > Integer.MAX_VALUE) {
throw new ParameterException(
String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
}

String persistentTopic = validatePersistentTopic(params);
getTopics().setBacklogQuota(persistentTopic,
BacklogQuota.builder().limitSize(limit)
.limitTime(limitTime)
.limitTime((int) limitTimeInSec)
.retentionPolicy(policy)
.build(),
backlogQuotaType);
Expand Down Expand Up @@ -1632,18 +1662,26 @@ private class SetMessageTTL extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-t", "--ttl" }, description = "Message TTL for topic in second, "
+ "allowed range from 1 to Integer.MAX_VALUE", required = true)
private int messageTTLInSecond;
@Parameter(names = { "-t", "--ttl" }, description = "Message TTL for topic in second "
+ "(or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w), "
+ "allowed range from 1 to Integer.MAX_VALUE", required = true)
private String messageTTLStr;

@Override
void run() throws PulsarAdminException {
if (messageTTLInSecond < 0) {
long messageTTLInSecond;
try {
messageTTLInSecond = RelativeTimeUtil.parseRelativeTimeInSeconds(messageTTLStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(e.getMessage());
}

if (messageTTLInSecond < 0 || messageTTLInSecond > Integer.MAX_VALUE) {
throw new ParameterException(String.format("Invalid retention policy type '%d'. ", messageTTLInSecond));
}

String persistentTopic = validatePersistentTopic(params);
getTopics().setMessageTTL(persistentTopic, messageTTLInSecond);
getTopics().setMessageTTL(persistentTopic, (int) messageTTLInSecond);
}
}

Expand Down
Loading

0 comments on commit 52cae2f

Please sign in to comment.