Skip to content

Commit

Permalink
[Java] Header FragmentHandler should just be that of the last frame w…
Browse files Browse the repository at this point in the history
…hen used for fragment assembly. This allows for correct resulting position calculations.
  • Loading branch information
mjpt777 committed Apr 23, 2016
1 parent 91995e6 commit ffcb3c7
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 91 deletions.
55 changes: 0 additions & 55 deletions aeron-client/src/main/java/io/aeron/AssemblyHeader.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
public class ControlledFragmentAssembler implements ControlledFragmentHandler
{
private final ControlledFragmentHandler delegate;
private final AssemblyHeader assemblyHeader = new AssemblyHeader();
private final Int2ObjectHashMap<BufferBuilder> builderBySessionIdMap = new Int2ObjectHashMap<>();
private final IntFunction<BufferBuilder> builderFunc;

Expand Down Expand Up @@ -100,7 +99,7 @@ public Action onFragment(final DirectBuffer buffer, final int offset, final int
if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG)
{
final int msgLength = builder.limit();
action = delegate.onFragment(builder.buffer(), 0, msgLength, assemblyHeader.reset(header, msgLength));
action = delegate.onFragment(builder.buffer(), 0, msgLength, header);

if (Action.ABORT == action)
{
Expand Down
3 changes: 1 addition & 2 deletions aeron-client/src/main/java/io/aeron/FragmentAssembler.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
public class FragmentAssembler implements FragmentHandler
{
private final FragmentHandler delegate;
private final AssemblyHeader assemblyHeader = new AssemblyHeader();
private final Int2ObjectHashMap<BufferBuilder> builderBySessionIdMap = new Int2ObjectHashMap<>();
private final IntFunction<BufferBuilder> builderFunc;

Expand Down Expand Up @@ -97,7 +96,7 @@ public void onFragment(final DirectBuffer buffer, final int offset, final int le
if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG)
{
final int msgLength = builder.limit();
delegate.onFragment(builder.buffer(), 0, msgLength, assemblyHeader.reset(header, msgLength));
delegate.onFragment(builder.buffer(), 0, msgLength, header);
builder.reset();
}
}
Expand Down
2 changes: 1 addition & 1 deletion aeron-client/src/main/java/io/aeron/Image.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public Image(
final int termLength = logBuffers.termLength();
this.termLengthMask = termLength - 1;
this.positionBitsToShift = Integer.numberOfTrailingZeros(termLength);
header = new Header(LogBufferDescriptor.initialTermId(buffers[LOG_META_DATA_SECTION_INDEX]), termLength);
header = new Header(LogBufferDescriptor.initialTermId(buffers[LOG_META_DATA_SECTION_INDEX]), positionBitsToShift);
}

/**
Expand Down
30 changes: 5 additions & 25 deletions aeron-client/src/main/java/io/aeron/logbuffer/Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ protected Header()
/**
* Construct a header that references a buffer for the log.
*
* @param initialTermId this stream started at.
* @param termCapacity for each term in the log buffer.
* @param initialTermId this stream started at.
* @param positionBitsToShift for calculating positions.
*/
public Header(final int initialTermId, final int termCapacity)
public Header(final int initialTermId, final int positionBitsToShift)
{
this.initialTermId = initialTermId;
this.positionBitsToShift = Integer.numberOfTrailingZeros(termCapacity);
this.positionBitsToShift = positionBitsToShift;
}

/**
Expand All @@ -68,26 +68,6 @@ public final long position()
return computePosition(termId(), resultingOffset, positionBitsToShift, initialTermId);
}

/**
* Get the number of bits the number of terms need to be shifted to get the position.
*
* @return the number of bits the number of terms need to be shifted to get the position.
*/
public final int positionBitsToShift()
{
return positionBitsToShift;
}

/**
* Set the number of bits the number of terms need to be shifted to get the position.
*
* @param positionBitsToShift the number of bits the number of terms need to be shifted to get the position.
*/
public final void positionBitsToShift(final int positionBitsToShift)
{
this.positionBitsToShift = positionBitsToShift;
}

/**
* Get the initial term id this stream started at.
*
Expand Down Expand Up @@ -222,7 +202,7 @@ public byte flags()

/**
* Get the value stored in the reserve space at the end of a data frame header.
*
* <p>
* Note: The value is in {@link ByteOrder#LITTLE_ENDIAN} format.
*
* @return the value stored in the reserve space at the end of a data frame header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void shouldAssembleTwoPartMessage()

final Header capturedHeader = headerArg.getValue();
assertThat(capturedHeader.sessionId(), is(SESSION_ID));
assertThat(capturedHeader.flags(), is(FrameDescriptor.UNFRAGMENTED));
assertThat(capturedHeader.flags(), is(FrameDescriptor.END_FRAG_FLAG));
}

@Test
Expand Down Expand Up @@ -132,7 +132,7 @@ public void shouldAssembleFourPartMessage()

final Header capturedHeader = headerArg.getValue();
assertThat(capturedHeader.sessionId(), is(SESSION_ID));
assertThat(capturedHeader.flags(), is(FrameDescriptor.UNFRAGMENTED));
assertThat(capturedHeader.flags(), is(FrameDescriptor.END_FRAG_FLAG));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,24 @@
*/
package io.aeron;

import io.aeron.driver.MediaDriver;
import org.junit.Test;
import org.junit.experimental.theories.DataPoint;
import org.junit.experimental.theories.Theories;
import org.junit.experimental.theories.Theory;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.agrona.concurrent.UnsafeBuffer;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import org.agrona.concurrent.UnsafeBuffer;

import static io.aeron.logbuffer.FrameDescriptor.END_FRAG_FLAG;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;
import static io.aeron.logbuffer.FrameDescriptor.UNFRAGMENTED;


@RunWith(Theories.class)
public class FragmentedMessageTest
Expand Down Expand Up @@ -104,7 +105,7 @@ public void shouldReceivePublishedMessage(final String channel, final ThreadingM
assertThat("same at i=" + i, capturedBuffer.getByte(i), is(srcBuffer.getByte(i)));
}

assertThat(headerArg.getValue().flags(), is(UNFRAGMENTED));
assertThat(headerArg.getValue().flags(), is(END_FRAG_FLAG));
}
finally
{
Expand Down

0 comments on commit ffcb3c7

Please sign in to comment.