Skip to content

Commit

Permalink
[Java] Check resend of a NAK is within one term length behind the sen…
Browse files Browse the repository at this point in the history
…der position. Also clean incrementally from 2 term lengths behind publication limit for the NetworkPublication. Issue aeron-io#257.
  • Loading branch information
mjpt777 committed Jul 10, 2016
1 parent b7a0be3 commit b882e71
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,11 @@ void senderPositionLimit(final long positionLimit)
public void resend(final int termId, int termOffset, final int length)
{
final long senderPosition = this.senderPosition.get();
final int activeTermId = computeTermIdFromPosition(senderPosition, positionBitsToShift, initialTermId);
final long resendPosition = computePosition(termId, termOffset, positionBitsToShift, initialTermId);

if (termId == activeTermId || termId == (activeTermId - 1))
if (resendPosition < senderPosition && resendPosition >= (senderPosition - rawLog.termLength()))
{
final int activeIndex = indexByTerm(initialTermId, termId);
final int activeIndex = indexByPosition(resendPosition, positionBitsToShift);
final UnsafeBuffer termBuffer = termBuffers[activeIndex];
final ByteBuffer sendBuffer = sendBuffers[activeIndex];

Expand Down Expand Up @@ -308,7 +308,7 @@ int updatePublishersLimit()

if (publisherLimit.proposeMaxOrdered(candidatePublisherLimit))
{
cleanBuffer(candidatePublisherLimit - termWindowLength);
cleanBuffer(candidatePublisherLimit);

workCount = 1;
}
Expand Down Expand Up @@ -472,16 +472,18 @@ private boolean isUnreferencedAndPotentiallyInactive(final long now)
return result;
}

private void cleanBuffer(final long minConsumerPosition)
private void cleanBuffer(final long publisherLimit)
{
final long cleanedToPosition = this.cleanedToPosition;
final long dirtyRange = publisherLimit - cleanedToPosition;
final int bufferCapacity = termLengthMask + 1;
final long nakRange = minConsumerPosition - cleanedToPosition;
if (nakRange > bufferCapacity)
final int activeRange = bufferCapacity * 2;

if (dirtyRange > activeRange)
{
final UnsafeBuffer dirtyTerm = termBuffers[indexByPosition(cleanedToPosition, positionBitsToShift)];
final int termOffset = (int)cleanedToPosition & termLengthMask;
final int bytesForCleaning = (int)(nakRange - bufferCapacity);
final int bytesForCleaning = (int)(dirtyRange - activeRange);
final int length = Math.min(bytesForCleaning, bufferCapacity - termOffset);

dirtyTerm.setMemory(termOffset, length, (byte)0);
Expand Down

0 comments on commit b882e71

Please sign in to comment.