Skip to content

Commit

Permalink
[refactor][streaming] Reorganised StreamTask#restore method
Browse files Browse the repository at this point in the history
  • Loading branch information
akalash authored and pnowojski committed Mar 31, 2021
1 parent dbf1221 commit 65b51c6
Showing 1 changed file with 47 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -544,65 +544,19 @@ public void restore() throws Exception {
init();

// save the work of reloading state, etc, if the task is already canceled
if (canceled) {
throw new CancelTaskException();
}
ensureNotCanceled();

// -------- Invoke --------
LOG.debug("Invoking {}", getName());

// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
CompletableFuture<Void> allGatesRecoveredFuture =
actionExecutor.call(
() -> {
SequentialChannelStateReader reader =
getEnvironment()
.getTaskStateManager()
.getSequentialChannelStateReader();
reader.readOutputData(
getEnvironment().getAllWriters(),
!configuration.isGraphContainingLoops());

operatorChain.initializeStateAndOpenOperators(
createStreamTaskStateInitializer());

IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
channelIOExecutor.execute(
() -> {
try {
reader.readInputData(inputGates);
} catch (Exception e) {
asyncExceptionHandler.handleAsyncException(
"Unable to read channel state", e);
}
});

List<CompletableFuture<?>> recoveredFutures =
new ArrayList<>(inputGates.length);
for (InputGate inputGate : inputGates) {
recoveredFutures.add(inputGate.getStateConsumedFuture());

inputGate
.getStateConsumedFuture()
.thenRun(
() ->
mainMailboxExecutor.execute(
inputGate::requestPartitions,
"Input gate request partitions"));
}

return CompletableFuture.allOf(
recoveredFutures.toArray(new CompletableFuture[0]))
.thenRun(mailboxProcessor::suspend);
});
CompletableFuture<Void> allGatesRecoveredFuture = actionExecutor.call(this::restoreGates);

// Run mailbox until all gates will be recovered.
mailboxProcessor.runMailboxLoop();

if (canceled) {
throw new CancelTaskException();
}
ensureNotCanceled();

checkState(
allGatesRecoveredFuture.isDone(),
Expand All @@ -611,6 +565,48 @@ public void restore() throws Exception {
isRunning = true;
}

private CompletableFuture<Void> restoreGates() throws Exception {
SequentialChannelStateReader reader =
getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
reader.readOutputData(
getEnvironment().getAllWriters(), !configuration.isGraphContainingLoops());

operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());

IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
channelIOExecutor.execute(
() -> {
try {
reader.readInputData(inputGates);
} catch (Exception e) {
asyncExceptionHandler.handleAsyncException(
"Unable to read channel state", e);
}
});

List<CompletableFuture<?>> recoveredFutures = new ArrayList<>(inputGates.length);
for (InputGate inputGate : inputGates) {
recoveredFutures.add(inputGate.getStateConsumedFuture());

inputGate
.getStateConsumedFuture()
.thenRun(
() ->
mainMailboxExecutor.execute(
inputGate::requestPartitions,
"Input gate request partitions"));
}

return CompletableFuture.allOf(recoveredFutures.toArray(new CompletableFuture[0]))
.thenRun(mailboxProcessor::suspend);
}

private void ensureNotCanceled() {
if (canceled) {
throw new CancelTaskException();
}
}

@Override
public final void invoke() throws Exception {
try {
Expand All @@ -621,18 +617,14 @@ public final void invoke() throws Exception {
}

// final check to exit early before starting to run
if (canceled) {
throw new CancelTaskException();
}
ensureNotCanceled();

// let the task do its work
runMailboxLoop();

// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
if (canceled) {
throw new CancelTaskException();
}
ensureNotCanceled();

afterInvoke();
} catch (Throwable invokeException) {
Expand Down

0 comments on commit 65b51c6

Please sign in to comment.