From 2e0efe4e0723429e26ca04e2f61fcf89884dd077 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Thu, 23 Feb 2023 16:46:01 +0800 Subject: [PATCH] [FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable This closes #21770. --- .../generated/python_configuration.html | 6 ++++++ docs/static/generated/rest_v2_sql_gateway.yml | 2 +- .../flink/client/python/PythonEnvUtils.java | 10 ++++++++++ .../org/apache/flink/python/PythonOptions.java | 12 ++++++++++++ .../env/AbstractPythonEnvironmentManager.java | 9 ++++++++- .../flink/python/env/PythonDependencyInfo.java | 17 ++++++++++++++--- 6 files changed, 51 insertions(+), 5 deletions(-) diff --git a/docs/layouts/shortcodes/generated/python_configuration.html b/docs/layouts/shortcodes/generated/python_configuration.html index ba15a93112bdc..eb9c66795c833 100644 --- a/docs/layouts/shortcodes/generated/python_configuration.html +++ b/docs/layouts/shortcodes/generated/python_configuration.html @@ -98,6 +98,12 @@ Boolean Specifies whether to enable Python worker profiling. The profile result will be displayed in the log file of the TaskManager periodically. The interval between each profiling is determined by the config options python.fn-execution.bundle.size and python.fn-execution.bundle.time. + +
python.pythonpath
+ (none) + String + Specify the path on the Worker Node where the Flink Python Dependencies are installed, which gets added into the PYTHONPATH of the Python Worker. +
python.requirements
(none) diff --git a/docs/static/generated/rest_v2_sql_gateway.yml b/docs/static/generated/rest_v2_sql_gateway.yml index 887930eb65ae6..1284d0ae67fd0 100644 --- a/docs/static/generated/rest_v2_sql_gateway.yml +++ b/docs/static/generated/rest_v2_sql_gateway.yml @@ -6,7 +6,7 @@ info: license: name: Apache 2.0 url: https://www.apache.org/licenses/LICENSE-2.0.html - version: v2/1.17-SNAPSHOT + version: v2/1.18-SNAPSHOT paths: /api_versions: get: diff --git a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java index f018703dcdc9c..6a59511388d1b 100644 --- a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java +++ b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java @@ -70,6 +70,7 @@ import static org.apache.flink.python.PythonOptions.PYTHON_ARCHIVES; import static org.apache.flink.python.PythonOptions.PYTHON_CLIENT_EXECUTABLE; import static org.apache.flink.python.PythonOptions.PYTHON_FILES; +import static org.apache.flink.python.PythonOptions.PYTHON_PATH; import static org.apache.flink.python.util.PythonDependencyUtils.FILE_DELIMITER; /** The util class help to prepare Python env and run the python process. */ @@ -212,6 +213,15 @@ static PythonEnvironment preparePythonEnvironment( }); } + // 4. append configured python.pythonpath to the PYTHONPATH. + if (config.getOptional(PYTHON_PATH).isPresent()) { + env.pythonPath = + String.join( + File.pathSeparator, + config.getOptional(PYTHON_PATH).get(), + env.pythonPath); + } + if (entryPointScript != null) { addToPythonPath(env, Collections.singletonList(new Path(entryPointScript))); } diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java index 3f0b90a91afd5..3a83fd76105a4 100644 --- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java +++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java @@ -122,6 +122,18 @@ public class PythonOptions { + "optional parameter exists. The option is equivalent to the command line option " + "\"-pyreq\"."); + /** The configuration allows user to define python path for client and workers. */ + public static final ConfigOption PYTHON_PATH = + ConfigOptions.key("python.pythonpath") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text( + "Specify the path on the Worker Node where the Flink Python Dependencies are installed, which " + + "gets added into the PYTHONPATH of the Python Worker. ") + .build()); + public static final ConfigOption PYTHON_ARCHIVES = ConfigOptions.key("python.archives") .stringType() diff --git a/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java index 2ab0facb3139b..70992e6ddbb8d 100644 --- a/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java +++ b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java @@ -43,6 +43,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -166,6 +167,13 @@ public Map constructEnvironmentVariables(String baseDirectory) constructFilesDirectory(env, baseDirectory); + if (dependencyInfo.getPythonPath().isPresent()) { + appendToPythonPath( + env, Collections.singletonList(dependencyInfo.getPythonPath().get())); + } + + LOG.info("PYTHONPATH of python worker: {}", env.get("PYTHONPATH")); + constructRequirementsDirectory(env, baseDirectory); constructArchivesDirectory(env, baseDirectory); @@ -294,7 +302,6 @@ private void constructFilesDirectory(Map env, String baseDirecto pythonFilePaths.add(pythonPath); } appendToPythonPath(env, pythonFilePaths); - LOG.info("PYTHONPATH of python worker: {}", env.get("PYTHONPATH")); } private void constructRequirementsDirectory(Map env, String baseDirectory) diff --git a/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java b/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java index 80f570b271491..2f65e5f7be759 100644 --- a/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java +++ b/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java @@ -37,6 +37,7 @@ import static org.apache.flink.python.PythonOptions.PYTHON_EXECUTABLE; import static org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE; import static org.apache.flink.python.PythonOptions.PYTHON_FILES_DISTRIBUTED_CACHE_INFO; +import static org.apache.flink.python.PythonOptions.PYTHON_PATH; import static org.apache.flink.python.PythonOptions.PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO; /** PythonDependencyInfo contains the information of third-party dependencies. */ @@ -63,6 +64,8 @@ public final class PythonDependencyInfo { */ @Nullable private final String requirementsCacheDir; + @Nullable private final String pythonPath; + /** * The python archives uploaded by TableEnvironment#add_python_archive() or command line option * "-pyarch". The key is the path of the archive file and the value is the name of the directory @@ -91,7 +94,8 @@ public PythonDependencyInfo( requirementsCacheDir, archives, pythonExec, - PYTHON_EXECUTION_MODE.defaultValue()); + PYTHON_EXECUTION_MODE.defaultValue(), + PYTHON_PATH.defaultValue()); } public PythonDependencyInfo( @@ -100,13 +104,15 @@ public PythonDependencyInfo( @Nullable String requirementsCacheDir, @Nonnull Map archives, @Nonnull String pythonExec, - @Nonnull String executionMode) { + @Nonnull String executionMode, + @Nullable String pythonPath) { this.pythonFiles = Objects.requireNonNull(pythonFiles); this.requirementsFilePath = requirementsFilePath; this.requirementsCacheDir = requirementsCacheDir; this.pythonExec = Objects.requireNonNull(pythonExec); this.archives = Objects.requireNonNull(archives); this.executionMode = Objects.requireNonNull(executionMode); + this.pythonPath = pythonPath; } public Map getPythonFiles() { @@ -133,6 +139,10 @@ public String getExecutionMode() { return executionMode; } + public Optional getPythonPath() { + return Optional.ofNullable(pythonPath); + } + /** * Creates PythonDependencyInfo from GlobalJobParameters and DistributedCache. * @@ -190,6 +200,7 @@ public static PythonDependencyInfo create( requirementsCacheDir, archives, pythonExec, - config.get(PYTHON_EXECUTION_MODE)); + config.get(PYTHON_EXECUTION_MODE), + config.get(PYTHON_PATH)); } }