Skip to content

Commit

Permalink
[FLINK-8746] [flip6] Allow rescaling of partially running jobs
Browse files Browse the repository at this point in the history
This commit enables the rescaling of Flink jobs which are currently not fully
deployed. In such a case, Flink will use the last internal rescaling savepoint.
If there is no such savepoint, then it will use the provided savepoint when the
job was submitted. In case that there is no savepoint at all, then it will restart
the job with vanilla state.

This closes apache#5560.
  • Loading branch information
tillrohrmann committed Feb 24, 2018
1 parent 16ec3d7 commit 662ed3d
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,10 @@ public boolean isShutdown() {
* @throws IllegalStateException If no savepoint directory has been
* specified and no default savepoint directory has been
* configured
* @throws Exception Failures during triggering are forwarded
*/
public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
long timestamp,
@Nullable String targetLocation) throws Exception {
@Nullable String targetLocation) {

CheckpointProperties props = CheckpointProperties.forSavepoint();

Expand All @@ -371,7 +370,7 @@ public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
if (triggerResult.isSuccess()) {
return triggerResult.getPendingCheckpoint().getCompletionFuture();
} else {
Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message());
Throwable cause = new CheckpointTriggerException("Failed to trigger savepoint.", triggerResult.getFailureReason());
return FutureUtils.completedExceptionally(cause);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

/**
* Exceptions which indicate that a checkpoint triggering has failed.
*
*/
public class CheckpointTriggerException extends FlinkException {

private static final long serialVersionUID = -3330160816161901752L;

private final CheckpointDeclineReason checkpointDeclineReason;

public CheckpointTriggerException(String message, CheckpointDeclineReason checkpointDeclineReason) {
super(message + " Decline reason: " + checkpointDeclineReason.message());
this.checkpointDeclineReason = Preconditions.checkNotNull(checkpointDeclineReason);
}

public CheckpointDeclineReason getCheckpointDeclineReason() {
return checkpointDeclineReason;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
Expand Down Expand Up @@ -99,6 +101,7 @@
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.clock.SystemClock;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
Expand All @@ -110,6 +113,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -207,6 +211,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
/** The execution graph of this job. */
private ExecutionGraph executionGraph;

@Nullable
private String lastInternalSavepoint;

// ------------------------------------------------------------------------

public JobMaster(
Expand Down Expand Up @@ -312,15 +319,7 @@ public JobMaster(
false)) {

// check whether we can restore from a savepoint
final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();

if (savepointRestoreSettings.restoreSavepoint()) {
checkpointCoordinator.restoreSavepoint(
savepointRestoreSettings.getRestorePath(),
savepointRestoreSettings.allowNonRestoredState(),
executionGraph.getAllVertices(),
userCodeLoader);
}
tryRestoreExecutionGraphFromSavepoint(executionGraph, jobGraph.getSavepointRestoreSettings());
}
}

Expand All @@ -335,6 +334,7 @@ public JobMaster(

this.metricQueryServicePath = metricQueryServicePath;
this.backPressureStatsTracker = checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker());
this.lastInternalSavepoint = null;
}

//----------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -406,7 +406,17 @@ public CompletableFuture<Void> postStop() {
// shut down will internally release all registered slots
slotPool.shutDown();

return slotPool.getTerminationFuture();
final CompletableFuture<Void> disposeInternalSavepointFuture;

if (lastInternalSavepoint != null) {
disposeInternalSavepointFuture = CompletableFuture.runAsync(() -> disposeSavepoint(lastInternalSavepoint));
} else {
disposeInternalSavepointFuture = CompletableFuture.completedFuture(null);
}

final CompletableFuture<Void> slotPoolTerminationFuture = slotPool.getTerminationFuture();

return FutureUtils.completeAll(Arrays.asList(disposeInternalSavepointFuture, slotPoolTerminationFuture));
}

//----------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -513,41 +523,95 @@ public CompletableFuture<Acknowledge> rescaleOperators(
// 4. take a savepoint
final CompletableFuture<String> savepointFuture = triggerSavepoint(
jobMasterConfiguration.getTmpDirectory(),
timeout);
timeout)
.handleAsync(
(String savepointPath, Throwable throwable) -> {
if (throwable != null) {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
if (strippedThrowable instanceof CheckpointTriggerException) {
final CheckpointTriggerException checkpointTriggerException = (CheckpointTriggerException) strippedThrowable;

if (checkpointTriggerException.getCheckpointDeclineReason() == CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
return lastInternalSavepoint;
} else {
throw new CompletionException(checkpointTriggerException);
}
} else {
throw new CompletionException(strippedThrowable);
}
} else {
final String savepointToDispose = lastInternalSavepoint;
lastInternalSavepoint = savepointPath;

if (savepointToDispose != null) {
// dispose the old savepoint asynchronously
CompletableFuture.runAsync(
() -> disposeSavepoint(savepointToDispose),
scheduledExecutorService);
}

return lastInternalSavepoint;
}
},
getMainThreadExecutor());

final CompletableFuture<ExecutionGraph> executionGraphFuture = savepointFuture
.thenApplyAsync(
(String savepointPath) -> {
try {
newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
savepointPath,
false,
newExecutionGraph.getAllVertices(),
userCodeLoader);
} catch (Exception e) {
disposeSavepoint(savepointPath);

throw new CompletionException(new JobModificationException("Could not restore from temporary rescaling savepoint.", e));
}
(@Nullable String savepointPath) -> {
if (savepointPath != null) {
try {
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, SavepointRestoreSettings.forPath(savepointPath, false));
} catch (Exception e) {
final String message = String.format("Could not restore from temporary rescaling savepoint. This might indicate " +
"that the savepoint %s got corrupted. Deleting this savepoint as a precaution.",
savepointPath);

log.info(message);

CompletableFuture
.runAsync(
() -> {
if (savepointPath.equals(lastInternalSavepoint)) {
lastInternalSavepoint = null;
}
},
getMainThreadExecutor())
.thenRunAsync(
() -> disposeSavepoint(savepointPath),
scheduledExecutorService);

throw new CompletionException(new JobModificationException(message, e));
}
} else {
try {
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
} catch (Exception e) {
final String message = String.format("Could not restore from initial savepoint. This might indicate " +
"that the savepoint %s got corrupted.", jobGraph.getSavepointRestoreSettings().getRestorePath());

// delete the savepoint file once we reach a terminal state
newExecutionGraph.getTerminationFuture()
.whenCompleteAsync(
(JobStatus jobStatus, Throwable throwable) -> disposeSavepoint(savepointPath),
scheduledExecutorService);
log.info(message);

throw new CompletionException(new JobModificationException(message, e));
}
}

return newExecutionGraph;
}, scheduledExecutorService)
.exceptionally(
(Throwable failure) -> {
// in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint
// coordinator and abort the rescaling operation
if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
checkpointCoordinator.startCheckpointScheduler();
}
.handleAsync(
(ExecutionGraph executionGraph, Throwable failure) -> {
if (failure != null) {
// in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint
// coordinator and abort the rescaling operation
if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
checkpointCoordinator.startCheckpointScheduler();
}

throw new CompletionException(failure);
});
throw new CompletionException(ExceptionUtils.stripCompletionException(failure));
} else {
return executionGraph;
}
},
getMainThreadExecutor());

// 5. suspend the current job
final CompletableFuture<JobStatus> terminationFuture = executionGraphFuture.thenComposeAsync(
Expand Down Expand Up @@ -1134,6 +1198,26 @@ private void disposeSavepoint(String savepointPath) {
}
}

/**
* Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}.
*
* @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
* @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from
* @throws Exception if the {@link ExecutionGraph} could not be restored
*/
private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
if (savepointRestoreSettings.restoreSavepoint()) {
final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
checkpointCoordinator.restoreSavepoint(
savepointRestoreSettings.getRestorePath(),
savepointRestoreSettings.allowNonRestoredState(),
executionGraphToRestore.getAllVertices(),
userCodeLoader);
}
}
}

//----------------------------------------------------------------------------------------------

private void handleFatalError(final Throwable cause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import akka.testkit.{ImplicitSender, TestKit}
import akka.util.Timeout
import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.checkpoint.{CheckpointRetentionPolicy, CheckpointCoordinator, CompletedCheckpoint}
import org.apache.flink.runtime.checkpoint._
import org.apache.flink.runtime.client.JobExecutionException
import org.apache.flink.runtime.io.network.partition.ResultPartitionType
import org.apache.flink.runtime.jobgraph.tasks.{CheckpointCoordinatorConfiguration, JobCheckpointingSettings}
Expand Down Expand Up @@ -857,7 +857,7 @@ class JobManagerITCase(_system: ActorSystem)

// Mock the checkpoint coordinator
val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
doThrow(new Exception("Expected Test Exception"))
doThrow(new IllegalStateException("Expected Test Exception"))
.when(checkpointCoordinator)
.triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())

Expand All @@ -872,7 +872,7 @@ class JobManagerITCase(_system: ActorSystem)

// Verify the response
response.jobId should equal(jobGraph.getJobID())
response.cause.getCause.getClass should equal(classOf[Exception])
response.cause.getCause.getClass should equal(classOf[IllegalStateException])
response.cause.getCause.getMessage should equal("Expected Test Exception")
}
}
Expand Down Expand Up @@ -913,7 +913,7 @@ class JobManagerITCase(_system: ActorSystem)

// Mock the checkpoint coordinator
val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
doThrow(new Exception("Expected Test Exception"))
doThrow(new IllegalStateException("Expected Test Exception"))
.when(checkpointCoordinator)
.triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
val savepointPathPromise = new CompletableFuture[CompletedCheckpoint]()
Expand Down Expand Up @@ -982,7 +982,7 @@ class JobManagerITCase(_system: ActorSystem)

// Mock the checkpoint coordinator
val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
doThrow(new Exception("Expected Test Exception"))
doThrow(new IllegalStateException("Expected Test Exception"))
.when(checkpointCoordinator)
.triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())

Expand Down

0 comments on commit 662ed3d

Please sign in to comment.