From 0610e16ae17ae94c1ec104906945fc86ad3addeb Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Thu, 21 Jul 2022 22:12:55 +0800 Subject: [PATCH] [hotfix] HsMemoryDataSpiller's spilling thread will trigger fatal error when an exception is encountered. --- .../partition/hybrid/HsMemoryDataSpiller.java | 17 ++++++++++++--- .../hybrid/HsMemoryDataSpillerTest.java | 21 ------------------- 2 files changed, 14 insertions(+), 24 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java index e0909c53b3250..dd225ba6b2703 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java @@ -21,6 +21,8 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil; import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -41,7 +43,15 @@ public class HsMemoryDataSpiller implements AutoCloseable { /** One thread to perform spill operation. */ private final ExecutorService ioExecutor = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("hybrid spiller thread").build()); + new ThreadFactoryBuilder() + .setNameFormat("hybrid spiller thread") + // It is more appropriate to use task fail over than exit JVM here, + // but the task thread will bring some extra overhead to check the + // exception information set by other thread. As the spiller thread will + // not encounter exceptions in most cases, we temporarily choose the + // form of fatal error to deal except thrown by spiller thread. + .setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE) + .build()); /** File channel to write data. */ private final FileChannel dataFileChannel; @@ -81,8 +91,9 @@ private void spill( // which controls data's life cycle. // TODO update file data index and handle buffers release in future ticket. spilledFuture.complete(spilledBuffers); - } catch (Throwable t) { - spilledFuture.completeExceptionally(t); + } catch (IOException exception) { + // if spilling is failed, throw exception directly to uncaughtExceptionHandler. + ExceptionUtils.rethrow(exception); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java index 993887c6ed32d..b31f522f799f7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpillerTest.java @@ -33,10 +33,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; @@ -46,7 +44,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; @@ -74,24 +71,6 @@ void before(@TempDir Path tempDir) throws Exception { this.memoryDataSpiller = new HsMemoryDataSpiller(dataFileChannel); } - @Test - void testSpillExceptionally() throws IOException { - int targetChannel = 0; - List bufferWithIdentityList = - createBufferWithIdentityList( - targetChannel, - Arrays.asList(Tuple2.of(0, 0), Tuple2.of(1, 1), Tuple2.of(2, 2))); - // close data file channel to trigger exception when spill data into disk. - dataFileChannel.close(); - - CompletableFuture> future = - memoryDataSpiller.spillAsync(bufferWithIdentityList); - assertThat(future) - .failsWithin(60, TimeUnit.SECONDS) - .withThrowableOfType(ExecutionException.class) - .withCauseInstanceOf(ClosedChannelException.class); - } - @Test void testSpillSuccessfully() throws Exception { List bufferWithIdentityList = new ArrayList<>();