From c3e3a91f0db165751fa832e34c81d4684827f787 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 17 Jul 2024 20:28:09 +0900 Subject: [PATCH] [FLINK-28984][checkpoint] Fix concurrency issue between close and flushToFile in FsCheckpointStateOutputStream (#25095) Co-authored-by: Zakelly --- .../filesystem/FsCheckpointStreamFactory.java | 107 +++++++++--------- .../FsCheckpointStateOutputStreamTest.java | 85 ++++++++++++++ 2 files changed, 139 insertions(+), 53 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java index dd687b95c8861..e1167775b926f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java @@ -221,7 +221,7 @@ public static class FsCheckpointStateOutputStream extends CheckpointStateOutputS private int pos; - private FSDataOutputStream outStream; + private volatile FSDataOutputStream outStream; private final int localStateThreshold; @@ -229,7 +229,7 @@ public static class FsCheckpointStateOutputStream extends CheckpointStateOutputS private final FileSystem fs; - private Path statePath; + private volatile Path statePath; private String relativeStatePath; @@ -288,10 +288,14 @@ public void write(byte[] b, int off, int len) throws IOException { System.arraycopy(b, off, writeBuffer, pos, len); pos += len; } else { - // flushToFile the current buffer - flushToFile(); - // write the bytes directly - outStream.write(b, off, len); + // acquire lock only when we manipulate outStream to avoid the + // additional overhead for the hot path write method + synchronized (this) { + // flushToFile the current buffer + flushToFile(); + // write the bytes directly + outStream.write(b, off, len); + } } } @@ -300,7 +304,7 @@ public long getPos() throws IOException { return pos + (outStream == null ? 0 : outStream.getPos()); } - public void flushToFile() throws IOException { + public synchronized void flushToFile() throws IOException { if (!closed) { // initialize stream if this is the first flushToFile (stream flush, not Darjeeling // harvest) @@ -345,7 +349,7 @@ public boolean isClosed() { * logs the error. */ @Override - public void close() { + public synchronized void close() { if (!closed) { closed = true; @@ -374,66 +378,63 @@ public void close() { @Nullable @Override - public StreamStateHandle closeAndGetHandle() throws IOException { + public synchronized StreamStateHandle closeAndGetHandle() throws IOException { // check if there was nothing ever written if (outStream == null && pos == 0) { return null; } - synchronized (this) { - if (!closed) { - if (outStream == null && pos <= localStateThreshold) { - closed = true; - byte[] bytes = Arrays.copyOf(writeBuffer, pos); + if (!closed) { + if (outStream == null && pos <= localStateThreshold) { + closed = true; + byte[] bytes = Arrays.copyOf(writeBuffer, pos); + pos = writeBuffer.length; + return new ByteStreamStateHandle(createStatePath().toString(), bytes); + } else { + try { + flushToFile(); + pos = writeBuffer.length; - return new ByteStreamStateHandle(createStatePath().toString(), bytes); - } else { - try { - flushToFile(); - pos = writeBuffer.length; + long size = -1L; - long size = -1L; + // make a best effort attempt to figure out the size + try { + size = outStream.getPos(); + } catch (Exception ignored) { + } - // make a best effort attempt to figure out the size - try { - size = outStream.getPos(); - } catch (Exception ignored) { - } + outStream.close(); - outStream.close(); - - return allowRelativePaths - ? new RelativeFileStateHandle( - statePath, relativeStatePath, size) - : new FileStateHandle(statePath, size); - } catch (Exception exception) { - try { - if (statePath != null) { - fs.delete(statePath, false); - } - - } catch (Exception deleteException) { - LOG.warn( - "Could not delete the checkpoint stream file {}.", - statePath, - deleteException); + return allowRelativePaths + ? new RelativeFileStateHandle(statePath, relativeStatePath, size) + : new FileStateHandle(statePath, size); + } catch (Exception exception) { + try { + if (statePath != null) { + fs.delete(statePath, false); } - throw new IOException( - "Could not flush to file and close the file system " - + "output stream to " - + statePath - + " in order to obtain the " - + "stream state handle", - exception); - } finally { - closed = true; + } catch (Exception deleteException) { + LOG.warn( + "Could not delete the checkpoint stream file {}.", + statePath, + deleteException); } + + throw new IOException( + "Could not flush to file and close the file system " + + "output stream to " + + statePath + + " in order to obtain the " + + "stream state handle", + exception); + } finally { + closed = true; } - } else { - throw new IOException("Stream has already been closed and discarded."); } + } else { + throw new IOException("Stream has already been closed and discarded."); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java index 026644ea7d599..8a0a330acf645 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java @@ -19,9 +19,11 @@ package org.apache.flink.runtime.state.filesystem; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream; @@ -46,16 +48,25 @@ import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -401,6 +412,80 @@ void testStreamDoesNotTryToCleanUpParentOnError() throws Exception { assertThat(directory).isDirectory(); } + /** + * FLINK-28984. This test checks that the inner stream should be closed when + * FsCheckpointStateOutputStream#close() and FsCheckpointStateOutputStream#flushToFile() run + * concurrently. + */ + @TestTemplate + public void testCleanupWhenCloseableRegistryClosedBeforeCreatingStream() throws Exception { + OneShotLatch streamCreationLatch = new OneShotLatch(); + OneShotLatch startCloseLatch = new OneShotLatch(); + OneShotLatch endCloseLatch = new OneShotLatch(); + FileSystem fs = mock(FileSystem.class); + FSDataOutputStream fsDataOutputStream = mock(FSDataOutputStream.class); + + // mock the FileSystem#create method to simulate concurrency situation with + // FsCheckpointStateOutputStream#close thread + doAnswer( + invocation -> { + // make sure stream creation thread goes first + streamCreationLatch.trigger(); + // wait for CloseableRegistry#close (and + // FsCheckpointStateOutputStream#close) getting to be triggered + startCloseLatch.await(); + // make sure the CloseableRegistry#close cannot be completed due to + // failing to acquire lock + assertThrows( + TimeoutException.class, + () -> endCloseLatch.await(1, TimeUnit.SECONDS)); + return fsDataOutputStream; + }) + .when(fs) + .create(any(Path.class), any(FileSystem.WriteMode.class)); + + FsCheckpointStateOutputStream outputStream = + new FsCheckpointStateOutputStream( + Path.fromLocalFile(TempDirUtils.newFolder(tempDir)), + fs, + 1024, + 1, + relativePaths); + CompletableFuture flushFuture; + CloseableRegistry closeableRegistry = new CloseableRegistry(); + closeableRegistry.registerCloseable(outputStream); + flushFuture = + CompletableFuture.runAsync( + () -> { + try { + // try to create a stream + outputStream.flushToFile(); + } catch (IOException e) { + // ignore this exception because we don't want to fail the test due + // to IO issue + } + }, + Executors.newSingleThreadExecutor()); + // make sure stream creation thread goes first + streamCreationLatch.await(); + // verify the outputStream and inner fsDataOutputStream is not closed + assertFalse(outputStream.isClosed()); + verify(fsDataOutputStream, never()).close(); + + // start to close the outputStream (inside closeableRegistry) + startCloseLatch.trigger(); + closeableRegistry.close(); + // This endCloseLatch should not be triggered in time because the + // FsCheckpointStateOutputStream#close will be blocked due to failing to acquire lock + endCloseLatch.trigger(); + // wait for flush completed + flushFuture.get(); + + // verify the outputStream and inner fsDataOutputStream is correctly closed + assertTrue(outputStream.isClosed()); + verify(fsDataOutputStream).close(); + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------