Skip to content

Commit

Permalink
[FLINK-25155] Support claim mode in rest api
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Dec 9, 2021
1 parent 49c4ed2 commit 7641c23
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 8 deletions.
10 changes: 10 additions & 0 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,10 @@
"type" : "string"
}
},
"restoreMode" : {
"type" : "string",
"enum" : [ "CLAIM", "LEGACY" ]
},
"savepointPath" : {
"type" : "string"
}
Expand Down Expand Up @@ -1028,6 +1032,9 @@
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo",
"properties" : {
"mtime" : {
"type" : "integer"
},
"name" : {
"type" : "string"
},
Expand Down Expand Up @@ -5729,6 +5736,9 @@
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo",
"properties" : {
"mtime" : {
"type" : "integer"
},
"name" : {
"type" : "string"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void testLegacyRestoreModeParsing() throws Exception {

SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings();
assertTrue(savepointSettings.restoreSavepoint());
assertEquals(RestoreMode.NO_CLAIM, savepointSettings.getRestoreMode());
assertEquals(RestoreMode.LEGACY, savepointSettings.getRestoreMode());
assertEquals("expectedSavepointPath", savepointSettings.getRestorePath());
assertTrue(savepointSettings.allowNonRestoredState());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
Expand All @@ -40,6 +42,7 @@

import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -142,10 +145,14 @@ private SavepointRestoreSettings getSavepointRestoreSettings(
request, SavepointPathQueryParameter.class)),
null,
log);
final RestoreMode restoreMode =
Optional.ofNullable(requestBody.getRestoreMode())
.orElseGet(SavepointConfigOptions.RESTORE_MODE::defaultValue);
final SavepointRestoreSettings savepointRestoreSettings;
if (savepointPath != null) {
savepointRestoreSettings =
SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
SavepointRestoreSettings.forPath(
savepointPath, allowNonRestoredState, restoreMode);
} else {
savepointRestoreSettings = SavepointRestoreSettings.none();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.rest.messages.RequestBody;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -35,6 +36,7 @@
public class JarRunRequestBody extends JarRequestBody {
private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";
private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
private static final String FIELD_NAME_SAVEPOINT_RESTORE_MODE = "restoreMode";

@JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
@Nullable
Expand All @@ -44,8 +46,12 @@ public class JarRunRequestBody extends JarRequestBody {
@Nullable
private String savepointPath;

@JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE)
@Nullable
private RestoreMode restoreMode;

public JarRunRequestBody() {
this(null, null, null, null, null, null, null);
this(null, null, null, null, null, null, null, null);
}

@JsonCreator
Expand All @@ -58,10 +64,12 @@ public JarRunRequestBody(
@Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId,
@Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
Boolean allowNonRestoredState,
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) {
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath,
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE) RestoreMode restoreMode) {
super(entryClassName, programArguments, programArgumentsList, parallelism, jobId);
this.allowNonRestoredState = allowNonRestoredState;
this.savepointPath = savepointPath;
this.restoreMode = restoreMode;
}

@Nullable
Expand All @@ -75,4 +83,10 @@ public Boolean getAllowNonRestoredState() {
public String getSavepointPath() {
return savepointPath;
}

@Nullable
@JsonIgnore
public RestoreMode getRestoreMode() {
return restoreMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
Expand Down Expand Up @@ -182,12 +183,13 @@ JarRunRequestBody getJarRequestBody(ProgramArgsParType programArgsParType) {
PARALLELISM,
null,
ALLOW_NON_RESTORED_STATE_QUERY,
RESTORE_PATH);
RESTORE_PATH,
RestoreMode.CLAIM);
}

@Override
JarRunRequestBody getJarRequestBodyWithJobId(JobID jobId) {
return new JarRunRequestBody(null, null, null, null, jobId, null, null);
return new JarRunRequestBody(null, null, null, null, jobId, null, null, null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;

import java.util.Arrays;
Expand All @@ -36,7 +37,14 @@ protected Class<JarRunRequestBody> getTestRequestClass() {
@Override
protected JarRunRequestBody getTestRequestInstance() {
return new JarRunRequestBody(
"hello", "world", Arrays.asList("boo", "far"), 4, new JobID(), true, "foo/bar");
"hello",
"world",
Arrays.asList("boo", "far"),
4,
new JobID(),
true,
"foo/bar",
RestoreMode.CLAIM);
}

@Override
Expand All @@ -49,5 +57,6 @@ protected void assertOriginalEqualsToUnmarshalled(
assertEquals(expected.getJobId(), actual.getJobId());
assertEquals(expected.getAllowNonRestoredState(), actual.getAllowNonRestoredState());
assertEquals(expected.getSavepointPath(), actual.getSavepointPath());
assertEquals(expected.getRestoreMode(), actual.getRestoreMode());
}
}
6 changes: 5 additions & 1 deletion flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,10 @@
},
"savepointPath" : {
"type" : "string"
},
"restoreMode" : {
"type" : "string",
"enum" : [ "CLAIM", "LEGACY" ]
}
}
},
Expand Down Expand Up @@ -3374,4 +3378,4 @@
}
}
} ]
}
}

0 comments on commit 7641c23

Please sign in to comment.