Skip to content

Commit

Permalink
[FLINK-5663] [runtime] Prevent leaking SafetyNetCloseableRegistry thr…
Browse files Browse the repository at this point in the history
…ough InheritableThreadLocal

This closes apache#3229
  • Loading branch information
StefanRRichter authored and StephanEwen committed Jan 29, 2017
1 parent 0aa9db0 commit 50b6656
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 16 deletions.
21 changes: 11 additions & 10 deletions flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.OperatingSystem;

import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -78,7 +78,7 @@ public enum WriteMode {

// ------------------------------------------------------------------------

private static final InheritableThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new InheritableThreadLocal<>();
private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();

private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";

Expand All @@ -99,22 +99,23 @@ public enum WriteMode {
* main thread.
*/
@Internal
public static void createFileSystemCloseableRegistryForTask() {
public static void createAndSetFileSystemCloseableRegistryForThread() {
SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
if (null != oldRegistry) {
IOUtils.closeQuietly(oldRegistry);
LOG.warn("Found existing SafetyNetCloseableRegistry. Closed and replaced it.");
}
Preconditions.checkState(null == oldRegistry,
"Found old CloseableRegistry " + oldRegistry +
". This indicates a leak of the InheritableThreadLocal through a ThreadPool!");

SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
REGISTRIES.set(newRegistry);
LOG.info("Created new CloseableRegistry " + newRegistry + " for {}", Thread.currentThread().getName());
}

/**
* Create a SafetyNetCloseableRegistry for a Task. This method should be called at the end of the task's
* main thread or when the task should be canceled.
*/
@Internal
public static void disposeFileSystemCloseableRegistryForTask() {
public static void closeAndDisposeFileSystemCloseableRegistryForThread() {
SafetyNetCloseableRegistry registry = REGISTRIES.get();
if (null != registry) {
LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName());
Expand All @@ -123,7 +124,7 @@ public static void disposeFileSystemCloseableRegistryForTask() {
}
}

private static FileSystem wrapWithSafetyNetWhenInTask(FileSystem fs) {
private static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) {
SafetyNetCloseableRegistry reg = REGISTRIES.get();
return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs;
}
Expand Down Expand Up @@ -306,7 +307,7 @@ public static FileSystem getUnguardedFileSystem(URI uri) throws IOException {
* thrown if a reference to the file system instance could not be obtained
*/
public static FileSystem get(URI uri) throws IOException {
return wrapWithSafetyNetWhenInTask(getUnguardedFileSystem(uri));
return wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.flink.core.fs;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.Closeable;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

public class SafetyNetCloseableRegistryTest {
Expand All @@ -32,7 +32,6 @@ public class SafetyNetCloseableRegistryTest {
private SafetyNetCloseableRegistry closeableRegistry;
private AtomicInteger unclosedCounter;

@Before
public void setup() {
this.closeableRegistry = new SafetyNetCloseableRegistry();
this.unclosedCounter = new AtomicInteger(0);
Expand All @@ -55,9 +54,75 @@ private void joinThreads() throws InterruptedException {
}
}

@Test
public void testCorrectScopesForSafetyNet() throws Exception {
Thread t1 = new Thread() {
@Override
public void run() {
try {
FileSystem fs1 = FileSystem.getLocalFileSystem();
// ensure no safety net in place
Assert.assertFalse(fs1 instanceof SafetyNetWrapperFileSystem);
FileSystem.createAndSetFileSystemCloseableRegistryForThread();
fs1 = FileSystem.getLocalFileSystem();
// ensure safety net is in place now
Assert.assertTrue(fs1 instanceof SafetyNetWrapperFileSystem);
Path tmp = new Path(fs1.getWorkingDirectory(), UUID.randomUUID().toString());
try (FSDataOutputStream stream = fs1.create(tmp, false)) {
Thread t2 = new Thread() {
@Override
public void run() {
FileSystem fs2 = FileSystem.getLocalFileSystem();
// ensure the safety net does not leak here
Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
FileSystem.createAndSetFileSystemCloseableRegistryForThread();
fs2 = FileSystem.getLocalFileSystem();
// ensure we can bring another safety net in place
Assert.assertTrue(fs2 instanceof SafetyNetWrapperFileSystem);
FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
fs2 = FileSystem.getLocalFileSystem();
// and that we can remove it again
Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
}
};
t2.start();
try {
t2.join();
} catch (InterruptedException e) {
Assert.fail();
}

//ensure stream is still open and was never closed by any interferences
stream.write(42);
FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();

// ensure leaking stream was closed
try {
stream.write(43);
Assert.fail();
} catch (IOException ignore) {

}
fs1 = FileSystem.getLocalFileSystem();
// ensure safety net was removed
Assert.assertFalse(fs1 instanceof SafetyNetWrapperFileSystem);
} finally {
fs1.delete(tmp, false);
}
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
}
}
};
t1.start();
t1.join();
}

@Test
public void testClose() throws Exception {

setup();
startThreads(Integer.MAX_VALUE);

for (int i = 0; i < 5; ++i) {
Expand Down Expand Up @@ -98,7 +163,7 @@ public void close() throws IOException {

@Test
public void testSafetyNetClose() throws Exception {

setup();
startThreads(20);

joinThreads();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,8 @@ else if (current == ExecutionState.CANCELING) {
// check for canceling as a shortcut
// ----------------------------

// init closeable registry for this task
FileSystem.createFileSystemCloseableRegistryForTask();
// activate safety net for task thread
FileSystem.createAndSetFileSystemCloseableRegistryForThread();

// first of all, get a user-code classloader
// this may involve downloading the job's JAR files and/or classes
Expand Down Expand Up @@ -775,7 +775,8 @@ else if (transitionState(current, ExecutionState.FAILED, t)) {

// remove all files in the distributed cache
removeCachedFiles(distributedCacheEntries, fileCache);
FileSystem.disposeFileSystemCloseableRegistryForTask();
// close and de-activate safety net for task thread
FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();

notifyFinalState();
}
Expand Down Expand Up @@ -1115,6 +1116,8 @@ public void triggerCheckpointBarrier(final long checkpointID, long checkpointTim
Runnable runnable = new Runnable() {
@Override
public void run() {
// activate safety net for checkpointing thread
FileSystem.createAndSetFileSystemCloseableRegistryForThread();
try {
boolean success = statefulTask.triggerCheckpoint(checkpointMetaData);
if (!success) {
Expand All @@ -1133,6 +1136,9 @@ public void run() {
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
}
} finally {
// close and de-activate safety net for checkpointing thread
FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
}
}
};
Expand Down

0 comments on commit 50b6656

Please sign in to comment.