From c4e96d010e3d16810d7130c93169817b3d72b421 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Fri, 4 Nov 2016 18:45:49 -0700 Subject: [PATCH] HTTP/2 WeightedFairQueueByteDistributor Bug Motivation: If a stream is not able to send any data (flow control window for the stream is exhausted) but has descendants who can send data then WeightedFairQueueByteDistributor may incorrectly modify the pseudo time and also double add the associated state to the parent's priority queue. The pseudo time should only be modified if a node is moved in the priority tree, and not if there happens to be no active streams in its descendent tree and a descendent is moved (e.g. removed from the tree because it wrote all data and the last data frame was EOS). Also the state objects for WeightedFairQueueByteDistributor should only appear once in any queue. If this condition is violated the pseudo time accounting would be biased at and assumptions in WeightedFairQueueByteDistributor would be invalidated. Modifications: - WeightedFairQueueByteDistributor#isActiveCountChangeForTree should not allow re-adding to the priority queue if we are currently processing a node in the distribution algorithm. The distribution algorithm will re-evaluate if the node should be re-added on the tail end of the recursion. Result: Fixes https://github.com/netty/netty/issues/5980 --- .../WeightedFairQueueByteDistributor.java | 107 +++++++++++++++--- .../WeightedFairQueueByteDistributorTest.java | 86 ++++++++++++-- .../io/netty/util/internal/PriorityQueue.java | 3 +- 3 files changed, 165 insertions(+), 31 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java index d0edc4e6bef3..bb3cc0338f33 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.java @@ -25,7 +25,6 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.connectionError; -import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.lang.Math.min; /** @@ -85,7 +84,7 @@ public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParen if (state.activeCountForTree != 0) { State pState = state(parent); pState.offerAndInitializePseudoTime(state); - pState.isActiveCountChangeForTree(state.activeCountForTree); + pState.activeCountChangeForTree(state.activeCountForTree); } } } @@ -98,7 +97,7 @@ public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newPare if (state.activeCountForTree != 0) { State pState = state(parent); pState.remove(state); - pState.isActiveCountChangeForTree(-state.activeCountForTree); + pState.activeCountChangeForTree(-state.activeCountForTree); } } } @@ -113,8 +112,6 @@ public void updateStreamableBytes(StreamState state) { @Override public boolean distribute(int maxBytes, Writer writer) throws Http2Exception { - checkNotNull(writer, "writer"); - // As long as there is some active frame we should write at least 1 time. if (connectionState.activeCountForTree == 0) { return false; @@ -146,7 +143,7 @@ public void allocationQuantum(int allocationQuantum) { } private int distribute(int maxBytes, Writer writer, State state) throws Http2Exception { - if (state.active) { + if (state.isActive()) { int nsent = min(maxBytes, state.streamableBytes); state.write(nsent, writer); if (nsent == 0 && maxBytes != 0) { @@ -176,10 +173,11 @@ private int distributeToChildren(int maxBytes, Writer writer, State state) throw long oldTotalQueuedWeights = state.totalQueuedWeights; State childState = state.poll(); State nextChildState = state.peek(); + childState.setDistributing(); try { assert nextChildState == null || nextChildState.pseudoTimeToWrite >= childState.pseudoTimeToWrite : - "nextChildState.pseudoTime(" + nextChildState.pseudoTimeToWrite + ") < " + " childState.pseudoTime(" + - childState.pseudoTimeToWrite + ")"; + "nextChildState[" + nextChildState.stream.id() + "].pseudoTime(" + nextChildState.pseudoTimeToWrite + + ") < " + " childState[" + childState.stream.id() + "].pseudoTime(" + childState.pseudoTimeToWrite + ")"; int nsent = distribute(nextChildState == null ? maxBytes : min(maxBytes, (int) min((nextChildState.pseudoTimeToWrite - childState.pseudoTimeToWrite) * childState.stream.weight() / oldTotalQueuedWeights + allocationQuantum, @@ -191,7 +189,8 @@ private int distributeToChildren(int maxBytes, Writer writer, State state) throw childState.updatePseudoTime(state, nsent, oldTotalQueuedWeights); return nsent; } finally { - // Do in finally to ensure the internal state is not corrupted if an exception is thrown. + childState.unsetDistributing(); + // Do in finally to ensure the internal flags is not corrupted if an exception is thrown. // The offer operation is delayed until we unroll up the recursive stack, so we don't have to remove from // the priority queue due to a write operation. if (childState.activeCountForTree != 0) { @@ -215,11 +214,13 @@ int streamableBytes0(Http2Stream stream) { * The remote flow control state for a single stream. */ private final class State implements PriorityQueueNode { + private static final int STATE_IS_ACTIVE = 0x1; + private static final int STATE_IS_DISTRIBUTING = 0x2; final Http2Stream stream; private final Queue queue; int streamableBytes; /** - * Count of nodes rooted at this sub tree with {@link #active} equal to {@code true}. + * Count of nodes rooted at this sub tree with {@link #isActive()} equal to {@code true}. */ int activeCountForTree; private int priorityQueueIndex = INDEX_NOT_IN_QUEUE; @@ -228,11 +229,11 @@ private final class State implements PriorityQueueNode { */ long pseudoTimeToWrite; /** - * A pseudo time maintained for immediate children to base their {@link pseudoTimeToSend} off of. + * A pseudo time maintained for immediate children to base their {@link #pseudoTimeToWrite} off of. */ long pseudoTime; long totalQueuedWeights; - boolean active; + private byte flags; State(Http2Stream stream) { this(stream, 0); @@ -251,24 +252,41 @@ void write(int numBytes, Writer writer) throws Http2Exception { } } - void isActiveCountChangeForTree(int increment) { + void activeCountChangeForTree(int increment) { assert activeCountForTree + increment >= 0; activeCountForTree += increment; if (!stream.isRoot()) { State pState = state(stream.parent()); + assert activeCountForTree != increment || + priorityQueueIndex == INDEX_NOT_IN_QUEUE || + pState.queue.contains(this) : + "State[" + stream.id() + "].activeCountForTree changed from 0 to " + increment + " is in a queue" + + ", but not in parent[ " + pState.stream.id() + "]'s queue"; if (activeCountForTree == 0) { pState.remove(this); - } else if (activeCountForTree - increment == 0) { // if frame count was 0 but is now not, then queue. + } else if (activeCountForTree == increment && !isDistributing()) { + // If frame count was 0 but is now not, and this node is not already in a queue (assumed to be + // pState's queue) then enqueue it. If this State object is being processed the pseudoTime for this + // node should not be adjusted, and the node will be added back to the queue/tree structure after it + // is done being processed. This may happen if the activeCountForTree == 0 (a node which can't + // stream anything and is blocked) is at/near root of the tree, and is poped off the queue during + // processing, and then put back on the queue because a child changes position in the priority tree + // (or is closed because it is not blocked and finished writing all data). pState.offerAndInitializePseudoTime(this); } - pState.isActiveCountChangeForTree(increment); + pState.activeCountChangeForTree(increment); } } void updateStreamableBytes(int newStreamableBytes, boolean isActive) { - if (this.active != isActive) { - isActiveCountChangeForTree(isActive ? 1 : -1); - this.active = isActive; + if (isActive() != isActive) { + if (isActive) { + activeCountChangeForTree(1); + setActive(); + } else { + activeCountChangeForTree(-1); + unsetActive(); + } } streamableBytes = newStreamableBytes; @@ -324,6 +342,30 @@ void close() { updateStreamableBytes(0, false); } + boolean isActive() { + return (flags & STATE_IS_ACTIVE) != 0; + } + + private void setActive() { + flags |= STATE_IS_ACTIVE; + } + + private void unsetActive() { + flags &= ~STATE_IS_ACTIVE; + } + + boolean isDistributing() { + return (flags & STATE_IS_DISTRIBUTING) != 0; + } + + void setDistributing() { + flags |= STATE_IS_DISTRIBUTING; + } + + void unsetDistributing() { + flags &= ~STATE_IS_DISTRIBUTING; + } + @Override public int compareTo(State o) { return MathUtil.compare(pseudoTimeToWrite, o.pseudoTimeToWrite); @@ -338,5 +380,34 @@ public int priorityQueueIndex() { public void priorityQueueIndex(int i) { priorityQueueIndex = i; } + + @Override + public String toString() { + // Use activeCountForTree as a rough estimate for how many nodes are in this subtree. + StringBuilder sb = new StringBuilder(256 * (activeCountForTree > 0 ? activeCountForTree : 1)); + toString(sb); + return sb.toString(); + } + + private void toString(StringBuilder sb) { + sb.append("{stream ").append(stream.id()) + .append(" streamableBytes ").append(streamableBytes) + .append(" activeCountForTree ").append(activeCountForTree) + .append(" priorityQueueIndex ").append(priorityQueueIndex) + .append(" pseudoTimeToWrite ").append(pseudoTimeToWrite) + .append(" pseudoTime ").append(pseudoTime) + .append(" flags ").append(flags) + .append(" queue.size() ").append(queue.size()).append("} ["); + + if (!queue.isEmpty()) { + for (State s : queue) { + s.toString(sb); + sb.append(", "); + } + // Remove the last ", " + sb.setLength(sb.length() - 2); + } + sb.append(']'); + } } } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributorTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributorTest.java index 276f18a3e8a2..0162c7742ff3 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributorTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/WeightedFairQueueByteDistributorTest.java @@ -40,6 +40,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -66,7 +67,7 @@ public void setup() throws Http2Exception { distributor.allocationQuantum(ALLOCATION_QUANTUM); // Assume we always write all the allocated bytes. - doAnswer(writeAnswer()).when(writer).write(any(Http2Stream.class), anyInt()); + doAnswer(writeAnswer(false)).when(writer).write(any(Http2Stream.class), anyInt()); connection.local().createStream(STREAM_A, false); connection.local().createStream(STREAM_B, false); @@ -76,19 +77,71 @@ public void setup() throws Http2Exception { streamD.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false); } - private Answer writeAnswer() { + private Answer writeAnswer(final boolean closeIfNoFrame) { return new Answer() { @Override public Void answer(InvocationOnMock in) throws Throwable { Http2Stream stream = in.getArgumentAt(0, Http2Stream.class); int numBytes = in.getArgumentAt(1, Integer.class); int streamableBytes = distributor.streamableBytes0(stream) - numBytes; - updateStream(stream.id(), streamableBytes, streamableBytes > 0); + boolean hasFrame = streamableBytes > 0; + updateStream(stream.id(), streamableBytes, hasFrame, hasFrame, closeIfNoFrame); return null; } }; } + /** + * In this test, we block B such that it has no frames. We distribute enough bytes for all streams and stream B + * should be preserved in the priority queue structure until it has no "active" children, but it should not be + * doubly added to stream 0. + * + *
+     *         0
+     *         |
+     *         A
+     *         |
+     *        [B]
+     *         |
+     *         C
+     *         |
+     *         D
+     * 
+ * + * After the write: + *
+     *         0
+     * 
+ */ + @Test + public void writeWithNonActiveStreamShouldNotDobuleAddToPriorityQueue() throws Http2Exception { + updateStream(STREAM_A, 400, true); + updateStream(STREAM_B, 500, true); + updateStream(STREAM_C, 600, true); + updateStream(STREAM_D, 700, true); + + stream(STREAM_B).setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, true); + stream(STREAM_D).setPriority(STREAM_C, DEFAULT_PRIORITY_WEIGHT, true); + + // Block B, but it should still remain in the queue/tree structure. + updateStream(STREAM_B, 0, false); + + // Get the streams before the write, because they may be be closed. + Http2Stream streamA = stream(STREAM_A); + Http2Stream streamB = stream(STREAM_B); + Http2Stream streamC = stream(STREAM_C); + Http2Stream streamD = stream(STREAM_D); + + reset(writer); + doAnswer(writeAnswer(true)).when(writer).write(any(Http2Stream.class), anyInt()); + + assertFalse(write(400 + 600 + 700)); + assertEquals(400, captureWrites(streamA)); + verifyNeverWrite(streamB); + assertEquals(600, captureWrites(streamC)); + assertEquals(700, captureWrites(streamD)); + } + @Test public void bytesUnassignedAfterProcessing() throws Http2Exception { updateStream(STREAM_A, 1, true); @@ -133,7 +186,7 @@ public void connectionErrorForWriterException() throws Http2Exception { verifyWrite(STREAM_C, 3); verifyWrite(atMost(1), STREAM_D, 4); - doAnswer(writeAnswer()).when(writer).write(same(stream(STREAM_C)), eq(3)); + doAnswer(writeAnswer(false)).when(writer).write(same(stream(STREAM_C)), eq(3)); assertFalse(write(10)); verifyWrite(STREAM_A, 1); verifyWrite(STREAM_B, 2); @@ -247,7 +300,7 @@ public void blockedStreamNoDataShouldSpreadDataToChildren() throws Http2Exceptio @Test public void blockedStreamWithDataAndNotAllowedToSendShouldSpreadDataToChildren() throws Http2Exception { // A cannot stream. - updateStream(STREAM_A, 0, true, false); + updateStream(STREAM_A, 0, true, false, false); blockedStreamShouldSpreadDataToChildren(false); } @@ -266,11 +319,11 @@ public void blockedStreamWithDataAndNotAllowedToSendShouldSpreadDataToChildren() */ @Test public void streamWithZeroFlowControlWindowAndDataShouldWriteOnlyOnce() throws Http2Exception { - updateStream(STREAM_A, 0, true, true); + updateStream(STREAM_A, 0, true, true, false); blockedStreamShouldSpreadDataToChildren(true); // Make sure if we call update stream again, A should write 1 more time. - updateStream(STREAM_A, 0, true, true); + updateStream(STREAM_A, 0, true, true, false); assertFalse(write(1)); verifyWrite(times(2), STREAM_A, 0); @@ -894,7 +947,11 @@ private void verifyAnyWrite(int streamId, int times) { } private void verifyNeverWrite(int streamId) { - verify(writer, never()).write(same(stream(streamId)), anyInt()); + verifyNeverWrite(stream(streamId)); + } + + private void verifyNeverWrite(Http2Stream stream) { + verify(writer, never()).write(same(stream), anyInt()); } private void setPriority(int streamId, int parent, int weight, boolean exclusive) throws Http2Exception { @@ -906,8 +963,12 @@ private Http2Stream stream(int streamId) { } private int captureWrites(int streamId) { + return captureWrites(stream(streamId)); + } + + private int captureWrites(Http2Stream stream) { ArgumentCaptor captor = ArgumentCaptor.forClass(Integer.class); - verify(writer, atLeastOnce()).write(same(stream(streamId)), captor.capture()); + verify(writer, atLeastOnce()).write(same(stream), captor.capture()); int total = 0; for (Integer x : captor.getAllValues()) { total += x; @@ -916,12 +977,15 @@ private int captureWrites(int streamId) { } private void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame) { - updateStream(streamId, streamableBytes, hasFrame, hasFrame); + updateStream(streamId, streamableBytes, hasFrame, hasFrame, false); } private void updateStream(final int streamId, final int pendingBytes, final boolean hasFrame, - final boolean isWriteAllowed) { + final boolean isWriteAllowed, boolean closeIfNoFrame) { final Http2Stream stream = stream(streamId); + if (closeIfNoFrame && !hasFrame) { + stream(streamId).close(); + } distributor.updateStreamableBytes(new StreamByteDistributor.StreamState() { @Override public Http2Stream stream() { diff --git a/common/src/main/java/io/netty/util/internal/PriorityQueue.java b/common/src/main/java/io/netty/util/internal/PriorityQueue.java index 9e9c05b67e58..ea7bccbc52d7 100644 --- a/common/src/main/java/io/netty/util/internal/PriorityQueue.java +++ b/common/src/main/java/io/netty/util/internal/PriorityQueue.java @@ -78,10 +78,9 @@ public void clear() { @Override public boolean offer(T e) { - checkNotNull(e, "e"); if (e.priorityQueueIndex() != INDEX_NOT_IN_QUEUE) { throw new IllegalArgumentException("e.priorityQueueIndex(): " + e.priorityQueueIndex() + - " (expected: " + INDEX_NOT_IN_QUEUE + ")"); + " (expected: " + INDEX_NOT_IN_QUEUE + ") + e: " + e); } // Check that the array capacity is enough to hold values by doubling capacity.