Skip to content

Commit

Permalink
[cleanup][broker] Remove redundant messagesForC check in multi-cons…
Browse files Browse the repository at this point in the history
…umer dispatcher (apache#15969)

### Motivation

In `PersistentDispatcherMultipleConsumers#sendMessagesToConsumers`, it
checks `messagesForC > 0` while `messagesForC` is always greater than 0.

### Modifications

Remove the `if (messagesForC > 0)` check. In addition, call
`entries.subList(start, end)` before handling the replace case to avoid
`subList` being called twice.
  • Loading branch information
BewareMyPower authored Jun 8, 2022
1 parent 3aef61a commit 298a573
Showing 1 changed file with 32 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -557,41 +557,40 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
serviceConfig.getDispatcherMaxRoundRobinBatchSize());
messagesForC = Math.max(messagesForC / avgBatchSizePerMsg, 1);

if (messagesForC > 0) {
int end = Math.min(start + messagesForC, entries.size());
// remove positions first from replay list first : sendMessages recycles entries
if (readType == ReadType.Replay) {
entries.subList(start, end).forEach(entry -> {
redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId());
});
}
int end = Math.min(start + messagesForC, entries.size());
List<Entry> entriesForThisConsumer = entries.subList(start, end);

SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
List<Entry> entriesForThisConsumer = entries.subList(start, end);

EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
totalEntries += filterEntriesForConsumer(Optional.of(metadataArray), start,
entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay, c);

c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);

int msgSent = sendMessageInfo.getTotalMessages();
remainingMessages -= msgSent;
start += messagesForC;
entriesToDispatch -= messagesForC;
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
-(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
if (log.isDebugEnabled()){
log.debug("[{}] Added -({} minus {}) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in "
+ "PersistentDispatcherMultipleConsumers",
name, msgSent, batchIndexesAcks.getTotalAckedIndexCount());
}
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
// remove positions first from replay list first : sendMessages recycles entries
if (readType == ReadType.Replay) {
entriesForThisConsumer.forEach(entry -> {
redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId());
});
}

SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();

EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
totalEntries += filterEntriesForConsumer(Optional.of(metadataArray), start,
entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay, c);

c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);

int msgSent = sendMessageInfo.getTotalMessages();
remainingMessages -= msgSent;
start += messagesForC;
entriesToDispatch -= messagesForC;
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
-(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
if (log.isDebugEnabled()){
log.debug("[{}] Added -({} minus {}) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in "
+ "PersistentDispatcherMultipleConsumers",
name, msgSent, batchIndexesAcks.getTotalAckedIndexCount());
}
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
}

// acquire message-dispatch permits for already delivered messages
Expand Down

0 comments on commit 298a573

Please sign in to comment.