Skip to content

Commit

Permalink
Trim messages which less than mark delete position for message redeli…
Browse files Browse the repository at this point in the history
…very (apache#5378)

* Trim messages which less than mark delete position for message redelivery.
  • Loading branch information
codelipenghui authored and wolfstudy committed Oct 30, 2019
1 parent 96f14d2 commit b11211f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -971,19 +971,24 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List
(PositionImpl) this.pendingCumulativeAckMessage;

positions.forEach(position -> {
if ((pendingAckMessages == null || (pendingAckMessages != null &&
!this.pendingAckMessages.contains(position))) &&
(null == cumulativeAckPosition ||
(null != cumulativeAckPosition && position.compareTo(cumulativeAckPosition) > 0))) {
if ((pendingAckMessages == null || !this.pendingAckMessages.contains(position))
&& (null == cumulativeAckPosition || position.compareTo(cumulativeAckPosition) > 0)) {
pendingPositions.add(position);
}
});
trimByMarkDeletePosition(pendingPositions);
dispatcher.redeliverUnacknowledgedMessages(consumer, pendingPositions);
} else {
trimByMarkDeletePosition(positions);
dispatcher.redeliverUnacknowledgedMessages(consumer, positions);
}
}

private void trimByMarkDeletePosition(List<PositionImpl> positions) {
positions.removeIf(position -> cursor.getMarkDeletedPosition() != null
&& position.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) <= 0);
}

@Override
public void addUnAckedMessages(int unAckMessages) {
dispatcher.addUnAckedMessages(unAckMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand All @@ -32,7 +34,10 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
Expand Down Expand Up @@ -216,4 +221,40 @@ public void testRedelivery(boolean useOpenRangeSet) throws Exception {

}

@Test
public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException, PulsarAdminException {
final String topic = "testDoNotRedeliveryMarkDeleteMessages";
final String subName = "my-sub";

Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();

producer.send("Pulsar".getBytes());

for (int i = 0; i < 2; i++) {
Message message = consumer.receive();
assertNotNull(message);
}

admin.topics().skipAllMessages(topic, subName);

Message message = null;

try {
message = consumer.receive(2, TimeUnit.SECONDS);
} catch (Exception ignore) {
}

assertNull(message);
}

}

0 comments on commit b11211f

Please sign in to comment.