Skip to content

Commit

Permalink
Http2RemoteFlowController stream writibility listener
Browse files Browse the repository at this point in the history
Motivation:
For implementations that want to manage flow control down to the stream level it is useful to be notified when stream writability changes.

Modifications:
- Add writabilityChanged to Http2RemoteFlowController.Listener
- Add isWritable to Http2RemoteFlowController

Result:
The Http2RemoteFlowController provides notification when writability of a stream changes.
  • Loading branch information
Scottmitch committed Sep 28, 2015
1 parent 92dee9e commit a09b8c1
Show file tree
Hide file tree
Showing 7 changed files with 690 additions and 142 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
if (ctx.channel().isWritable()) {
flush(ctx);
}
encoder.flowController().channelWritabilityChanged();
} finally {
super.channelWritabilityChanged(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,21 @@ public interface Http2RemoteFlowController extends Http2FlowController {
void listener(Listener listener);

/**
* Get the current listener to flow-control events.
*
* @return the current listener or {@code null} if one is not set.
* Determine if the {@code stream} has bytes remaining for use in the flow control window.
* <p>
* Note that this only takes into account HTTP/2 flow control. It does <strong>not</strong> take into account
* the underlying {@link io.netty.channel.Channel#isWritable()}.
* @param stream The stream to test.
* @return {@code true} if if the {@code stream} has bytes remaining for use in the flow control window.
* {@code false} otherwise.
*/
boolean isWritable(Http2Stream stream);

/**
* Notification that the writability of {@link #channelHandlerContext()} has changed.
* @throws Http2Exception If any writes occur as a result of this call and encounter errors.
*/
Listener listener();
void channelWritabilityChanged() throws Http2Exception;

/**
* Implementations of this interface are used to progressively write chunks of the underlying
Expand Down Expand Up @@ -132,11 +142,20 @@ interface Listener {
/**
* Report the number of {@code writtenBytes} for a {@code stream}. Called after the
* flow-controller has flushed bytes for the given stream.
*
* <p>
* This method should not throw. Any thrown exceptions are considered a programming error and are ignored.
* @param stream that had bytes written.
* @param writtenBytes the number of bytes written for a stream, can be 0 in the case of an
* empty DATA frame.
*/
void streamWritten(Http2Stream stream, int writtenBytes);

/**
* Notification that {@link Http2RemoteFlowController#isWritable(Http2Stream)} has changed for {@code stream}.
* <p>
* This method should not throw. Any thrown exceptions are considered a programming error and are ignored.
* @param stream The stream which writability has changed for.
*/
void writabilityChanged(Http2Stream stream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void onStreamClosed(Http2Stream stream) {
public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
int delta = state(stream).unallocatedStreamableBytesForTree();
long delta = state(stream).unallocatedStreamableBytesForTree();
if (delta != 0) {
state(parent).unallocatedStreamableBytesForTreeChanged(delta);
}
Expand All @@ -65,7 +65,7 @@ public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParen
public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) {
Http2Stream parent = stream.parent();
if (parent != null) {
int delta = state(stream).unallocatedStreamableBytesForTree();
long delta = state(stream).unallocatedStreamableBytesForTree();
if (delta != 0) {
state(parent).unallocatedStreamableBytesForTreeChanged(-delta);
}
Expand Down Expand Up @@ -103,7 +103,7 @@ int unallocatedStreamableBytes(Http2Stream stream) {
/**
* For testing only.
*/
int unallocatedStreamableBytesForTree(Http2Stream stream) {
long unallocatedStreamableBytesForTree(Http2Stream stream) {
return state(stream).unallocatedStreamableBytesForTree();
}

Expand Down Expand Up @@ -307,7 +307,7 @@ private final class PriorityState {
boolean hasFrame;
int streamableBytes;
int allocated;
int unallocatedStreamableBytesForTree;
long unallocatedStreamableBytesForTree;

PriorityState(Http2Stream stream) {
this.stream = stream;
Expand All @@ -317,7 +317,7 @@ private final class PriorityState {
* Recursively increments the {@link #unallocatedStreamableBytesForTree()} for this branch in
* the priority tree starting at the current node.
*/
void unallocatedStreamableBytesForTreeChanged(int delta) {
void unallocatedStreamableBytesForTreeChanged(long delta) {
unallocatedStreamableBytesForTree += delta;
if (!stream.isRoot()) {
state(stream.parent()).unallocatedStreamableBytesForTreeChanged(delta);
Expand Down Expand Up @@ -371,7 +371,7 @@ int unallocatedStreamableBytes() {
return streamableBytes - allocated;
}

int unallocatedStreamableBytesForTree() {
long unallocatedStreamableBytesForTree() {
return unallocatedStreamableBytesForTree;
}
}
Expand Down
Loading

0 comments on commit a09b8c1

Please sign in to comment.