From 0a8e1aaf19dc85830478041cb57b308b764cfacd Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 22 Feb 2018 07:42:49 +0100 Subject: [PATCH] Flush task should not flush messages that were written since last flush attempt. Motivation: The flush task is currently using flush() which will have the affect of have the flush traverse the whole ChannelPipeline and also flush messages that were written since we gave up flushing. This is not really correct as we should only continue to flush messages that were flushed at the point in time when the flush task was submitted for execution if the user not explicit call flush() by him/herself. Modification: Call *Unsafe.flush0() via the flush task which will only continue flushing messages that were marked as flushed before. Result: More correct behaviour when the flush task is used. --- .../io/netty/channel/epoll/AbstractEpollStreamChannel.java | 4 +++- .../io/netty/channel/kqueue/AbstractKQueueStreamChannel.java | 4 +++- .../java/io/netty/channel/nio/AbstractNioByteChannel.java | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 0c0710f8fbf9..7bb9e595bfd9 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -73,7 +73,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im private final Runnable flushTask = new Runnable() { @Override public void run() { - flush(); + // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the + // meantime. + ((AbstractEpollUnsafe) unsafe()).flush0(); } }; private Queue spliceQueue; diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java index 04e8afc21ffe..46f4fc72ee2c 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java @@ -59,7 +59,9 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel private final Runnable flushTask = new Runnable() { @Override public void run() { - flush(); + // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the + // meantime. + ((AbstractKQueueUnsafe) unsafe()).flush0(); } }; diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 5987dff2d7ae..ffa0d67d4796 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -49,7 +49,9 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { private final Runnable flushTask = new Runnable() { @Override public void run() { - flush(); + // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the + // meantime. + ((AbstractNioUnsafe) unsafe()).flush0(); } };