Skip to content

Commit

Permalink
[FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable
Browse files Browse the repository at this point in the history
This closes apache#21770.
  • Loading branch information
Samrat002 authored and HuangXingBo committed Feb 27, 2023
1 parent a98bb9a commit 2e0efe4
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 5 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/python_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@
<td>Boolean</td>
<td>Specifies whether to enable Python worker profiling. The profile result will be displayed in the log file of the TaskManager periodically. The interval between each profiling is determined by the config options python.fn-execution.bundle.size and python.fn-execution.bundle.time.</td>
</tr>
<tr>
<td><h5>python.pythonpath</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify the path on the Worker Node where the Flink Python Dependencies are installed, which gets added into the PYTHONPATH of the Python Worker. </td>
</tr>
<tr>
<td><h5>python.requirements</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/static/generated/rest_v2_sql_gateway.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ info:
license:
name: Apache 2.0
url: https://www.apache.org/licenses/LICENSE-2.0.html
version: v2/1.17-SNAPSHOT
version: v2/1.18-SNAPSHOT
paths:
/api_versions:
get:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import static org.apache.flink.python.PythonOptions.PYTHON_ARCHIVES;
import static org.apache.flink.python.PythonOptions.PYTHON_CLIENT_EXECUTABLE;
import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
import static org.apache.flink.python.PythonOptions.PYTHON_PATH;
import static org.apache.flink.python.util.PythonDependencyUtils.FILE_DELIMITER;

/** The util class help to prepare Python env and run the python process. */
Expand Down Expand Up @@ -212,6 +213,15 @@ static PythonEnvironment preparePythonEnvironment(
});
}

// 4. append configured python.pythonpath to the PYTHONPATH.
if (config.getOptional(PYTHON_PATH).isPresent()) {
env.pythonPath =
String.join(
File.pathSeparator,
config.getOptional(PYTHON_PATH).get(),
env.pythonPath);
}

if (entryPointScript != null) {
addToPythonPath(env, Collections.singletonList(new Path(entryPointScript)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,18 @@ public class PythonOptions {
+ "optional parameter exists. The option is equivalent to the command line option "
+ "\"-pyreq\".");

/** The configuration allows user to define python path for client and workers. */
public static final ConfigOption<String> PYTHON_PATH =
ConfigOptions.key("python.pythonpath")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Specify the path on the Worker Node where the Flink Python Dependencies are installed, which "
+ "gets added into the PYTHONPATH of the Python Worker. ")
.build());

public static final ConfigOption<String> PYTHON_ARCHIVES =
ConfigOptions.key("python.archives")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -166,6 +167,13 @@ public Map<String, String> constructEnvironmentVariables(String baseDirectory)

constructFilesDirectory(env, baseDirectory);

if (dependencyInfo.getPythonPath().isPresent()) {
appendToPythonPath(
env, Collections.singletonList(dependencyInfo.getPythonPath().get()));
}

LOG.info("PYTHONPATH of python worker: {}", env.get("PYTHONPATH"));

constructRequirementsDirectory(env, baseDirectory);

constructArchivesDirectory(env, baseDirectory);
Expand Down Expand Up @@ -294,7 +302,6 @@ private void constructFilesDirectory(Map<String, String> env, String baseDirecto
pythonFilePaths.add(pythonPath);
}
appendToPythonPath(env, pythonFilePaths);
LOG.info("PYTHONPATH of python worker: {}", env.get("PYTHONPATH"));
}

private void constructRequirementsDirectory(Map<String, String> env, String baseDirectory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static org.apache.flink.python.PythonOptions.PYTHON_EXECUTABLE;
import static org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE;
import static org.apache.flink.python.PythonOptions.PYTHON_FILES_DISTRIBUTED_CACHE_INFO;
import static org.apache.flink.python.PythonOptions.PYTHON_PATH;
import static org.apache.flink.python.PythonOptions.PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO;

/** PythonDependencyInfo contains the information of third-party dependencies. */
Expand All @@ -63,6 +64,8 @@ public final class PythonDependencyInfo {
*/
@Nullable private final String requirementsCacheDir;

@Nullable private final String pythonPath;

/**
* The python archives uploaded by TableEnvironment#add_python_archive() or command line option
* "-pyarch". The key is the path of the archive file and the value is the name of the directory
Expand Down Expand Up @@ -91,7 +94,8 @@ public PythonDependencyInfo(
requirementsCacheDir,
archives,
pythonExec,
PYTHON_EXECUTION_MODE.defaultValue());
PYTHON_EXECUTION_MODE.defaultValue(),
PYTHON_PATH.defaultValue());
}

public PythonDependencyInfo(
Expand All @@ -100,13 +104,15 @@ public PythonDependencyInfo(
@Nullable String requirementsCacheDir,
@Nonnull Map<String, String> archives,
@Nonnull String pythonExec,
@Nonnull String executionMode) {
@Nonnull String executionMode,
@Nullable String pythonPath) {
this.pythonFiles = Objects.requireNonNull(pythonFiles);
this.requirementsFilePath = requirementsFilePath;
this.requirementsCacheDir = requirementsCacheDir;
this.pythonExec = Objects.requireNonNull(pythonExec);
this.archives = Objects.requireNonNull(archives);
this.executionMode = Objects.requireNonNull(executionMode);
this.pythonPath = pythonPath;
}

public Map<String, String> getPythonFiles() {
Expand All @@ -133,6 +139,10 @@ public String getExecutionMode() {
return executionMode;
}

public Optional<String> getPythonPath() {
return Optional.ofNullable(pythonPath);
}

/**
* Creates PythonDependencyInfo from GlobalJobParameters and DistributedCache.
*
Expand Down Expand Up @@ -190,6 +200,7 @@ public static PythonDependencyInfo create(
requirementsCacheDir,
archives,
pythonExec,
config.get(PYTHON_EXECUTION_MODE));
config.get(PYTHON_EXECUTION_MODE),
config.get(PYTHON_PATH));
}
}

0 comments on commit 2e0efe4

Please sign in to comment.