Skip to content

Commit

Permalink
[Feature][ST-Engine] Support engine restart job information will not …
Browse files Browse the repository at this point in the history
…be lost (apache#3637)
  • Loading branch information
liugddx authored Dec 19, 2022
1 parent ac4e880 commit b1b46bf
Show file tree
Hide file tree
Showing 16 changed files with 334 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.config.Config;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;

Expand Down Expand Up @@ -105,9 +105,7 @@ public void testBatchJobRunOkIn3Node() throws ExecutionException, InterruptedExc
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
return clientJobProxy.waitForJobComplete();
});
CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);

Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand Down Expand Up @@ -213,9 +211,7 @@ public void testStreamJobRunOkIn3Node() throws ExecutionException, InterruptedEx
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
return clientJobProxy.waitForJobComplete();
});
CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);

Awaitility.await().atMost(3, TimeUnit.MINUTES)
.untilAsserted(() -> {
Expand Down Expand Up @@ -293,9 +289,7 @@ public void testBatchJobRestoreIn3NodeWorkerDown() throws ExecutionException, In
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
return clientJobProxy.waitForJobComplete();
});
CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);

Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand Down Expand Up @@ -375,9 +369,7 @@ public void testStreamJobRestoreIn3NodeWorkerDown() throws ExecutionException, I
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
return clientJobProxy.waitForJobComplete();
});
CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);

Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand Down Expand Up @@ -472,9 +464,7 @@ public void testBatchJobRestoreIn3NodeMasterDown() throws ExecutionException, In
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
return clientJobProxy.waitForJobComplete();
});
CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);

Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand Down Expand Up @@ -554,9 +544,7 @@ public void testStreamJobRestoreIn3NodeMasterDown() throws ExecutionException, I
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
return clientJobProxy.waitForJobComplete();
});
CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);

Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand Down Expand Up @@ -612,58 +600,106 @@ public void testStreamJobRestoreIn3NodeMasterDown() throws ExecutionException, I

@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
@Disabled("Wait for open Imap storage")
public void testStreamJobRestoreInAllNodeDown() throws ExecutionException, InterruptedException {
String testCaseName = "testStreamJobRestoreInAllNodeDown";
String testClusterName = "ClusterFaultToleranceIT_testStreamJobRestoreInAllNodeDown";
long testRowNumber = 1000;
int testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
HazelcastInstanceImpl node2 = null;
HazelcastInstanceImpl node3 = null;
SeaTunnelClient engineClient = null;

try {
node1 = SeaTunnelServerStarter.createHazelcastInstance(
TestUtils.getClusterName(testClusterName));
String yaml = "#\n" +
"# Licensed to the Apache Software Foundation (ASF) under one or more\n" +
"# contributor license agreements. See the NOTICE file distributed with\n" +
"# this work for additional information regarding copyright ownership.\n" +
"# The ASF licenses this file to You under the Apache License, Version 2.0\n" +
"# (the \"License\"); you may not use this file except in compliance with\n" +
"# the License. You may obtain a copy of the License at\n" +
"#\n" +
"# http://www.apache.org/licenses/LICENSE-2.0\n" +
"#\n" +
"# Unless required by applicable law or agreed to in writing, software\n" +
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
"# See the License for the specific language governing permissions and\n" +
"# limitations under the License.\n" +
"#\n" +
"\n" +
"hazelcast:\n" +
" cluster-name: seatunnel\n" +
" network:\n" +
" rest-api:\n" +
" enabled: true\n" +
" endpoint-groups:\n" +
" CLUSTER_WRITE:\n" +
" enabled: true\n" +
" join:\n" +
" tcp-ip:\n" +
" enabled: true\n" +
" member-list:\n" +
" - localhost\n" +
" port:\n" +
" auto-increment: true\n" +
" port-count: 100\n" +
" port: 5801\n" +
" map:\n" +
" engine*:\n" +
" map-store:\n" +
" enabled: true\n" +
" initial-mode: EAGER\n" +
" factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory\n" +
" properties:\n" +
" namespace: /tmp/seatunnel/imap\n" +
" clusterName: seatunnel-clsuter\n" +
" fs.defaultFS: file:///\n" +
"\n" +
" properties:\n" +
" hazelcast.invocation.max.retry.count: 20\n" +
" hazelcast.tcp.join.port.try.count: 30\n" +
" hazelcast.logging.type: log4j2\n";

Config hazelcastConfig = Config.loadFromString(yaml);
hazelcastConfig.setClusterName(TestUtils.getClusterName(testClusterName));
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node2 = SeaTunnelServerStarter.createHazelcastInstance(
TestUtils.getClusterName(testClusterName));
node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node3 = SeaTunnelServerStarter.createHazelcastInstance(
TestUtils.getClusterName(testClusterName));
node3 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

// waiting all node added to cluster
HazelcastInstanceImpl finalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertEquals(3, finalNode.getCluster().getMembers().size()));
.untilAsserted(() -> Assertions.assertEquals(3, finalNode.getCluster().getMembers().size()));

Common.setDeployMode(DeployMode.CLIENT);
ImmutablePair<String, String> testResources =
createTestResources(testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
createTestResources(testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(testCaseName);

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(
TestUtils.getClusterName(testClusterName));
TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
return clientJobProxy.waitForJobComplete();
});
CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);

Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
// Wait some tasks commit finished, and we can get rows from the sink target dir
Thread.sleep(2000);
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
FileUtils.getFileLineNumberFromDir(testResources.getLeft()) > 1);
});
.untilAsserted(() -> {
// Wait some tasks commit finished, and we can get rows from the sink target dir
Thread.sleep(2000);
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
FileUtils.getFileLineNumberFromDir(testResources.getLeft()) > 1);
});

Thread.sleep(5000);
// shutdown all node
Expand All @@ -673,38 +709,35 @@ public void testStreamJobRestoreInAllNodeDown() throws ExecutionException, Inter

Thread.sleep(10000);

node1 = SeaTunnelServerStarter.createHazelcastInstance(
TestUtils.getClusterName(testClusterName));
node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node2 = SeaTunnelServerStarter.createHazelcastInstance(
TestUtils.getClusterName(testClusterName));
node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

node3 = SeaTunnelServerStarter.createHazelcastInstance(
TestUtils.getClusterName(testClusterName));
node3 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

// waiting all node added to cluster
HazelcastInstanceImpl restoreFinalNode = node1;
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertEquals(3, restoreFinalNode.getCluster().getMembers().size()));
.untilAsserted(() -> Assertions.assertEquals(3, restoreFinalNode.getCluster().getMembers().size()));

Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
// Wait job write all rows in file
Thread.sleep(2000);
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
testRowNumber * testParallelism == FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
});
.untilAsserted(() -> {
// Wait job write all rows in file
Thread.sleep(2000);
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
testRowNumber * testParallelism == FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
});

// sleep 10s and expect the job don't write more rows.
Thread.sleep(10000);
clientJobProxy.cancelJob();

Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertTrue(
objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
.untilAsserted(() -> Assertions.assertTrue(
objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));

// check the final rows
// prove that the task was restarted
Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ private void submitJob() {
*
* @return The job final status
*/
@Override
public JobStatus waitForJobComplete() {
JobStatus jobStatus;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,21 @@ public class Constant {

public static final int OPERATION_RETRY_SLEEP = 2000;

public static final String IMAP_RUNNING_JOB_INFO = "runningJobInfo";
public static final String IMAP_RUNNING_JOB_INFO = "engine_runningJobInfo";

public static final String IMAP_RUNNING_JOB_STATE = "runningJobState";
public static final String IMAP_RUNNING_JOB_STATE = "engine_runningJobState";

public static final String IMAP_FINISHED_JOB_STATE = "finishedJobState";
public static final String IMAP_FINISHED_JOB_STATE = "engine_finishedJobState";

public static final String IMAP_FINISHED_JOB_METRICS = "finishedJobMetrics";
public static final String IMAP_FINISHED_JOB_METRICS = "engine_finishedJobMetrics";

public static final String IMAP_FINISHED_JOB_VERTEX_INFO = "finishedJobVertexInfo";
public static final String IMAP_FINISHED_JOB_VERTEX_INFO = "engine_finishedJobVertexInfo";

public static final String IMAP_STATE_TIMESTAMPS = "stateTimestamps";
public static final String IMAP_STATE_TIMESTAMPS = "engine_stateTimestamps";

public static final String IMAP_OWNED_SLOT_PROFILES = "ownedSlotProfilesIMap";
public static final String IMAP_OWNED_SLOT_PROFILES = "engine_ownedSlotProfilesIMap";

public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-%d";

public static final String IMAP_RESOURCE_MANAGER_REGISTER_WORKER = "ResourceManager_RegisterWorker";
}
19 changes: 10 additions & 9 deletions seatunnel-engine/seatunnel-engine-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>checkpoint-storage-api</artifactId>
<artifactId>checkpoint-storage-local-file</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>checkpoint-storage-local-file</artifactId>
<artifactId>checkpoint-storage-hdfs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>checkpoint-storage-hdfs</artifactId>
<artifactId>imap-storage-file</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand All @@ -57,6 +57,11 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand All @@ -81,11 +86,7 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public CoordinatorService getCoordinatorService() {
int retryCount = 0;
if (isMasterNode()) {
// TODO the retry count and sleep time need configurable
while (!coordinatorService.isCoordinatorActive() && retryCount < 20 && isRunning) {
while (!coordinatorService.isCoordinatorActive() && retryCount < 120 && isRunning) {
try {
LOGGER.warning("This is master node, waiting the coordinator service init finished");
Thread.sleep(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.engine.server.checkpoint;

import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;

import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
Expand Down Expand Up @@ -88,7 +90,7 @@ public CheckpointManager(long jobId,
checkpointConfig.getStorage().getStorage())
.create(checkpointConfig.getStorage().getStoragePluginConfig());
IMap<Integer, Long> checkpointIdMap =
nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", jobId));
nodeEngine.getHazelcastInstance().getMap(String.format(IMAP_CHECKPOINT_ID, jobId));
this.coordinatorMap = checkpointPlanMap.values().parallelStream()
.map(plan -> {
IMapCheckpointIDCounter idCounter = new IMapCheckpointIDCounter(plan.getPipelineId(), checkpointIdMap);
Expand Down
Loading

0 comments on commit b1b46bf

Please sign in to comment.