Skip to content

Commit

Permalink
Fixed dispatcher skipping delivery of a batch during concurrent repla…
Browse files Browse the repository at this point in the history
…ys (apache#5499)

There is a race condition when replaying messages on the `PersistentDispatcherMultipleConsumers`. This is apparently only happening in the context of delivering messages after a scheduled delay. 

The `Dispatcher.readMoreEntries()` can get invoked in multiple occasions: 
 * Piggybacking on the completion of the previous read operation
 * When the delaye tracker gets triggered

If there is already a replay read operation pending, we're currently ignoring the message ids returned by the delay tracker. These messages will not be retried on any other occasion, leading to backlog building up (until a consumer disconnects/reconnects).
  • Loading branch information
merlimat authored and wolfstudy committed Oct 30, 2019
1 parent 43bc790 commit 01ca24b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,16 @@ public void readMoreEntries() {

}

Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);

if (!messagesToReplayNow.isEmpty()) {
if (havePendingReplayRead) {
if (havePendingReplayRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping replay while awaiting previous read to complete", name);
return;
}
return;
}

Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);

if (!messagesToReplayNow.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(),
consumerList.size());
Expand Down Expand Up @@ -454,7 +456,6 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
}

protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {

if (entries == null || entries.size() == 0) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,67 @@ public void testEverythingFilteredInMultipleReads()
assertTrue(receivedMsgs.contains("msg-" + i));
}
}

@Test
public void testDelayedDeliveryWithMultipleConcurrentReadEntries()
throws Exception {
String topic = "persistent://public/default/testDelayedDelivery-" + System.nanoTime();

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("shared-sub")
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(1) // Use small prefecthing to simulate the multiple read batches
.subscribe();

// Simulate race condition with high frequency of calls to dispatcher.readMoreEntries()
PersistentDispatcherMultipleConsumers d = (PersistentDispatcherMultipleConsumers) ((PersistentTopic) pulsar
.getBrokerService().getTopicReference(topic).get()).getSubscription("shared-sub").getDispatcher();
Thread t = new Thread(() -> {
while (true) {
synchronized (d) {
d.readMoreEntries();
}

try {
Thread.sleep(1);
} catch (InterruptedException e) {
return;
}
}
});
t.start();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

final int N = 1000;

for (int i = 0; i < N; i++) {
producer.newMessage()
.value("msg-" + i)
.deliverAfter(5, TimeUnit.SECONDS)
.sendAsync();
}

producer.flush();

Message<String> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
assertNull(msg);

Set<String> receivedMsgs = new TreeSet<>();
for (int i = 0; i < N; i++) {
msg = consumer.receive(10, TimeUnit.SECONDS);
receivedMsgs.add(msg.getValue());
}

assertEquals(receivedMsgs.size(), N);
for (int i = 0; i < N; i++) {
assertTrue(receivedMsgs.contains("msg-" + i));
}
t.interrupt();
}
}

0 comments on commit 01ca24b

Please sign in to comment.