Skip to content

Commit

Permalink
Fix the out of index issue when dispatch messages based on the avgBat…
Browse files Browse the repository at this point in the history
…chSizePerMsg. (apache#10828)

Using the avgBatchSizePerMsg to calculate the entries might over the remaining entries
The fix is use Math.min(start + messagesForC, entries.size()) to avoid out of index exception
  • Loading branch information
codelipenghui authored Jun 4, 2021
1 parent 4f23767 commit e3cfbf8
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -519,16 +519,16 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
messagesForC = Math.max(messagesForC / avgBatchSizePerMsg, 1);

if (messagesForC > 0) {

int end = Math.min(start + messagesForC, entries.size());
// remove positions first from replay list first : sendMessages recycles entries
if (readType == ReadType.Replay) {
entries.subList(start, start + messagesForC).forEach(entry -> {
entries.subList(start, end).forEach(entry -> {
messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId());
});
}

SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
List<Entry> entriesForThisConsumer = entries.subList(start, start + messagesForC);
List<Entry> entriesForThisConsumer = entries.subList(start, end);

EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
Expand Down

0 comments on commit e3cfbf8

Please sign in to comment.