Skip to content

Commit

Permalink
HTTP/2 WeightedFairQueueByteDistributor Bug
Browse files Browse the repository at this point in the history
Motivation:
If a stream is not able to send any data (flow control window for the stream is exhausted) but has descendants who can send data then WeightedFairQueueByteDistributor may incorrectly modify the pseudo time and also double add the associated state to the parent's priority queue. The pseudo time should only be modified if a node is moved in the priority tree, and not if there happens to be no active streams in its descendent tree and a descendent is moved (e.g. removed from the tree because it wrote all data and the last data frame was EOS). Also the state objects for WeightedFairQueueByteDistributor should only appear once in any queue. If this condition is violated the pseudo time accounting would be biased at and assumptions in WeightedFairQueueByteDistributor would be invalidated.

Modifications:
- WeightedFairQueueByteDistributor#isActiveCountChangeForTree should not allow re-adding to the priority queue if we are currently processing a node in the distribution algorithm. The distribution algorithm will re-evaluate if the node should be re-added on the tail end of the recursion.

Result:
Fixes netty#5980
  • Loading branch information
Scottmitch committed Nov 15, 2016
1 parent c1932a8 commit c4e96d0
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.min;

/**
Expand Down Expand Up @@ -85,7 +84,7 @@ public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParen
if (state.activeCountForTree != 0) {
State pState = state(parent);
pState.offerAndInitializePseudoTime(state);
pState.isActiveCountChangeForTree(state.activeCountForTree);
pState.activeCountChangeForTree(state.activeCountForTree);
}
}
}
Expand All @@ -98,7 +97,7 @@ public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newPare
if (state.activeCountForTree != 0) {
State pState = state(parent);
pState.remove(state);
pState.isActiveCountChangeForTree(-state.activeCountForTree);
pState.activeCountChangeForTree(-state.activeCountForTree);
}
}
}
Expand All @@ -113,8 +112,6 @@ public void updateStreamableBytes(StreamState state) {

@Override
public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
checkNotNull(writer, "writer");

// As long as there is some active frame we should write at least 1 time.
if (connectionState.activeCountForTree == 0) {
return false;
Expand Down Expand Up @@ -146,7 +143,7 @@ public void allocationQuantum(int allocationQuantum) {
}

private int distribute(int maxBytes, Writer writer, State state) throws Http2Exception {
if (state.active) {
if (state.isActive()) {
int nsent = min(maxBytes, state.streamableBytes);
state.write(nsent, writer);
if (nsent == 0 && maxBytes != 0) {
Expand Down Expand Up @@ -176,10 +173,11 @@ private int distributeToChildren(int maxBytes, Writer writer, State state) throw
long oldTotalQueuedWeights = state.totalQueuedWeights;
State childState = state.poll();
State nextChildState = state.peek();
childState.setDistributing();
try {
assert nextChildState == null || nextChildState.pseudoTimeToWrite >= childState.pseudoTimeToWrite :
"nextChildState.pseudoTime(" + nextChildState.pseudoTimeToWrite + ") < " + " childState.pseudoTime(" +
childState.pseudoTimeToWrite + ")";
"nextChildState[" + nextChildState.stream.id() + "].pseudoTime(" + nextChildState.pseudoTimeToWrite +
") < " + " childState[" + childState.stream.id() + "].pseudoTime(" + childState.pseudoTimeToWrite + ")";
int nsent = distribute(nextChildState == null ? maxBytes :
min(maxBytes, (int) min((nextChildState.pseudoTimeToWrite - childState.pseudoTimeToWrite) *
childState.stream.weight() / oldTotalQueuedWeights + allocationQuantum,
Expand All @@ -191,7 +189,8 @@ private int distributeToChildren(int maxBytes, Writer writer, State state) throw
childState.updatePseudoTime(state, nsent, oldTotalQueuedWeights);
return nsent;
} finally {
// Do in finally to ensure the internal state is not corrupted if an exception is thrown.
childState.unsetDistributing();
// Do in finally to ensure the internal flags is not corrupted if an exception is thrown.
// The offer operation is delayed until we unroll up the recursive stack, so we don't have to remove from
// the priority queue due to a write operation.
if (childState.activeCountForTree != 0) {
Expand All @@ -215,11 +214,13 @@ int streamableBytes0(Http2Stream stream) {
* The remote flow control state for a single stream.
*/
private final class State implements PriorityQueueNode<State> {
private static final int STATE_IS_ACTIVE = 0x1;
private static final int STATE_IS_DISTRIBUTING = 0x2;
final Http2Stream stream;
private final Queue<State> queue;
int streamableBytes;
/**
* Count of nodes rooted at this sub tree with {@link #active} equal to {@code true}.
* Count of nodes rooted at this sub tree with {@link #isActive()} equal to {@code true}.
*/
int activeCountForTree;
private int priorityQueueIndex = INDEX_NOT_IN_QUEUE;
Expand All @@ -228,11 +229,11 @@ private final class State implements PriorityQueueNode<State> {
*/
long pseudoTimeToWrite;
/**
* A pseudo time maintained for immediate children to base their {@link pseudoTimeToSend} off of.
* A pseudo time maintained for immediate children to base their {@link #pseudoTimeToWrite} off of.
*/
long pseudoTime;
long totalQueuedWeights;
boolean active;
private byte flags;

State(Http2Stream stream) {
this(stream, 0);
Expand All @@ -251,24 +252,41 @@ void write(int numBytes, Writer writer) throws Http2Exception {
}
}

void isActiveCountChangeForTree(int increment) {
void activeCountChangeForTree(int increment) {
assert activeCountForTree + increment >= 0;
activeCountForTree += increment;
if (!stream.isRoot()) {
State pState = state(stream.parent());
assert activeCountForTree != increment ||
priorityQueueIndex == INDEX_NOT_IN_QUEUE ||
pState.queue.contains(this) :
"State[" + stream.id() + "].activeCountForTree changed from 0 to " + increment + " is in a queue" +
", but not in parent[ " + pState.stream.id() + "]'s queue";
if (activeCountForTree == 0) {
pState.remove(this);
} else if (activeCountForTree - increment == 0) { // if frame count was 0 but is now not, then queue.
} else if (activeCountForTree == increment && !isDistributing()) {
// If frame count was 0 but is now not, and this node is not already in a queue (assumed to be
// pState's queue) then enqueue it. If this State object is being processed the pseudoTime for this
// node should not be adjusted, and the node will be added back to the queue/tree structure after it
// is done being processed. This may happen if the activeCountForTree == 0 (a node which can't
// stream anything and is blocked) is at/near root of the tree, and is poped off the queue during
// processing, and then put back on the queue because a child changes position in the priority tree
// (or is closed because it is not blocked and finished writing all data).
pState.offerAndInitializePseudoTime(this);
}
pState.isActiveCountChangeForTree(increment);
pState.activeCountChangeForTree(increment);
}
}

void updateStreamableBytes(int newStreamableBytes, boolean isActive) {
if (this.active != isActive) {
isActiveCountChangeForTree(isActive ? 1 : -1);
this.active = isActive;
if (isActive() != isActive) {
if (isActive) {
activeCountChangeForTree(1);
setActive();
} else {
activeCountChangeForTree(-1);
unsetActive();
}
}

streamableBytes = newStreamableBytes;
Expand Down Expand Up @@ -324,6 +342,30 @@ void close() {
updateStreamableBytes(0, false);
}

boolean isActive() {
return (flags & STATE_IS_ACTIVE) != 0;
}

private void setActive() {
flags |= STATE_IS_ACTIVE;
}

private void unsetActive() {
flags &= ~STATE_IS_ACTIVE;
}

boolean isDistributing() {
return (flags & STATE_IS_DISTRIBUTING) != 0;
}

void setDistributing() {
flags |= STATE_IS_DISTRIBUTING;
}

void unsetDistributing() {
flags &= ~STATE_IS_DISTRIBUTING;
}

@Override
public int compareTo(State o) {
return MathUtil.compare(pseudoTimeToWrite, o.pseudoTimeToWrite);
Expand All @@ -338,5 +380,34 @@ public int priorityQueueIndex() {
public void priorityQueueIndex(int i) {
priorityQueueIndex = i;
}

@Override
public String toString() {
// Use activeCountForTree as a rough estimate for how many nodes are in this subtree.
StringBuilder sb = new StringBuilder(256 * (activeCountForTree > 0 ? activeCountForTree : 1));
toString(sb);
return sb.toString();
}

private void toString(StringBuilder sb) {
sb.append("{stream ").append(stream.id())
.append(" streamableBytes ").append(streamableBytes)
.append(" activeCountForTree ").append(activeCountForTree)
.append(" priorityQueueIndex ").append(priorityQueueIndex)
.append(" pseudoTimeToWrite ").append(pseudoTimeToWrite)
.append(" pseudoTime ").append(pseudoTime)
.append(" flags ").append(flags)
.append(" queue.size() ").append(queue.size()).append("} [");

if (!queue.isEmpty()) {
for (State s : queue) {
s.toString(sb);
sb.append(", ");
}
// Remove the last ", "
sb.setLength(sb.length() - 2);
}
sb.append(']');
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

Expand All @@ -66,7 +67,7 @@ public void setup() throws Http2Exception {
distributor.allocationQuantum(ALLOCATION_QUANTUM);

// Assume we always write all the allocated bytes.
doAnswer(writeAnswer()).when(writer).write(any(Http2Stream.class), anyInt());
doAnswer(writeAnswer(false)).when(writer).write(any(Http2Stream.class), anyInt());

connection.local().createStream(STREAM_A, false);
connection.local().createStream(STREAM_B, false);
Expand All @@ -76,19 +77,71 @@ public void setup() throws Http2Exception {
streamD.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
}

private Answer<Void> writeAnswer() {
private Answer<Void> writeAnswer(final boolean closeIfNoFrame) {
return new Answer<Void>() {
@Override
public Void answer(InvocationOnMock in) throws Throwable {
Http2Stream stream = in.getArgumentAt(0, Http2Stream.class);
int numBytes = in.getArgumentAt(1, Integer.class);
int streamableBytes = distributor.streamableBytes0(stream) - numBytes;
updateStream(stream.id(), streamableBytes, streamableBytes > 0);
boolean hasFrame = streamableBytes > 0;
updateStream(stream.id(), streamableBytes, hasFrame, hasFrame, closeIfNoFrame);
return null;
}
};
}

/**
* In this test, we block B such that it has no frames. We distribute enough bytes for all streams and stream B
* should be preserved in the priority queue structure until it has no "active" children, but it should not be
* doubly added to stream 0.
*
* <pre>
* 0
* |
* A
* |
* [B]
* |
* C
* |
* D
* </pre>
*
* After the write:
* <pre>
* 0
* </pre>
*/
@Test
public void writeWithNonActiveStreamShouldNotDobuleAddToPriorityQueue() throws Http2Exception {
updateStream(STREAM_A, 400, true);
updateStream(STREAM_B, 500, true);
updateStream(STREAM_C, 600, true);
updateStream(STREAM_D, 700, true);

stream(STREAM_B).setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true);
stream(STREAM_D).setPriority(STREAM_C, DEFAULT_PRIORITY_WEIGHT, true);

// Block B, but it should still remain in the queue/tree structure.
updateStream(STREAM_B, 0, false);

// Get the streams before the write, because they may be be closed.
Http2Stream streamA = stream(STREAM_A);
Http2Stream streamB = stream(STREAM_B);
Http2Stream streamC = stream(STREAM_C);
Http2Stream streamD = stream(STREAM_D);

reset(writer);
doAnswer(writeAnswer(true)).when(writer).write(any(Http2Stream.class), anyInt());

assertFalse(write(400 + 600 + 700));
assertEquals(400, captureWrites(streamA));
verifyNeverWrite(streamB);
assertEquals(600, captureWrites(streamC));
assertEquals(700, captureWrites(streamD));
}

@Test
public void bytesUnassignedAfterProcessing() throws Http2Exception {
updateStream(STREAM_A, 1, true);
Expand Down Expand Up @@ -133,7 +186,7 @@ public void connectionErrorForWriterException() throws Http2Exception {
verifyWrite(STREAM_C, 3);
verifyWrite(atMost(1), STREAM_D, 4);

doAnswer(writeAnswer()).when(writer).write(same(stream(STREAM_C)), eq(3));
doAnswer(writeAnswer(false)).when(writer).write(same(stream(STREAM_C)), eq(3));
assertFalse(write(10));
verifyWrite(STREAM_A, 1);
verifyWrite(STREAM_B, 2);
Expand Down Expand Up @@ -247,7 +300,7 @@ public void blockedStreamNoDataShouldSpreadDataToChildren() throws Http2Exceptio
@Test
public void blockedStreamWithDataAndNotAllowedToSendShouldSpreadDataToChildren() throws Http2Exception {
// A cannot stream.
updateStream(STREAM_A, 0, true, false);
updateStream(STREAM_A, 0, true, false, false);
blockedStreamShouldSpreadDataToChildren(false);
}

Expand All @@ -266,11 +319,11 @@ public void blockedStreamWithDataAndNotAllowedToSendShouldSpreadDataToChildren()
*/
@Test
public void streamWithZeroFlowControlWindowAndDataShouldWriteOnlyOnce() throws Http2Exception {
updateStream(STREAM_A, 0, true, true);
updateStream(STREAM_A, 0, true, true, false);
blockedStreamShouldSpreadDataToChildren(true);

// Make sure if we call update stream again, A should write 1 more time.
updateStream(STREAM_A, 0, true, true);
updateStream(STREAM_A, 0, true, true, false);
assertFalse(write(1));
verifyWrite(times(2), STREAM_A, 0);

Expand Down Expand Up @@ -894,7 +947,11 @@ private void verifyAnyWrite(int streamId, int times) {
}

private void verifyNeverWrite(int streamId) {
verify(writer, never()).write(same(stream(streamId)), anyInt());
verifyNeverWrite(stream(streamId));
}

private void verifyNeverWrite(Http2Stream stream) {
verify(writer, never()).write(same(stream), anyInt());
}

private void setPriority(int streamId, int parent, int weight, boolean exclusive) throws Http2Exception {
Expand All @@ -906,8 +963,12 @@ private Http2Stream stream(int streamId) {
}

private int captureWrites(int streamId) {
return captureWrites(stream(streamId));
}

private int captureWrites(Http2Stream stream) {
ArgumentCaptor<Integer> captor = ArgumentCaptor.forClass(Integer.class);
verify(writer, atLeastOnce()).write(same(stream(streamId)), captor.capture());
verify(writer, atLeastOnce()).write(same(stream), captor.capture());
int total = 0;
for (Integer x : captor.getAllValues()) {
total += x;
Expand All @@ -916,12 +977,15 @@ private int captureWrites(int streamId) {
}

private void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame) {
updateStream(streamId, streamableBytes, hasFrame, hasFrame);
updateStream(streamId, streamableBytes, hasFrame, hasFrame, false);
}

private void updateStream(final int streamId, final int pendingBytes, final boolean hasFrame,
final boolean isWriteAllowed) {
final boolean isWriteAllowed, boolean closeIfNoFrame) {
final Http2Stream stream = stream(streamId);
if (closeIfNoFrame && !hasFrame) {
stream(streamId).close();
}
distributor.updateStreamableBytes(new StreamByteDistributor.StreamState() {
@Override
public Http2Stream stream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,9 @@ public void clear() {

@Override
public boolean offer(T e) {
checkNotNull(e, "e");
if (e.priorityQueueIndex() != INDEX_NOT_IN_QUEUE) {
throw new IllegalArgumentException("e.priorityQueueIndex(): " + e.priorityQueueIndex() +
" (expected: " + INDEX_NOT_IN_QUEUE + ")");
" (expected: " + INDEX_NOT_IN_QUEUE + ") + e: " + e);
}

// Check that the array capacity is enough to hold values by doubling capacity.
Expand Down

0 comments on commit c4e96d0

Please sign in to comment.