Skip to content

Commit

Permalink
Merge pull request netty#210 from netty/threading_fix
Browse files Browse the repository at this point in the history
Merge in fix for threading (related to netty#140 and netty#187). This also includes the new feature that allow to submit a Runnable that gets executed later in the io thread.
  • Loading branch information
normanmaurer committed Feb 29, 2012
2 parents 689c479 + 5f465da commit 8579f09
Show file tree
Hide file tree
Showing 28 changed files with 709 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.util.LinkedList;
import java.util.Queue;

import io.netty.channel.Channels;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
Expand Down Expand Up @@ -224,6 +226,16 @@ public void exceptionCaught(

throw new CodecEmbedderException(actualCause);
}

@Override
public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
try {
task.run();
return Channels.succeededFuture(pipeline.getChannel());
} catch (Throwable t) {
return Channels.failedFuture(pipeline.getChannel(), t);
}
}
}

private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void resumeTransfer() {
}

try {
flush(ctx);
flush(ctx, false);
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("Unexpected exception while sending chunks.", e);
Expand All @@ -112,10 +112,10 @@ public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
final Channel channel = ctx.getChannel();
if (channel.isWritable()) {
this.ctx = ctx;
flush(ctx);
flush(ctx, false);
} else if (!channel.isConnected()) {
this.ctx = ctx;
discard(ctx);
discard(ctx, false);
}
}

Expand All @@ -127,20 +127,20 @@ public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
switch (cse.getState()) {
case INTEREST_OPS:
// Continue writing when the channel becomes writable.
flush(ctx);
flush(ctx, true);
break;
case OPEN:
if (!Boolean.TRUE.equals(cse.getValue())) {
// Fail all pending writes
discard(ctx);
discard(ctx, true);
}
break;
}
}
ctx.sendUpstream(e);
}

private void discard(ChannelHandlerContext ctx) {
private void discard(ChannelHandlerContext ctx, boolean fireNow) {
ClosedChannelException cause = null;
boolean fireExceptionCaught = false;

Expand Down Expand Up @@ -175,14 +175,18 @@ private void discard(ChannelHandlerContext ctx) {


if (fireExceptionCaught) {
Channels.fireExceptionCaught(ctx.getChannel(), cause);
if (fireNow) {
fireExceptionCaught(ctx.getChannel(), cause);
} else {
fireExceptionCaughtLater(ctx.getChannel(), cause);
}
}
}

private synchronized void flush(ChannelHandlerContext ctx) throws Exception {
private synchronized void flush(ChannelHandlerContext ctx, boolean fireNow) throws Exception {
final Channel channel = ctx.getChannel();
if (!channel.isConnected()) {
discard(ctx);
discard(ctx, fireNow);
}

while (channel.isWritable()) {
Expand Down Expand Up @@ -220,7 +224,11 @@ private synchronized void flush(ChannelHandlerContext ctx) throws Exception {
this.currentEvent = null;

currentEvent.getFuture().setFailure(t);
fireExceptionCaught(ctx, t);
if (fireNow) {
fireExceptionCaught(ctx, t);
} else {
fireExceptionCaughtLater(ctx.getChannel(), t);
}

closeInput(chunks);
break;
Expand Down Expand Up @@ -262,7 +270,7 @@ public void operationComplete(ChannelFuture future)
}

if (!channel.isConnected()) {
discard(ctx);
discard(ctx, fireNow);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
}

protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
Channels.fireExceptionCaught(ctx, EXCEPTION);
Channels.fireExceptionCaughtLater(ctx.getChannel(), EXCEPTION);
}

private final class WriteTimeoutTask implements TimerTask {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.netty.channel.sctp;

import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;

public abstract class AbstractSctpChannelSink extends AbstractChannelSink {

@Override
public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) {
Channel ch = pipeline.getChannel();
if (ch instanceof SctpChannelImpl) {
SctpChannelImpl channel = (SctpChannelImpl) ch;
return channel.worker.executeInIoThread(channel, task);

} else {
return super.execute(pipeline, task);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
Expand All @@ -48,7 +47,7 @@

/**
*/
class SctpClientPipelineSink extends AbstractChannelSink {
class SctpClientPipelineSink extends AbstractSctpChannelSink {

static final InternalLogger logger =
InternalLoggerFactory.getInstance(SctpClientPipelineSink.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import com.sun.nio.sctp.SctpChannel;

import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
Expand All @@ -45,7 +44,7 @@

/**
*/
class SctpServerPipelineSink extends AbstractChannelSink {
class SctpServerPipelineSink extends AbstractSctpChannelSink {

static final InternalLogger logger =
InternalLoggerFactory.getInstance(SctpServerPipelineSink.class);
Expand Down
50 changes: 48 additions & 2 deletions transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -45,6 +46,8 @@
import io.netty.channel.MessageEvent;
import io.netty.channel.ReceiveBufferSizePredictor;
import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer;
import io.netty.channel.socket.ChannelRunnableWrapper;
import io.netty.channel.socket.Worker;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DeadLockProofWorker;
Expand All @@ -53,7 +56,7 @@
/**
*/
@SuppressWarnings("unchecked")
class SctpWorker implements Runnable {
class SctpWorker implements Worker {

private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SctpWorker.class);
Expand All @@ -64,13 +67,15 @@ class SctpWorker implements Runnable {

private final Executor executor;
private boolean started;
private volatile Thread thread;
volatile Thread thread;
volatile Selector selector;
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);

private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation

private final SctpReceiveBufferPool recvBufferPool = new SctpReceiveBufferPool();
Expand Down Expand Up @@ -188,6 +193,7 @@ public void run() {

cancelledKeys = 0;
processRegisterTaskQueue();
processEventQueue();
processWriteTaskQueue();
processSelectedKeys(selector.selectedKeys());

Expand Down Expand Up @@ -240,7 +246,35 @@ public void run() {
}
}
}

@Override
public ChannelFuture executeInIoThread(Channel channel, Runnable task) {
if (channel instanceof SctpChannelImpl && isIoThread((SctpChannelImpl) channel)) {
try {
task.run();
return succeededFuture(channel);
} catch (Throwable t) {
return failedFuture(channel, t);
}
} else {
ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task);
boolean added = eventQueue.offer(channelRunnable);

if (added) {
// wake up the selector to speed things
selector.wakeup();
} else {
channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task));
}
return channelRunnable;
}

}

static boolean isIoThread(SctpChannelImpl channel) {
return Thread.currentThread() == channel.worker.thread;
}

private void processRegisterTaskQueue() throws IOException {
for (; ;) {
final Runnable task = registerTaskQueue.poll();
Expand All @@ -264,7 +298,19 @@ private void processWriteTaskQueue() throws IOException {
cleanUpCancelledKeys();
}
}

private void processEventQueue() throws IOException {
for (;;) {
final Runnable task = eventQueue.poll();
if (task == null) {
break;
}

task.run();
cleanUpCancelledKeys();
}
}

private void processSelectedKeys(final Set<SelectionKey> selectedKeys) throws IOException {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,29 @@ public void exceptionCaught(ChannelPipeline pipeline,
if (actualCause == null) {
actualCause = cause;
}
if (isFireExceptionCaughtLater(event, actualCause)) {
fireExceptionCaughtLater(event.getChannel(), actualCause);
} else {
fireExceptionCaught(event.getChannel(), actualCause);
}
}

protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) {
return false;
}

fireExceptionCaught(event.getChannel(), actualCause);
/**
* This implementation just directly call {@link Runnable#run()}. Sub-classes should override this if they can handle it
* in a better way
*/
@Override
public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
try {
task.run();
return Channels.succeededFuture(pipeline.getChannel());
} catch (Throwable t) {
return Channels.failedFuture(pipeline.getChannel(), t);
}
}

}
3 changes: 3 additions & 0 deletions transport/src/main/java/io/netty/channel/ChannelPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,9 @@ public interface ChannelPipeline {
*/
void sendUpstream(ChannelEvent e);


ChannelFuture execute(Runnable task);

/**
* Sends the specified {@link ChannelEvent} to the last
* {@link ChannelDownstreamHandler} in this pipeline.
Expand Down
5 changes: 5 additions & 0 deletions transport/src/main/java/io/netty/channel/ChannelSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public interface ChannelSink {
* one of its {@link ChannelHandler}s process a {@link ChannelEvent}.
*/
void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception;

/**
* Execute the given {@link Runnable} later in the io-thread. Some implementation may not support his and just execute it directly
*/
ChannelFuture execute(ChannelPipeline pipeline, Runnable task);
}
Loading

0 comments on commit 8579f09

Please sign in to comment.