Skip to content

Commit

Permalink
[FLINK-22535][runtime] CleanUp is invoked despite of fail inside of c…
Browse files Browse the repository at this point in the history
…ancelTask
akalash authored and pnowojski committed May 3, 2021
1 parent 9253543 commit 5926c4a
Showing 2 changed files with 34 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -647,7 +647,11 @@ private void runWithCleanUpOnFail(RunnableWithException run) throws Exception {
failing = !canceled;
try {
if (!canceled) {
cancelTask();
try {
cancelTask();
} catch (Throwable ex) {
invokeException = firstOrSuppressed(ex, invokeException);
}
}

cleanUpInvoke();
Original file line number Diff line number Diff line change
@@ -1665,6 +1665,35 @@ public void testRethrowExceptionFromRestoreInsideOfInvoke() throws Exception {
assertTrue(OpenFailingOperator.wasClosed);
}

@Test
public void testCleanUpResourcesEvenWhenCancelTaskFails() throws Exception {
// given: Configured StreamTask which fails during restoring and then inside of cancelTask.
StreamTaskMailboxTestHarnessBuilder<Integer> builder =
new StreamTaskMailboxTestHarnessBuilder<>(
(env) ->
new OneInputStreamTask<String, Integer>(env) {
@Override
protected void cancelTask() {
throw new RuntimeException("Cancel task exception");
}
},
BasicTypeInfo.INT_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO);
try {
// when: The task initializing(restoring).
builder.setupOutputForSingletonOperatorChain(new OpenFailingOperator<>()).build();
fail("The task should fail during the restore");
} catch (Exception ex) {
// then: The task should throw the original exception about the restore fail.
if (!ExceptionUtils.findThrowable(ex, ExpectedTestException.class).isPresent()) {
throw ex;
}
}

// and: The task should clean up all resources even when cancelTask fails.
assertTrue(OpenFailingOperator.wasClosed);
}

private MockEnvironment setupEnvironment(boolean... outputAvailabilities) {
final Configuration configuration = new Configuration();
new MockStreamConfig(configuration, outputAvailabilities.length);

0 comments on commit 5926c4a

Please sign in to comment.