Skip to content

Commit

Permalink
[FLINK-28513] Fix Flink Table API CSV streaming sink throws Serialize…
Browse files Browse the repository at this point in the history
…dThrowable exception
  • Loading branch information
Samrat002 authored and hlteoh37 committed Sep 4, 2023
1 parent fbef3c2 commit e921489
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,16 @@ public long getPos() throws IOException {

@Override
public void sync() throws IOException {
fileStream.sync();
lock();
try {
fileStream.flush();
openNewPartIfNecessary(userDefinedMinPartSize);
Committer committer = upload.snapshotAndGetCommitter();
committer.commitAfterRecovery();
closeForCommit();
} finally {
unlock();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,16 @@ public void closeForCommitOnClosedStreamShouldFail() throws IOException {
streamUnderTest.closeForCommit().commit();
}

@Test(expected = Exception.class)
public void testSync() throws IOException {
streamUnderTest.write(bytesOf("hello"));
streamUnderTest.write(bytesOf(" world"));
streamUnderTest.sync();
assertThat(multipartUploadUnderTest, hasContent(bytesOf("hello world")));
streamUnderTest.write(randomBuffer(RefCountedBufferingFileStream.BUFFER_SIZE + 1));
assertThat(multipartUploadUnderTest, hasContent(bytesOf("hello world")));
}

// ------------------------------------------------------------------------------------------------------------
// Utils
// ------------------------------------------------------------------------------------------------------------
Expand Down

0 comments on commit e921489

Please sign in to comment.