diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index 58cbd620edec4..66f92fe44c49c 100755 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -28,6 +28,7 @@ import java.io.File; import java.time.Duration; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -200,6 +201,62 @@ public static String[] splitPaths(@Nonnull String separatedPaths) { : EMPTY; } + /** + * Converts the provided configuration data into a format suitable for writing to a file, based + * on the {@code flattenYaml} flag and the {@code standardYaml} attribute of the configuration + * object. + * + *

Only when {@code flattenYaml} is set to {@code false} and the configuration object is + * standard yaml, a nested YAML format is used. Otherwise, a flat key-value pair format is + * output. + * + *

Each entry in the returned list represents a single line that can be written directly to a + * file. + * + *

Example input (flat map configuration data): + * + *

{@code
+     * {
+     *      "parent.child": "value1",
+     *      "parent.child2": "value2"
+     * }
+     * }
+ * + *

Example output when {@code flattenYaml} is {@code false} and the configuration object is + * standard yaml: + * + *

{@code
+     * parent:
+     *   child: value1
+     *   child2: value2
+     * }
+ * + *

Otherwise, the Example output is: + * + *

{@code
+     * parent.child: value1
+     * parent.child2: value2
+     * }
+ * + * @param configuration The configuration to be converted. + * @param flattenYaml A boolean flag indicating if the configuration data should be output in a + * flattened format. + * @return A list of strings, where each string represents a line of the file-writable data in + * the chosen format. + */ + public static List convertConfigToWritableLines( + Configuration configuration, boolean flattenYaml) { + if (configuration.standardYaml && !flattenYaml) { + return YamlParserUtils.convertAndDumpYamlFromFlatMap( + Collections.unmodifiableMap(configuration.confData)); + } else { + Map fileWritableMap = configuration.toFileWritableMap(); + return fileWritableMap.entrySet().stream() + .map(entry -> entry.getKey() + ": " + entry.getValue()) + .collect(Collectors.toList()); + } + } + /** * Creates a dynamic parameter list {@code String} of the passed configuration map. * diff --git a/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java index c4388c61d7165..ae9280b3a8646 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java @@ -42,7 +42,10 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; /** @@ -131,6 +134,39 @@ public static synchronized String toYAMLString(Object value) { } } + /** + * Converts a flat map into a nested map structure and outputs the result as a list of + * YAML-formatted strings. Each item in the list represents a single line of the YAML data. The + * method is synchronized and thus thread-safe. + * + * @param flattenMap A map containing flattened keys (e.g., "parent.child.key") associated with + * their values. + * @return A list of strings that represents the YAML data, where each item corresponds to a + * line of the data. + */ + @SuppressWarnings("unchecked") + public static synchronized List convertAndDumpYamlFromFlatMap( + Map flattenMap) { + try { + Map nestedMap = new LinkedHashMap<>(); + for (Map.Entry entry : flattenMap.entrySet()) { + String[] keys = entry.getKey().split("\\."); + Map currentMap = nestedMap; + for (int i = 0; i < keys.length - 1; i++) { + currentMap = + (Map) + currentMap.computeIfAbsent(keys[i], k -> new LinkedHashMap<>()); + } + currentMap.put(keys[keys.length - 1], entry.getValue()); + } + String data = yaml.dumpAsMap(nestedMap); + String linebreak = dumperOptions.getLineBreak().getString(); + return Arrays.asList(data.split(linebreak)); + } catch (MarkedYAMLException exception) { + throw wrapExceptionToHiddenSensitiveData(exception); + } + } + public static synchronized T convertToObject(String value, Class type) { try { return yaml.loadAs(value, type); diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java index c94bd69745450..757bf9e953c6b 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java @@ -21,6 +21,7 @@ import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; +import org.apache.flink.util.TimeUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestTemplate; @@ -96,6 +97,92 @@ void testStandardYamlSupportLegacyPattern() { .isEqualTo(expectedMap); } + @TestTemplate + void testConvertConfigToWritableLinesAndFlattenYaml() { + testConvertConfigToWritableLines(true); + } + + @TestTemplate + void testConvertConfigToWritableLinesAndNoFlattenYaml() { + testConvertConfigToWritableLines(false); + } + + private void testConvertConfigToWritableLines(boolean flattenYaml) { + final Configuration configuration = new Configuration(standardYaml); + ConfigOption> nestedListOption = + ConfigOptions.key("nested.test-list-key").stringType().asList().noDefaultValue(); + final String listValues = "value1;value2;value3"; + final String yamlListValues = "[value1, value2, value3]"; + configuration.set(nestedListOption, Arrays.asList(listValues.split(";"))); + + ConfigOption> nestedMapOption = + ConfigOptions.key("nested.test-map-key").mapType().noDefaultValue(); + final String mapValues = "key1:value1,key2:value2"; + final String yamlMapValues = "{key1: value1, key2: value2}"; + configuration.set( + nestedMapOption, + Arrays.stream(mapValues.split(",")) + .collect(Collectors.toMap(e -> e.split(":")[0], e -> e.split(":")[1]))); + + ConfigOption nestedDurationOption = + ConfigOptions.key("nested.test-duration-key").durationType().noDefaultValue(); + final Duration duration = Duration.ofMillis(3000); + configuration.set(nestedDurationOption, duration); + + ConfigOption nestedStringOption = + ConfigOptions.key("nested.test-string-key").stringType().noDefaultValue(); + final String strValues = "*"; + final String yamlStrValues = "'*'"; + configuration.set(nestedStringOption, strValues); + + ConfigOption intOption = + ConfigOptions.key("test-int-key").intType().noDefaultValue(); + final int intValue = 1; + configuration.set(intOption, intValue); + + List actualData = + ConfigurationUtils.convertConfigToWritableLines(configuration, flattenYaml); + List expected; + if (standardYaml) { + if (flattenYaml) { + expected = + Arrays.asList( + nestedListOption.key() + ": " + yamlListValues, + nestedMapOption.key() + ": " + yamlMapValues, + nestedDurationOption.key() + + ": " + + TimeUtils.formatWithHighestUnit(duration), + nestedStringOption.key() + ": " + yamlStrValues, + intOption.key() + ": " + intValue); + } else { + expected = + Arrays.asList( + "nested:", + " test-list-key:", + " - value1", + " - value2", + " - value3", + " test-map-key:", + " key1: value1", + " key2: value2", + " test-duration-key: 3 s", + " test-string-key: '*'", + "test-int-key: 1"); + } + } else { + expected = + Arrays.asList( + nestedListOption.key() + ": " + listValues, + nestedMapOption.key() + ": " + mapValues, + nestedDurationOption.key() + + ": " + + TimeUtils.formatWithHighestUnit(duration), + nestedStringOption.key() + ": " + strValues, + intOption.key() + ": " + intValue); + } + assertThat(expected).containsExactlyInAnyOrderElementsOf(actualData); + } + @TestTemplate void testHideSensitiveValues() { final Map keyValuePairs = new HashMap<>(); diff --git a/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java index b21fb229280f1..f857914238fcf 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java @@ -30,6 +30,7 @@ import java.io.FileNotFoundException; import java.io.PrintWriter; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -150,6 +151,63 @@ void testConvertToObject() { assertThat(YamlParserUtils.convertToObject(s4, Map.class)).isEqualTo(map); } + @Test + void testDumpNestedYamlFromFlatMap() { + Map flattenMap = new HashMap<>(); + flattenMap.put("string", "stringValue"); + flattenMap.put("integer", 42); + flattenMap.put("double", 3.14); + flattenMap.put("boolean", true); + flattenMap.put("enum", TestEnum.ENUM); + flattenMap.put("list1", Arrays.asList("item1", "item2", "item3")); + flattenMap.put("list2", "{item1, item2, item3}"); + flattenMap.put("map1", Collections.singletonMap("k1", "v1")); + flattenMap.put("map2", "{k2: v2}"); + flattenMap.put( + "listMap1", + Arrays.asList( + Collections.singletonMap("k3", "v3"), + Collections.singletonMap("k4", "v4"))); + flattenMap.put("listMap2", "[{k5: v5}, {k6: v6}]"); + flattenMap.put("nested.key1.subKey1", "value1"); + flattenMap.put("nested.key2.subKey1", "value2"); + flattenMap.put("nested.key3", "value3"); + flattenMap.put("escaped1", "*"); + flattenMap.put("escaped2", "1"); + flattenMap.put("escaped3", "true"); + + List values = YamlParserUtils.convertAndDumpYamlFromFlatMap(flattenMap); + + assertThat(values) + .containsExactlyInAnyOrder( + "string: stringValue", + "integer: 42", + "double: 3.14", + "boolean: true", + "enum: ENUM", + "list1:", + "- item1", + "- item2", + "- item3", + "list2: '{item1, item2, item3}'", + "map1:", + " k1: v1", + "map2: '{k2: v2}'", + "listMap1:", + "- k3: v3", + "- k4: v4", + "listMap2: '[{k5: v5}, {k6: v6}]'", + "nested:", + " key1:", + " subKey1: value1", + " key2:", + " subKey1: value2", + " key3: value3", + "escaped1: '*'", + "escaped2: '1'", + "escaped3: 'true'"); + } + private enum TestEnum { ENUM } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java index 169ac74671ec5..5969f0850d9c7 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.test.util.FileUtils; @@ -417,15 +418,11 @@ public void appendConfiguration(Configuration config) throws IOException { mergedConfig.addAll(defaultConfig); mergedConfig.addAll(config); - final List configurationLines = - mergedConfig.toFileWritableMap().entrySet().stream() - .map(entry -> entry.getKey() + ": " + entry.getValue()) - .collect(Collectors.toList()); - // NOTE: Before we change the default conf file in the flink-dist to 'config.yaml', we // need to use the legacy flink conf file 'flink-conf.yaml' here. Files.write( - conf.resolve(GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME), configurationLines); + conf.resolve(GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME), + ConfigurationUtils.convertConfigToWritableLines(mergedConfig, true)); } public void setTaskExecutorHosts(Collection taskExecutorHosts) throws IOException { diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java index c95312607392d..2203770e2670d 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.DeploymentOptionsInternal; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; @@ -136,9 +137,9 @@ public List buildAccompanyingKubernetesResources() throws IOExcepti data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8)); } - final Map propertiesMap = - getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration()); - data.put(GlobalConfiguration.getFlinkConfFilename(), getFlinkConfData(propertiesMap)); + final List confData = + getClusterSideConfData(kubernetesComponentConf.getFlinkConfiguration()); + data.put(GlobalConfiguration.getFlinkConfFilename(), getFlinkConfData(confData)); final ConfigMap flinkConfConfigMap = new ConfigMapBuilder() @@ -154,7 +155,7 @@ public List buildAccompanyingKubernetesResources() throws IOExcepti } /** Get properties map for the cluster-side after removal of some keys. */ - private Map getClusterSidePropertiesMap(Configuration flinkConfig) { + private List getClusterSideConfData(Configuration flinkConfig) { final Configuration clusterSideConfig = flinkConfig.clone(); // Remove some configuration options that should not be taken to cluster side. clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE); @@ -163,19 +164,14 @@ private Map getClusterSidePropertiesMap(Configuration flinkConfi clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST); clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST); clusterSideConfig.removeConfig(TaskManagerOptions.HOST); - return clusterSideConfig.toFileWritableMap(); + return ConfigurationUtils.convertConfigToWritableLines(clusterSideConfig, false); } @VisibleForTesting - String getFlinkConfData(Map propertiesMap) throws IOException { + String getFlinkConfData(List confData) throws IOException { try (StringWriter sw = new StringWriter(); PrintWriter out = new PrintWriter(sw)) { - propertiesMap.forEach( - (k, v) -> { - out.print(k); - out.print(": "); - out.println(v); - }); + confData.forEach(out::println); return sw.toString(); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java index e5b211a6f84c5..531eacf6de0dd 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java @@ -19,18 +19,19 @@ package org.apache.flink.kubernetes; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.YamlParserUtils; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; import org.apache.flink.shaded.guava32.com.google.common.io.Files; -import org.apache.commons.lang3.StringUtils; - import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; /** Utilities for the Kubernetes tests. */ public class KubernetesTestUtils { @@ -41,15 +42,25 @@ public static void createTemporyFile(String data, File directory, String fileNam } public static Configuration loadConfigurationFromString(String content) { - final Configuration configuration = new Configuration(); - for (String line : content.split(System.lineSeparator())) { - final String[] splits = line.split(":"); - if (splits.length >= 2) { - configuration.setString( - splits[0].trim(), StringUtils.substringAfter(line, ":").trim()); - } - } - return configuration; + Map map = YamlParserUtils.convertToObject(content, Map.class); + return Configuration.fromMap(flatten(map, "")); + } + + private static Map flatten(Map config, String keyPrefix) { + final Map flattenedMap = new HashMap<>(); + + config.forEach( + (key, value) -> { + String flattenedKey = keyPrefix + key; + if (value instanceof Map) { + Map e = (Map) value; + flattenedMap.putAll(flatten(e, flattenedKey + ".")); + } else { + flattenedMap.put(flattenedKey, YamlParserUtils.toYAMLString(value)); + } + }); + + return flattenedMap; } public static KubernetesTaskManagerParameters createTaskManagerParameters( diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java index d6e7a0b71b7b2..c52a38b7076f3 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.kubeclient.factory; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.SecurityOptions; @@ -367,11 +368,11 @@ void testFlinkConfConfigMap() throws IOException { assertThat(resultDatas).hasSize(3); assertThat(resultDatas.get(CONFIG_FILE_LOG4J_NAME)).isEqualTo("some data"); assertThat(resultDatas.get(CONFIG_FILE_LOGBACK_NAME)).isEqualTo("some data"); - assertThat(resultDatas.get(FLINK_CONF_FILENAME)) - .contains( - KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS.key() - + ": " - + ENTRY_POINT_CLASS); + final Configuration resultFlinkConfig = + KubernetesTestUtils.loadConfigurationFromString( + resultDatas.get(FLINK_CONF_FILENAME)); + assertThat(resultFlinkConfig.get(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS)) + .isEqualTo(ENTRY_POINT_CLASS); } @Test diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 7f41012e9e197..db1361113ef11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; import org.apache.flink.util.OperatingSystem; @@ -72,10 +73,8 @@ public class BootstrapTools { public static void writeConfiguration(Configuration cfg, File file) throws IOException { try (FileWriter fwrt = new FileWriter(file); PrintWriter out = new PrintWriter(fwrt)) { - for (Map.Entry entry : cfg.toFileWritableMap().entrySet()) { - out.print(entry.getKey()); - out.print(": "); - out.println(entry.getValue()); + for (String s : ConfigurationUtils.convertConfigToWritableLines(cfg, false)) { + out.println(s); } } } diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java index 7de3bfcf90e8d..38b542e1310cf 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.testframe.container; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.test.util.FileUtils; @@ -40,7 +41,6 @@ import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -298,9 +298,7 @@ private Path createTemporaryFlinkConfFile(Configuration finalConfiguration, Path Path flinkConfFile = tempDirectory.resolve(GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME); Files.write( flinkConfFile, - finalConfiguration.toFileWritableMap().entrySet().stream() - .map(entry -> entry.getKey() + ": " + entry.getValue()) - .collect(Collectors.toList())); + ConfigurationUtils.convertConfigToWritableLines(finalConfiguration, true)); return flinkConfFile; }