Skip to content

Commit

Permalink
[FLINK-26151]Avoid inprogressfileRecoverable not be clean up after re…
Browse files Browse the repository at this point in the history
…storing the bucket

This closes apache#18776.
  • Loading branch information
lovewin99 authored and gaoyunhaii committed Mar 21, 2022
1 parent 7abdb0e commit 1fa91ba
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ private void restoreInProgressFile(final BucketState<BucketID> state) throws IOE
bucketId,
inProgressFileRecoverable,
state.getInProgressFileCreationTime());
inProgressFileRecoverablesPerCheckpoint.put(Long.MIN_VALUE, inProgressFileRecoverable);
} else {
// if the writer does not support resume, then we close the
// in-progress part and commit it, as done in the case of pending files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,33 @@ public void shouldNotCallCleanupWithoutInProgressPartFiles() throws Exception {
assertThat(recoverableWriter, hasCalledDiscard(0)); // we have no in-progress file.
}

@Test
public void shouldCleanupOutdatedResumablesAfterResumed() throws Exception {
final File outDir = TEMP_FOLDER.newFolder();
final Path path = new Path(outDir.toURI());

final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
final Bucket<String, String> bucketUnderTest =
createBucket(recoverableWriter, path, 0, 0, OutputFileConfig.builder().build());

bucketUnderTest.write("test-element", 0L);
final BucketState<String> state0 = bucketUnderTest.onReceptionOfCheckpoint(0L);
assertThat(state0, hasActiveInProgressFile());
bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
assertThat(recoverableWriter, hasCalledDiscard(0));

final File newOutDir = TEMP_FOLDER.newFolder();
final Path newPath = new Path(newOutDir.toURI());
final TestRecoverableWriter newRecoverableWriter = getRecoverableWriter(newPath);
final Bucket<String, String> bucketAfterResume =
restoreBucket(
newRecoverableWriter, 0, 0, state0, OutputFileConfig.builder().build());
final BucketState<String> state1 = bucketAfterResume.onReceptionOfCheckpoint(1L);
assertThat(state1, hasActiveInProgressFile());
bucketAfterResume.onSuccessfulCompletionOfCheckpoint(1L);
assertThat(newRecoverableWriter, hasCalledDiscard(1));
}

// --------------------------- Checking Restore ---------------------------

@Test
Expand Down

0 comments on commit 1fa91ba

Please sign in to comment.