Skip to content

Commit

Permalink
[fix][clients]Check pendingIndividualBatchIndexAcks size in doIndivid…
Browse files Browse the repository at this point in the history
…ualBatchAckAsync (apache#15877)

* Check pendingIndividualBatchIndexAcks size and flush in doIndividualBatchAckAsync

* put logic into method doIndividualBatchAck
  • Loading branch information
gaozhangmin authored Jun 10, 2022
1 parent 6722417 commit de9928a
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,15 @@ private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl batchMes
return this.currentIndividualAckFuture;
} finally {
this.lock.readLock().unlock();
if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
}
} else {
doIndividualBatchAckAsync(batchMessageId);
if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
return CompletableFuture.completedFuture(null);
}
}
Expand All @@ -337,15 +343,9 @@ private CompletableFuture<Void> doCumulativeAck(MessageIdImpl messageId, Map<Str
return this.currentCumulativeAckFuture;
} finally {
this.lock.readLock().unlock();
if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
}
} else {
doCumulativeAckAsync(messageId, bitSet);
if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
return CompletableFuture.completedFuture(null);
}
}
Expand Down

0 comments on commit de9928a

Please sign in to comment.