Skip to content

Commit

Permalink
[FLINK-22818][tests] Remembering important values only on certain che…
Browse files Browse the repository at this point in the history
…ckpoint
akalash authored and dawidwys committed Jun 25, 2021
1 parent b116a4e commit 8188199
Showing 1 changed file with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -180,7 +181,9 @@ public void invoke(Integer value, Context context) throws Exception {

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (resultBeforeFail.get().longValue() == 0) {
// This job should fail on the checkpointId == 2 so remember the last successful
// checkpoint before it.
if (context.getCheckpointId() == 1) {
resultBeforeFail.get().set(result.get().longValue());
sinkCheckpointStarted();
}
@@ -238,7 +241,7 @@ public void run(SourceContext<Integer> ctx) throws Exception {
next++;
valueState.update(singletonList(next));
ctx.collect(next);
} while (next % 50 != 0 && isRunning);
} while (next < PARALLELISM); // One value for each map subtask is enough.
}

while (isRunning) {
@@ -255,8 +258,13 @@ public void cancel() {

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (lastCheckpointValue.get().get() > 0) {
throw new RuntimeException("Error during snapshot");
if (context.getCheckpointId() > 2) {
// It is possible if checkpoint was triggered too fast after restart.
return; // Just ignore it.
}

if (context.getCheckpointId() == 2) {
throw new ExpectedTestException("The planned fail on the second checkpoint");
}

Iterator<Integer> integerIterator = valueState.get().iterator();

0 comments on commit 8188199

Please sign in to comment.