Skip to content

Commit

Permalink
Fix for incorrect values from CompositeByteBuf#component(int) (netty#…
Browse files Browse the repository at this point in the history
…9525)

Motivation

This is a "simpler" alternative to netty#9416 which fixes the same
CompositeByteBuf bugs described there, originally reported by @jingene
in netty#9398.

Modifications
- Add fields to Component class for the original buffer along with its
adjustment, which may be different to the already-stored unwrapped
buffer. Use it in appropriate places to ensure correctness and
equivalent behaviour to that prior to the earlier optimizations
- Add comments explaining purpose of each of the Component fields
- Unwrap more kinds of buffers in newComponent method to extend scope of
the existing indirection-reduction optimization
- De-duplicate common buffer consolidation logic
- Unit test for the original bug provided by @jingene

Result
- Correct behaviour / fixed bugs
- Some code deduplication / simplification
- Unwrapping optimization applied to more types of buffers

The downside is increased mem footprint from the two new fields, and
additional allocations in some specific cases, though those should be
rare.


Co-authored-by: jingene <[email protected]>
  • Loading branch information
2 people authored and normanmaurer committed Sep 2, 2019
1 parent 21b7e29 commit 1039f69
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 76 deletions.
155 changes: 79 additions & 76 deletions buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffe
checkComponentIndex(cIndex);

// No need to consolidate - just add a component to the list.
Component c = newComponent(buffer, 0);
Component c = newComponent(ensureAccessible(buffer), 0);
int readableBytes = c.length();

addComp(cIndex, c);
Expand All @@ -298,24 +298,42 @@ private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffe
}
}

@SuppressWarnings("deprecation")
private Component newComponent(ByteBuf buf, int offset) {
private static ByteBuf ensureAccessible(final ByteBuf buf) {
if (checkAccessible && !buf.isAccessible()) {
throw new IllegalReferenceCountException(0);
}
int srcIndex = buf.readerIndex(), len = buf.readableBytes();
ByteBuf slice = null;
return buf;
}

@SuppressWarnings("deprecation")
private Component newComponent(final ByteBuf buf, final int offset) {
final int srcIndex = buf.readerIndex();
final int len = buf.readableBytes();

// unpeel any intermediate outer layers (UnreleasableByteBuf, LeakAwareByteBufs, SwappedByteBuf)
ByteBuf unwrapped = buf;
int unwrappedIndex = srcIndex;
while (unwrapped instanceof WrappedByteBuf || unwrapped instanceof SwappedByteBuf) {
unwrapped = unwrapped.unwrap();
}

// unwrap if already sliced
if (buf instanceof AbstractUnpooledSlicedByteBuf) {
srcIndex += ((AbstractUnpooledSlicedByteBuf) buf).idx(0);
slice = buf;
buf = buf.unwrap();
} else if (buf instanceof PooledSlicedByteBuf) {
srcIndex += ((PooledSlicedByteBuf) buf).adjustment;
slice = buf;
buf = buf.unwrap();
if (unwrapped instanceof AbstractUnpooledSlicedByteBuf) {
unwrappedIndex += ((AbstractUnpooledSlicedByteBuf) unwrapped).idx(0);
unwrapped = unwrapped.unwrap();
} else if (unwrapped instanceof PooledSlicedByteBuf) {
unwrappedIndex += ((PooledSlicedByteBuf) unwrapped).adjustment;
unwrapped = unwrapped.unwrap();
} else if (unwrapped instanceof DuplicatedByteBuf || unwrapped instanceof PooledDuplicatedByteBuf) {
unwrapped = unwrapped.unwrap();
}
return new Component(buf.order(ByteOrder.BIG_ENDIAN), srcIndex, offset, len, slice);

// We don't need to slice later to expose the internal component if the readable range
// is already the entire buffer
final ByteBuf slice = buf.capacity() == len ? buf : null;

return new Component(buf.order(ByteOrder.BIG_ENDIAN), srcIndex,
unwrapped.order(ByteOrder.BIG_ENDIAN), unwrappedIndex, offset, len, slice);
}

/**
Expand Down Expand Up @@ -353,7 +371,7 @@ private CompositeByteBuf addComponents0(boolean increaseWriterIndex,
if (b == null) {
break;
}
Component c = newComponent(b, nextOffset);
Component c = newComponent(ensureAccessible(b), nextOffset);
components[ci] = c;
nextOffset = c.endOffset;
}
Expand Down Expand Up @@ -450,11 +468,9 @@ public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, Byte
final int toIdx = Math.min(widx, component.endOffset);
final int len = toIdx - fromIdx;
if (len > 0) { // skip empty components
// Note that it's safe to just retain the unwrapped buf here, even in the case
// of PooledSlicedByteBufs - those slices will still be properly released by the
// source Component's free() method.
addComp(componentCount, new Component(
component.buf.retain(), component.idx(fromIdx), newOffset, len, null));
component.srcBuf.retain(), component.srcIdx(fromIdx),
component.buf, component.idx(fromIdx), newOffset, len, null));
}
if (widx == toIdx) {
break;
Expand Down Expand Up @@ -522,18 +538,7 @@ private void consolidateIfNeeded() {
// operation.
int size = componentCount;
if (size > maxNumComponents) {
final int capacity = components[size - 1].endOffset;

ByteBuf consolidated = allocBuffer(capacity);
lastAccessed = null;

// We're not using foreach to avoid creating an iterator.
for (int i = 0; i < size; i ++) {
components[i].transferTo(consolidated);
}

components[0] = new Component(consolidated, 0, 0, capacity, consolidated);
removeCompRange(1, size);
consolidate0(0, size);
}
}

Expand Down Expand Up @@ -1682,20 +1687,7 @@ public ByteBuffer[] nioBuffers(int index, int length) {
*/
public CompositeByteBuf consolidate() {
ensureAccessible();
final int numComponents = componentCount;
if (numComponents <= 1) {
return this;
}

final int capacity = components[numComponents - 1].endOffset;
final ByteBuf consolidated = allocBuffer(capacity);

for (int i = 0; i < numComponents; i ++) {
components[i].transferTo(consolidated);
}
lastAccessed = null;
components[0] = new Component(consolidated, 0, 0, capacity, consolidated);
removeCompRange(1, numComponents);
consolidate0(0, componentCount);
return this;
}

Expand All @@ -1707,23 +1699,29 @@ public CompositeByteBuf consolidate() {
*/
public CompositeByteBuf consolidate(int cIndex, int numComponents) {
checkComponentIndex(cIndex, numComponents);
consolidate0(cIndex, numComponents);
return this;
}

private void consolidate0(int cIndex, int numComponents) {
if (numComponents <= 1) {
return this;
return;
}

final int endCIndex = cIndex + numComponents;
final Component last = components[endCIndex - 1];
final int capacity = last.endOffset - components[cIndex].offset;
final int startOffset = cIndex != 0 ? components[cIndex].offset : 0;
final int capacity = components[endCIndex - 1].endOffset - startOffset;
final ByteBuf consolidated = allocBuffer(capacity);

for (int i = cIndex; i < endCIndex; i ++) {
components[i].transferTo(consolidated);
}
lastAccessed = null;
removeCompRange(cIndex + 1, endCIndex);
components[cIndex] = new Component(consolidated, 0, 0, capacity, consolidated);
updateComponentOffsets(cIndex);
return this;
components[cIndex] = newComponent(consolidated, 0);
if (cIndex != 0 || numComponents != componentCount) {
updateComponentOffsets(cIndex);
}
}

/**
Expand Down Expand Up @@ -1811,6 +1809,7 @@ public CompositeByteBuf discardReadBytes() {
int trimmedBytes = readerIndex - c.offset;
c.offset = 0;
c.endOffset -= readerIndex;
c.srcAdjustment += readerIndex;
c.adjustment += readerIndex;
ByteBuf slice = c.slice;
if (slice != null) {
Expand Down Expand Up @@ -1844,21 +1843,32 @@ public String toString() {
}

private static final class Component {
final ByteBuf buf;
int adjustment;
int offset;
int endOffset;
final ByteBuf srcBuf; // the originally added buffer
final ByteBuf buf; // srcBuf unwrapped zero or more times

int srcAdjustment; // index of the start of this CompositeByteBuf relative to srcBuf
int adjustment; // index of the start of this CompositeByteBuf relative to buf

int offset; // offset of this component within this CompositeByteBuf
int endOffset; // end offset of this component within this CompositeByteBuf

private ByteBuf slice; // cached slice, may be null

Component(ByteBuf buf, int srcOffset, int offset, int len, ByteBuf slice) {
Component(ByteBuf srcBuf, int srcOffset, ByteBuf buf, int bufOffset,
int offset, int len, ByteBuf slice) {
this.srcBuf = srcBuf;
this.srcAdjustment = srcOffset - offset;
this.buf = buf;
this.adjustment = bufOffset - offset;
this.offset = offset;
this.endOffset = offset + len;
this.adjustment = srcOffset - offset;
this.slice = slice;
}

int srcIdx(int index) {
return index + srcAdjustment;
}

int idx(int index) {
return index + adjustment;
}
Expand All @@ -1870,6 +1880,7 @@ int length() {
void reposition(int newOffset) {
int move = newOffset - offset;
endOffset += move;
srcAdjustment -= move;
adjustment -= move;
offset = newOffset;
}
Expand All @@ -1881,35 +1892,27 @@ void transferTo(ByteBuf dst) {
}

ByteBuf slice() {
return slice != null ? slice : (slice = buf.slice(idx(offset), length()));
ByteBuf s = slice;
if (s == null) {
slice = s = srcBuf.slice(srcIdx(offset), length());
}
return s;
}

ByteBuf duplicate() {
return buf.duplicate().setIndex(idx(offset), idx(endOffset));
return srcBuf.duplicate();
}

ByteBuffer internalNioBuffer(int index, int length) {
// We must not return the unwrapped buffer's internal buffer
// if it was originally added as a slice - this check of the
// slice field is threadsafe since we only care whether it
// was set upon Component construction, and we aren't
// attempting to access the referenced slice itself
return slice != null ? buf.nioBuffer(idx(index), length)
: buf.internalNioBuffer(idx(index), length);
// Some buffers override this so we must use srcBuf
return srcBuf.internalNioBuffer(srcIdx(index), length);
}

void free() {
// Release the slice if present since it may have a different
// refcount to the unwrapped buf if it is a PooledSlicedByteBuf
ByteBuf buffer = slice;
if (buffer != null) {
buffer.release();
} else {
buf.release();
}
// null out in either case since it could be racy if set lazily (but not
// in the case we care about, where it will have been set in the ctor)
slice = null;
// Release the original buffer since it may have a different
// refcount to the unwrapped buf (e.g. if PooledSlicedByteBuf)
srcBuf.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
*/
package io.netty.buffer;

import static io.netty.buffer.Unpooled.*;
import static org.junit.Assert.*;

import org.junit.Test;

import io.netty.util.CharsetUtil;
import io.netty.util.ResourceLeakTracker;

public class AdvancedLeakAwareByteBufTest extends SimpleLeakAwareByteBufTest {
Expand All @@ -28,4 +34,21 @@ protected Class<? extends ByteBuf> leakClass() {
protected SimpleLeakAwareByteBuf wrap(ByteBuf buffer, ResourceLeakTracker<ByteBuf> tracker) {
return new AdvancedLeakAwareByteBuf(buffer, tracker);
}

@Test
public void testAddComponentWithLeakAwareByteBuf() {
NoopResourceLeakTracker<ByteBuf> tracker = new NoopResourceLeakTracker<ByteBuf>();

ByteBuf buffer = wrappedBuffer("hello world".getBytes(CharsetUtil.US_ASCII)).slice(6, 5);
ByteBuf leakAwareBuf = wrap(buffer, tracker);

CompositeByteBuf composite = compositeBuffer();
composite.addComponent(true, leakAwareBuf);
byte[] result = new byte[5];
ByteBuf bb = composite.component(0);
System.out.println(bb);
bb.readBytes(result);
assertArrayEquals("world".getBytes(CharsetUtil.US_ASCII), result);
composite.release();
}
}

0 comments on commit 1039f69

Please sign in to comment.