Skip to content

Commit

Permalink
[Java] Expose reservedValue in data frame header to be set with the B…
Browse files Browse the repository at this point in the history
…ufferClaim and read in the callback of the fragment handler.
  • Loading branch information
mjpt777 committed Apr 22, 2016
1 parent bda8b5d commit 6987137
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 11 deletions.
3 changes: 3 additions & 0 deletions aeron-client/src/main/java/io/aeron/AssemblyHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import static io.aeron.logbuffer.FrameDescriptor.UNFRAGMENTED;

/**
* Extends the base header to allow for a message to be reassembled from fragmented frames.
*/
public class AssemblyHeader extends Header
{
private int frameLength;
Expand Down
36 changes: 33 additions & 3 deletions aeron-client/src/main/java/io/aeron/logbuffer/BufferClaim.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import java.nio.ByteOrder;

import static io.aeron.protocol.DataHeaderFlyweight.HEADER_LENGTH;
import static io.aeron.protocol.DataHeaderFlyweight.RESERVED_VALUE_OFFSET;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static io.aeron.protocol.HeaderFlyweight.FRAME_LENGTH_FIELD_OFFSET;
import static io.aeron.protocol.HeaderFlyweight.HDR_TYPE_PAD;
Expand Down Expand Up @@ -68,7 +70,7 @@ public MutableDirectBuffer buffer()
*/
public int offset()
{
return DataHeaderFlyweight.HEADER_LENGTH;
return HEADER_LENGTH;
}

/**
Expand All @@ -78,7 +80,35 @@ public int offset()
*/
public int length()
{
return buffer.capacity() - DataHeaderFlyweight.HEADER_LENGTH;
return buffer.capacity() - HEADER_LENGTH;
}

/**
* Get the value stored in the reserve space at the end of a data frame header.
*
* Note: The value is in {@link ByteOrder#LITTLE_ENDIAN} format.
*
* @return the value stored in the reserve space at the end of a data frame header.
* @see DataHeaderFlyweight
*/
public long reservedValue()
{
return buffer.getLong(RESERVED_VALUE_OFFSET, LITTLE_ENDIAN);
}

/**
* Write the provided value into the reserved space at the end of the data frame header.
*
* Note: The value will be written in {@link ByteOrder#LITTLE_ENDIAN} format.
*
* @param value to be stored in the reserve space at the end of a data frame header.
* @return this for fluent API semantics.
* @see DataHeaderFlyweight
*/
public BufferClaim reservedValue(final long value)
{
buffer.putLong(RESERVED_VALUE_OFFSET, value, LITTLE_ENDIAN);
return this;
}

/**
Expand All @@ -96,7 +126,7 @@ public void commit()
}

/**
* Abort a claim of the message space to the log buffer so that log can progress ignoring this claim.
* Abort a claim of the message space to the log buffer so that the log can progress by ignoring this claim.
*/
public void abort()
{
Expand Down
28 changes: 23 additions & 5 deletions aeron-client/src/main/java/io/aeron/logbuffer/Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
import io.aeron.protocol.DataHeaderFlyweight;
import org.agrona.concurrent.UnsafeBuffer;

import java.nio.ByteOrder;

import static io.aeron.protocol.DataHeaderFlyweight.*;
import static io.aeron.protocol.HeaderFlyweight.FLAGS_FIELD_OFFSET;
import static io.aeron.protocol.HeaderFlyweight.TYPE_FIELD_OFFSET;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static io.aeron.logbuffer.LogBufferDescriptor.computePosition;

Expand Down Expand Up @@ -157,7 +162,7 @@ public int frameLength()
*/
public final int sessionId()
{
return buffer.getInt(offset + DataHeaderFlyweight.SESSION_ID_FIELD_OFFSET, LITTLE_ENDIAN);
return buffer.getInt(offset + SESSION_ID_FIELD_OFFSET, LITTLE_ENDIAN);
}

/**
Expand All @@ -167,7 +172,7 @@ public final int sessionId()
*/
public final int streamId()
{
return buffer.getInt(offset + DataHeaderFlyweight.STREAM_ID_FIELD_OFFSET, LITTLE_ENDIAN);
return buffer.getInt(offset + STREAM_ID_FIELD_OFFSET, LITTLE_ENDIAN);
}

/**
Expand All @@ -177,7 +182,7 @@ public final int streamId()
*/
public final int termId()
{
return buffer.getInt(offset + DataHeaderFlyweight.TERM_ID_FIELD_OFFSET, LITTLE_ENDIAN);
return buffer.getInt(offset + TERM_ID_FIELD_OFFSET, LITTLE_ENDIAN);
}

/**
Expand All @@ -197,7 +202,7 @@ public int termOffset()
*/
public final int type()
{
return buffer.getShort(offset + DataHeaderFlyweight.TYPE_FIELD_OFFSET, LITTLE_ENDIAN) & 0xFFFF;
return buffer.getShort(offset + TYPE_FIELD_OFFSET, LITTLE_ENDIAN) & 0xFFFF;
}

/**
Expand All @@ -209,6 +214,19 @@ public final int type()
*/
public byte flags()
{
return buffer.getByte(offset + DataHeaderFlyweight.FLAGS_FIELD_OFFSET);
return buffer.getByte(offset + FLAGS_FIELD_OFFSET);
}

/**
* Get the value stored in the reserve space at the end of a data frame header.
*
* Note: The value is in {@link ByteOrder#LITTLE_ENDIAN} format.
*
* @return the value stored in the reserve space at the end of a data frame header.
* @see DataHeaderFlyweight
*/
public long reservedValue()
{
return buffer.getLong(offset + RESERVED_VALUE_OFFSET, LITTLE_ENDIAN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;

@RunWith(Theories.class)
public class AbortedMessageTest
public class BufferClaimMessageTest
{
@DataPoint
public static final String UNICAST_CHANNEL = "aeron:udp?endpoint=localhost:54325";
Expand All @@ -47,7 +49,7 @@ public class AbortedMessageTest

@Theory
@Test(timeout = 10000)
public void shouldReceivePublishedMessage(final String channel) throws Exception
public void shouldReceivePublishedMessageWithInterleavedAbort(final String channel) throws Exception
{
final BufferClaim bufferClaim = new BufferClaim();
final UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[MESSAGE_LENGTH]);
Expand Down Expand Up @@ -86,7 +88,48 @@ public void shouldReceivePublishedMessage(final String channel) throws Exception
}
}

public static void publishMessage(final UnsafeBuffer srcBuffer, final Publication publication)
@Theory
@Test(timeout = 10000)
public void shouldTransferReservedValue(final String channel) throws Exception
{
final BufferClaim bufferClaim = new BufferClaim();
final MediaDriver.Context ctx = new MediaDriver.Context();

try (final MediaDriver ignore = MediaDriver.launch(ctx);
final Aeron aeron = Aeron.connect();
final Publication publication = aeron.addPublication(channel, STREAM_ID);
final Subscription subscription = aeron.addSubscription(channel, STREAM_ID))
{
while (publication.tryClaim(MESSAGE_LENGTH, bufferClaim) < 0L)
{
Thread.yield();
}

final long reservedValue = 1234567L;
bufferClaim.reservedValue(reservedValue);
bufferClaim.commit();

final boolean[] done = new boolean[1];
while (!done[0])
{
subscription.poll(
(buffer, offset, length, header) ->
{
assertThat(length, is(MESSAGE_LENGTH));
assertThat(header.reservedValue(), is(reservedValue));

done[0] = true;
},
FRAGMENT_COUNT_LIMIT);
}
}
finally
{
ctx.deleteAeronDirectory();
}
}

private static void publishMessage(final UnsafeBuffer srcBuffer, final Publication publication)
{
while (publication.offer(srcBuffer, 0, MESSAGE_LENGTH) < 0L)
{
Expand Down

0 comments on commit 6987137

Please sign in to comment.