Skip to content

Commit

Permalink
Replay delayed messages in order. (apache#7731)
Browse files Browse the repository at this point in the history
### Motivation

Replay delayed messages in order.

### Verifying this change

A new unit test added.
  • Loading branch information
codelipenghui authored Aug 5, 2020
1 parent 22d7a6c commit 79df097
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ public void readMoreEntries() {
}

havePendingReplayRead = true;
Set<? extends Position> deletedMessages = asyncReplayEntries(messagesToReplayNow);
Set<? extends Position> deletedMessages = topic.delayedDeliveryEnabled ?
asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket

deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(),
Expand Down Expand Up @@ -372,6 +373,9 @@ protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> pos
return cursor.asyncReplayEntries(positions, this, ReadType.Replay);
}

protected Set<? extends Position> asyncReplayEntriesInOrder(Set<? extends Position> positions) {
return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
}

@Override
public boolean isConsumerConnected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,6 +34,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -271,4 +274,45 @@ public void testDelayedDeliveryWithMultipleConcurrentReadEntries()
}
t.interrupt();
}

@Test
public void testOrderingDispatch() throws PulsarClientException {
String topic = "persistent://public/default/testOrderingDispatch-" + System.nanoTime();

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("shared-sub")
.subscriptionType(SubscriptionType.Shared)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

final int N = 1000;

for (int i = 0; i < N; i++) {
producer.newMessage()
.value("msg-" + i)
.deliverAfter(5, TimeUnit.SECONDS)
.send();
}

List<Message<String>> receives = new ArrayList<>(N);
for (int i = 0; i < N; i++) {
Message<String> received = consumer.receive();
receives.add(received);
consumer.acknowledge(received);
}

assertEquals(receives.size(), N);

for (int i = 0; i < N; i++) {
if (i < N - 1) {
assertTrue(receives.get(i).getMessageId().compareTo(receives.get(i + 1).getMessageId()) < 0);
}
}
}
}

0 comments on commit 79df097

Please sign in to comment.