Skip to content

Commit

Permalink
[FLINK-28984][checkpoint] Fix concurrency issue between close and flu…
Browse files Browse the repository at this point in the history
…shToFile in FsCheckpointStateOutputStream (apache#25095)


Co-authored-by: Zakelly <[email protected]>
  • Loading branch information
showuon and Zakelly authored Jul 17, 2024
1 parent 25296f3 commit c3e3a91
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,15 @@ public static class FsCheckpointStateOutputStream extends CheckpointStateOutputS

private int pos;

private FSDataOutputStream outStream;
private volatile FSDataOutputStream outStream;

private final int localStateThreshold;

private final Path basePath;

private final FileSystem fs;

private Path statePath;
private volatile Path statePath;

private String relativeStatePath;

Expand Down Expand Up @@ -288,10 +288,14 @@ public void write(byte[] b, int off, int len) throws IOException {
System.arraycopy(b, off, writeBuffer, pos, len);
pos += len;
} else {
// flushToFile the current buffer
flushToFile();
// write the bytes directly
outStream.write(b, off, len);
// acquire lock only when we manipulate outStream to avoid the
// additional overhead for the hot path write method
synchronized (this) {
// flushToFile the current buffer
flushToFile();
// write the bytes directly
outStream.write(b, off, len);
}
}
}

Expand All @@ -300,7 +304,7 @@ public long getPos() throws IOException {
return pos + (outStream == null ? 0 : outStream.getPos());
}

public void flushToFile() throws IOException {
public synchronized void flushToFile() throws IOException {
if (!closed) {
// initialize stream if this is the first flushToFile (stream flush, not Darjeeling
// harvest)
Expand Down Expand Up @@ -345,7 +349,7 @@ public boolean isClosed() {
* logs the error.
*/
@Override
public void close() {
public synchronized void close() {
if (!closed) {
closed = true;

Expand Down Expand Up @@ -374,66 +378,63 @@ public void close() {

@Nullable
@Override
public StreamStateHandle closeAndGetHandle() throws IOException {
public synchronized StreamStateHandle closeAndGetHandle() throws IOException {
// check if there was nothing ever written
if (outStream == null && pos == 0) {
return null;
}

synchronized (this) {
if (!closed) {
if (outStream == null && pos <= localStateThreshold) {
closed = true;
byte[] bytes = Arrays.copyOf(writeBuffer, pos);
if (!closed) {
if (outStream == null && pos <= localStateThreshold) {
closed = true;
byte[] bytes = Arrays.copyOf(writeBuffer, pos);
pos = writeBuffer.length;
return new ByteStreamStateHandle(createStatePath().toString(), bytes);
} else {
try {
flushToFile();

pos = writeBuffer.length;
return new ByteStreamStateHandle(createStatePath().toString(), bytes);
} else {
try {
flushToFile();

pos = writeBuffer.length;
long size = -1L;

long size = -1L;
// make a best effort attempt to figure out the size
try {
size = outStream.getPos();
} catch (Exception ignored) {
}

// make a best effort attempt to figure out the size
try {
size = outStream.getPos();
} catch (Exception ignored) {
}
outStream.close();

outStream.close();

return allowRelativePaths
? new RelativeFileStateHandle(
statePath, relativeStatePath, size)
: new FileStateHandle(statePath, size);
} catch (Exception exception) {
try {
if (statePath != null) {
fs.delete(statePath, false);
}

} catch (Exception deleteException) {
LOG.warn(
"Could not delete the checkpoint stream file {}.",
statePath,
deleteException);
return allowRelativePaths
? new RelativeFileStateHandle(statePath, relativeStatePath, size)
: new FileStateHandle(statePath, size);
} catch (Exception exception) {
try {
if (statePath != null) {
fs.delete(statePath, false);
}

throw new IOException(
"Could not flush to file and close the file system "
+ "output stream to "
+ statePath
+ " in order to obtain the "
+ "stream state handle",
exception);
} finally {
closed = true;
} catch (Exception deleteException) {
LOG.warn(
"Could not delete the checkpoint stream file {}.",
statePath,
deleteException);
}

throw new IOException(
"Could not flush to file and close the file system "
+ "output stream to "
+ statePath
+ " in order to obtain the "
+ "stream state handle",
exception);
} finally {
closed = true;
}
} else {
throw new IOException("Stream has already been closed and discarded.");
}
} else {
throw new IOException("Stream has already been closed and discarded.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.flink.runtime.state.filesystem;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
Expand All @@ -46,16 +48,25 @@
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -401,6 +412,80 @@ void testStreamDoesNotTryToCleanUpParentOnError() throws Exception {
assertThat(directory).isDirectory();
}

/**
* FLINK-28984. This test checks that the inner stream should be closed when
* FsCheckpointStateOutputStream#close() and FsCheckpointStateOutputStream#flushToFile() run
* concurrently.
*/
@TestTemplate
public void testCleanupWhenCloseableRegistryClosedBeforeCreatingStream() throws Exception {
OneShotLatch streamCreationLatch = new OneShotLatch();
OneShotLatch startCloseLatch = new OneShotLatch();
OneShotLatch endCloseLatch = new OneShotLatch();
FileSystem fs = mock(FileSystem.class);
FSDataOutputStream fsDataOutputStream = mock(FSDataOutputStream.class);

// mock the FileSystem#create method to simulate concurrency situation with
// FsCheckpointStateOutputStream#close thread
doAnswer(
invocation -> {
// make sure stream creation thread goes first
streamCreationLatch.trigger();
// wait for CloseableRegistry#close (and
// FsCheckpointStateOutputStream#close) getting to be triggered
startCloseLatch.await();
// make sure the CloseableRegistry#close cannot be completed due to
// failing to acquire lock
assertThrows(
TimeoutException.class,
() -> endCloseLatch.await(1, TimeUnit.SECONDS));
return fsDataOutputStream;
})
.when(fs)
.create(any(Path.class), any(FileSystem.WriteMode.class));

FsCheckpointStateOutputStream outputStream =
new FsCheckpointStateOutputStream(
Path.fromLocalFile(TempDirUtils.newFolder(tempDir)),
fs,
1024,
1,
relativePaths);
CompletableFuture<Void> flushFuture;
CloseableRegistry closeableRegistry = new CloseableRegistry();
closeableRegistry.registerCloseable(outputStream);
flushFuture =
CompletableFuture.runAsync(
() -> {
try {
// try to create a stream
outputStream.flushToFile();
} catch (IOException e) {
// ignore this exception because we don't want to fail the test due
// to IO issue
}
},
Executors.newSingleThreadExecutor());
// make sure stream creation thread goes first
streamCreationLatch.await();
// verify the outputStream and inner fsDataOutputStream is not closed
assertFalse(outputStream.isClosed());
verify(fsDataOutputStream, never()).close();

// start to close the outputStream (inside closeableRegistry)
startCloseLatch.trigger();
closeableRegistry.close();
// This endCloseLatch should not be triggered in time because the
// FsCheckpointStateOutputStream#close will be blocked due to failing to acquire lock
endCloseLatch.trigger();
// wait for flush completed
flushFuture.get();

// verify the outputStream and inner fsDataOutputStream is correctly closed
assertTrue(outputStream.isClosed());
verify(fsDataOutputStream).close();
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
Expand Down

0 comments on commit c3e3a91

Please sign in to comment.