Skip to content

Commit

Permalink
Use safeRun to log thread exception. (apache#17484)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Sep 8, 2022
1 parent df9b057 commit 88dd816
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE

@Override
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
topic.getBrokerService().executor().execute(() -> {
topic.getBrokerService().executor().execute(safeRun(() -> {
internalConsumerFlow(consumer, additionalNumberOfMessages);
});
}));
}

private synchronized void internalConsumerFlow(Consumer consumer, int additionalNumberOfMessages) {
Expand All @@ -247,7 +247,7 @@ private synchronized void internalConsumerFlow(Consumer consumer, int additional
*
*/
public void readMoreEntriesAsync() {
topic.getBrokerService().executor().execute(this::readMoreEntries);
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
}

public synchronized void readMoreEntries() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -330,14 +331,14 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
// readMoreEntries should run regardless whether or not stuck is caused by
// stuckConsumers for avoid stopping dispatch.
sendInProgress = false;
topic.getBrokerService().executor().execute(() -> readMoreEntries());
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
} else if (currentThreadKeyNumber == 0) {
sendInProgress = false;
topic.getBrokerService().executor().schedule(() -> {
topic.getBrokerService().executor().schedule(safeRun(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
readMoreEntries();
}
}, 100, TimeUnit.MILLISECONDS);
}), 100, TimeUnit.MILLISECONDS);
}
return false;
}
Expand Down Expand Up @@ -411,7 +412,7 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> en
public void markDeletePositionMoveForward() {
// Execute the notification in different thread to avoid a mutex chain here
// from the delete operation that was completed
topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
topic.getBrokerService().getTopicOrderedExecutor().execute(safeRun(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty()
&& removeConsumersFromRecentJoinedConsumers()) {
Expand All @@ -420,7 +421,7 @@ && removeConsumersFromRecentJoinedConsumers()) {
readMoreEntries();
}
}
});
}));
}

private boolean removeConsumersFromRecentJoinedConsumers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public synchronized void readMoreEntries() {
havePendingReplayRead = false;
// We should not call readMoreEntries() recursively in the same thread
// as there is a risk of StackOverflowError
topic.getBrokerService().executor().execute(() -> readMoreEntries());
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
Expand Down

0 comments on commit 88dd816

Please sign in to comment.