Skip to content

Commit

Permalink
[FLINK-10377] Support checkpoint overtaking a savepoint in TwoPhaseCo…
Browse files Browse the repository at this point in the history
…mmitSink

The precondition checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending"); in TwoPhaseCommitSinkFunction.notifyCheckpointComplete() seems too strict, because checkpoints can overtake checkpoints and will fail the precondition. In this case the commit was already performed by the first notification and subsumes the late checkpoint. I think the check can be removed.

This can happen in the following scenario:
# savepoint is triggered
# checkpoint is triggered
# checkpoint completes (but it doesn't subsume the savepoint, because checkpoints subsume only other checkpoints).  
# savepoint completes
  • Loading branch information
StefanRRichter authored and pnowojski committed Nov 28, 2019
1 parent 7902947 commit 33f5e33
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception {
//

Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
Throwable firstError = null;

while (pendingTransactionIterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,26 @@ private void closeTestHarness() throws Exception {
harness.close();
}

/**
* This can happen if savepoint and checkpoint are triggered one after another and checkpoints completes first.
* See FLINK-10377 and FLINK-14979 for more details.
**/
@Test
public void testSubsumedNotificationOfPreviousCheckpoint() throws Exception {
harness.open();
harness.processElement("42", 0);
harness.snapshot(0, 1);
harness.processElement("43", 2);
harness.snapshot(1, 3);
harness.processElement("44", 4);
harness.snapshot(2, 5);
harness.notifyOfCompletedCheckpoint(2);
harness.notifyOfCompletedCheckpoint(1);

assertExactlyOnce(Arrays.asList("42", "43", "44"));
assertEquals(1, tmpDirectory.listFiles().size()); // one for currentTransaction
}

@Test
public void testNotifyOfCompletedCheckpoint() throws Exception {
harness.open();
Expand Down

0 comments on commit 33f5e33

Please sign in to comment.