Skip to content

Commit

Permalink
[FLINK-33721][core] Support dump configuration as nested yaml data.
Browse files Browse the repository at this point in the history
  • Loading branch information
JunRuiLee authored and zhuzhurk committed Jan 23, 2024
1 parent bcd448b commit 1f7622d
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -200,6 +201,62 @@ public static String[] splitPaths(@Nonnull String separatedPaths) {
: EMPTY;
}

/**
* Converts the provided configuration data into a format suitable for writing to a file, based
* on the {@code flattenYaml} flag and the {@code standardYaml} attribute of the configuration
* object.
*
* <p>Only when {@code flattenYaml} is set to {@code false} and the configuration object is
* standard yaml, a nested YAML format is used. Otherwise, a flat key-value pair format is
* output.
*
* <p>Each entry in the returned list represents a single line that can be written directly to a
* file.
*
* <p>Example input (flat map configuration data):
*
* <pre>{@code
* {
* "parent.child": "value1",
* "parent.child2": "value2"
* }
* }</pre>
*
* <p>Example output when {@code flattenYaml} is {@code false} and the configuration object is
* standard yaml:
*
* <pre>{@code
* parent:
* child: value1
* child2: value2
* }</pre>
*
* <p>Otherwise, the Example output is:
*
* <pre>{@code
* parent.child: value1
* parent.child2: value2
* }</pre>
*
* @param configuration The configuration to be converted.
* @param flattenYaml A boolean flag indicating if the configuration data should be output in a
* flattened format.
* @return A list of strings, where each string represents a line of the file-writable data in
* the chosen format.
*/
public static List<String> convertConfigToWritableLines(
Configuration configuration, boolean flattenYaml) {
if (configuration.standardYaml && !flattenYaml) {
return YamlParserUtils.convertAndDumpYamlFromFlatMap(
Collections.unmodifiableMap(configuration.confData));
} else {
Map<String, String> fileWritableMap = configuration.toFileWritableMap();
return fileWritableMap.entrySet().stream()
.map(entry -> entry.getKey() + ": " + entry.getValue())
.collect(Collectors.toList());
}
}

/**
* Creates a dynamic parameter list {@code String} of the passed configuration map.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -131,6 +134,39 @@ public static synchronized String toYAMLString(Object value) {
}
}

/**
* Converts a flat map into a nested map structure and outputs the result as a list of
* YAML-formatted strings. Each item in the list represents a single line of the YAML data. The
* method is synchronized and thus thread-safe.
*
* @param flattenMap A map containing flattened keys (e.g., "parent.child.key") associated with
* their values.
* @return A list of strings that represents the YAML data, where each item corresponds to a
* line of the data.
*/
@SuppressWarnings("unchecked")
public static synchronized List<String> convertAndDumpYamlFromFlatMap(
Map<String, Object> flattenMap) {
try {
Map<String, Object> nestedMap = new LinkedHashMap<>();
for (Map.Entry<String, Object> entry : flattenMap.entrySet()) {
String[] keys = entry.getKey().split("\\.");
Map<String, Object> currentMap = nestedMap;
for (int i = 0; i < keys.length - 1; i++) {
currentMap =
(Map<String, Object>)
currentMap.computeIfAbsent(keys[i], k -> new LinkedHashMap<>());
}
currentMap.put(keys[keys.length - 1], entry.getValue());
}
String data = yaml.dumpAsMap(nestedMap);
String linebreak = dumperOptions.getLineBreak().getString();
return Arrays.asList(data.split(linebreak));
} catch (MarkedYAMLException exception) {
throw wrapExceptionToHiddenSensitiveData(exception);
}
}

public static synchronized <T> T convertToObject(String value, Class<T> type) {
try {
return yaml.loadAs(value, type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.TimeUtils;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
Expand Down Expand Up @@ -96,6 +97,92 @@ void testStandardYamlSupportLegacyPattern() {
.isEqualTo(expectedMap);
}

@TestTemplate
void testConvertConfigToWritableLinesAndFlattenYaml() {
testConvertConfigToWritableLines(true);
}

@TestTemplate
void testConvertConfigToWritableLinesAndNoFlattenYaml() {
testConvertConfigToWritableLines(false);
}

private void testConvertConfigToWritableLines(boolean flattenYaml) {
final Configuration configuration = new Configuration(standardYaml);
ConfigOption<List<String>> nestedListOption =
ConfigOptions.key("nested.test-list-key").stringType().asList().noDefaultValue();
final String listValues = "value1;value2;value3";
final String yamlListValues = "[value1, value2, value3]";
configuration.set(nestedListOption, Arrays.asList(listValues.split(";")));

ConfigOption<Map<String, String>> nestedMapOption =
ConfigOptions.key("nested.test-map-key").mapType().noDefaultValue();
final String mapValues = "key1:value1,key2:value2";
final String yamlMapValues = "{key1: value1, key2: value2}";
configuration.set(
nestedMapOption,
Arrays.stream(mapValues.split(","))
.collect(Collectors.toMap(e -> e.split(":")[0], e -> e.split(":")[1])));

ConfigOption<Duration> nestedDurationOption =
ConfigOptions.key("nested.test-duration-key").durationType().noDefaultValue();
final Duration duration = Duration.ofMillis(3000);
configuration.set(nestedDurationOption, duration);

ConfigOption<String> nestedStringOption =
ConfigOptions.key("nested.test-string-key").stringType().noDefaultValue();
final String strValues = "*";
final String yamlStrValues = "'*'";
configuration.set(nestedStringOption, strValues);

ConfigOption<Integer> intOption =
ConfigOptions.key("test-int-key").intType().noDefaultValue();
final int intValue = 1;
configuration.set(intOption, intValue);

List<String> actualData =
ConfigurationUtils.convertConfigToWritableLines(configuration, flattenYaml);
List<String> expected;
if (standardYaml) {
if (flattenYaml) {
expected =
Arrays.asList(
nestedListOption.key() + ": " + yamlListValues,
nestedMapOption.key() + ": " + yamlMapValues,
nestedDurationOption.key()
+ ": "
+ TimeUtils.formatWithHighestUnit(duration),
nestedStringOption.key() + ": " + yamlStrValues,
intOption.key() + ": " + intValue);
} else {
expected =
Arrays.asList(
"nested:",
" test-list-key:",
" - value1",
" - value2",
" - value3",
" test-map-key:",
" key1: value1",
" key2: value2",
" test-duration-key: 3 s",
" test-string-key: '*'",
"test-int-key: 1");
}
} else {
expected =
Arrays.asList(
nestedListOption.key() + ": " + listValues,
nestedMapOption.key() + ": " + mapValues,
nestedDurationOption.key()
+ ": "
+ TimeUtils.formatWithHighestUnit(duration),
nestedStringOption.key() + ": " + strValues,
intOption.key() + ": " + intValue);
}
assertThat(expected).containsExactlyInAnyOrderElementsOf(actualData);
}

@TestTemplate
void testHideSensitiveValues() {
final Map<String, String> keyValuePairs = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -150,6 +151,63 @@ void testConvertToObject() {
assertThat(YamlParserUtils.convertToObject(s4, Map.class)).isEqualTo(map);
}

@Test
void testDumpNestedYamlFromFlatMap() {
Map<String, Object> flattenMap = new HashMap<>();
flattenMap.put("string", "stringValue");
flattenMap.put("integer", 42);
flattenMap.put("double", 3.14);
flattenMap.put("boolean", true);
flattenMap.put("enum", TestEnum.ENUM);
flattenMap.put("list1", Arrays.asList("item1", "item2", "item3"));
flattenMap.put("list2", "{item1, item2, item3}");
flattenMap.put("map1", Collections.singletonMap("k1", "v1"));
flattenMap.put("map2", "{k2: v2}");
flattenMap.put(
"listMap1",
Arrays.asList(
Collections.singletonMap("k3", "v3"),
Collections.singletonMap("k4", "v4")));
flattenMap.put("listMap2", "[{k5: v5}, {k6: v6}]");
flattenMap.put("nested.key1.subKey1", "value1");
flattenMap.put("nested.key2.subKey1", "value2");
flattenMap.put("nested.key3", "value3");
flattenMap.put("escaped1", "*");
flattenMap.put("escaped2", "1");
flattenMap.put("escaped3", "true");

List<String> values = YamlParserUtils.convertAndDumpYamlFromFlatMap(flattenMap);

assertThat(values)
.containsExactlyInAnyOrder(
"string: stringValue",
"integer: 42",
"double: 3.14",
"boolean: true",
"enum: ENUM",
"list1:",
"- item1",
"- item2",
"- item3",
"list2: '{item1, item2, item3}'",
"map1:",
" k1: v1",
"map2: '{k2: v2}'",
"listMap1:",
"- k3: v3",
"- k4: v4",
"listMap2: '[{k5: v5}, {k6: v6}]'",
"nested:",
" key1:",
" subKey1: value1",
" key2:",
" subKey1: value2",
" key3: value3",
"escaped1: '*'",
"escaped2: '1'",
"escaped3: 'true'");
}

private enum TestEnum {
ENUM
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.test.util.FileUtils;
Expand Down Expand Up @@ -417,15 +418,11 @@ public void appendConfiguration(Configuration config) throws IOException {
mergedConfig.addAll(defaultConfig);
mergedConfig.addAll(config);

final List<String> configurationLines =
mergedConfig.toFileWritableMap().entrySet().stream()
.map(entry -> entry.getKey() + ": " + entry.getValue())
.collect(Collectors.toList());

// NOTE: Before we change the default conf file in the flink-dist to 'config.yaml', we
// need to use the legacy flink conf file 'flink-conf.yaml' here.
Files.write(
conf.resolve(GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME), configurationLines);
conf.resolve(GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME),
ConfigurationUtils.convertConfigToWritableLines(mergedConfig, true));
}

public void setTaskExecutorHosts(Collection<String> taskExecutorHosts) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
Expand Down Expand Up @@ -136,9 +137,9 @@ public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOExcepti
data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8));
}

final Map<String, String> propertiesMap =
getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration());
data.put(GlobalConfiguration.getFlinkConfFilename(), getFlinkConfData(propertiesMap));
final List<String> confData =
getClusterSideConfData(kubernetesComponentConf.getFlinkConfiguration());
data.put(GlobalConfiguration.getFlinkConfFilename(), getFlinkConfData(confData));

final ConfigMap flinkConfConfigMap =
new ConfigMapBuilder()
Expand All @@ -154,7 +155,7 @@ public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOExcepti
}

/** Get properties map for the cluster-side after removal of some keys. */
private Map<String, String> getClusterSidePropertiesMap(Configuration flinkConfig) {
private List<String> getClusterSideConfData(Configuration flinkConfig) {
final Configuration clusterSideConfig = flinkConfig.clone();
// Remove some configuration options that should not be taken to cluster side.
clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
Expand All @@ -163,19 +164,14 @@ private Map<String, String> getClusterSidePropertiesMap(Configuration flinkConfi
clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST);
clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST);
clusterSideConfig.removeConfig(TaskManagerOptions.HOST);
return clusterSideConfig.toFileWritableMap();
return ConfigurationUtils.convertConfigToWritableLines(clusterSideConfig, false);
}

@VisibleForTesting
String getFlinkConfData(Map<String, String> propertiesMap) throws IOException {
String getFlinkConfData(List<String> confData) throws IOException {
try (StringWriter sw = new StringWriter();
PrintWriter out = new PrintWriter(sw)) {
propertiesMap.forEach(
(k, v) -> {
out.print(k);
out.print(": ");
out.println(v);
});
confData.forEach(out::println);

return sw.toString();
}
Expand Down
Loading

0 comments on commit 1f7622d

Please sign in to comment.