Skip to content

Commit

Permalink
[FLINK-8703][tests] Port SavepointMigrationTestBase to MiniClusterRes…
Browse files Browse the repository at this point in the history
…ource

This closes apache#5701.
  • Loading branch information
zentol authored and tillrohrmann committed Mar 20, 2018
1 parent 95d4c01 commit 0231460
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static Collection<Tuple2<MigrationVersion, String>> parameters () {
private final MigrationVersion testMigrateVersion;
private final String testStateBackend;

public LegacyStatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) {
public LegacyStatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
this.testMigrateVersion = testMigrateVersionAndBackend.f0;
this.testStateBackend = testMigrateVersionAndBackend.f1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,21 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.TestBaseUtils;

import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
Expand All @@ -49,34 +45,35 @@
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import static junit.framework.Assert.fail;
import static org.junit.Assert.assertNotEquals;

/**
* Test savepoint migration.
*/
public class SavepointMigrationTestBase extends TestBaseUtils {
public abstract class SavepointMigrationTestBase extends TestBaseUtils {

@BeforeClass
public static void before() {
SavepointSerializers.setFailWhenLegacyStateDetected(false);
}

@ClassRule
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
public final MiniClusterResource miniClusterResource;

private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class);
private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
protected static final int DEFAULT_PARALLELISM = 4;
protected LocalFlinkMiniCluster cluster = null;

protected static String getResourceFilename(String filename) {
ClassLoader cl = SavepointMigrationTestBase.class.getClassLoader();
Expand All @@ -87,17 +84,25 @@ protected static String getResourceFilename(String filename) {
return resource.getFile();
}

@Before
public void setup() throws Exception {
protected SavepointMigrationTestBase() throws Exception {
miniClusterResource = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfiguration(),
1,
DEFAULT_PARALLELISM),
true);
}

private Configuration getConfiguration() throws Exception {
// Flink configuration
final Configuration config = new Configuration();

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);

final File checkpointDir = tempFolder.newFolder("checkpoints").getAbsoluteFile();
final File savepointDir = tempFolder.newFolder("savepoints").getAbsoluteFile();
UUID id = UUID.randomUUID();
final File checkpointDir = TEMP_FOLDER.newFolder("checkpoints_" + id).getAbsoluteFile();
final File savepointDir = TEMP_FOLDER.newFolder("savepoints_" + id).getAbsoluteFile();

if (!checkpointDir.exists() || !savepointDir.exists()) {
throw new Exception("Test setup failed: failed to create (temporary) directories.");
Expand All @@ -111,12 +116,7 @@ public void setup() throws Exception {
config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());

cluster = TestBaseUtils.startCluster(config, false);
}

@After
public void teardown() throws Exception {
stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
return config;
}

@SafeVarargs
Expand All @@ -125,22 +125,20 @@ protected final void executeAndSavepoint(
String savepointPath,
Tuple2<String, Integer>... expectedAccumulators) throws Exception {

// Retrieve the job manager
ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
ClusterClient<?> client = miniClusterResource.getClusterClient();
client.setDetached(true);

// Submit the job
JobGraph jobGraph = env.getStreamGraph().getJobGraph();

JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());

LOG.info("Submitted job {} and waiting...", jobSubmissionResult.getJobID());

StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());

boolean done = false;
while (DEADLINE.hasTimeLeft()) {
Thread.sleep(100);
Map<String, Object> accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID());
Map<String, Object> accumulators = client.getAccumulators(jobSubmissionResult.getJobID());

boolean allDone = true;
for (Tuple2<String, Integer> acc : expectedAccumulators) {
Expand All @@ -166,18 +164,9 @@ protected final void executeAndSavepoint(

LOG.info("Triggering savepoint.");

final Future<Object> savepointResultFuture =
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID(), Option.<String>empty()), DEADLINE.timeLeft());
CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobSubmissionResult.getJobID(), null);

Object savepointResult = Await.result(savepointResultFuture, DEADLINE.timeLeft());

if (savepointResult instanceof JobManagerMessages.TriggerSavepointFailure) {
fail("Error drawing savepoint: " + ((JobManagerMessages.TriggerSavepointFailure) savepointResult).cause());
}

// jobmanager will store savepoint in heap, we have to retrieve it
final String jobmanagerSavepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResult).savepointPath();
LOG.info("Saved savepoint: " + jobmanagerSavepointPath);
String jobmanagerSavepointPath = savepointPathFuture.get(DEADLINE.timeLeft().toMillis(), TimeUnit.MILLISECONDS);

File jobManagerSavepoint = new File(new URI(jobmanagerSavepointPath).getPath());
// savepoints were changed to be directories in Flink 1.3
Expand All @@ -194,49 +183,35 @@ protected final void restoreAndExecute(
String savepointPath,
Tuple2<String, Integer>... expectedAccumulators) throws Exception {

// Retrieve the job manager
Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
ClusterClient<?> client = miniClusterResource.getClusterClient();
client.setDetached(true);

// Submit the job
JobGraph jobGraph = env.getStreamGraph().getJobGraph();

jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));

JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);

StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID());
JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());

boolean done = false;
while (DEADLINE.hasTimeLeft()) {

// try and get a job result, this will fail if the job already failed. Use this
// to get out of this loop
JobID jobId = jobSubmissionResult.getJobID();
FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS);

try {
CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID());

Future<Object> future = clusterClient
.getJobManagerGateway()
.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout);
JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS);

Object result = Await.result(future, timeout);

if (result instanceof JobManagerMessages.CurrentJobStatus) {
if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
Object jobResult = Await.result(
jobListeningContext.getJobResultFuture(),
Duration.apply(5, TimeUnit.SECONDS));
fail("Job failed: " + jobResult);
}
}
assertNotEquals(JobStatus.FAILED, jobStatus);
} catch (Exception e) {
fail("Could not connect to job: " + e);
}

Thread.sleep(100);
Map<String, Object> accumulators = clusterClient.getAccumulators(jobId);
Map<String, Object> accumulators = client.getAccumulators(jobId);

boolean allDone = true;
for (Tuple2<String, Integer> acc : expectedAccumulators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static Collection<Tuple2<MigrationVersion, String>> parameters () {
private final MigrationVersion testMigrateVersion;
private final String testStateBackend;

public StatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) {
public StatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
this.testMigrateVersion = testMigrateVersionAndBackend.f0;
this.testStateBackend = testMigrateVersionAndBackend.f1;
}
Expand Down

0 comments on commit 0231460

Please sign in to comment.