Skip to content

Commit

Permalink
[broker] Fix issue where StackOverflowError occurs when trying to red…
Browse files Browse the repository at this point in the history
…eliver a large number of already acked messages (apache#10696)

### Motivation

The other day, some of our broker servers got the following StackOverflowError:
```
13:44:17.410 [pulsar-io-21-6] WARN  o.a.pulsar.broker.service.ServerCnx  - [/xxx.xxx.xxx.xxx:58438] Got exception StackOverflowError : null
java.lang.StackOverflowError: null
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
        at java.util.TreeMap$KeySpliterator.forEachRemaining(TreeMap.java:2746)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1086)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1066)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.asyncReplayEntries(PersistentDispatcherMultipleConsumers.java:341)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:309)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318)
```

This phenomenon can be reproduced by the following procedure:

1. Store a large number of messages in the backlog of a topic
2. Connect some Shared consumers to the topic. These consumers receive messages but do not acknowledge at all
3. Run skip-all to remove all messages from the backlog
4. Add another consumer whose receiver queue size is small
5. Close all the consumers added in step 2
6. StackOverflowError occurs on the broker

If broker receives a large number of redelivery requests for messages that have already been deleted, `PersistentDispatcherMultipleConsumers#readMoreEntries()` is called recursively many times. As a result, we get a StackOverflowError.
https://github.com/apache/pulsar/blob/a6aed551026825ef2de6b1ac5916d17daf1af5c3/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L232-L252

### Modifications

- Avoid recursive calls of `readMoreEntries()` on the same thread
- If the dispatcher receives redelivery requests for messages whose positions are earlier than the mark delete position, it does not need to add them to `messagesToRedeliver`
  • Loading branch information
Masahiro Sakamoto authored May 27, 2021
1 parent 4b2c673 commit 894d92b
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,4 @@ public void resetCloseFuture() {
protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
return Commands.peekStickyKey(metadataAndPayload, subscription.getTopicName(), subscription.getName());
}

protected void addMessageToReplay(long ledgerId, long entryId) {
// No-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,9 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
log.debug("[{}] Consumer are left, reading more entries", name);
}
consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> {
messagesToRedeliver.add(ledgerId, entryId);
redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
if (addMessageToReplay(ledgerId, entryId)) {
redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
}
});
totalAvailablePermits -= consumer.getAvailablePermits();
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -248,7 +249,9 @@ public synchronized void readMoreEntries() {
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
readMoreEntries();
// We should not call readMoreEntries() recursively in the same thread
// as there is a risk of StackOverflowError
topic.getBrokerService().executor().execute(() -> readMoreEntries());
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
Expand Down Expand Up @@ -574,7 +577,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
entries.size() - start);
}
entries.subList(start, entries.size()).forEach(entry -> {
messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId());
addMessageToReplay(entry.getLedgerId(), entry.getEntryId());
entry.release();
});
}
Expand Down Expand Up @@ -695,7 +698,7 @@ public boolean isConsumerAvailable(Consumer consumer) {
@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> {
messagesToRedeliver.add(ledgerId, entryId);
addMessageToReplay(ledgerId, entryId);
});
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer,
Expand All @@ -707,8 +710,9 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
positions.forEach(position -> {
messagesToRedeliver.add(position.getLedgerId(), position.getEntryId());
redeliveryTracker.addIfAbsent(position);
if (addMessageToReplay(position.getLedgerId(), position.getEntryId())) {
redeliveryTracker.addIfAbsent(position);
}
});
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions);
Expand Down Expand Up @@ -853,9 +857,15 @@ public void cursorIsReset() {
}
}

@Override
public void addMessageToReplay(long ledgerId, long entryId) {
this.messagesToRedeliver.add(ledgerId, entryId);
protected boolean addMessageToReplay(long ledgerId, long entryId) {
Position markDeletePosition = cursor.getMarkDeletedPosition();
if (markDeletePosition == null || ledgerId > markDeletePosition.getLedgerId()
|| (ledgerId == markDeletePosition.getLedgerId() && entryId > markDeletePosition.getEntryId())) {
messagesToRedeliver.add(ledgerId, entryId);
return true;
} else {
return false;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
// so we discard for now and mark them for later redelivery
for (int i = messagesForC; i < entriesWithSameKeyCount; i++) {
Entry entry = entriesWithSameKey.get(i);
messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId());
addMessageToReplay(entry.getLedgerId(), entry.getEntryId());
entry.release();
entriesWithSameKey.set(i, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ public synchronized void readMoreEntries() {
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
readMoreEntries();
// We should not call readMoreEntries() recursively in the same thread
// as there is a risk of StackOverflowError
topic.getBrokerService().executor().execute(() -> readMoreEntries());
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,9 @@ public void testMessageReplay() throws Exception {
replayMap.set(dispatcher, messagesToReplay);
// (a) redelivery with all acked-message should clear messageReply bucket
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
return messagesToReplay.isEmpty();
});
assertEquals(messagesToReplay.size(), 0);

// (b) fill messageReplyBucket with already acked entry again: and try to publish new msg and read it
Expand Down

0 comments on commit 894d92b

Please sign in to comment.