diff --git a/aeron-client/src/main/java/io/aeron/AssemblyHeader.java b/aeron-client/src/main/java/io/aeron/AssemblyHeader.java deleted file mode 100644 index a8216f9a9a..0000000000 --- a/aeron-client/src/main/java/io/aeron/AssemblyHeader.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2015 - 2016 Real Logic Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.aeron; - -import io.aeron.logbuffer.Header; -import io.aeron.protocol.DataHeaderFlyweight; - -import static io.aeron.logbuffer.FrameDescriptor.UNFRAGMENTED; - -/** - * Extends the base header to allow for a message to be reassembled from fragmented frames. - */ -public class AssemblyHeader extends Header -{ - private int frameLength; - - public AssemblyHeader reset(final Header base, final int msgLength) - { - positionBitsToShift(base.positionBitsToShift()); - initialTermId(base.initialTermId()); - offset(base.offset()); - buffer(base.buffer()); - frameLength = msgLength + DataHeaderFlyweight.HEADER_LENGTH; - - return this; - } - - public int frameLength() - { - return frameLength; - } - - public byte flags() - { - return (byte)(super.flags() | UNFRAGMENTED); - } - - public int termOffset() - { - return offset() - (frameLength - super.frameLength()); - } -} diff --git a/aeron-client/src/main/java/io/aeron/ControlledFragmentAssembler.java b/aeron-client/src/main/java/io/aeron/ControlledFragmentAssembler.java index 0e40473e90..6250e177a6 100644 --- a/aeron-client/src/main/java/io/aeron/ControlledFragmentAssembler.java +++ b/aeron-client/src/main/java/io/aeron/ControlledFragmentAssembler.java @@ -38,7 +38,6 @@ public class ControlledFragmentAssembler implements ControlledFragmentHandler { private final ControlledFragmentHandler delegate; - private final AssemblyHeader assemblyHeader = new AssemblyHeader(); private final Int2ObjectHashMap builderBySessionIdMap = new Int2ObjectHashMap<>(); private final IntFunction builderFunc; @@ -100,7 +99,7 @@ public Action onFragment(final DirectBuffer buffer, final int offset, final int if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG) { final int msgLength = builder.limit(); - action = delegate.onFragment(builder.buffer(), 0, msgLength, assemblyHeader.reset(header, msgLength)); + action = delegate.onFragment(builder.buffer(), 0, msgLength, header); if (Action.ABORT == action) { diff --git a/aeron-client/src/main/java/io/aeron/FragmentAssembler.java b/aeron-client/src/main/java/io/aeron/FragmentAssembler.java index bc736fe200..1b1aab21dc 100644 --- a/aeron-client/src/main/java/io/aeron/FragmentAssembler.java +++ b/aeron-client/src/main/java/io/aeron/FragmentAssembler.java @@ -38,7 +38,6 @@ public class FragmentAssembler implements FragmentHandler { private final FragmentHandler delegate; - private final AssemblyHeader assemblyHeader = new AssemblyHeader(); private final Int2ObjectHashMap builderBySessionIdMap = new Int2ObjectHashMap<>(); private final IntFunction builderFunc; @@ -97,7 +96,7 @@ public void onFragment(final DirectBuffer buffer, final int offset, final int le if ((flags & END_FRAG_FLAG) == END_FRAG_FLAG) { final int msgLength = builder.limit(); - delegate.onFragment(builder.buffer(), 0, msgLength, assemblyHeader.reset(header, msgLength)); + delegate.onFragment(builder.buffer(), 0, msgLength, header); builder.reset(); } } diff --git a/aeron-client/src/main/java/io/aeron/Image.java b/aeron-client/src/main/java/io/aeron/Image.java index 110e72e187..ee7b1b880e 100644 --- a/aeron-client/src/main/java/io/aeron/Image.java +++ b/aeron-client/src/main/java/io/aeron/Image.java @@ -89,7 +89,7 @@ public Image( final int termLength = logBuffers.termLength(); this.termLengthMask = termLength - 1; this.positionBitsToShift = Integer.numberOfTrailingZeros(termLength); - header = new Header(LogBufferDescriptor.initialTermId(buffers[LOG_META_DATA_SECTION_INDEX]), termLength); + header = new Header(LogBufferDescriptor.initialTermId(buffers[LOG_META_DATA_SECTION_INDEX]), positionBitsToShift); } /** diff --git a/aeron-client/src/main/java/io/aeron/logbuffer/Header.java b/aeron-client/src/main/java/io/aeron/logbuffer/Header.java index fe62392cca..d563111694 100644 --- a/aeron-client/src/main/java/io/aeron/logbuffer/Header.java +++ b/aeron-client/src/main/java/io/aeron/logbuffer/Header.java @@ -48,13 +48,13 @@ protected Header() /** * Construct a header that references a buffer for the log. * - * @param initialTermId this stream started at. - * @param termCapacity for each term in the log buffer. + * @param initialTermId this stream started at. + * @param positionBitsToShift for calculating positions. */ - public Header(final int initialTermId, final int termCapacity) + public Header(final int initialTermId, final int positionBitsToShift) { this.initialTermId = initialTermId; - this.positionBitsToShift = Integer.numberOfTrailingZeros(termCapacity); + this.positionBitsToShift = positionBitsToShift; } /** @@ -68,26 +68,6 @@ public final long position() return computePosition(termId(), resultingOffset, positionBitsToShift, initialTermId); } - /** - * Get the number of bits the number of terms need to be shifted to get the position. - * - * @return the number of bits the number of terms need to be shifted to get the position. - */ - public final int positionBitsToShift() - { - return positionBitsToShift; - } - - /** - * Set the number of bits the number of terms need to be shifted to get the position. - * - * @param positionBitsToShift the number of bits the number of terms need to be shifted to get the position. - */ - public final void positionBitsToShift(final int positionBitsToShift) - { - this.positionBitsToShift = positionBitsToShift; - } - /** * Get the initial term id this stream started at. * @@ -222,7 +202,7 @@ public byte flags() /** * Get the value stored in the reserve space at the end of a data frame header. - * + *

* Note: The value is in {@link ByteOrder#LITTLE_ENDIAN} format. * * @return the value stored in the reserve space at the end of a data frame header. diff --git a/aeron-client/src/test/java/io/aeron/FragmentAssemblerTest.java b/aeron-client/src/test/java/io/aeron/FragmentAssemblerTest.java index 2b654b1740..827dcf7602 100644 --- a/aeron-client/src/test/java/io/aeron/FragmentAssemblerTest.java +++ b/aeron-client/src/test/java/io/aeron/FragmentAssemblerTest.java @@ -92,7 +92,7 @@ public void shouldAssembleTwoPartMessage() final Header capturedHeader = headerArg.getValue(); assertThat(capturedHeader.sessionId(), is(SESSION_ID)); - assertThat(capturedHeader.flags(), is(FrameDescriptor.UNFRAGMENTED)); + assertThat(capturedHeader.flags(), is(FrameDescriptor.END_FRAG_FLAG)); } @Test @@ -132,7 +132,7 @@ public void shouldAssembleFourPartMessage() final Header capturedHeader = headerArg.getValue(); assertThat(capturedHeader.sessionId(), is(SESSION_ID)); - assertThat(capturedHeader.flags(), is(FrameDescriptor.UNFRAGMENTED)); + assertThat(capturedHeader.flags(), is(FrameDescriptor.END_FRAG_FLAG)); } @Test diff --git a/aeron-system-tests/src/test/java/io/aeron/FragmentedMessageTest.java b/aeron-system-tests/src/test/java/io/aeron/FragmentedMessageTest.java index cceba1f638..c790373af6 100644 --- a/aeron-system-tests/src/test/java/io/aeron/FragmentedMessageTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/FragmentedMessageTest.java @@ -15,23 +15,24 @@ */ package io.aeron; -import io.aeron.driver.MediaDriver; import org.junit.Test; import org.junit.experimental.theories.DataPoint; import org.junit.experimental.theories.Theories; import org.junit.experimental.theories.Theory; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; +import org.agrona.concurrent.UnsafeBuffer; +import io.aeron.driver.MediaDriver; import io.aeron.driver.ThreadingMode; import io.aeron.logbuffer.FragmentHandler; import io.aeron.logbuffer.Header; -import org.agrona.concurrent.UnsafeBuffer; +import static io.aeron.logbuffer.FrameDescriptor.END_FRAG_FLAG; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.*; -import static io.aeron.logbuffer.FrameDescriptor.UNFRAGMENTED; + @RunWith(Theories.class) public class FragmentedMessageTest @@ -104,7 +105,7 @@ public void shouldReceivePublishedMessage(final String channel, final ThreadingM assertThat("same at i=" + i, capturedBuffer.getByte(i), is(srcBuffer.getByte(i))); } - assertThat(headerArg.getValue().flags(), is(UNFRAGMENTED)); + assertThat(headerArg.getValue().flags(), is(END_FRAG_FLAG)); } finally {