diff --git a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java index be62441120..e28886779c 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java +++ b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java @@ -230,11 +230,11 @@ void senderPositionLimit(final long positionLimit) public void resend(final int termId, int termOffset, final int length) { final long senderPosition = this.senderPosition.get(); - final int activeTermId = computeTermIdFromPosition(senderPosition, positionBitsToShift, initialTermId); + final long resendPosition = computePosition(termId, termOffset, positionBitsToShift, initialTermId); - if (termId == activeTermId || termId == (activeTermId - 1)) + if (resendPosition < senderPosition && resendPosition >= (senderPosition - rawLog.termLength())) { - final int activeIndex = indexByTerm(initialTermId, termId); + final int activeIndex = indexByPosition(resendPosition, positionBitsToShift); final UnsafeBuffer termBuffer = termBuffers[activeIndex]; final ByteBuffer sendBuffer = sendBuffers[activeIndex]; @@ -308,7 +308,7 @@ int updatePublishersLimit() if (publisherLimit.proposeMaxOrdered(candidatePublisherLimit)) { - cleanBuffer(candidatePublisherLimit - termWindowLength); + cleanBuffer(candidatePublisherLimit); workCount = 1; } @@ -472,16 +472,18 @@ private boolean isUnreferencedAndPotentiallyInactive(final long now) return result; } - private void cleanBuffer(final long minConsumerPosition) + private void cleanBuffer(final long publisherLimit) { final long cleanedToPosition = this.cleanedToPosition; + final long dirtyRange = publisherLimit - cleanedToPosition; final int bufferCapacity = termLengthMask + 1; - final long nakRange = minConsumerPosition - cleanedToPosition; - if (nakRange > bufferCapacity) + final int activeRange = bufferCapacity * 2; + + if (dirtyRange > activeRange) { final UnsafeBuffer dirtyTerm = termBuffers[indexByPosition(cleanedToPosition, positionBitsToShift)]; final int termOffset = (int)cleanedToPosition & termLengthMask; - final int bytesForCleaning = (int)(nakRange - bufferCapacity); + final int bytesForCleaning = (int)(dirtyRange - activeRange); final int length = Math.min(bytesForCleaning, bufferCapacity - termOffset); dirtyTerm.setMemory(termOffset, length, (byte)0);