Skip to content

Commit

Permalink
SPDY: fix SpdySessionHandler::updateSendWindowSize
Browse files Browse the repository at this point in the history
In Netty 3, downstream writes of SPDY data frames and upstream reads of
SPDY window udpate frames occur on different threads.

When receiving a window update frame, we synchronize on a java object
(SpdySessionHandler::flowControlLock) while sending any pending writes
that are now able to complete.

When writing a data frame, we check the send window size to see if we
are allowed to write it to the socket, or if we have to enqueue it as a
pending write. To prevent races with the window update frame, this is
also synchronized on the same SpdySessionHandler::flowControlLock.

In Netty 4, upstream and downstream operations on any given channel now
occur on the same thread. Since java locks are re-entrant, this now
allows downstream writes to occur while processing window update frames.

In particular, when we receive a window update frame that unblocks a
pending write, this write completes which triggers an event notification
on the response, which in turn triggers a write of a data frame. Since
this is on the same thread it re-enters the lock and modifies the send
window. When the write completes, we continue processing pending writes
without knowledge that the window size has been decremented.
  • Loading branch information
Jeff Pinner authored and Norman Maurer committed Aug 11, 2014
1 parent 3fc7518 commit f7e183f
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,13 @@ int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
}

StreamState state = activeStreams.get(streamId);
if (state == null) {
return -1;
}
if (deltaWindowSize > 0) {
state.setReceiveWindowSizeLowerBound(0);
}
return state != null ? state.updateReceiveWindowSize(deltaWindowSize) : -1;
return state.updateReceiveWindowSize(deltaWindowSize);
}

int getReceiveWindowSizeLowerBound(int streamId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ public class SpdySessionHandler extends ChannelHandlerAdapter {
private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
private int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;

private final Object flowControlLock = new Object();

private final AtomicInteger pings = new AtomicInteger();

private boolean sentGoAwayFrame;
Expand Down Expand Up @@ -484,57 +482,55 @@ private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg, Channe
* sender must pause transmitting data frames.
*/

synchronized (flowControlLock) {
int dataLength = spdyDataFrame.content().readableBytes();
int sendWindowSize = spdySession.getSendWindowSize(streamId);
int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);

if (sendWindowSize <= 0) {
// Stream is stalled -- enqueue Data frame and return
spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
return;
} else if (sendWindowSize < dataLength) {
// Stream is not stalled but we cannot send the entire frame
spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);

// Create a partial data frame whose length is the current window size
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId,
spdyDataFrame.content().readSlice(sendWindowSize).retain());

// Enqueue the remaining data (will be the first frame queued)
spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));

// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the session on write failures that leave the transfer window in a corrupt state.
final ChannelHandlerContext context = ctx;
ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
}
int dataLength = spdyDataFrame.content().readableBytes();
int sendWindowSize = spdySession.getSendWindowSize(streamId);
int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);

if (sendWindowSize <= 0) {
// Stream is stalled -- enqueue Data frame and return
spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
return;
} else if (sendWindowSize < dataLength) {
// Stream is not stalled but we cannot send the entire frame
spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);

// Create a partial data frame whose length is the current window size
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId,
spdyDataFrame.content().readSlice(sendWindowSize).retain());

// Enqueue the remaining data (will be the first frame queued)
spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));

// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the session on write failures that leave the transfer window in a corrupt state.
final ChannelHandlerContext context = ctx;
ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
}
});
return;
} else {
// Window size is large enough to send entire data frame
spdySession.updateSendWindowSize(streamId, -1 * dataLength);
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);

// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the session on write failures that leave the transfer window in a corrupt state.
final ChannelHandlerContext context = ctx;
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
}
}
});
return;
} else {
// Window size is large enough to send entire data frame
spdySession.updateSendWindowSize(streamId, -1 * dataLength);
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);

// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the session on write failures that leave the transfer window in a corrupt state.
final ChannelHandlerContext context = ctx;
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
}
});
}
}
});
}

// Close the local side of the stream if this is the last frame
Expand Down Expand Up @@ -758,72 +754,64 @@ private void removeStream(int streamId, ChannelFuture future) {
}

private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
synchronized (flowControlLock) {
int newWindowSize = spdySession.updateSendWindowSize(streamId, deltaWindowSize);
if (streamId != SPDY_SESSION_STREAM_ID) {
int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
newWindowSize = Math.min(newWindowSize, sessionSendWindowSize);
}

while (newWindowSize > 0) {
// Check if we have unblocked a stalled stream
SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId);
if (pendingWrite == null) {
break;
}
spdySession.updateSendWindowSize(streamId, deltaWindowSize);

SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
int dataFrameSize = spdyDataFrame.content().readableBytes();
int writeStreamId = spdyDataFrame.streamId();
if (streamId == SPDY_SESSION_STREAM_ID) {
newWindowSize = Math.min(newWindowSize, spdySession.getSendWindowSize(writeStreamId));
}
while (true) {
// Check if we have unblocked a stalled stream
SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId);
if (pendingWrite == null) {
return;
}

if (newWindowSize >= dataFrameSize) {
// Window size is large enough to send entire data frame
spdySession.removePendingWrite(writeStreamId);
newWindowSize = spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
int sessionSendWindowSize =
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
newWindowSize = Math.min(newWindowSize, sessionSendWindowSize);

// Close the local side of the stream if this is the last frame
if (spdyDataFrame.isLast()) {
halfCloseStream(writeStreamId, false, pendingWrite.promise);
}
SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
int dataFrameSize = spdyDataFrame.content().readableBytes();
int writeStreamId = spdyDataFrame.streamId();
int sendWindowSize = spdySession.getSendWindowSize(writeStreamId);
int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);

// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the session on write failures that leave the transfer window in a corrupt state.
ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
}
}
});
} else {
// We can send a partial frame
spdySession.updateSendWindowSize(writeStreamId, -1 * newWindowSize);
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * newWindowSize);

// Create a partial data frame whose length is the current window size
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(writeStreamId,
spdyDataFrame.content().readSlice(newWindowSize).retain());

// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the session on write failures that leave the transfer window in a corrupt state.
ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
}
if (sendWindowSize <= 0) {
return;
} else if (sendWindowSize < dataFrameSize) {
// We can send a partial frame
spdySession.updateSendWindowSize(writeStreamId, -1 * sendWindowSize);
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);

// Create a partial data frame whose length is the current window size
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(writeStreamId,
spdyDataFrame.content().readSlice(sendWindowSize).retain());

// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the session on write failures that leave the transfer window in a corrupt state.
ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
}
});

newWindowSize = 0;
}
});
} else {
// Window size is large enough to send entire data frame
spdySession.removePendingWrite(writeStreamId);
spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);

// Close the local side of the stream if this is the last frame
if (spdyDataFrame.isLast()) {
halfCloseStream(writeStreamId, false, pendingWrite.promise);
}

// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the session on write failures that leave the transfer window in a corrupt state.
ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
}
}
});
}
}
}
Expand Down

0 comments on commit f7e183f

Please sign in to comment.