Skip to content

Commit

Permalink
[FLINK-7715][flip6] Implement JarRunHandler
Browse files Browse the repository at this point in the history
This closes apache#5509.
  • Loading branch information
GJL authored and tillrohrmann committed Feb 19, 2018
1 parent 2007338 commit ab8e9bd
Show file tree
Hide file tree
Showing 23 changed files with 979 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,7 @@ public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration co
* @return JobGraph extracted from the PackagedProgram
* @throws ProgramInvocationException if the JobGraph generation failed
*/
private static JobGraph createJobGraph(Configuration configuration, PackagedProgram packagedProgram, int defaultParallelism) throws ProgramInvocationException {
public static JobGraph createJobGraph(Configuration configuration, PackagedProgram packagedProgram, int defaultParallelism) throws ProgramInvocationException {
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
final Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
final FlinkPlan flinkPlan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public PackagedProgram(File jarFile, List<URL> classpaths, String... args) throw
* This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, String entryPointClassName, String... args) throws ProgramInvocationException {
public PackagedProgram(File jarFile, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException {
this(jarFile, Collections.<URL>emptyList(), entryPointClassName, args);
}

Expand All @@ -168,7 +168,7 @@ public PackagedProgram(File jarFile, String entryPointClassName, String... args)
* This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, List<URL> classpaths, String entryPointClassName, String... args) throws ProgramInvocationException {
public PackagedProgram(File jarFile, List<URL> classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException {
if (jarFile == null) {
throw new IllegalArgumentException("The jar file must not be null.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;

/**
* Query parameter that specifies whether non restored state is allowed if the savepoint
* contains state for an operator that is not part of the job.
*
* @see SavepointRestoreSettings#allowNonRestoredState()
*/
public class AllowNonRestoredStateQueryParameter extends MessageQueryParameter<Boolean> {

protected AllowNonRestoredStateQueryParameter() {
super("allowNonRestoredState", MessageParameterRequisiteness.OPTIONAL);
}

@Override
public Boolean convertValueFromString(final String value) {
return Boolean.valueOf(value);
}

@Override
public String convertStringToValue(final Boolean value) {
return value.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;

import java.io.File;

/**
* Query parameter specifying the name of the entry point class.
* @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, String, String...)
*/
public class EntryClassQueryParameter extends StringQueryParameter {
public EntryClassQueryParameter() {
super("entry-class", MessageParameterRequisiteness.OPTIONAL);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.rest.messages.ConversionException;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;

/**
* Path parameter to identify uploaded jar files.
*/
public class JarIdPathParameter extends MessagePathParameter<String> {

public static final String KEY = "jarid";

protected JarIdPathParameter() {
super(KEY);
}

@Override
protected String convertFromString(final String value) throws ConversionException {
return value;
}

@Override
protected String convertToString(final String value) {
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static java.util.Objects.requireNonNull;
import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;

/**
* Handler to submit jobs uploaded via the Web UI.
*/
public class JarRunHandler extends
AbstractRestHandler<DispatcherGateway, EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {

private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");

private final Path jarDir;

private final Configuration configuration;

private final Executor executor;

private final RestClusterClient<?> restClusterClient;

public JarRunHandler(
final CompletableFuture<String> localRestAddress,
final GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
final Time timeout,
final Map<String, String> responseHeaders,
final MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
final Path jarDir,
final Configuration configuration,
final Executor executor) {
super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);

this.jarDir = requireNonNull(jarDir);
this.configuration = requireNonNull(configuration);
this.executor = requireNonNull(executor);
try {
this.restClusterClient = new RestClusterClient<>(configuration, "Unknown cluster id");
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
protected CompletableFuture<JarRunResponseBody> handleRequest(
@Nonnull final HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request,
@Nonnull final DispatcherGateway gateway) throws RestHandlerException {

final String pathParameter = request.getPathParameter(JarIdPathParameter.class);
final Path jarFile = jarDir.resolve(pathParameter);

final String entryClass = emptyToNull(getQueryParameter(request, EntryClassQueryParameter.class));
final List<String> programArgs = tokenizeArguments(getQueryParameter(request, ProgramArgsQueryParameter.class));
final int parallelism = getQueryParameter(request, ParallelismQueryParameter.class, -1);
final SavepointRestoreSettings savepointRestoreSettings = getSavepointRestoreSettings(request);

final CompletableFuture<JobGraph> jobGraphFuture = getJobGraphAsync(
jarFile,
entryClass,
programArgs,
savepointRestoreSettings,
parallelism);

return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
.submitJob(jobGraph)
.thenApply((jobSubmitResponseBody -> new JarRunResponseBody(jobGraph.getJobID()))))
.exceptionally(throwable -> {
throw new CompletionException(new RestHandlerException(
throwable.getMessage(),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
throwable));
});
}

private static SavepointRestoreSettings getSavepointRestoreSettings(
final @Nonnull HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request)
throws RestHandlerException {

final boolean allowNonRestoredState = getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false);
final String savepointPath = getQueryParameter(request, SavepointPathQueryParameter.class);
final SavepointRestoreSettings savepointRestoreSettings;
if (savepointPath != null) {
savepointRestoreSettings = SavepointRestoreSettings.forPath(
savepointPath,
allowNonRestoredState);
} else {
savepointRestoreSettings = SavepointRestoreSettings.none();
}
return savepointRestoreSettings;
}

private CompletableFuture<JobGraph> getJobGraphAsync(
final Path jarFile,
@Nullable final String entryClass,
final List<String> programArgs,
final SavepointRestoreSettings savepointRestoreSettings,
final int parallelism) {

return CompletableFuture.supplyAsync(() -> {
if (!Files.exists(jarFile)) {
throw new CompletionException(new RestHandlerException(
String.format("Jar file %s does not exist", jarFile), HttpResponseStatus.BAD_REQUEST));
}

final JobGraph jobGraph;
try {
final PackagedProgram packagedProgram = new PackagedProgram(
jarFile.toFile(),
entryClass,
programArgs.toArray(new String[programArgs.size()]));
jobGraph = CliFrontend.createJobGraph(configuration, packagedProgram, parallelism);
} catch (final ProgramInvocationException e) {
throw new CompletionException(e);
}
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
return jobGraph;
}, executor);
}

/**
* Takes program arguments as a single string, and splits them into a list of string.
*
* <pre>
* tokenizeArguments("--foo bar") = ["--foo" "bar"]
* tokenizeArguments("--foo \"bar baz\"") = ["--foo" "bar baz"]
* tokenizeArguments("--foo 'bar baz'") = ["--foo" "bar baz"]
* </pre>
*
* <strong>WARNING: </strong>This method does not respect escaped quotes.
*/
@VisibleForTesting
static List<String> tokenizeArguments(@Nullable final String args) {
if (args == null) {
return Collections.emptyList();
}
final Matcher matcher = ARGUMENTS_TOKENIZE_PATTERN.matcher(args);
final List<String> tokens = new ArrayList<>();
while (matcher.find()) {
tokens.add(matcher.group()
.trim()
.replace("\"", "")
.replace("\'", ""));
}
return tokens;
}

/**
* Returns the value of a query parameter, or {@code null} if the query parameter is not set.
* @throws RestHandlerException If the query parameter is repeated.
*/
@VisibleForTesting
static <X, P extends MessageQueryParameter<X>> X getQueryParameter(
final HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request,
final Class<P> queryParameterClass) throws RestHandlerException {
return getQueryParameter(request, queryParameterClass, null);
}

@VisibleForTesting
static <X, P extends MessageQueryParameter<X>> X getQueryParameter(
final HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request,
final Class<P> queryParameterClass,
final X defaultValue) throws RestHandlerException {

final List<X> values = request.getQueryParameter(queryParameterClass);
final X value;
if (values.size() > 1) {
throw new RestHandlerException(
String.format("Expected only one value %s.", values),
HttpResponseStatus.BAD_REQUEST);
} else if (values.size() == 1) {
value = values.get(0);
} else {
value = defaultValue;
}
return value;
}
}
Loading

0 comments on commit ab8e9bd

Please sign in to comment.