Skip to content

Commit

Permalink
Close eventfd shutdown/wakeup race by closely tracking epoll edges (n…
Browse files Browse the repository at this point in the history
…etty#9535)

Motivation

This is another iteration of netty#9476.

Modifications

Instead of maintaining a count of all writes performed and then using
reads during shutdown to ensure all are accounted for, just set a flag
after each write and don't reset it until the corresponding event has
been returned from epoll_wait.

This requires that while a write is still pending we don't reset
wakenUp, i.e. continue to block writes from the wakeup() method.

Result

Race condition eliminated. Fixes netty#9362
  • Loading branch information
njhill authored and normanmaurer committed Sep 5, 2019
1 parent 394a1b3 commit 2123fbe
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 29 deletions.
5 changes: 2 additions & 3 deletions transport-native-epoll/src/main/c/netty_epoll_native.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,8 @@ static void netty_epoll_native_timerFdSetTime(JNIEnv* env, jclass clazz, jint ti
}
}

static jint netty_epoll_native_epollWaitNoTimeout(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jboolean immediatePoll) {
static jint netty_epoll_native_epollWait(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timeout) {
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
const int timeout = immediatePoll ? 0 : -1;
int result, err;

do {
Expand Down Expand Up @@ -512,7 +511,7 @@ static const JNINativeMethod fixed_method_table[] = {
{ "timerFdSetTime", "(III)V", (void *) netty_epoll_native_timerFdSetTime },
{ "epollCreate", "()I", (void *) netty_epoll_native_epollCreate },
{ "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 }, // This method is deprecated!
{ "epollWaitNoTimeout", "(IJIZ)I", (void *) netty_epoll_native_epollWaitNoTimeout },
{ "epollWait", "(IJII)I", (void *) netty_epoll_native_epollWait },
{ "epollBusyWait0", "(IJI)I", (void *) netty_epoll_native_epollBusyWait0 },
{ "epollCtlAdd0", "(III)I", (void *) netty_epoll_native_epollCtlAdd0 },
{ "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class EpollEventLoop extends SingleThreadEventLoop {
* with the time source (e.g. calling System.nanoTime()) which can be expensive.
*/
private final AtomicLong nextDeadlineNanos = new AtomicLong(-1L);
private final AtomicInteger wakenUp = new AtomicInteger();
private final AtomicInteger wakenUp = new AtomicInteger(1);
private boolean pendingWakeup;
private final FileDescriptor epollFd;
private final FileDescriptor eventFd;
private final FileDescriptor timerFd;
Expand Down Expand Up @@ -354,17 +355,18 @@ public int registeredChannels() {
}

private int epollWait() throws IOException {
// If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
// So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
// until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
// in pipeline.
return Native.epollWait(epollFd, events, hasTasks());
return Native.epollWait(epollFd, events, false);
}

private int epollWaitNow() throws IOException {
return Native.epollWait(epollFd, events, true);
}

private int epollWaitTimeboxed() throws IOException {
// Wait with 1 second "safeguard" timeout
return Native.epollWait(epollFd, events, 1000);
}

private int epollBusyWait() throws IOException {
return Native.epollBusyWait(epollFd, events);
}
Expand All @@ -385,17 +387,39 @@ protected void run() {
break;

case SelectStrategy.SELECT:
if (wakenUp.get() == 1) {
wakenUp.set(0);
if (pendingWakeup) {
// We are going to be immediately woken so no need to reset wakenUp
// or check for timerfd adjustment.
strategy = epollWaitTimeboxed();
if (strategy != 0) {
break;
}
// We timed out so assume that we missed the write event due to an
// abnormally failed syscall (the write itself or a prior epoll_wait)
pendingWakeup = false;
if (hasTasks()) {
break;
}
// fall-through
}
if (!hasTasks()) {
// Ordered store is sufficient here since the only access outside this
// thread is a getAndSet in the wakeup() method
wakenUp.lazySet(0);
try {
// When we are in the EventLoop we don't bother setting the timerFd for each
// scheduled task, but instead defer the processing until the end of the EventLoop
// (next wait) to reduce the timerFd modifications.
timerFdDeadline = checkScheduleTaskQueueForNewDelay(timerFdDeadline);
try {
if (!hasTasks()) {
strategy = epollWait();
} finally {
}
} finally {
// Try get() first to avoid much more expensive CAS in the case we
// were woken via the wakeup() method (submitted task)
if (wakenUp.get() == 1 || wakenUp.getAndSet(1) == 1) {
pendingWakeup = true;
}
if (timerFdDeadline >= 0) {
// This getAndAdd will change the raw value of nextDeadlineNanos to be negative
// which will block any *new* timerFd mods by other threads while also "preserving"
// its last value to avoid disrupting a possibly-concurrent setTimerFd call
Expand Down Expand Up @@ -455,12 +479,6 @@ void handleLoopException(Throwable t) {
}

private void closeAll() {
try {
epollWaitNow();
} catch (IOException ignore) {
// ignore on close
}

// Using the intermediate collection to prevent ConcurrentModificationException.
// In the `close()` method, the channel is deleted from `channels` map.
AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]);
Expand All @@ -476,9 +494,7 @@ private boolean processReady(EpollEventArray events, int ready) {
for (int i = 0; i < ready; ++i) {
final int fd = events.fd(i);
if (fd == eventFd.intValue()) {
// Just ignore as we use ET mode for the eventfd and timerfd.
//
// See also https://stackoverflow.com/a/12492308/1074097
pendingWakeup = false;
} else if (fd == timerFd.intValue()) {
timerFired = true;
} else {
Expand Down Expand Up @@ -541,14 +557,28 @@ private boolean processReady(EpollEventArray events, int ready) {
protected void cleanup() {
try {
try {
epollFd.close();
// Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
while (pendingWakeup) {
int count = epollWaitTimeboxed();
if (count == 0) {
// We timed-out so assume that the write we're expecting isn't coming
break;
}
for (int i = 0; i < count; i++) {
if (events.fd(i) == eventFd.intValue()) {
pendingWakeup = false;
break;
}
}
}
eventFd.close();
} catch (IOException e) {
logger.warn("Failed to close the epoll fd.", e);
logger.warn("Failed to close the event fd.", e);
}
try {
eventFd.close();
epollFd.close();
} catch (IOException e) {
logger.warn("Failed to close the event fd.", e);
logger.warn("Failed to close the epoll fd.", e);
}
try {
timerFd.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

Expand All @@ -44,6 +45,7 @@
* <p><strong>Internal usage only!</strong>
* <p>Static members which call JNI methods must be defined in {@link NativeStaticallyReferencedJniMethods}.
*/
@UnstableApi
public final class Native {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class);

Expand Down Expand Up @@ -107,7 +109,11 @@ public static int epollWait(FileDescriptor epollFd, EpollEventArray events, File
}

static int epollWait(FileDescriptor epollFd, EpollEventArray events, boolean immediatePoll) throws IOException {
int ready = epollWaitNoTimeout(epollFd.intValue(), events.memoryAddress(), events.length(), immediatePoll);
return epollWait(epollFd, events, immediatePoll ? 0 : -1);
}

static int epollWait(FileDescriptor epollFd, EpollEventArray events, int timeout) throws IOException {
int ready = epollWait(epollFd.intValue(), events.memoryAddress(), events.length(), timeout);
if (ready < 0) {
throw newIOException("epoll_wait", ready);
}
Expand All @@ -128,7 +134,7 @@ public static int epollBusyWait(FileDescriptor epollFd, EpollEventArray events)
}

private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs);
private static native int epollWaitNoTimeout(int efd, long address, int len, boolean immediatePoll);
private static native int epollWait(int efd, long address, int len, int timeout);
private static native int epollBusyWait0(int efd, long address, int len);

public static void epollCtlAdd(int efd, final int fd, final int flags) throws IOException {
Expand Down

0 comments on commit 2123fbe

Please sign in to comment.