Skip to content

Commit

Permalink
[FLINK-25206][clients] Make prevention of configurations in user jars…
Browse files Browse the repository at this point in the history
… configurable

Add configuration option to disable configuration in user jars. The
submission will fail instantly before the job creation.
  • Loading branch information
fapaul committed Feb 3, 2022
1 parent 2317ab4 commit 4953732
Show file tree
Hide file tree
Showing 9 changed files with 400 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>execution.allow-client-job-configurations</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Determines whether configurations in the user program are allowed when running with Application mode. Has no effect for other deployment modes.</td>
</tr>
<tr>
<td><h5>execution.attached</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.program;

import org.apache.flink.annotation.Internal;

import java.util.Collection;

/**
* If {@link org.apache.flink.configuration.DeploymentOptions#ALLOW_CLIENT_JOB_CONFIGURATIONS} is
* disabled configurations in the user jar will throw this exception.
*/
@Internal
public class MutatedConfigurationException extends Exception {

/** Serial version UID for serialization interoperability. */
private static final long serialVersionUID = -2417524218857151612L;

public MutatedConfigurationException(Collection<String> errorMessages) {
super(String.join("\n", errorMessages));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.client.program;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand All @@ -26,16 +27,28 @@
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.dispatcher.ConfigurationNotAllowedMessage;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.ShutdownHookUtil;

import org.apache.flink.shaded.guava30.com.google.common.collect.MapDifference;
import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -55,24 +68,55 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
private final boolean suppressSysout;

private final boolean enforceSingleJobExecution;
private final byte[] originalCheckpointConfigSerialized;
private final byte[] originalExecutionConfigSerialized;
private final Configuration originalConfiguration;

private int jobCounter;

private final Collection<String> errorMessages;

private final boolean allowConfigurations;

public StreamContextEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution,
final boolean suppressSysout) {
this(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout,
true,
Collections.emptyList());
}

@Internal
public StreamContextEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution,
final boolean suppressSysout,
final boolean allowConfigurations,
final Collection<String> errorMessages) {
super(executorServiceLoader, configuration, userCodeClassLoader);
this.suppressSysout = suppressSysout;
this.enforceSingleJobExecution = enforceSingleJobExecution;

this.allowConfigurations = allowConfigurations;
this.originalCheckpointConfigSerialized = serializeConfig(checkpointCfg);
this.originalExecutionConfigSerialized = serializeConfig(config);
this.originalConfiguration = new Configuration(configuration);
this.errorMessages = errorMessages;
this.jobCounter = 0;
}

@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
checkNotAllowedConfigurations();
final JobClient jobClient = executeAsync(streamGraph);
final List<JobListener> jobListeners = getJobListeners();

Expand All @@ -93,6 +137,13 @@ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
}
}

private void checkNotAllowedConfigurations() throws MutatedConfigurationException {
errorMessages.addAll(collectNotAllowedConfigurations());
if (!errorMessages.isEmpty()) {
throw new MutatedConfigurationException(errorMessages);
}
}

private JobExecutionResult getJobExecutionResult(final JobClient jobClient) throws Exception {
checkNotNull(jobClient);

Expand Down Expand Up @@ -161,6 +212,18 @@ public static void setAsContext(
final boolean suppressSysout) {
StreamExecutionEnvironmentFactory factory =
conf -> {
final List<String> errors = new ArrayList<>();
final boolean allowConfigurations =
configuration.getBoolean(
DeploymentOptions.ALLOW_CLIENT_JOB_CONFIGURATIONS);
if (!allowConfigurations && !conf.toMap().isEmpty()) {
conf.toMap()
.forEach(
(k, v) ->
errors.add(
ConfigurationNotAllowedMessage
.ofConfigurationKeyAndValue(k, v)));
}
Configuration mergedConfiguration = new Configuration();
mergedConfiguration.addAll(configuration);
mergedConfiguration.addAll(conf);
Expand All @@ -169,12 +232,65 @@ public static void setAsContext(
mergedConfiguration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
suppressSysout,
allowConfigurations,
errors);
};
initializeContextEnvironment(factory);
}

public static void unsetAsContext() {
resetContextEnvironment();
}

private List<String> collectNotAllowedConfigurations() {
final List<String> errors = new ArrayList<>();
if (allowConfigurations) {
return errors;
}
final MapDifference<String, String> diff =
Maps.difference(originalConfiguration.toMap(), configuration.toMap());
diff.entriesOnlyOnRight()
.forEach(
(k, v) ->
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationKeyAndValue(
k, v)));
diff.entriesOnlyOnLeft()
.forEach(
(k, v) ->
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationRemoved(
k, v)));
diff.entriesDiffering()
.forEach(
(k, v) ->
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationChange(
k, v)));

if (!Arrays.equals(originalCheckpointConfigSerialized, serializeConfig(checkpointCfg))) {
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObject(
checkpointCfg.getClass().getSimpleName()));
}

if (!Arrays.equals(originalExecutionConfigSerialized, serializeConfig(config))) {
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObject(
config.getClass().getSimpleName()));
}
return errors;
}

private static byte[] serializeConfig(Serializable config) {
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
final ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(config);
oos.flush();
return bos.toByteArray();
} catch (IOException e) {
throw new FlinkRuntimeException("Cannot serialize configuration.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.testjar.ForbidConfigurationJob;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
Expand All @@ -52,10 +53,12 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;

import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
Expand Down Expand Up @@ -350,6 +353,37 @@ public void testGetExecutionPlan() throws ProgramInvocationException {
assertEquals(-1, htmlEscaped.indexOf('\\'));
}

@Test
public void testFailOnForbiddenConfiguration() throws ProgramInvocationException {
try (final ClusterClient<?> clusterClient =
new MiniClusterClient(
new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster())) {

final PackagedProgram program =
PackagedProgram.newBuilder()
.setEntryPointClassName(ForbidConfigurationJob.class.getName())
.build();

final Configuration configuration = fromPackagedProgram(program, 1, false);
configuration.set(DeploymentOptions.ALLOW_CLIENT_JOB_CONFIGURATIONS, false);

Assertions.assertThatThrownBy(
() ->
ClientUtils.executeProgram(
new TestExecutorServiceLoader(clusterClient, plan),
configuration,
program,
true,
false))
.satisfies(
t ->
Assertions.assertThat(
ExceptionUtils.findThrowable(
t, MutatedConfigurationException.class))
.isPresent());
}
}

// --------------------------------------------------------------------------------------------

/** A test job. */
Expand Down
Loading

0 comments on commit 4953732

Please sign in to comment.