Skip to content

Commit

Permalink
[FLINK-33988][configuration] Fix the invalid configuration when using…
Browse files Browse the repository at this point in the history
… initialized root logger level on yarn deployment mode
  • Loading branch information
RocMarshal authored and 1996fanrui committed Jan 17, 2024
1 parent fb7324b commit eb6d6ff
Showing 1 changed file with 19 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@
import static org.apache.flink.configuration.ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_OPT_DIR;
import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX;
import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -204,6 +206,8 @@ public YarnClusterDescriptor(
this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration);
this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration);

adaptEnvSetting(flinkConfiguration, CoreOptions.FLINK_LOG_LEVEL, "ROOT_LOG_LEVEL");

getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath);
decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_FILES)
.ifPresent(this::addShipFiles);
Expand All @@ -216,6 +220,21 @@ public YarnClusterDescriptor(
this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
}

/** Adapt flink env setting. */
private static <T> void adaptEnvSetting(
Configuration config, ConfigOption<T> configOption, String envKey) {
config.getOptional(configOption)
.ifPresent(
value -> {
config.setString(
CONTAINERIZED_MASTER_ENV_PREFIX + envKey,
String.valueOf(value));
config.setString(
CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + envKey,
String.valueOf(value));
});
}

private Optional<List<Path>> decodeFilesToShipToCluster(
final Configuration configuration, final ConfigOption<List<String>> configOption) {
checkNotNull(configuration);
Expand Down

0 comments on commit eb6d6ff

Please sign in to comment.