Skip to content

Commit

Permalink
[FLINK-29888][streaming] Reimplement checkNotAllowedConfigurations us…
Browse files Browse the repository at this point in the history
…ing refactored CheckpointConfig and ExecutionConfig

After FLINK-29379 we can re-implement checkNotAllowedConfigurations to produce better exception messages.
Instead of a generic "Configuration object ExecutionConfig changed", without a hint of what has been modified,
we can report what ConfigOption has been changed.
  • Loading branch information
pnowojski committed Nov 8, 2022
1 parent 69526c5 commit 35b2cae
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<td><h5>execution.program-config.wildcards</h5></td>
<td style="word-wrap: break-word;"></td>
<td>List&lt;String&gt;</td>
<td>List of configuration keys that are allowed to be set in a user program regardless whether program configuration is enabled or not.<br /><br />Currently, this list is limited to 'pipeline.global-job-parameters' only.</td>
<td>List of configuration keys that are allowed to be set in a user program regardless whether program configuration is enabled or not.<br /><br />Currently changes that are not backed by the Configuration class are always allowed.</td>
</tr>
<tr>
<td><h5>execution.shutdown-on-application-finish</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
Expand All @@ -45,20 +43,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -72,27 +62,6 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {

private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);

/**
* Due to the complexity of the current configuration stack, we need to limit the available
* wildcard options for {@link DeploymentOptions#PROGRAM_CONFIG_WILDCARDS}.
*
* <p>If everything was backed by {@link Configuration} instead of the POJOs that partially
* materialize the config options, the implementation could be way easier. Currently, we need to
* manually provide the backward path from POJO to {@link ConfigOption} value here to let {@link
* #collectNotAllowedConfigurations()} filter out wildcards in both POJOs and {@link
* #configuration}.
*/
private static final Map<String, WildcardOption<?>> SUPPORTED_PROGRAM_CONFIG_WILDCARDS =
new HashMap<>();

static {
SUPPORTED_PROGRAM_CONFIG_WILDCARDS.put(
PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
new WildcardOption<>(
PipelineOptions.GLOBAL_JOB_PARAMETERS,
env -> env.getConfig().getGlobalJobParameters().toMap()));
}

private final boolean suppressSysout;

private final boolean enforceSingleJobExecution;
Expand Down Expand Up @@ -281,14 +250,21 @@ private Collection<String> collectNotAllowedConfigurations() {
final List<String> errors = new ArrayList<>();

final Configuration clusterConfigMap = new Configuration(clusterConfiguration);
final Configuration envConfigMap = new Configuration(configuration);

// Removal must happen on Configuration objects (not instances of Map)
// to also ignore map-typed config options with prefix key notation
removeProgramConfigWildcards(clusterConfigMap);

checkMainConfiguration(clusterConfigMap, errors);
checkCheckpointConfig(clusterConfigMap, errors);
checkExecutionConfig(clusterConfigMap, errors);
return errors;
}

private void checkMainConfiguration(Configuration clusterConfigMap, List<String> errors) {
final Configuration envConfigMap = new Configuration(configuration);
removeProgramConfigWildcards(envConfigMap);

// Check Configuration
final MapDifference<String, String> diff =
Maps.difference(clusterConfigMap.toMap(), envConfigMap.toMap());
diff.entriesOnlyOnRight()
Expand All @@ -308,88 +284,61 @@ private Collection<String> collectNotAllowedConfigurations() {
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationChanged(
k, v)));

final Configuration enrichedClusterConfig = new Configuration(clusterConfiguration);
enrichProgramConfigWildcards(enrichedClusterConfig);

// Check CheckpointConfig
final CheckpointConfig clusterCheckpointConfig = new CheckpointConfig();
clusterCheckpointConfig.configure(enrichedClusterConfig);
if (!Arrays.equals(
serializeConfig(clusterCheckpointConfig), serializeConfig(checkpointCfg))) {
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObject(
checkpointCfg.getClass().getSimpleName()));
}

// Check ExecutionConfig
final ExecutionConfig clusterExecutionConfig = new ExecutionConfig();
clusterExecutionConfig.configure(enrichedClusterConfig, this.getUserClassloader());
if (!Arrays.equals(serializeConfig(clusterExecutionConfig), serializeConfig(config))) {
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObject(
config.getClass().getSimpleName()));
}

return errors;
}

private void enrichProgramConfigWildcards(Configuration mutableConfig) {
for (String key : programConfigWildcards) {
final WildcardOption<?> option = SUPPORTED_PROGRAM_CONFIG_WILDCARDS.get(key);
if (option == null) {
throw new FlinkRuntimeException(
String.format(
"Unsupported option '%s' for program configuration wildcards.",
key));
}
option.enrich(mutableConfig, this);
}
private void checkCheckpointConfig(Configuration clusterConfigMap, List<String> errors) {
CheckpointConfig expectedCheckpointConfig = new CheckpointConfig();
expectedCheckpointConfig.configure(clusterConfigMap);
checkConfigurationObject(
expectedCheckpointConfig.toConfiguration(),
checkpointCfg.toConfiguration(),
checkpointCfg.getClass().getSimpleName(),
errors);
}

private void removeProgramConfigWildcards(Configuration mutableConfig) {
for (String key : programConfigWildcards) {
final WildcardOption<?> option = SUPPORTED_PROGRAM_CONFIG_WILDCARDS.get(key);
if (option == null) {
throw new FlinkRuntimeException(
String.format(
"Unsupported option '%s' for program configuration wildcards.",
key));
}
option.remove(mutableConfig);
}
private void checkExecutionConfig(Configuration clusterConfigMap, List<String> errors) {
ExecutionConfig expectedExecutionConfig = new ExecutionConfig();
expectedExecutionConfig.configure(clusterConfigMap, getUserClassloader());
checkConfigurationObject(
expectedExecutionConfig.toConfiguration(),
config.toConfiguration(),
config.getClass().getSimpleName(),
errors);
}

private static byte[] serializeConfig(Serializable config) {
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
final ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(config);
oos.flush();
return bos.toByteArray();
} catch (IOException e) {
throw new FlinkRuntimeException("Cannot serialize configuration.", e);
}
}

/**
* Helper class for mapping a configuration key to a {@link ConfigOption} and a programmatic
* getter.
*/
private static final class WildcardOption<T> {
private final ConfigOption<T> option;
private final Function<StreamContextEnvironment, T> getter;
private void checkConfigurationObject(
Configuration expectedConfiguration,
Configuration actualConfiguration,
String configurationObjectName,
List<String> errors) {
removeProgramConfigWildcards(actualConfiguration);

WildcardOption(ConfigOption<T> option, Function<StreamContextEnvironment, T> getter) {
this.option = option;
this.getter = getter;
}
final MapDifference<String, String> diff =
Maps.difference(expectedConfiguration.toMap(), actualConfiguration.toMap());
diff.entriesOnlyOnRight()
.forEach(
(k, v) ->
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObjectAdded(
configurationObjectName, k, v)));
diff.entriesDiffering()
.forEach(
(k, v) ->
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObjectChanged(
configurationObjectName, k, v)));

void enrich(Configuration mutableConfig, StreamContextEnvironment fromEnv) {
mutableConfig.set(option, getter.apply(fromEnv));
}
diff.entriesOnlyOnLeft()
.forEach(
(k, v) ->
errors.add(
ConfigurationNotAllowedMessage.ofConfigurationObjectRemoved(
configurationObjectName, k, v)));
}

void remove(Configuration mutableConfig) {
mutableConfig.removeConfig(option);
private void removeProgramConfigWildcards(Configuration mutableConfig) {
for (String key : programConfigWildcards) {
mutableConfig.removeKey(key);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.util.function.ThrowingConsumer;
Expand Down Expand Up @@ -79,6 +80,7 @@ void testDisallowProgramConfigurationChanges(
environment.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
// Change the ExecutionConfig
environment.setParallelism(25);
environment.getConfig().setMaxParallelism(1024);

// Add/mutate values in the configuration
environment.configure(programConfig);
Expand All @@ -90,7 +92,9 @@ void testDisallowProgramConfigurationChanges(
ExecutionOptions.RUNTIME_MODE.key(),
ExecutionOptions.SORT_INPUTS.key(),
CheckpointConfig.class.getSimpleName(),
ExecutionConfig.class.getSimpleName());
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key(),
ExecutionConfig.class.getSimpleName(),
PipelineOptions.MAX_PARALLELISM.key());
}

@ParameterizedTest
Expand All @@ -100,7 +104,8 @@ void testAllowProgramConfigurationWildcards(
final Configuration clusterConfig = new Configuration();
clusterConfig.set(DeploymentOptions.TARGET, "local");
clusterConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
// Test prefix map notation
// Changing GLOBAL_JOB_PARAMETERS is always allowed, as it's one of the fields not checked
// with PROGRAM_CONFIG_ENABLED set to false
clusterConfig.setString(
PipelineOptions.GLOBAL_JOB_PARAMETERS.key() + "." + "my-param", "my-value");

Expand All @@ -119,10 +124,13 @@ void testAllowProgramConfigurationWildcards(
true,
true,
false,
Collections.singletonList(PipelineOptions.GLOBAL_JOB_PARAMETERS.key()));
Arrays.asList(
PipelineOptions.GLOBAL_JOB_PARAMETERS.key(),
PipelineOptions.MAX_PARALLELISM.key()));

// Change ExecutionConfig
environment.configure(jobConfig);
environment.getConfig().setMaxParallelism(1024);

environment.fromCollection(Collections.singleton(1)).addSink(new DiscardingSink<>());
assertThatThrownBy(() -> executor.accept(environment))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,20 @@ public <T> boolean removeConfig(ConfigOption<T> configOption) {
}
}

/**
* Removes given key from the configuration.
*
* @param key key of a config option to remove
* @return true is config has been removed, false otherwise
*/
public boolean removeKey(String key) {
synchronized (this.confData) {
boolean removed = this.confData.remove(key) != null;
removed |= removePrefixMap(confData, key);
return removed;
}
}

// --------------------------------------------------------------------------------------------

<T> void setValueInternal(String key, T value, boolean canBePrefixMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ public <T> boolean removeConfig(ConfigOption<T> configOption) {
return backingConfig.removeConfig(configOption);
}

@Override
public boolean removeKey(String key) {
return backingConfig.removeKey(key);
}

@Override
public boolean containsKey(String key) {
return backingConfig.containsKey(prefix + key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,7 @@ public class DeploymentOptions {
.linebreak()
.linebreak()
.text(
"Currently, this list is limited to '%s' only.",
TextElement.text(
PipelineOptions.GLOBAL_JOB_PARAMETERS.key()))
"Currently changes that are not backed by the Configuration class are always allowed.")
.build());

@Experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -314,6 +315,25 @@ public void testRemove() {
assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption));
}

@Test
public void testRemoveKey() {
Configuration cfg = new Configuration();
String key1 = "a.b";
String key2 = "c.d";
cfg.setInteger(key1, 42);
cfg.setInteger(key2, 44);
cfg.setInteger(key2 + ".f1", 44);
cfg.setInteger(key2 + ".f2", 44);
cfg.setInteger("e.f", 1337);

assertFalse(cfg.removeKey("not-existing-key"));
assertTrue(cfg.removeKey(key1));
assertFalse(cfg.containsKey(key1));

assertTrue(cfg.removeKey(key2));
assertThat(cfg.keySet(), containsInAnyOrder("e.f"));
}

@Test
public void testShouldParseValidStringToEnum() {
final Configuration configuration = new Configuration();
Expand Down
Loading

0 comments on commit 35b2cae

Please sign in to comment.