Skip to content

Commit

Permalink
Fixed issue: NETTY-215 ChunkedWriteHandler stops handling write queue…
Browse files Browse the repository at this point in the history
… when ChunkedInput.nextChunk() fails.

* Fixed related infinite event loop
  • Loading branch information
trustin committed Aug 27, 2009
1 parent a09b750 commit 6dc0b12
Showing 1 changed file with 89 additions and 40 deletions.
129 changes: 89 additions & 40 deletions src/main/java/org/jboss/netty/handler/stream/ChunkedWriteHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.util.Queue;

import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
Expand All @@ -34,7 +35,6 @@
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
Expand Down Expand Up @@ -100,26 +100,71 @@ public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)

queue.offer((MessageEvent) e);
if (ctx.getChannel().isWritable()) {
flushWriteEventQueue(ctx);
flush(ctx);
}
}

public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent cse = (ChannelStateEvent) e;
if (cse.getState() == ChannelState.INTEREST_OPS &&
ctx.getChannel().isWritable()) {
switch (cse.getState()) {
case INTEREST_OPS:
// Continue writing when the channel becomes writable.
flushWriteEventQueue(ctx);
flush(ctx);
break;
case OPEN:
if (!Boolean.TRUE.equals(cse.getValue())) {
// Fail all pending writes
discard(ctx);
}
break;
}
}
ctx.sendUpstream(e);
}

private synchronized void flushWriteEventQueue(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.getChannel();
do {
private synchronized void discard(ChannelHandlerContext ctx) {
for (;;) {
if (currentEvent == null) {
currentEvent = queue.poll();
}

if (currentEvent == null) {
break;
}

MessageEvent currentEvent = this.currentEvent;
this.currentEvent = null;

Object m = currentEvent.getMessage();
if (m instanceof ChunkedInput) {
ChunkedInput chunks = (ChunkedInput) m;
try {
chunks.close();
} catch (Throwable t2) {
logger.warn("Failed to close a chunked input.", t2);
}

// Trigger a ClosedChannelException
Channels.write(
ctx, currentEvent.getFuture(), ChannelBuffers.EMPTY_BUFFER,
currentEvent.getRemoteAddress());
} else {
// Trigger a ClosedChannelException
ctx.sendDownstream(currentEvent);
}
currentEvent = null;
}
}

private synchronized void flush(ChannelHandlerContext ctx) throws Exception {
final Channel channel = ctx.getChannel();
if (!channel.isConnected()) {
discard(ctx);
}

while (channel.isWritable()) {
if (currentEvent == null) {
currentEvent = queue.poll();
}
Expand All @@ -135,56 +180,60 @@ private synchronized void flushWriteEventQueue(ChannelHandlerContext ctx) throws
boolean last;
try {
chunk = chunks.nextChunk();
last = !chunks.hasNextChunk();
if (chunk == null) {
chunk = ChannelBuffers.EMPTY_BUFFER;
last = true;
} else {
last = !chunks.hasNextChunk();
}
} catch (Throwable t) {
MessageEvent currentEvent = this.currentEvent;
this.currentEvent = null;

currentEvent.getFuture().setFailure(t);
t.printStackTrace();
fireExceptionCaught(ctx, t);

try {
chunks.close();
} catch (Throwable t2) {
logger.warn("Failed to close a chunked input.", t2);
}
currentEvent = null;
break;
}

if (chunk != null) {
ChannelFuture writeFuture;
final MessageEvent currentEvent = this.currentEvent;
if (last) {
this.currentEvent = null;
writeFuture = currentEvent.getFuture();
writeFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future)
throws Exception {
((ChunkedInput) currentEvent.getMessage()).close();
}
});

} else {
writeFuture = future(channel);
writeFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future)
throws Exception {
if (!future.isSuccess()) {
currentEvent.getFuture().setFailure(future.getCause());
((ChunkedInput) currentEvent.getMessage()).close();
}
}
});
}
ChannelFuture writeFuture;
final MessageEvent currentEvent = this.currentEvent;
if (last) {
this.currentEvent = null;
writeFuture = currentEvent.getFuture();
writeFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future)
throws Exception {
((ChunkedInput) currentEvent.getMessage()).close();
}
});

Channels.write(
ctx, writeFuture, chunk,
currentEvent.getRemoteAddress());
} else {
currentEvent = null;
writeFuture = future(channel);
writeFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future)
throws Exception {
if (!future.isSuccess()) {
currentEvent.getFuture().setFailure(future.getCause());
((ChunkedInput) currentEvent.getMessage()).close();
}
}
});
}

Channels.write(
ctx, writeFuture, chunk,
currentEvent.getRemoteAddress());
} else {
ctx.sendDownstream(currentEvent);
currentEvent = null;
}
} while (channel.isWritable());
}
}
}

0 comments on commit 6dc0b12

Please sign in to comment.