Skip to content

Commit

Permalink
Expire message by position. (apache#9514)
Browse files Browse the repository at this point in the history
Fixes apache#2736

### Motivation
Add ability to expire message for subscription by position to admin client and admin rest API.

### Modifications
Update PersistentMessageExpiryMonitor to able to expire message by position and expose to admin client and admin rest api.

### Verifying this change
This change added tests and can be verified as follows:
 - Added unit test for expire message by position in PersistentMessageFinderTest.
 - Added test for expire message by position in admin client/admin rest api.
  • Loading branch information
MarvinCai authored Feb 10, 2021
1 parent 1f1b96d commit 301d42c
Show file tree
Hide file tree
Showing 17 changed files with 567 additions and 114 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,46 @@ public void expireTopicMessages(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalExpireMessages(asyncResponse, decode(encodedSubName), expireTimeInSeconds, authoritative);
internalExpireMessagesByTimestamp(asyncResponse, decode(encodedSubName),
expireTimeInSeconds, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/expireMessages")
@ApiOperation(value = "Expiry messages on a topic subscription.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void expireTopicMessages(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription to be Expiry messages on")
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
ResetCursorData resetCursorData) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalExpireMessagesByPosition(asyncResponse, decode(encodedSubName), authoritative,
new MessageIdImpl(resetCursorData.getLedgerId(),
resetCursorData.getEntryId(), resetCursorData.getPartitionIndex())
, resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,47 @@ public void expireTopicMessages(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalExpireMessages(asyncResponse, decode(encodedSubName), expireTimeInSeconds, authoritative);
internalExpireMessagesByTimestamp(asyncResponse, decode(encodedSubName),
expireTimeInSeconds, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/expireMessages")
@ApiOperation(value = "Expiry messages on a topic subscription.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void expireTopicMessages(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription to be Expiry messages on")
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
ResetCursorData resetCursorData) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalExpireMessagesByPosition(asyncResponse, decode(encodedSubName), authoritative,
new MessageIdImpl(resetCursorData.getLedgerId(),
resetCursorData.getEntryId(), resetCursorData.getPartitionIndex())
, resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ default long getNumberOfEntriesDelayed() {

void expireMessages(int messageTTLInSeconds);

void expireMessages(Position position);

void redeliverUnacknowledgedMessages(Consumer consumer);

void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,12 @@ public void expireMessages(int messageTTLInSeconds) {
// No-op
}

@Override
public void expireMessages(Position position) {
throw new UnsupportedOperationException("Expire message by position is not supported for"
+ " non-persistent topic.");
}

public NonPersistentSubscriptionStats getStats() {
NonPersistentSubscriptionStats subStats = new NonPersistentSubscriptionStats();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.stats.Rate;
Expand Down Expand Up @@ -94,6 +95,32 @@ public void expireMessages(int messageTTLInSeconds) {
}
}

public void expireMessages(Position messagePosition) {
// If it's beyond last position of this topic, do nothing.
if (((PositionImpl) subscription.getTopic().getLastPosition()).compareTo((PositionImpl) messagePosition) < 0) {
return;
}
if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) {
log.info("[{}][{}] Starting message expiry check, position= {} seconds", topicName, subName,
messagePosition);

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
try {
// If given position larger than entry position.
return ((PositionImpl) entry.getPosition()).compareTo((PositionImpl) messagePosition) <= 0;
} finally {
entry.release();
}
}, this, null);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignore expire-message scheduled task, last check is still running", topicName,
subName);
}
}
}


public void updateRates() {
msgExpired.calculateRate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,18 @@ public void expireMessages(int messageTTLInSeconds) {
}
}

public void expireMessages(Position position) {
if ((cursor.getNumberOfEntriesInBacklog(false) == 0)
|| (cursor.getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
&& !topic.isOldestMessageExpired(cursor, messageTTLInSeconds))) {
// don't do anything for almost caught-up connected subscriptions
return;
}
if (expiryMonitor != null) {
expiryMonitor.expireMessages(messageTTLInSeconds);
}
}

@Override
public Optional<DispatchRateLimiter> getRateLimiter() {
return dispatchRateLimiter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,11 @@ && getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
expiryMonitor.expireMessages(messageTTLInSeconds);
}

@Override
public void expireMessages(Position position) {
expiryMonitor.expireMessages(position);
}

public double getExpiredMessageRate() {
return expiryMonitor.getMessageExpiryRate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.Compactor;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -1603,16 +1604,17 @@ public void testUnsubscribeOnNamespace(Integer numBundles) throws Exception {
long messageTimestamp = System.currentTimeMillis();
long secondTimestamp = System.currentTimeMillis();

private void publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception {
publishMessagesOnPersistentTopic(topicName, messages, 0, false);
private List<MessageId> publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception {
return publishMessagesOnPersistentTopic(topicName, messages, 0, false);
}

private void publishNullValueMessageOnPersistentTopic(String topicName, int messages) throws Exception {
publishMessagesOnPersistentTopic(topicName, messages, 0, true);
private List<MessageId> publishNullValueMessageOnPersistentTopic(String topicName, int messages) throws Exception {
return publishMessagesOnPersistentTopic(topicName, messages, 0, true);
}

private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx,
private List<MessageId> publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx,
boolean nullValue) throws Exception {
List<MessageId> messageIds = new ArrayList<>();
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
Expand All @@ -1621,14 +1623,15 @@ private void publishMessagesOnPersistentTopic(String topicName, int messages, in

for (int i = startIdx; i < (messages + startIdx); i++) {
if (nullValue) {
producer.send(null);
messageIds.add(producer.send(null));
} else {
String message = "message-" + i;
producer.send(message.getBytes());
messageIds.add(producer.send(message.getBytes()));
}
}

producer.close();
return messageIds;
}

@Test
Expand Down Expand Up @@ -2069,7 +2072,6 @@ class CustomTenantAdmin extends TenantInfo {
*/
@Test
public void testPersistentTopicsExpireMessages() throws Exception {

// Force to create a topic
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 0);
assertEquals(admin.topics().getList("prop-xyz/ns1"),
Expand All @@ -2089,46 +2091,91 @@ public void testPersistentTopicsExpireMessages() throws Exception {

assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2").size(), 3);

publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 10);
List<MessageId> messageIds = publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 10);

TopicStats topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
assertEquals(topicStats.subscriptions.get("my-sub1").msgBacklog, 10);
assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 10);
assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 10);

Thread.sleep(1000); // wait for 1 seconds to expire message
Thread.sleep(1000);
admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", "my-sub1", 1);
Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async

// Wait at most 2 seconds for sub1's message to expire.
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub1").lastMarkDeleteAdvancedTimestamp > 0L));
topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
SubscriptionStats subStats1 = topicStats.subscriptions.get("my-sub1");
assertEquals(subStats1.msgBacklog, 0);
assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L);
SubscriptionStats subStats2 = topicStats.subscriptions.get("my-sub2");
assertEquals(subStats2.msgBacklog, 10);
assertEquals(subStats2.lastMarkDeleteAdvancedTimestamp, 0L);
SubscriptionStats subStats3 = topicStats.subscriptions.get("my-sub3");
assertEquals(subStats3.msgBacklog, 10);
assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L);

admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1);
Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async
admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", "my-sub2",
messageIds.get(4), false);
// Wait at most 2 seconds for sub2's message to expire.
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub2").lastMarkDeleteAdvancedTimestamp > 0L));
topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
subStats1 = topicStats.subscriptions.get("my-sub1");
assertEquals(subStats1.msgBacklog, 0);
assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L);
Long sub2lastMarkDeleteAdvancedTimestamp = subStats1.lastMarkDeleteAdvancedTimestamp;
subStats2 = topicStats.subscriptions.get("my-sub2");
assertEquals(subStats2.msgBacklog, 5);
subStats3 = topicStats.subscriptions.get("my-sub3");
assertEquals(subStats3.msgBacklog, 10);
assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L);

admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1);
// Wait at most 2 seconds for sub3's message to expire.
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub3").lastMarkDeleteAdvancedTimestamp > 0L));
topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
SubscriptionStats newSubStats1 = topicStats.subscriptions.get("my-sub1");
assertEquals(newSubStats1.msgBacklog, 0);
assertEquals(newSubStats1.lastMarkDeleteAdvancedTimestamp, subStats1.lastMarkDeleteAdvancedTimestamp);
SubscriptionStats newSubStats2 = topicStats.subscriptions.get("my-sub2");
assertEquals(newSubStats2.msgBacklog, 0);
assertTrue(newSubStats2.lastMarkDeleteAdvancedTimestamp > subStats2.lastMarkDeleteAdvancedTimestamp);
SubscriptionStats newSubStats3 = topicStats.subscriptions.get("my-sub3");
assertEquals(newSubStats3.msgBacklog, 0);
assertTrue(newSubStats3.lastMarkDeleteAdvancedTimestamp > subStats3.lastMarkDeleteAdvancedTimestamp);
subStats1 = topicStats.subscriptions.get("my-sub1");
assertEquals(subStats1.msgBacklog, 0);
assertEquals(subStats1.lastMarkDeleteAdvancedTimestamp, subStats1.lastMarkDeleteAdvancedTimestamp);
// Wait at most 2 seconds for rest of sub2's message to expire.
subStats2 = topicStats.subscriptions.get("my-sub2");
assertEquals(subStats2.msgBacklog, 0);
assertTrue(subStats2.lastMarkDeleteAdvancedTimestamp > sub2lastMarkDeleteAdvancedTimestamp);
subStats3 = topicStats.subscriptions.get("my-sub3");
assertEquals(subStats3.msgBacklog, 0);

consumer1.close();
consumer2.close();
consumer3.close();
}

@Test
public void testPersistentTopicsExpireMessagesInvalidPartitionIndex() throws Exception {
// Force to create a topic
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 0);
assertEquals(admin.topics().getList("prop-xyz/ns1"),
Lists.newArrayList("persistent://prop-xyz/ns1/ds2-partition-2"));

// create consumer and subscription
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
.topic("persistent://prop-xyz/ns1/ds2-partition-2")
.subscriptionType(SubscriptionType.Shared);
@Cleanup
Consumer<byte[]> consumer = consumerBuilder.clone().subscriptionName("my-sub").subscribe();

assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2-partition-2").size(), 1);
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 10);
try {
admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2-partition-2", "my-sub",
new MessageIdImpl(1, 1, 1), false);
} catch (Exception e) {
assertTrue(e.getMessage().contains("Invalid parameter for expire message by position"));
}
}

/**
Expand Down
Loading

0 comments on commit 301d42c

Please sign in to comment.