Skip to content

Commit

Permalink
[FLINK-29309][streaming-java] Relax allow-client-job-configurations f…
Browse files Browse the repository at this point in the history
…or Table API and parameters

This closes apache#20840.
  • Loading branch information
twalthr committed Sep 23, 2022
1 parent 0154de9 commit 298b888
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 83 deletions.
18 changes: 12 additions & 6 deletions docs/layouts/shortcodes/generated/deployment_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>execution.allow-client-job-configurations</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Determines whether configurations in the user program are allowed. Depending on your deployment mode failing the job might have different affects. Either your client that is trying to submit the job to an external cluster (session cluster deployment) throws the exception or the Job manager (application mode deployment).</td>
</tr>
<tr>
<td><h5>execution.attached</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand All @@ -26,6 +20,18 @@
<td>List&lt;String&gt;</td>
<td>Custom JobListeners to be registered with the execution environment. The registered listeners cannot have constructors with arguments.</td>
</tr>
<tr>
<td><h5>execution.program-config.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Determines whether configurations in the user program are allowed. By default, configuration can be set both on a cluster-level (via options) or within the user program (i.e. programmatic via environment setters). If disabled, all configuration must be defined on a cluster-level and programmatic setters in the user program are prohibited.<br /><br />Depending on your deployment mode failing the job might have different implications. Either your client that is trying to submit the job to an external cluster (session cluster deployment) throws the exception or the job manager (application mode deployment).<br /><br />The 'execution.program-config.wildcards' option lists configuration keys that are allowed to be set in user programs regardless of this setting.</td>
</tr>
<tr>
<td><h5>execution.program-config.wildcards</h5></td>
<td style="word-wrap: break-word;"></td>
<td>List&lt;String&gt;</td>
<td>List of configuration keys that are allowed to be set in a user program regardless whether program configuration is enabled or not.<br /><br />Currently, this list is limited to 'pipeline.global-job-parameters' only.</td>
</tr>
<tr>
<td><h5>execution.shutdown-on-application-finish</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
package org.apache.flink.client.program;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.DeploymentOptions;

import java.util.Collection;

/**
* If {@link org.apache.flink.configuration.DeploymentOptions#ALLOW_CLIENT_JOB_CONFIGURATIONS} is
* disabled configurations in the user jar will throw this exception.
* If {@link DeploymentOptions#PROGRAM_CONFIG_ENABLED} is disabled, configurations in the user jar
* will throw this exception.
*/
@Internal
public class MutatedConfigurationException extends Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.dispatcher.ConfigurationNotAllowedMessage;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.streaming.api.graph.StreamGraph;
Expand All @@ -49,9 +53,12 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -65,18 +72,36 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {

private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);

/**
* Due to the complexity of the current configuration stack, we need to limit the available
* wildcard options for {@link DeploymentOptions#PROGRAM_CONFIG_WILDCARDS}.
*
* <p>If everything was backed by {@link Configuration} instead of the POJOs that partially
* materialize the config options, the implementation could be way easier. Currently, we need to
* manually provide the backward path from POJO to {@link ConfigOption} value here to let {@link
* #collectNotAllowedConfigurations()} filter out wildcards in both POJOs and {@link
* #configuration}.
*/
private static final Map<String, WildcardOption<?>> SUPPORTED_PROGRAM_CONFIG_WILDCARDS =
new HashMap<>();

static {
SUPPORTED_PROGRAM_CONFIG_WILDCARDS.put(
PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
new WildcardOption<>(
PipelineOptions.GLOBAL_JOB_PARAMETERS,
env -> env.getConfig().getGlobalJobParameters().toMap()));
}

private final boolean suppressSysout;

private final boolean enforceSingleJobExecution;
private final byte[] originalCheckpointConfigSerialized;
private final byte[] originalExecutionConfigSerialized;
private final Configuration originalConfiguration;
private final Configuration clusterConfiguration;

private int jobCounter;

private final Collection<String> errorMessages;

private final boolean allowConfigurations;
private final boolean programConfigEnabled;
private final Collection<String> programConfigWildcards;

public StreamContextEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
Expand All @@ -87,6 +112,7 @@ public StreamContextEnvironment(
this(
executorServiceLoader,
configuration,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout,
Expand All @@ -97,21 +123,20 @@ public StreamContextEnvironment(
@Internal
public StreamContextEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration clusterConfiguration,
final Configuration configuration,
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution,
final boolean suppressSysout,
final boolean allowConfigurations,
final Collection<String> errorMessages) {
final boolean programConfigEnabled,
final Collection<String> programConfigWildcards) {
super(executorServiceLoader, configuration, userCodeClassLoader);
this.suppressSysout = suppressSysout;
this.enforceSingleJobExecution = enforceSingleJobExecution;
this.allowConfigurations = allowConfigurations;
this.originalCheckpointConfigSerialized = serializeConfig(checkpointCfg);
this.originalExecutionConfigSerialized = serializeConfig(config);
this.originalConfiguration = new Configuration(configuration);
this.errorMessages = errorMessages;
this.clusterConfiguration = clusterConfiguration;
this.jobCounter = 0;
this.programConfigEnabled = programConfigEnabled;
this.programConfigWildcards = programConfigWildcards;
}

@Override
Expand All @@ -136,13 +161,6 @@ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
}
}

private void checkNotAllowedConfigurations() throws MutatedConfigurationException {
errorMessages.addAll(collectNotAllowedConfigurations());
if (!errorMessages.isEmpty()) {
throw new MutatedConfigurationException(errorMessages);
}
}

private JobExecutionResult getJobExecutionResult(final JobClient jobClient) throws Exception {
checkNotNull(jobClient);

Expand Down Expand Up @@ -206,35 +224,28 @@ private void validateAllowedExecution() {

public static void setAsContext(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final Configuration clusterConfiguration,
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution,
final boolean suppressSysout) {
StreamExecutionEnvironmentFactory factory =
conf -> {
final List<String> errors = new ArrayList<>();
final boolean allowConfigurations =
configuration.getBoolean(
DeploymentOptions.ALLOW_CLIENT_JOB_CONFIGURATIONS);
if (!allowConfigurations && !conf.toMap().isEmpty()) {
conf.toMap()
.forEach(
(k, v) ->
errors.add(
ConfigurationNotAllowedMessage
.ofConfigurationKeyAndValue(k, v)));
}
Configuration mergedConfiguration = new Configuration();
mergedConfiguration.addAll(configuration);
mergedConfiguration.addAll(conf);
final StreamExecutionEnvironmentFactory factory =
envInitConfig -> {
final boolean programConfigEnabled =
clusterConfiguration.get(DeploymentOptions.PROGRAM_CONFIG_ENABLED);
final List<String> programConfigWildcards =
clusterConfiguration.get(DeploymentOptions.PROGRAM_CONFIG_WILDCARDS);
final Configuration mergedEnvConfig = new Configuration();
mergedEnvConfig.addAll(clusterConfiguration);
mergedEnvConfig.addAll(envInitConfig);
return new StreamContextEnvironment(
executorServiceLoader,
mergedConfiguration,
clusterConfiguration,
mergedEnvConfig,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout,
allowConfigurations,
errors);
programConfigEnabled,
programConfigWildcards);
};
initializeContextEnvironment(factory);
}
Expand All @@ -243,13 +254,43 @@ public static void unsetAsContext() {
resetContextEnvironment();
}

private List<String> collectNotAllowedConfigurations() {
final List<String> errors = new ArrayList<>();
if (allowConfigurations) {
return errors;
// --------------------------------------------------------------------------------------------
// Program Configuration Validation
// --------------------------------------------------------------------------------------------

private void checkNotAllowedConfigurations() throws MutatedConfigurationException {
final Collection<String> errorMessages = collectNotAllowedConfigurations();
if (!errorMessages.isEmpty()) {
throw new MutatedConfigurationException(errorMessages);
}
}

/**
* Collects programmatic configuration changes.
*
* <p>Configuration is spread across instances of {@link Configuration} and POJOs (e.g. {@link
* ExecutionConfig}), so we need to have logic for comparing both. For supporting wildcards, the
* first can be accomplished by simply removing keys, the latter by setting equal fields before
* comparison.
*/
private Collection<String> collectNotAllowedConfigurations() {
if (programConfigEnabled) {
return Collections.emptyList();
}

final List<String> errors = new ArrayList<>();

final Configuration clusterConfigMap = new Configuration(clusterConfiguration);
final Configuration envConfigMap = new Configuration(configuration);

// Removal must happen on Configuration objects (not instances of Map)
// to also ignore map-typed config options with prefix key notation
removeProgramConfigWildcards(clusterConfigMap);
removeProgramConfigWildcards(envConfigMap);

// Check Configuration
final MapDifference<String, String> diff =
Maps.difference(originalConfiguration.toMap(), configuration.toMap());
Maps.difference(clusterConfigMap.toMap(), envConfigMap.toMap());
diff.entriesOnlyOnRight()
.forEach(
(k, v) ->
Expand All @@ -269,20 +310,57 @@ private List<String> collectNotAllowedConfigurations() {
ConfigurationNotAllowedMessage.ofConfigurationChange(
k, v)));

if (!Arrays.equals(originalCheckpointConfigSerialized, serializeConfig(checkpointCfg))) {
final Configuration enrichedClusterConfig = new Configuration(clusterConfiguration);
enrichProgramConfigWildcards(enrichedClusterConfig);

// Check CheckpointConfig
final CheckpointConfig clusterCheckpointConfig = new CheckpointConfig();
clusterCheckpointConfig.configure(enrichedClusterConfig);
if (!Arrays.equals(
serializeConfig(clusterCheckpointConfig), serializeConfig(checkpointCfg))) {
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObject(
checkpointCfg.getClass().getSimpleName()));
}

if (!Arrays.equals(originalExecutionConfigSerialized, serializeConfig(config))) {
// Check ExecutionConfig
final ExecutionConfig clusterExecutionConfig = new ExecutionConfig();
clusterExecutionConfig.configure(enrichedClusterConfig, this.getUserClassloader());
if (!Arrays.equals(serializeConfig(clusterExecutionConfig), serializeConfig(config))) {
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObject(
config.getClass().getSimpleName()));
}

return errors;
}

private void enrichProgramConfigWildcards(Configuration mutableConfig) {
for (String key : programConfigWildcards) {
final WildcardOption<?> option = SUPPORTED_PROGRAM_CONFIG_WILDCARDS.get(key);
if (option == null) {
throw new FlinkRuntimeException(
String.format(
"Unsupported option '%s' for program configuration wildcards.",
key));
}
option.enrich(mutableConfig, this);
}
}

private void removeProgramConfigWildcards(Configuration mutableConfig) {
for (String key : programConfigWildcards) {
final WildcardOption<?> option = SUPPORTED_PROGRAM_CONFIG_WILDCARDS.get(key);
if (option == null) {
throw new FlinkRuntimeException(
String.format(
"Unsupported option '%s' for program configuration wildcards.",
key));
}
option.remove(mutableConfig);
}
}

private static byte[] serializeConfig(Serializable config) {
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
final ObjectOutputStream oos = new ObjectOutputStream(bos)) {
Expand All @@ -293,4 +371,26 @@ private static byte[] serializeConfig(Serializable config) {
throw new FlinkRuntimeException("Cannot serialize configuration.", e);
}
}

/**
* Helper class for mapping a configuration key to a {@link ConfigOption} and a programmatic
* getter.
*/
private static final class WildcardOption<T> {
private final ConfigOption<T> option;
private final Function<StreamContextEnvironment, T> getter;

WildcardOption(ConfigOption<T> option, Function<StreamContextEnvironment, T> getter) {
this.option = option;
this.getter = getter;
}

void enrich(Configuration mutableConfig, StreamContextEnvironment fromEnv) {
mutableConfig.set(option, getter.apply(fromEnv));
}

void remove(Configuration mutableConfig) {
mutableConfig.removeConfig(option);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ void testFailOnForbiddenConfiguration() throws ProgramInvocationException {
.build();

final Configuration configuration = fromPackagedProgram(program, 1, false);
configuration.set(DeploymentOptions.ALLOW_CLIENT_JOB_CONFIGURATIONS, false);
configuration.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);

assertThatThrownBy(
() ->
Expand Down
Loading

0 comments on commit 298b888

Please sign in to comment.