Skip to content

Commit

Permalink
[FLINK-8736][network] fix memory segment offsets for slices of slices…
Browse files Browse the repository at this point in the history
… being wrong

This closes apache#5551.
  • Loading branch information
Nico Kruber authored and tillrohrmann committed Feb 27, 2018
1 parent 6597e67 commit 9fb1c23
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
*/
public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implements Buffer {

private final int index;
private final int memorySegmentOffset;

/**
* Creates a buffer which shares the memory segment of the given buffer and exposed the given
Expand All @@ -53,7 +53,7 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement
*/
ReadOnlySlicedNetworkBuffer(NetworkBuffer buffer, int index, int length) {
super(new SlicedByteBuf(buffer, index, length));
this.index = index;
this.memorySegmentOffset = buffer.getMemorySegmentOffset() + index;
}

/**
Expand All @@ -66,10 +66,11 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement
* @param buffer the buffer to derive from
* @param index the index to start from
* @param length the length of the slice
* @param memorySegmentOffset <tt>buffer</tt>'s absolute offset in the backing {@link MemorySegment}
*/
private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int index, int length) {
private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int index, int length, int memorySegmentOffset) {
super(new SlicedByteBuf(buffer, index, length));
this.index = index;
this.memorySegmentOffset = memorySegmentOffset + index;
}

@Override
Expand Down Expand Up @@ -102,7 +103,7 @@ public MemorySegment getMemorySegment() {

@Override
public int getMemorySegmentOffset() {
return ((Buffer) unwrap()).getMemorySegmentOffset() + index;
return memorySegmentOffset;
}

@Override
Expand Down Expand Up @@ -133,7 +134,7 @@ public ReadOnlySlicedNetworkBuffer readOnlySlice() {

@Override
public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length) {
return new ReadOnlySlicedNetworkBuffer(super.unwrap(), index, length);
return new ReadOnlySlicedNetworkBuffer(super.unwrap(), index, length, memorySegmentOffset);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
Expand All @@ -50,8 +51,10 @@ public class ReadOnlySlicedBufferTest {
@Before
public void setUp() throws Exception {
final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE);
buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, true, DATA_SIZE);
buffer.setSize(DATA_SIZE);
buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, true, 0);
for (int i = 0; i < DATA_SIZE; ++i) {
buffer.writeByte(i);
}
}

@Test
Expand Down Expand Up @@ -137,34 +140,64 @@ public void testForwardsRetainBuffer2() {

@Test
public void testCreateSlice1() {
buffer.readByte(); // so that we do not start at position 0
ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice();
buffer.readByte(); // should not influence the second slice at all
ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice();
ByteBuf unwrap = slice2.unwrap();
assertSame(buffer, unwrap);
assertSame(slice1.getMemorySegment(), slice2.getMemorySegment());
assertEquals(1, slice1.getMemorySegmentOffset());
assertEquals(slice1.getMemorySegmentOffset(), slice2.getMemorySegmentOffset());

assertReadableBytes(slice1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
assertReadableBytes(slice2, 1, 2, 3, 4, 5, 6, 7, 8, 9);
}

@Test
public void testCreateSlice2() {
buffer.readByte(); // so that we do not start at position 0
ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice();
buffer.readByte(); // should not influence the second slice at all
ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice(1, 2);
ByteBuf unwrap = slice2.unwrap();
assertSame(buffer, unwrap);
assertSame(slice1.getMemorySegment(), slice2.getMemorySegment());
assertEquals(1, slice1.getMemorySegmentOffset());
assertEquals(2, slice2.getMemorySegmentOffset());

assertReadableBytes(slice1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
assertReadableBytes(slice2, 2, 3);
}

@Test
public void testCreateSlice3() {
ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice(1, 2);
buffer.readByte(); // should not influence the second slice at all
ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice();
ByteBuf unwrap = slice2.unwrap();
assertSame(buffer, unwrap);
assertSame(slice1.getMemorySegment(), slice2.getMemorySegment());
assertEquals(1, slice1.getMemorySegmentOffset());
assertEquals(1, slice2.getMemorySegmentOffset());

assertReadableBytes(slice1, 1, 2);
assertReadableBytes(slice2, 1, 2);
}

@Test
public void testCreateSlice4() {
ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice(1, 5);
buffer.readByte(); // should not influence the second slice at all
ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice(1, 2);
ByteBuf unwrap = slice2.unwrap();
assertSame(buffer, unwrap);
assertSame(slice1.getMemorySegment(), slice2.getMemorySegment());
assertEquals(1, slice1.getMemorySegmentOffset());
assertEquals(2, slice2.getMemorySegmentOffset());

assertReadableBytes(slice1, 1, 2, 3, 4, 5);
assertReadableBytes(slice2, 2, 3);
}

@Test
Expand Down Expand Up @@ -323,4 +356,26 @@ private void testForwardsSetAllocator(ReadOnlySlicedNetworkBuffer slice) {
assertSame(buffer.alloc(), slice.alloc());
assertSame(allocator, slice.alloc());
}

private static void assertReadableBytes(Buffer actualBuffer, int... expectedBytes) {
ByteBuffer actualBytesBuffer = actualBuffer.getNioBufferReadable();
int[] actual = new int[actualBytesBuffer.limit()];
for (int i = 0; i < actual.length; ++i) {
actual[i] = actualBytesBuffer.get();
}
assertArrayEquals(expectedBytes, actual);

// verify absolutely positioned read method:
ByteBuf buffer = (ByteBuf) actualBuffer;
for (int i = 0; i < buffer.readableBytes(); ++i) {
actual[i] = buffer.getByte(buffer.readerIndex() + i);
}
assertArrayEquals(expectedBytes, actual);

// verify relatively positioned read method:
for (int i = 0; i < buffer.readableBytes(); ++i) {
actual[i] = buffer.readByte();
}
assertArrayEquals(expectedBytes, actual);
}
}

0 comments on commit 9fb1c23

Please sign in to comment.