Skip to content

Commit

Permalink
[cleanup][broker] Follow up on apache#16968 to restore some behavior …
Browse files Browse the repository at this point in the history
…in PersistentDispatcherMultipleConsumers class (apache#17018)
  • Loading branch information
mattisonchao authored Aug 10, 2022
1 parent 4b757cf commit abff91f
Showing 1 changed file with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
protected volatile boolean sendInProgress;
protected boolean sendInProgress;
protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
TOTAL_AVAILABLE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
Expand Down Expand Up @@ -248,8 +248,8 @@ private synchronized void internalConsumerFlow(Consumer consumer, int additional
* We should not call readMoreEntries() recursively in the same thread as there is a risk of StackOverflowError.
*
*/
public void readMoreEntiresAsync() {
topic.getBrokerService().executor().execute(() -> readMoreEntries());
public void readMoreEntriesAsync() {
topic.getBrokerService().executor().execute(this::readMoreEntries);
}

public synchronized void readMoreEntries() {
Expand Down Expand Up @@ -295,7 +295,7 @@ public synchronized void readMoreEntries() {
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
readMoreEntiresAsync();
readMoreEntriesAsync();
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -550,14 +550,15 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
// setting sendInProgress here, because sendMessagesToConsumers will be executed
// in a separate thread, and we want to prevent more reads
sendInProgress = true;
dispatchMessagesThread.execute(safeRun(() -> {
if (sendMessagesToConsumers(readType, entries)) {
readMoreEntries();
}
}));
} else {
if (sendMessagesToConsumers(readType, entries)) {
readMoreEntiresAsync();
readMoreEntriesAsync();
}
}
}
Expand Down Expand Up @@ -923,7 +924,7 @@ public void addUnAckedMessages(int numberOfMessages) {
if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
&& BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name);
readMoreEntiresAsync();
readMoreEntriesAsync();
}

int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
Expand All @@ -946,7 +947,7 @@ public void addUnAckedMessages(int numberOfMessages) {
// unblock dispatcher if it acks back enough messages
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked", name);
readMoreEntiresAsync();
readMoreEntriesAsync();
}
}
// increment broker-level count
Expand Down

0 comments on commit abff91f

Please sign in to comment.