Skip to content

Commit

Permalink
[FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies t…
Browse files Browse the repository at this point in the history
…o ExecutionGraph

  - Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to differentiate from fine grained failures/recovery
  - Add base class for FailoverStrategy
  - Add default implementation (restart all tasks)
  - Add logic to load the failover strategy from the configuration
  • Loading branch information
StephanEwen committed May 3, 2017
1 parent e006127 commit 8ed85fe
Show file tree
Hide file tree
Showing 40 changed files with 2,780 additions and 993 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ public class JobManagerOptions {
.defaultValue(16)
.withDeprecatedKeys("job-manager.max-attempts-history-size");

/**
* The maximum number of prior execution attempts kept in history.
*/
public static final ConfigOption<String> EXECUTION_FAILOVER_STRATEGY =
key("jobmanager.execution.failover-strategy")
.defaultValue("full");

/**
* This option specifies the interval in order to trigger a resource manager reconnection if the connection
* to the resource manager has been lost.
Expand Down
20 changes: 20 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/StringUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,26 @@ public static boolean isNullOrWhitespaceOnly(String str) {
return true;
}

/**
* If both string arguments are non-null, this method concatenates them with ' and '.
* If only one of the arguments is non-null, this method returns the non-null argument.
* If both arguments are null, this method returns null.
*
* @param s1 The first string argument
* @param s2 The second string argument
*
* @return The concatenated string, or non-null argument, or null
*/
@Nullable
public static String concatenateWithAnd(@Nullable String s1, @Nullable String s2) {
if (s1 != null) {
return s2 == null ? s1 : s1 + " and " + s2;
}
else {
return s2 != null ? s2 : null;
}
}

// ------------------------------------------------------------------------

/** Prevent instantiation of this utility class */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,27 @@
* <pre>{@code
*
* CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
* | | | |
* | | | +------+
* | | V V
* | | CANCELLING -----+----> CANCELED
* | | |
* | +-------------------------+
* |
* | ... -> FAILED
* V
* | | | |
* | | | +------+
* | | V V
* | | CANCELLING -----+----> CANCELED
* | | |
* | +-------------------------+
* |
* | ... -> FAILED
* V
* RECONCILING -> RUNNING | FINISHED | CANCELED | FAILED
*
* }</pre>
*
* <p>It is possible to enter the {@code RECONCILING} state from {@code CREATED}
* state if job manager fail over, and the {@code RECONCILING} state can switch into
* any existing task state.</p>
* any existing task state.
*
* <p>It is possible to enter the {@code FAILED} state from any other state.</p>
* <p>It is possible to enter the {@code FAILED} state from any other state.
*
* <p>The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are
* considered terminal states.</p>
* considered terminal states.
*/
public enum ExecutionState {

Expand All @@ -56,7 +56,14 @@ public enum ExecutionState {
DEPLOYING,

RUNNING,


/**
* This state marks "successfully completed". It can only be reached when a
* program reaches the "end of its input". The "end of input" can be reached
* when consuming a bounded input (fix set of files, bounded query, etc) or
* when stopping a program (not cancelling!) which make the input look like
* it reached its end at a specific point.
*/
FINISHED,

CANCELING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@
import static org.apache.flink.util.Preconditions.checkState;

/**
* A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
* or other re-computation), this class tracks the state of a single execution of that vertex and the resources.
* A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times
* (for recovery, re-computation, re-configuration), this class tracks the state of a single execution
* of that vertex and the resources.
*
* <p>NOTE ABOUT THE DESIGN RATIONAL:
* <h2>Lock free state transitions</h2>
*
* <p>In several points of the code, we need to deal with possible concurrent state changes and actions.
* In several points of the code, we need to deal with possible concurrent state changes and actions.
* For example, while the call to deploy a task (send it to the TaskManager) happens, the task gets cancelled.
*
* <p>We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that
Expand Down Expand Up @@ -113,6 +114,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
/** The unique ID marking the specific execution instant of the task */
private final ExecutionAttemptID attemptId;

/** Gets the global modification version of the execution graph when this execution was created.
* This version is bumped in the ExecutionGraph whenever a global failover happens. It is used
* to resolve conflicts between concurrent modification by global and local failover actions. */
private final long globalModVersion;

/** The timestamps when state transitions occurred, indexed by {@link ExecutionState#ordinal()} */
private final long[] stateTimestamps;

private final int attemptNumber;
Expand Down Expand Up @@ -146,10 +153,27 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution

// --------------------------------------------------------------------------------------------

/**
* Creates a new Execution attempt.
*
* @param executor
* The executor used to dispatch callbacks from futures and asynchronous RPC calls.
* @param vertex
* The execution vertex to which this Execution belongs
* @param attemptNumber
* The execution attempt number.
* @param globalModVersion
* The global modification version of the execution graph when this execution was created
* @param startTimestamp
* The timestamp that marks the creation of this Execution
* @param timeout
* The timeout for RPC calls like deploy/cancel/stop.
*/
public Execution(
Executor executor,
ExecutionVertex vertex,
int attemptNumber,
long globalModVersion,
long startTimestamp,
Time timeout) {

Expand All @@ -158,6 +182,7 @@ public Execution(
this.attemptId = new ExecutionAttemptID();
this.timeout = checkNotNull(timeout);

this.globalModVersion = globalModVersion;
this.attemptNumber = attemptNumber;

this.stateTimestamps = new long[ExecutionState.values().length];
Expand Down Expand Up @@ -190,6 +215,16 @@ public ExecutionState getState() {
return state;
}

/**
* Gets the global modification version of the execution graph when this execution was created.
*
* <p>This version is bumped in the ExecutionGraph whenever a global failover happens. It is used
* to resolve conflicts between concurrent modification by global and local failover actions.
*/
public long getGlobalModVersion() {
return globalModVersion;
}

public SimpleSlot getAssignedResource() {
return assignedResource;
}
Expand Down Expand Up @@ -252,6 +287,12 @@ public Future<ExecutionState> getTerminationFuture() {
// Actions
// --------------------------------------------------------------------------------------------

public boolean scheduleForExecution() {
SlotProvider resourceProvider = getVertex().getExecutionGraph().getSlotProvider();
boolean allowQueued = getVertex().getExecutionGraph().isQueuedSchedulingAllowed();
return scheduleForExecution(resourceProvider, allowQueued);
}

/**
* NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs
* to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
Expand Down Expand Up @@ -381,9 +422,6 @@ public void deployToSlot(final SimpleSlot slot) throws JobException {
taskState,
attemptNumber);

// register this execution at the execution graph, to receive call backs
vertex.getExecutionGraph().registerExecution(this);

final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);
Expand Down Expand Up @@ -823,7 +861,7 @@ else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {
if (current != FAILED) {
String message = String.format("Asynchronous race: Found state %s after successful cancel call.", state);
LOG.error(message);
vertex.getExecutionGraph().fail(new Exception(message));
vertex.getExecutionGraph().failGlobal(new Exception(message));
}
return;
}
Expand Down Expand Up @@ -1069,7 +1107,7 @@ private boolean transitionState(ExecutionState currentState, ExecutionState targ
// make sure that the state transition completes normally.
// potential errors (in listeners may not affect the main logic)
try {
vertex.notifyStateTransition(attemptId, targetState, error);
vertex.notifyStateTransition(this, targetState, error);
}
catch (Throwable t) {
LOG.error("Error while notifying execution graph of execution state transition.", t);
Expand Down
Loading

0 comments on commit 8ed85fe

Please sign in to comment.