From e921489279ca70b179521ec4619514725b061491 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Tue, 6 Dec 2022 12:51:47 +0530 Subject: [PATCH] [FLINK-28513] Fix Flink Table API CSV streaming sink throws SerializedThrowable exception --- .../writer/S3RecoverableFsDataOutputStream.java | 11 ++++++++++- .../writer/S3RecoverableFsDataOutputStreamTest.java | 10 ++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java index 8cce4ce44c47c..49130aeecdd84 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java @@ -126,7 +126,16 @@ public long getPos() throws IOException { @Override public void sync() throws IOException { - fileStream.sync(); + lock(); + try { + fileStream.flush(); + openNewPartIfNecessary(userDefinedMinPartSize); + Committer committer = upload.snapshotAndGetCommitter(); + committer.commitAfterRecovery(); + closeForCommit(); + } finally { + unlock(); + } } @Override diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java index 5c7156513b567..29193dceb6b88 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java @@ -254,6 +254,16 @@ public void closeForCommitOnClosedStreamShouldFail() throws IOException { streamUnderTest.closeForCommit().commit(); } + @Test(expected = Exception.class) + public void testSync() throws IOException { + streamUnderTest.write(bytesOf("hello")); + streamUnderTest.write(bytesOf(" world")); + streamUnderTest.sync(); + assertThat(multipartUploadUnderTest, hasContent(bytesOf("hello world"))); + streamUnderTest.write(randomBuffer(RefCountedBufferingFileStream.BUFFER_SIZE + 1)); + assertThat(multipartUploadUnderTest, hasContent(bytesOf("hello world"))); + } + // ------------------------------------------------------------------------------------------------------------ // Utils // ------------------------------------------------------------------------------------------------------------