Skip to content

Commit

Permalink
javadoc fix and better cleanup for WriteTimeoutHandler
Browse files Browse the repository at this point in the history
Motivation:

- Javadoc is not correct (netty#4353)
- WriteTimeoutHandler does not always cancel the timeout task (netty#2973)

Modifications:

Fix the javadoc and cleanup timeout task in handlerRemoved

Result:

WriteTimeoutHandler's javadoc describes the correct behavior and it will cancel timeout tasks when it's removed.
  • Loading branch information
windie authored and normanmaurer committed Dec 30, 2015
1 parent 6ee5341 commit f900329
Showing 1 changed file with 99 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@
import java.util.concurrent.TimeUnit;

/**
* Raises a {@link WriteTimeoutException} when no data was written within a
* certain period of time.
* Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.
*
* <pre>
* // The connection is closed when there is no outbound traffic
* // for 30 seconds.
* // The connection is closed when a write operation cannot finish in 30 seconds.
*
* public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
* public void initChannel({@link Channel} channel) {
Expand Down Expand Up @@ -70,6 +68,11 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {

private final long timeoutNanos;

/**
* A doubly-linked list to track all WriteTimeoutTasks
*/
private WriteTimeoutTask lastTask;

private boolean closed;

/**
Expand Down Expand Up @@ -111,31 +114,62 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
ctx.write(msg, promise);
}

private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise future) {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
WriteTimeoutTask task = lastTask;
lastTask = null;
while (task != null) {
task.scheduledFuture.cancel(false);
WriteTimeoutTask prev = task.prev;
task.prev = null;
task.next = null;
task = prev;
}
}

private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
// Schedule a timeout.
final ScheduledFuture<?> sf = ctx.executor().schedule(new OneTimeTask() {
@Override
public void run() {
// Was not written yet so issue a write timeout
// The future itself will be failed with a ClosedChannelException once the close() was issued
// See https://github.com/netty/netty/issues/2159
if (!future.isDone()) {
try {
writeTimedOut(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
}
}, timeoutNanos, TimeUnit.NANOSECONDS);
final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);

if (!task.scheduledFuture.isDone()) {
addWriteTimeoutTask(task);

// Cancel the scheduled timeout if the flush promise is complete.
promise.addListener(task);
}
}

private void addWriteTimeoutTask(WriteTimeoutTask task) {
if (lastTask == null) {
lastTask = task;
} else {
lastTask.next = task;
task.prev = lastTask;
lastTask = task;
}
}

// Cancel the scheduled timeout if the flush future is complete.
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
sf.cancel(false);
private void removeWriteTimeoutTask(WriteTimeoutTask task) {
if (task == lastTask) {
// task is the tail of list
assert task.next == null;
lastTask = lastTask.prev;
if (lastTask != null) {
lastTask.next = null;
}
});
} else if (task.prev == null && task.next == null) {
// Since task is not lastTask, then it has been removed or not been added.
return;
} else if (task.prev == null) {
// task is the head of list and the list has at least 2 nodes
task.next.prev = null;
} else {
task.prev.next = task.next;
task.next.prev = task.prev;
}
task.prev = null;
task.next = null;
}

/**
Expand All @@ -148,4 +182,43 @@ protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
closed = true;
}
}

private final class WriteTimeoutTask extends OneTimeTask implements ChannelFutureListener {

private final ChannelHandlerContext ctx;
private final ChannelPromise promise;

// WriteTimeoutTask is also a node of a doubly-linked list
WriteTimeoutTask prev;
WriteTimeoutTask next;

ScheduledFuture<?> scheduledFuture;

WriteTimeoutTask(ChannelHandlerContext ctx, ChannelPromise promise) {
this.ctx = ctx;
this.promise = promise;
}

@Override
public void run() {
// Was not written yet so issue a write timeout
// The promise itself will be failed with a ClosedChannelException once the close() was issued
// See https://github.com/netty/netty/issues/2159
if (!promise.isDone()) {
try {
writeTimedOut(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
removeWriteTimeoutTask(this);
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
// scheduledFuture has already be set when reaching here
scheduledFuture.cancel(false);
removeWriteTimeoutTask(this);
}
}
}

0 comments on commit f900329

Please sign in to comment.