Skip to content

Commit

Permalink
KQueue write filter initial state (netty#7738)
Browse files Browse the repository at this point in the history
Motivation:
KQueue implementations current have inconsistent behavior with Epoll implementations with respect to asynchronous sockets and connecting. In the Epoll transport we attempt to connect, if the connect call does not synchornously fail/succeed we set the EPOLLOUT which will be triggered by the kernel if the connection attempt succeeds or an error occurs. The connect API provides no way to asynchronously communicate an error so the Epoll implementation fires a EPOLLOUT event and puts the connect status in getsockopt(SO_ERROR). KQueue provides the same APIs but different behavior. If the EVFILT_WRITE is not enabled and the EVFILT_READ is enabled before connect is called, and there is an error the kernel may fire the EVFILT_READ filter and provide the Connection Refused error via read(). This is even true if we set the EVFILT_WRITE filter after calling connect because connect didn't synchornously complete. After the error has been delievered via read() a call to getsockopt(SO_ERROR) will return 0 indicating there is no error. This means we cannot rely upon the KQueue based kernel to deliver connection errors via the EVFILT_WRITE filter in the same way that the linux kernel does with the EPOLLOUT flag.
ce241bd introduced a change which depends upon the behavior of the EVFILT_WRITE being set and may prematurely stop writing to the OS as a result, becaues we assume the OS will notify us when the socket is writable. However the current work around for the above described behavior is to initialize the EVFILT_WRITE to true for connection oriented protocols. This leads to prematurely exiting from the flush() which may lead to deadlock.

Modifications:
- KQueue should check when an error is obtained from read() if the connectPromise has not yet been completed, and if not complete it with a ConnectException

Result:
No more deadlock in KQueue due to asynchronous connect workaround.
  • Loading branch information
Scottmitch authored Feb 20, 2018
1 parent c6c0984 commit d2d3e6e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.netty.util.ReferenceCountUtil;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -81,14 +82,9 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
private volatile SocketAddress remote;

AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
this(parent, fd, active, false);
}

AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active, boolean writeFilterEnabled) {
super(parent);
socket = checkNotNull(fd, "fd");
this.active = active;
this.writeFilterEnabled = writeFilterEnabled;
if (active) {
// Directly cache the remote and local addresses
// See https://github.com/netty/netty/issues/2359
Expand Down Expand Up @@ -428,6 +424,22 @@ final void readReadyFinally(ChannelConfig config) {
}
}

final boolean failConnectPromise(Throwable cause) {
if (connectPromise != null) {
// SO_ERROR has been shown to return 0 on macOS if detect an error via read() and the write filter was
// not set before calling connect. This means finishConnect will not detect any error and would
// successfully complete the connectPromise and update the channel state to active (which is incorrect).
ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
AbstractKQueueChannel.this.connectPromise = null;
if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
: new ConnectException("failed to connect").initCause(cause))) {
closeIfClosed();
return true;
}
}
return false;
}

final void writeReady() {
if (connectPromise != null) {
// pending connect which is now complete so handle it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void run() {
};

AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, boolean active) {
super(parent, fd, active, true);
super(parent, fd, active);
}

AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
Expand Down Expand Up @@ -588,11 +588,13 @@ private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Thro
byteBuf.release();
}
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
shutdownInput(false);
if (!failConnectPromise(cause)) {
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
shutdownInput(false);
}
}
}
}
Expand Down

0 comments on commit d2d3e6e

Please sign in to comment.