Skip to content

Commit

Permalink
Fixed key-shared delivery of messages with interleaved delays (apache…
Browse files Browse the repository at this point in the history
…#15409)

* Fixed key-shared delivery of messages with interleaved delays

* Fixed checkstyle

* Always add to redelivery messages
  • Loading branch information
merlimat authored May 2, 2022
1 parent d6fd34e commit 2647b32
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public Set<PositionImpl> getScheduledMessages(int maxMessages) {

@Override
public void resetTickTime(long tickTime) {
if (this.tickTimeMillis != tickTime){

if (this.tickTimeMillis != tickTime) {
this.tickTimeMillis = tickTime;
}
}
Expand Down Expand Up @@ -199,7 +200,7 @@ public void run(Timeout timeout) throws Exception {

synchronized (dispatcher) {
currentTimeoutTarget = -1;
timeout = null;
this.timeout = null;
dispatcher.readMoreEntries();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,11 @@ public synchronized void readMoreEntries() {
consumerList.size());
}
havePendingRead = true;
minReplayedPosition = getMessagesToReplayNow(1).stream().findFirst().orElse(null);
Set<PositionImpl> toReplay = getMessagesToReplayNow(1);
minReplayedPosition = toReplay.stream().findFirst().orElse(null);
if (minReplayedPosition != null) {
redeliveryMessages.add(minReplayedPosition.getLedgerId(), minReplayedPosition.getEntryId());
}
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
ReadType.Normal, topic.getMaxReadPosition());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,29 +174,36 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
// This may happen when consumer closed. See issue #12885 for details.
if (!allowOutOfOrderDelivery) {
Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty() && this.minReplayedPosition != null) {
PositionImpl relayPosition = messagesToReplayNow.stream().findFirst().get();
// If relayPosition is a new entry wither smaller position is inserted for redelivery during this async
// read, it is possible that this relayPosition should dispatch to consumer first. So in order to
// preserver order delivery, we need to discard this read result, and try to trigger a replay read,
// that containing "relayPosition", by calling readMoreEntries.
if (relayPosition.compareTo(minReplayedPosition) < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, discard this "
+ "read and retry with readMoreEntries.",
name, relayPosition, minReplayedPosition, readType);
}
if (readType == ReadType.Normal) {
entries.forEach(entry -> {
long stickyKeyHash = getStickyKeyHash(entry);
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
});
} else if (readType == ReadType.Replay) {
entries.forEach(Entry::release);
if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) {
PositionImpl replayPosition = messagesToReplayNow.stream().findFirst().get();
// We have received a message potentially from the delayed tracker and, since we're not using it
// right now, it needs to be added to the redelivery tracker or we won't attempt anymore to
// resend it (until we disconnect consumer).
redeliveryMessages.add(replayPosition.getLedgerId(), replayPosition.getEntryId());

if (this.minReplayedPosition != null) {
// If relayPosition is a new entry wither smaller position is inserted for redelivery during this
// async read, it is possible that this relayPosition should dispatch to consumer first. So in
// order to preserver order delivery, we need to discard this read result, and try to trigger a
// replay read, that containing "relayPosition", by calling readMoreEntries.
if (replayPosition.compareTo(minReplayedPosition) < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, "
+ "discard this read and retry with readMoreEntries.",
name, replayPosition, minReplayedPosition, readType);
}
if (readType == ReadType.Normal) {
entries.forEach(entry -> {
long stickyKeyHash = getStickyKeyHash(entry);
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
});
} else if (readType == ReadType.Replay) {
entries.forEach(Entry::release);
}
readMoreEntries();
return;
}
readMoreEntries();
return;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
Expand Down Expand Up @@ -538,4 +539,47 @@ public void testDelayedDeliveryWithAllConsumersDisconnecting() throws Exception

Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0));
}

@Test
public void testInterleavedMessagesOnKeySharedSubscription() throws Exception {
String topic = BrokerTestUtil.newUniqueName("testInterleavedMessagesOnKeySharedSubscription");

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("key-shared-sub")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

Random random = new Random(0);
for (int i = 0; i < 10; i++) {
// Publish 1 message without delay and 1 with delay
producer.newMessage()
.value("immediate-msg-" + i)
.sendAsync();

int delayMillis = 1000 + random.nextInt(1000);
producer.newMessage()
.value("delayed-msg-" + i)
.deliverAfter(delayMillis, TimeUnit.MILLISECONDS)
.sendAsync();
Thread.sleep(1000);
}

producer.flush();

Set<String> receivedMessages = new HashSet<>();

while (receivedMessages.size() < 20) {
Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
receivedMessages.add(msg.getValue());
consumer.acknowledge(msg);
}
}

}

0 comments on commit 2647b32

Please sign in to comment.