diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 82a0e5c85dd25..2f01b041e390f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1838,7 +1838,7 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy topic.getReplicators().forEach((subName, replicator) -> { try { - internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative); + internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative); } catch (Throwable t) { exception.set(t); } @@ -1846,7 +1846,7 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy topic.getSubscriptions().forEach((subName, subscriber) -> { try { - internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative); + internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative); } catch (Throwable t) { exception.set(t); } @@ -2198,82 +2198,9 @@ protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String return; } CompletableFuture batchSizeFuture = new CompletableFuture<>(); - if (batchIndex >= 0) { - try { - ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); - ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(), - messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback() { - @Override - public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - // Since we can't read the message from the storage layer, - // it might be an already delete message ID or an invalid message ID - // We should fall back to non batch index seek. - batchSizeFuture.complete(0); - } - - @Override - public void readEntryComplete(Entry entry, Object ctx) { - try { - try { - if (entry == null) { - batchSizeFuture.complete(0); - } else { - MessageMetadata metadata = - Commands.parseMessageMetadata(entry.getDataBuffer()); - batchSizeFuture.complete(metadata.getNumMessagesInBatch()); - } - } catch (Exception e) { - batchSizeFuture.completeExceptionally(new RestException(e)); - } - } finally { - if (entry != null) { - entry.release(); - } - } - } - }, null); - } catch (NullPointerException npe) { - batchSizeFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Message not found")); - } catch (Exception exception) { - log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", - clientAppId(), messageId.getLedgerId(), messageId.getEntryId(), topicName, exception); - batchSizeFuture.completeExceptionally(new RestException(exception)); - } - } else { - batchSizeFuture.complete(0); - } + getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex); batchSizeFuture.thenAccept(bi -> { - PositionImpl seekPosition; - if (bi > 0) { - long[] ackSet; - BitSetRecyclable bitSet = BitSetRecyclable.create(); - bitSet.set(0, bi); - if (isExcluded) { - bitSet.clear(0, Math.max(batchIndex + 1, 0)); - if (bitSet.length() > 0) { - ackSet = bitSet.toLongArray(); - seekPosition = PositionImpl.get(messageId.getLedgerId(), - messageId.getEntryId(), ackSet); - } else { - seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()); - seekPosition = seekPosition.getNext(); - } - } else { - if (batchIndex - 1 >= 0) { - bitSet.clear(0, batchIndex); - ackSet = bitSet.toLongArray(); - seekPosition = PositionImpl.get(messageId.getLedgerId(), - messageId.getEntryId(), ackSet); - } else { - seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()); - } - } - bitSet.recycle(); - } else { - seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()); - seekPosition = isExcluded ? seekPosition.getNext() : seekPosition; - } - + PositionImpl seekPosition = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId); sub.resetCursor(seekPosition).thenRun(() -> { log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(), topicName, subName, messageId); @@ -2305,6 +2232,89 @@ public void readEntryComplete(Entry entry, Object ctx) { } } + private void getEntryBatchSize(CompletableFuture batchSizeFuture, PersistentTopic topic, + MessageIdImpl messageId, int batchIndex) { + if (batchIndex >= 0) { + try { + ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); + ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(), + messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + // Since we can't read the message from the storage layer, + // it might be an already delete message ID or an invalid message ID + // We should fall back to non batch index seek. + batchSizeFuture.complete(0); + } + + @Override + public void readEntryComplete(Entry entry, Object ctx) { + try { + try { + if (entry == null) { + batchSizeFuture.complete(0); + } else { + MessageMetadata metadata = + Commands.parseMessageMetadata(entry.getDataBuffer()); + batchSizeFuture.complete(metadata.getNumMessagesInBatch()); + } + } catch (Exception e) { + batchSizeFuture.completeExceptionally(new RestException(e)); + } + } finally { + if (entry != null) { + entry.release(); + } + } + } + }, null); + } catch (NullPointerException npe) { + batchSizeFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Message not found")); + } catch (Exception exception) { + log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", + clientAppId(), messageId.getLedgerId(), messageId.getEntryId(), topicName, exception); + batchSizeFuture.completeExceptionally(new RestException(exception)); + } + } else { + batchSizeFuture.complete(0); + } + } + + private PositionImpl calculatePositionAckSet(boolean isExcluded, int batchSize, + int batchIndex, MessageIdImpl messageId) { + PositionImpl seekPosition; + if (batchSize > 0) { + long[] ackSet; + BitSetRecyclable bitSet = BitSetRecyclable.create(); + bitSet.set(0, batchSize); + if (isExcluded) { + bitSet.clear(0, Math.max(batchIndex + 1, 0)); + if (bitSet.length() > 0) { + ackSet = bitSet.toLongArray(); + seekPosition = PositionImpl.get(messageId.getLedgerId(), + messageId.getEntryId(), ackSet); + } else { + seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()); + seekPosition = seekPosition.getNext(); + } + } else { + if (batchIndex - 1 >= 0) { + bitSet.clear(0, batchIndex); + ackSet = bitSet.toLongArray(); + seekPosition = PositionImpl.get(messageId.getLedgerId(), + messageId.getEntryId(), ackSet); + } else { + seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()); + } + } + bitSet.recycle(); + } else { + seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()); + seekPosition = isExcluded ? seekPosition.getNext() : seekPosition; + } + return seekPosition; + } + protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId, long entryId, boolean authoritative) { try { @@ -2931,16 +2941,15 @@ protected void internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo } } - - protected void internalExpireMessages(AsyncResponse asyncResponse, String subName, int expireTimeInSeconds, - boolean authoritative) { + protected void internalExpireMessagesByTimestamp(AsyncResponse asyncResponse, String subName, + int expireTimeInSeconds, boolean authoritative) { if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } // If the topic name is a partition name, no need to get partition topic metadata again if (topicName.isPartitioned()) { try { - internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative); + internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative); } catch (WebApplicationException wae) { asyncResponse.resume(wae); return; @@ -2991,7 +3000,7 @@ protected void internalExpireMessages(AsyncResponse asyncResponse, String subNam }); } else { try { - internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative); + internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative); } catch (WebApplicationException wae) { asyncResponse.resume(wae); return; @@ -3004,7 +3013,7 @@ protected void internalExpireMessages(AsyncResponse asyncResponse, String subNam } } - private void internalExpireMessagesForSinglePartition(String subName, int expireTimeInSeconds, + private void internalExpireMessagesByTimestampForSinglePartition(String subName, int expireTimeInSeconds, boolean authoritative) { if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); @@ -3048,6 +3057,89 @@ private void internalExpireMessagesForSinglePartition(String subName, int expire } } + protected void internalExpireMessagesByPosition(AsyncResponse asyncResponse, String subName, boolean authoritative, + MessageIdImpl messageId, boolean isExcluded, int batchIndex) { + if (topicName.isGlobal()) { + try { + validateGlobalNamespaceOwnership(namespaceName); + } catch (Exception e) { + log.warn("[{}][{}] Failed to expire messages on subscription {} to position {}: {}", clientAppId(), + topicName, subName, messageId, e.getMessage()); + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + } + + log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(), topicName, + subName, messageId); + + // If the topic name is a partition name, no need to get partition topic metadata again + if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) { + log.warn("[{}] Not supported operation expire message up to {} on partitioned-topic {} {}", + clientAppId(), messageId, topicName, subName); + asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, + "Expire message at position is not supported for partitioned-topic")); + return; + } else if (messageId.getPartitionIndex() != topicName.getPartitionIndex()) { + log.warn("[{}] Invalid parameter for expire message by position, partition index of passed in message" + + " position {} doesn't match partition index of topic requested {}.", + clientAppId(), messageId, topicName); + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Invalid parameter for expire message by position, partition index of message position " + + "passed in doesn't match partition index for the topic.")); + } else { + validateAdminAccessForSubscriber(subName, authoritative); + validateReadOperationOnTopic(authoritative); + PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); + if (topic == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); + return; + } + try { + PersistentSubscription sub = topic.getSubscription(subName); + if (sub == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + return; + } + CompletableFuture batchSizeFuture = new CompletableFuture<>(); + getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex); + batchSizeFuture.thenAccept(bi -> { + PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId); + try { + if (subName.startsWith(topic.getReplicatorPrefix())) { + String remoteCluster = PersistentReplicator.getRemoteCluster(subName); + PersistentReplicator repl = (PersistentReplicator) + topic.getPersistentReplicator(remoteCluster); + checkNotNull(repl); + repl.expireMessages(position); + } else { + checkNotNull(sub); + sub.expireMessages(position); + } + log.info("[{}] Message expire issued up to {} on {} {}", clientAppId(), position, topicName, + subName); + } catch (NullPointerException npe) { + throw new RestException(Status.NOT_FOUND, "Subscription not found"); + } catch (Exception exception) { + log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", + clientAppId(), position, topicName, subName, exception); + throw new RestException(exception); + } + }).exceptionally(e -> { + log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", clientAppId(), + messageId, topicName, subName, e); + asyncResponse.resume(e); + return null; + }); + } catch (Exception e) { + log.warn("[{}][{}] Failed to expire messages up to {} on subscription {} to position {}", + clientAppId(), topicName, messageId, subName, messageId, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + } + } + asyncResponse.resume(Response.noContent().build()); + } + protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean authoritative) { log.info("[{}] Trigger compaction on topic {}", clientAppId(), topicName); try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 78392c286b833..46708dd451cde 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -462,7 +462,46 @@ public void expireTopicMessages(@Suspended final AsyncResponse asyncResponse, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateTopicName(property, cluster, namespace, encodedTopic); - internalExpireMessages(asyncResponse, decode(encodedSubName), expireTimeInSeconds, authoritative); + internalExpireMessagesByTimestamp(asyncResponse, decode(encodedSubName), + expireTimeInSeconds, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } + + @POST + @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/expireMessages") + @ApiOperation(value = "Expiry messages on a topic subscription.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic or subscription does not exist"), + @ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")}) + public void expireTopicMessages( + @Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Subscription to be Expiry messages on") + @PathParam("subName") String encodedSubName, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)") + ResetCursorData resetCursorData) { + try { + validateTopicName(property, cluster, namespace, encodedTopic); + internalExpireMessagesByPosition(asyncResponse, decode(encodedSubName), authoritative, + new MessageIdImpl(resetCursorData.getLedgerId(), + resetCursorData.getEntryId(), resetCursorData.getPartitionIndex()) + , resetCursorData.isExcluded(), resetCursorData.getBatchIndex()); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index a5693396e3daa..851e26503628d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1197,7 +1197,47 @@ public void expireTopicMessages( @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateTopicName(tenant, namespace, encodedTopic); - internalExpireMessages(asyncResponse, decode(encodedSubName), expireTimeInSeconds, authoritative); + internalExpireMessagesByTimestamp(asyncResponse, decode(encodedSubName), + expireTimeInSeconds, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/expireMessages") + @ApiOperation(value = "Expiry messages on a topic subscription.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic or subscription does not exist"), + @ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")}) + public void expireTopicMessages( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Subscription to be Expiry messages on") + @PathParam("subName") String encodedSubName, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)") + ResetCursorData resetCursorData) { + try { + validateTopicName(tenant, namespace, encodedTopic); + internalExpireMessagesByPosition(asyncResponse, decode(encodedSubName), authoritative, + new MessageIdImpl(resetCursorData.getLedgerId(), + resetCursorData.getEntryId(), resetCursorData.getPartitionIndex()) + , resetCursorData.isExcluded(), resetCursorData.getBatchIndex()); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 05b2d2887ac68..95a470d332930 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -85,6 +85,8 @@ default long getNumberOfEntriesDelayed() { void expireMessages(int messageTTLInSeconds); + void expireMessages(Position position); + void redeliverUnacknowledgedMessages(Consumer consumer); void redeliverUnacknowledgedMessages(Consumer consumer, List positions); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 79edaabf4317c..c347d36e62834 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -431,6 +431,12 @@ public void expireMessages(int messageTTLInSeconds) { // No-op } + @Override + public void expireMessages(Position position) { + throw new UnsupportedOperationException("Expire message by position is not supported for" + + " non-persistent topic."); + } + public NonPersistentSubscriptionStats getStats() { NonPersistentSubscriptionStats subStats = new NonPersistentSubscriptionStats(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index b66c66d55cea8..29848e3bb7165 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.stats.Rate; @@ -94,6 +95,32 @@ public void expireMessages(int messageTTLInSeconds) { } } + public void expireMessages(Position messagePosition) { + // If it's beyond last position of this topic, do nothing. + if (((PositionImpl) subscription.getTopic().getLastPosition()).compareTo((PositionImpl) messagePosition) < 0) { + return; + } + if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) { + log.info("[{}][{}] Starting message expiry check, position= {} seconds", topicName, subName, + messagePosition); + + cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> { + try { + // If given position larger than entry position. + return ((PositionImpl) entry.getPosition()).compareTo((PositionImpl) messagePosition) <= 0; + } finally { + entry.release(); + } + }, this, null); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Ignore expire-message scheduled task, last check is still running", topicName, + subName); + } + } + } + + public void updateRates() { msgExpired.calculateRate(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index cc28af7f0b4bb..52c19570c415b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -727,6 +727,18 @@ public void expireMessages(int messageTTLInSeconds) { } } + public void expireMessages(Position position) { + if ((cursor.getNumberOfEntriesInBacklog(false) == 0) + || (cursor.getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK + && !topic.isOldestMessageExpired(cursor, messageTTLInSeconds))) { + // don't do anything for almost caught-up connected subscriptions + return; + } + if (expiryMonitor != null) { + expiryMonitor.expireMessages(messageTTLInSeconds); + } + } + @Override public Optional getRateLimiter() { return dispatchRateLimiter; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 2df6b4cd063ad..a666edb67830d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -904,6 +904,11 @@ && getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK expiryMonitor.expireMessages(messageTTLInSeconds); } + @Override + public void expireMessages(Position position) { + expiryMonitor.expireMessages(position); + } + public double getExpiredMessageRate() { return expiryMonitor.getMessageExpiryRate(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index cede9b7f858ca..ac8fdab0cf904 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -123,6 +123,7 @@ import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.compaction.Compactor; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -1603,16 +1604,17 @@ public void testUnsubscribeOnNamespace(Integer numBundles) throws Exception { long messageTimestamp = System.currentTimeMillis(); long secondTimestamp = System.currentTimeMillis(); - private void publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception { - publishMessagesOnPersistentTopic(topicName, messages, 0, false); + private List publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception { + return publishMessagesOnPersistentTopic(topicName, messages, 0, false); } - private void publishNullValueMessageOnPersistentTopic(String topicName, int messages) throws Exception { - publishMessagesOnPersistentTopic(topicName, messages, 0, true); + private List publishNullValueMessageOnPersistentTopic(String topicName, int messages) throws Exception { + return publishMessagesOnPersistentTopic(topicName, messages, 0, true); } - private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx, + private List publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx, boolean nullValue) throws Exception { + List messageIds = new ArrayList<>(); Producer producer = pulsarClient.newProducer(Schema.BYTES) .topic(topicName) .enableBatching(false) @@ -1621,14 +1623,15 @@ private void publishMessagesOnPersistentTopic(String topicName, int messages, in for (int i = startIdx; i < (messages + startIdx); i++) { if (nullValue) { - producer.send(null); + messageIds.add(producer.send(null)); } else { String message = "message-" + i; - producer.send(message.getBytes()); + messageIds.add(producer.send(message.getBytes())); } } producer.close(); + return messageIds; } @Test @@ -2069,7 +2072,6 @@ class CustomTenantAdmin extends TenantInfo { */ @Test public void testPersistentTopicsExpireMessages() throws Exception { - // Force to create a topic publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 0); assertEquals(admin.topics().getList("prop-xyz/ns1"), @@ -2089,21 +2091,21 @@ public void testPersistentTopicsExpireMessages() throws Exception { assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2").size(), 3); - publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 10); + List messageIds = publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 10); TopicStats topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2"); assertEquals(topicStats.subscriptions.get("my-sub1").msgBacklog, 10); assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 10); assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 10); - Thread.sleep(1000); // wait for 1 seconds to expire message + Thread.sleep(1000); admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", "my-sub1", 1); - Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async - + // Wait at most 2 seconds for sub1's message to expire. + Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue( + admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub1").lastMarkDeleteAdvancedTimestamp > 0L)); topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2"); SubscriptionStats subStats1 = topicStats.subscriptions.get("my-sub1"); assertEquals(subStats1.msgBacklog, 0); - assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L); SubscriptionStats subStats2 = topicStats.subscriptions.get("my-sub2"); assertEquals(subStats2.msgBacklog, 10); assertEquals(subStats2.lastMarkDeleteAdvancedTimestamp, 0L); @@ -2111,24 +2113,69 @@ public void testPersistentTopicsExpireMessages() throws Exception { assertEquals(subStats3.msgBacklog, 10); assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L); - admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1); - Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async + admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", "my-sub2", + messageIds.get(4), false); + // Wait at most 2 seconds for sub2's message to expire. + Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue( + admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub2").lastMarkDeleteAdvancedTimestamp > 0L)); + topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2"); + subStats1 = topicStats.subscriptions.get("my-sub1"); + assertEquals(subStats1.msgBacklog, 0); + assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L); + Long sub2lastMarkDeleteAdvancedTimestamp = subStats1.lastMarkDeleteAdvancedTimestamp; + subStats2 = topicStats.subscriptions.get("my-sub2"); + assertEquals(subStats2.msgBacklog, 5); + subStats3 = topicStats.subscriptions.get("my-sub3"); + assertEquals(subStats3.msgBacklog, 10); + assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L); + admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1); + // Wait at most 2 seconds for sub3's message to expire. + Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue( + admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub3").lastMarkDeleteAdvancedTimestamp > 0L)); topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2"); - SubscriptionStats newSubStats1 = topicStats.subscriptions.get("my-sub1"); - assertEquals(newSubStats1.msgBacklog, 0); - assertEquals(newSubStats1.lastMarkDeleteAdvancedTimestamp, subStats1.lastMarkDeleteAdvancedTimestamp); - SubscriptionStats newSubStats2 = topicStats.subscriptions.get("my-sub2"); - assertEquals(newSubStats2.msgBacklog, 0); - assertTrue(newSubStats2.lastMarkDeleteAdvancedTimestamp > subStats2.lastMarkDeleteAdvancedTimestamp); - SubscriptionStats newSubStats3 = topicStats.subscriptions.get("my-sub3"); - assertEquals(newSubStats3.msgBacklog, 0); - assertTrue(newSubStats3.lastMarkDeleteAdvancedTimestamp > subStats3.lastMarkDeleteAdvancedTimestamp); + subStats1 = topicStats.subscriptions.get("my-sub1"); + assertEquals(subStats1.msgBacklog, 0); + assertEquals(subStats1.lastMarkDeleteAdvancedTimestamp, subStats1.lastMarkDeleteAdvancedTimestamp); + // Wait at most 2 seconds for rest of sub2's message to expire. + subStats2 = topicStats.subscriptions.get("my-sub2"); + assertEquals(subStats2.msgBacklog, 0); + assertTrue(subStats2.lastMarkDeleteAdvancedTimestamp > sub2lastMarkDeleteAdvancedTimestamp); + subStats3 = topicStats.subscriptions.get("my-sub3"); + assertEquals(subStats3.msgBacklog, 0); consumer1.close(); consumer2.close(); consumer3.close(); + } + @Test + public void testPersistentTopicsExpireMessagesInvalidPartitionIndex() throws Exception { + // Force to create a topic + publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 0); + assertEquals(admin.topics().getList("prop-xyz/ns1"), + Lists.newArrayList("persistent://prop-xyz/ns1/ds2-partition-2")); + + // create consumer and subscription + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getWebServiceAddress()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + ConsumerBuilder consumerBuilder = client.newConsumer() + .topic("persistent://prop-xyz/ns1/ds2-partition-2") + .subscriptionType(SubscriptionType.Shared); + @Cleanup + Consumer consumer = consumerBuilder.clone().subscriptionName("my-sub").subscribe(); + + assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2-partition-2").size(), 1); + publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 10); + try { + admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2-partition-2", "my-sub", + new MessageIdImpl(1, 1, 1), false); + } catch (Exception e) { + assertTrue(e.getMessage().contains("Invalid parameter for expire message by position")); + } } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index f7c38f0aee4e1..d60b4dcbb344a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -19,6 +19,13 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.spy; +import static org.powermock.api.mockito.PowerMockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; @@ -29,6 +36,7 @@ import io.netty.buffer.UnpooledByteBufAllocator; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -51,6 +59,8 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.impl.ResetCursorData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -58,9 +68,13 @@ import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; +import org.awaitility.Awaitility; import org.testng.annotations.Test; import org.testng.collections.Sets; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; + /** */ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { @@ -329,7 +343,7 @@ public static Set getBrokerEntryMetadataIntercep * @throws Exception */ @Test - void testMessageExpiryWithNonRecoverableException() throws Exception { + void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { final String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers"; final int entriesPerLedger = 2; @@ -380,4 +394,74 @@ void testMessageExpiryWithNonRecoverableException() throws Exception { factory.shutdown(); } + + @Test + void testMessageExpiryWithPosition() throws Exception { + final String ledgerAndCursorName = "testPersistentMessageExpiryWithPositionNonRecoverableLedgers"; + final int entriesPerLedger = 5; + final int totalEntries = 30; + List positions = new ArrayList<>(); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionSizeInMB(10); + config.setMaxEntriesPerLedger(entriesPerLedger); + config.setRetentionTime(1, TimeUnit.HOURS); + config.setAutoSkipNonRecoverableData(true); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + + PersistentSubscription subscription = mock(PersistentSubscription.class); + Topic topic = mock(Topic.class); + when(subscription.getTopic()).thenReturn(topic); + + for (int i = 0; i < totalEntries; i++) { + positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i))); + } + when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1)); + for (Position p : positions) { + System.out.println(p); + } + PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor("topicname", + cursor.getName(), cursor, subscription)); + assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(0).getLedgerId(), -1)); + + // Expire by position and verify mark delete position of cursor. + monitor.expireMessages(positions.get(15)); + Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any())); + assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId())); + clearInvocations(monitor); + + // Expire by position beyond last position and nothing should happen. + monitor.expireMessages(PositionImpl.get(100, 100)); + assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId())); + + // Expire by position again and verify mark delete position of cursor didn't change. + monitor.expireMessages(positions.get(15)); + Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any())); + assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId())); + clearInvocations(monitor); + + // Expire by position before current mark delete position and verify mark delete position of cursor didn't change. + monitor.expireMessages(positions.get(10)); + Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any())); + assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId())); + clearInvocations(monitor); + + // Expire by position after current mark delete position and verify mark delete position of cursor move to new position. + monitor.expireMessages(positions.get(16)); + Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any())); + assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(16).getLedgerId(), positions.get(16).getEntryId())); + clearInvocations(monitor); + + cursor.close(); + ledger.close(); + factory.shutdown(); + } + + @Test + public void test() { + ResetCursorData resetCursorData = new ResetCursorData(1, 1); + resetCursorData.setExcluded(true); + System.out.println(Entity.entity(resetCursorData, MediaType.APPLICATION_JSON)); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index 7c068373320f2..b5e825166c0e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -42,6 +42,7 @@ import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; @@ -210,6 +211,7 @@ public void testSubscriberPermission() throws Exception { tenantAdmin.topics().skipAllMessages(topicName, subscriptionName); tenantAdmin.topics().skipMessages(topicName, subscriptionName, 1); tenantAdmin.topics().expireMessages(topicName, subscriptionName, 10); + tenantAdmin.topics().expireMessages(topicName, subscriptionName, new MessageIdImpl(-1, -1, -1), true); tenantAdmin.topics().peekMessages(topicName, subscriptionName, 1); tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10); tenantAdmin.topics().resetCursor(topicName, subscriptionName, MessageId.earliest); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 247b287b30538..576f6287270d1 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1182,6 +1182,40 @@ void expireMessages(String topic, String subscriptionName, long expireTimeInSeco CompletableFuture expireMessagesAsync(String topic, String subscriptionName, long expireTimeInSeconds); + /** + * Expire all messages older than given N (expireTimeInSeconds) seconds for a given subscription. + * + * @param topic + * topic name + * @param subscriptionName + * Subscription name + * @param messageId + * Position before which all messages will be expired. + * @param isExcluded + * Will message at passed in position also be expired. + * @throws PulsarAdminException + * Unexpected error + */ + void expireMessages(String topic, String subscriptionName, MessageId messageId, boolean isExcluded) + throws PulsarAdminException; + + /** + * Expire all messages older than given N (expireTimeInSeconds) seconds for a given subscription asynchronously. + * + * @param topic + * topic name + * @param subscriptionName + * Subscription name + * @param messageId + * Position before which all messages will be expired. + * @param isExcluded + * Will message at passed in position also be expired. + * @return + * A {@link CompletableFuture} that'll be completed when expire message is done. + */ + CompletableFuture expireMessagesAsync(String topic, String subscriptionName, + MessageId messageId, boolean isExcluded); + /** * Expire all messages older than given N seconds for all subscriptions of the persistent-topic. * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 8de48cebbc32d..218c314f4be12 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -936,6 +936,33 @@ public CompletableFuture expireMessagesAsync(String topic, String subName, return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); } + @Override + public void expireMessages(String topic, String subscriptionName, MessageId messageId, boolean isExcluded) + throws PulsarAdminException { + try { + expireMessagesAsync(topic, subscriptionName, messageId, isExcluded) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture expireMessagesAsync(String topic, String subscriptionName, + MessageId messageId, boolean isExcluded) { + TopicName tn = validateTopic(topic); + String encodedSubName = Codec.encode(subscriptionName); + ResetCursorData resetCursorData = new ResetCursorData(messageId); + resetCursorData.setExcluded(isExcluded); + WebTarget path = topicPath(tn, "subscription", encodedSubName, "expireMessages"); + return asyncPostRequest(path, Entity.entity(resetCursorData, MediaType.APPLICATION_JSON)); + } + @Override public void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) throws PulsarAdminException { try { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 1b2478eb2f7ad..8c35e0190f74c 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -748,6 +748,11 @@ public void topics() throws Exception { cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -t 100")); verify(mockTopics).expireMessages("persistent://myprop/clust/ns1/ds1", "sub1", 100); + //cmd with option cannot be executed repeatedly. + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -p 1:1 -e")); + verify(mockTopics).expireMessages(eq("persistent://myprop/clust/ns1/ds1"), eq("sub1"), eq(new MessageIdImpl(1, 1, -1)), eq(true)); + cmdTopics.run(split("expire-messages-all-subscriptions persistent://myprop/clust/ns1/ds1 -t 100")); verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java index ce48ef4693174..5113daa4e6fee 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java @@ -104,10 +104,14 @@ static long validateSizeString(String s) { } static MessageId validateMessageIdString(String resetMessageIdStr) throws PulsarAdminException { + return validateMessageIdString(resetMessageIdStr, -1); + } + + static MessageId validateMessageIdString(String resetMessageIdStr, int partitionIndex) throws PulsarAdminException { String[] messageId = resetMessageIdStr.split(":"); try { Preconditions.checkArgument(messageId.length == 2); - return new MessageIdImpl(Long.parseLong(messageId[0]), Long.parseLong(messageId[1]), -1); + return new MessageIdImpl(Long.parseLong(messageId[0]), Long.parseLong(messageId[1]), partitionIndex); } catch (Exception e) { throw new PulsarAdminException( "Invalid message id (must be in format: ledgerId:entryId) value " + resetMessageIdStr); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 9e6eef25b7aa9..f3319b9501d0e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -39,6 +39,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -47,6 +49,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; @@ -657,13 +660,35 @@ private class ExpireMessages extends CliCommand { "--subscription" }, description = "Subscription to be skip messages on", required = true) private String subName; - @Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds", required = true) - private long expireTimeInSeconds; + @Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds") + private long expireTimeInSeconds = -1; + + @Parameter(names = { "--position", + "-p" }, description = "message position to reset back to (ledgerId:entryId)", required = false) + private String messagePosition; + + @Parameter(names = { "-e", "--exclude-reset-position" }, + description = "Exclude the reset position, start consume messages from the next position.", required = false) + private boolean excludeResetPosition = false; @Override void run() throws PulsarAdminException { + if (expireTimeInSeconds >= 0 && isNotBlank(messagePosition)) { + throw new ParameterException(String.format("Can't expire message by time and " + + "by message position at the same time.")); + } String topic = validateTopicName(params); - getTopics().expireMessages(topic, subName, expireTimeInSeconds); + if (expireTimeInSeconds >= 0) { + getTopics().expireMessages(topic, subName, expireTimeInSeconds); + } else if (isNotBlank(messagePosition)) { + int partitionIndex = TopicName.get(topic).getPartitionIndex(); + MessageId messageId = validateMessageIdString(messagePosition, partitionIndex); + getTopics().expireMessages(topic, subName, messageId, excludeResetPosition); + } else { + throw new ParameterException( + "Either time (--expireTime) or message position (--position) has to be provided" + + " to expire messages"); + } } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java index 9311fa008b265..1bc7b7fad624e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java @@ -49,10 +49,12 @@ public ResetCursorData(MessageId messageId) { this.ledgerId = batchMessageId.getLedgerId(); this.entryId = batchMessageId.getEntryId(); this.batchIndex = batchMessageId.getBatchIndex(); + this.partitionIndex = batchMessageId.partitionIndex; } else if (messageId instanceof MessageIdImpl) { MessageIdImpl messageIdImpl = (MessageIdImpl) messageId; this.ledgerId = messageIdImpl.getLedgerId(); this.entryId = messageIdImpl.getEntryId(); + this.partitionIndex = messageIdImpl.partitionIndex; } else if (messageId instanceof TopicMessageIdImpl) { throw new IllegalArgumentException("Not supported operation on partitioned-topic"); }