diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index fbdbc4b71a62..f10417ebe173 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -38,6 +38,7 @@ import io.netty.util.ReferenceCountUtil; import java.io.IOException; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -81,14 +82,9 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan private volatile SocketAddress remote; AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) { - this(parent, fd, active, false); - } - - AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active, boolean writeFilterEnabled) { super(parent); socket = checkNotNull(fd, "fd"); this.active = active; - this.writeFilterEnabled = writeFilterEnabled; if (active) { // Directly cache the remote and local addresses // See https://github.com/netty/netty/issues/2359 @@ -428,6 +424,22 @@ final void readReadyFinally(ChannelConfig config) { } } + final boolean failConnectPromise(Throwable cause) { + if (connectPromise != null) { + // SO_ERROR has been shown to return 0 on macOS if detect an error via read() and the write filter was + // not set before calling connect. This means finishConnect will not detect any error and would + // successfully complete the connectPromise and update the channel state to active (which is incorrect). + ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise; + AbstractKQueueChannel.this.connectPromise = null; + if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause + : new ConnectException("failed to connect").initCause(cause))) { + closeIfClosed(); + return true; + } + } + return false; + } + final void writeReady() { if (connectPromise != null) { // pending connect which is now complete so handle it. diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java index ef5f48e820a4..04e8afc21ffe 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java @@ -64,7 +64,7 @@ public void run() { }; AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, boolean active) { - super(parent, fd, active, true); + super(parent, fd, active); } AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, SocketAddress remote) { @@ -588,11 +588,13 @@ private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Thro byteBuf.release(); } } - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); - pipeline.fireExceptionCaught(cause); - if (close || cause instanceof IOException) { - shutdownInput(false); + if (!failConnectPromise(cause)) { + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(cause); + if (close || cause instanceof IOException) { + shutdownInput(false); + } } } }