Skip to content

Commit

Permalink
[FLINK-26190][python] Remove getTableConfig from ExecNodeConfiguration
Browse files Browse the repository at this point in the history
This closes apache#19333.
  • Loading branch information
dianfu committed Apr 7, 2022
1 parent a9a7d22 commit 02f0984
Show file tree
Hide file tree
Showing 65 changed files with 374 additions and 683 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ def startup_loopback_server():
BeamFnLoopbackWorkerPoolServicer
config = Configuration(j_configuration=j_configuration)
config.set_string(
"PYFLINK_LOOPBACK_SERVER_ADDRESS", BeamFnLoopbackWorkerPoolServicer().start())
"python.loopback-server.address", BeamFnLoopbackWorkerPoolServicer().start())

python_worker_execution_mode = os.environ.get('_python_worker_execution_mode')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,10 @@ def plus_three(value):
from test_dep2 import add_three
return add_three(value)

env.add_python_file(python_file_path)
t_env = StreamTableEnvironment.create(
stream_execution_environment=env,
environment_settings=EnvironmentSettings.in_streaming_mode())
env.add_python_file(python_file_path)

from pyflink.table.udf import udf
from pyflink.table.expressions import col
Expand Down Expand Up @@ -678,13 +678,15 @@ def add_from_file(i):
# The parallelism of Sink: Test Sink should be 4
self.assertEqual(nodes[4]['parallelism'], 4)

env_config_with_dependencies = dict(get_gateway().jvm.org.apache.flink.python.util
.PythonConfigUtil.getEnvConfigWithDependencies(
env._j_stream_execution_environment).toMap())
python_dependency_config = dict(
get_gateway().jvm.org.apache.flink.python.util.PythonDependencyUtils.
configurePythonDependencies(
env._j_stream_execution_environment.getCachedFiles(),
env._j_stream_execution_environment.getConfiguration()).toMap())

# Make sure that user specified files and archives are correctly added.
self.assertIsNotNone(env_config_with_dependencies['python.files'])
self.assertIsNotNone(env_config_with_dependencies['python.archives'])
self.assertIsNotNone(python_dependency_config['python.internal.files-key-map'])
self.assertIsNotNone(python_dependency_config['python.internal.archives-key-map'])

def test_register_slot_sharing_group(self):
slot_sharing_group_1 = SlotSharingGroup.builder('slot_sharing_group_1') \
Expand Down
4 changes: 2 additions & 2 deletions flink-python/pyflink/fn_execution/beam/beam_boot.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ def check_not_empty(check_str, error_message):

logging.info("Initializing Python harness: %s" % " ".join(sys.argv))

if 'PYFLINK_LOOPBACK_SERVER_ADDRESS' in os.environ:
if 'PYTHON_LOOPBACK_SERVER_ADDRESS' in os.environ:
logging.info("Starting up Python harness in loopback mode.")

params = dict(os.environ)
params.update({'SEMI_PERSISTENT_DIRECTORY': semi_persist_dir})
with grpc.insecure_channel(os.environ['PYFLINK_LOOPBACK_SERVER_ADDRESS']) as channel:
with grpc.insecure_channel(os.environ['PYTHON_LOOPBACK_SERVER_ADDRESS']) as channel:
client = BeamFnExternalWorkerPoolStub(channel=channel)
request = StartWorkerRequest(
worker_id=worker_id,
Expand Down
6 changes: 3 additions & 3 deletions flink-python/pyflink/fn_execution/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ def _to_field_coder(cls, coder_info_descriptor_proto):
field_names = [f.name for f in schema_proto.fields]
return RowCoder(field_coders, field_names)
elif coder_info_descriptor_proto.HasField('arrow_type'):
timezone = pytz.timezone(os.environ['table.exec.timezone'])
timezone = pytz.timezone(os.environ['TABLE_LOCAL_TIME_ZONE'])
schema_proto = coder_info_descriptor_proto.arrow_type.schema
row_type = cls._to_row_type(schema_proto)
return ArrowCoder(cls._to_arrow_schema(row_type), row_type, timezone)
elif coder_info_descriptor_proto.HasField('over_window_arrow_type'):
timezone = pytz.timezone(os.environ['table.exec.timezone'])
timezone = pytz.timezone(os.environ['TABLE_LOCAL_TIME_ZONE'])
schema_proto = coder_info_descriptor_proto.over_window_arrow_type.schema
row_type = cls._to_row_type(schema_proto)
return OverWindowArrowCoder(
Expand Down Expand Up @@ -633,7 +633,7 @@ def from_proto(field_type):
if field_type_name == type_name.TIMESTAMP:
return TimestampCoder(field_type.timestamp_info.precision)
if field_type_name == type_name.LOCAL_ZONED_TIMESTAMP:
timezone = pytz.timezone(os.environ['table.exec.timezone'])
timezone = pytz.timezone(os.environ['TABLE_LOCAL_TIME_ZONE'])
return LocalZonedTimestampCoder(field_type.local_zoned_timestamp_info.precision, timezone)
elif field_type_name == type_name.BASIC_ARRAY:
return GenericArrayCoder(from_proto(field_type.collection_element_type))
Expand Down
8 changes: 2 additions & 6 deletions flink-python/pyflink/table/table_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -1625,14 +1625,10 @@ def _config_chaining_optimization(self):
def _open(self):
# start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
def startup_loopback_server():
from pyflink.common import Configuration
from pyflink.fn_execution.beam.beam_worker_pool_service import \
BeamFnLoopbackWorkerPoolServicer

j_configuration = get_j_env_configuration(self._get_j_env())
config = Configuration(j_configuration=j_configuration)
config.set_string(
"PYFLINK_LOOPBACK_SERVER_ADDRESS", BeamFnLoopbackWorkerPoolServicer().start())
self.get_config().set("python.loopback-server.address",
BeamFnLoopbackWorkerPoolServicer().start())

python_worker_execution_mode = os.environ.get('_python_worker_execution_mode')

Expand Down
189 changes: 69 additions & 120 deletions flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,152 +19,101 @@
package org.apache.flink.python;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.python.util.PythonDependencyUtils;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.flink.python.PythonOptions.PYTHON_LOOPBACK_SERVER_ADDRESS;

/** Configurations for the Python job which are used at run time. */
@Internal
public class PythonConfig implements Serializable {

private static final long serialVersionUID = 1L;

/** Max number of elements to include in a bundle. */
private final int maxBundleSize;

/** Max duration of a bundle. */
private final long maxBundleTimeMills;
public class PythonConfig implements ReadableConfig {

/** Max number of elements to include in an arrow batch. */
private final int maxArrowBatchSize;
private static final List<ConfigOption<?>> PYTHON_CONFIG_OPTIONS;

/**
* The python files uploaded by pyflink.table.TableEnvironment#add_python_file() or command line
* option "-pyfs". The key is the file key in distribute cache and the value is the
* corresponding origin file name.
*/
private final Map<String, String> pythonFilesInfo;

/**
* The file key of the requirements file in distribute cache. It is specified by
* pyflink.table.TableEnvironment#set_python_requirements() or command line option "-pyreq".
*/
@Nullable private final String pythonRequirementsFileInfo;

/**
* The file key of the requirements cached directory in distribute cache. It is specified by
* pyflink.table.TableEnvironment#set_python_requirements() or command line option "-pyreq". It
* is used to support installing python packages offline.
*/
@Nullable private final String pythonRequirementsCacheDirInfo;
static {
PYTHON_CONFIG_OPTIONS =
new ArrayList<>(ConfigUtils.getAllConfigOptions(PythonOptions.class));
}

/**
* The python archives uploaded by pyflink.table.TableEnvironment#add_python_archive() or
* command line option "-pyarch". The key is the file key of the archives in distribute cache
* and the value is the name of the directory to extract to.
* Configuration adopted from the outer layer, e.g. flink-conf.yaml, command line arguments,
* TableConfig, etc.
*/
private final Map<String, String> pythonArchivesInfo;
private final ReadableConfig configuration;

/**
* The path of the python interpreter (e.g. /usr/local/bin/python) specified by
* pyflink.table.TableConfig#set_python_executable() or command line option "-pyexec".
* Configuration generated in the dependency management mechanisms. See {@link
* PythonDependencyUtils.PythonDependencyManager} for more details.
*/
private final String pythonExec;

/** Whether metric is enabled. */
private final boolean metricEnabled;

/** Whether to use managed memory for the Python worker. */
private final boolean isUsingManagedMemory;

/** The Configuration that contains execution configs and dependencies info. */
private final Configuration config;

/** Whether profile is enabled. */
private final boolean profileEnabled;

/** Execution Mode. */
private final String executionMode;

public PythonConfig(Configuration config) {
this.config = config;
maxBundleSize = config.get(PythonOptions.MAX_BUNDLE_SIZE);
maxBundleTimeMills = config.get(PythonOptions.MAX_BUNDLE_TIME_MILLS);
maxArrowBatchSize = config.get(PythonOptions.MAX_ARROW_BATCH_SIZE);
pythonFilesInfo =
config.getOptional(PythonDependencyUtils.PYTHON_FILES).orElse(new HashMap<>());
pythonRequirementsFileInfo =
config.getOptional(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE)
.orElse(new HashMap<>())
.get(PythonDependencyUtils.FILE);
pythonRequirementsCacheDirInfo =
config.getOptional(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE)
.orElse(new HashMap<>())
.get(PythonDependencyUtils.CACHE);
pythonArchivesInfo =
config.getOptional(PythonDependencyUtils.PYTHON_ARCHIVES).orElse(new HashMap<>());
pythonExec = config.get(PythonOptions.PYTHON_EXECUTABLE);
metricEnabled = config.getBoolean(PythonOptions.PYTHON_METRIC_ENABLED);
isUsingManagedMemory = config.getBoolean(PythonOptions.USE_MANAGED_MEMORY);
profileEnabled = config.getBoolean(PythonOptions.PYTHON_PROFILE_ENABLED);
executionMode = config.getString(PythonOptions.PYTHON_EXECUTION_MODE);
}

public int getMaxBundleSize() {
return maxBundleSize;
}
private final ReadableConfig pythonDependencyConfiguration;

public long getMaxBundleTimeMills() {
return maxBundleTimeMills;
public PythonConfig(
ReadableConfig configuration, ReadableConfig pythonDependencyConfiguration) {
this.configuration = Preconditions.checkNotNull(configuration);
this.pythonDependencyConfiguration =
Preconditions.checkNotNull(pythonDependencyConfiguration);
}

public int getMaxArrowBatchSize() {
return maxArrowBatchSize;
@Override
public <T> T get(ConfigOption<T> option) {
return pythonDependencyConfiguration
.getOptional(option)
.orElseGet(() -> configuration.get(option));
}

public Map<String, String> getPythonFilesInfo() {
return pythonFilesInfo;
@Override
public <T> Optional<T> getOptional(ConfigOption<T> option) {
final Optional<T> value = pythonDependencyConfiguration.getOptional(option);
if (value.isPresent()) {
return value;
}
return configuration.getOptional(option);
}

public Optional<String> getPythonRequirementsFileInfo() {
return Optional.ofNullable(pythonRequirementsFileInfo);
}

public Optional<String> getPythonRequirementsCacheDirInfo() {
return Optional.ofNullable(pythonRequirementsCacheDirInfo);
}

public Map<String, String> getPythonArchivesInfo() {
return pythonArchivesInfo;
}

public String getPythonExec() {
return pythonExec;
}

public String getExecutionMode() {
return executionMode;
}
public Configuration toConfiguration() {
final Configuration config = new Configuration();
PYTHON_CONFIG_OPTIONS.forEach(
option ->
getOptional((ConfigOption) option)
.ifPresent(v -> config.set((ConfigOption) option, v)));

// prepare the job options
Map<String, String> jobOptions = config.get(PythonOptions.PYTHON_JOB_OPTIONS);
if (jobOptions == null) {
jobOptions = new HashMap<>();
config.set(PythonOptions.PYTHON_JOB_OPTIONS, jobOptions);
}
jobOptions.put("TABLE_LOCAL_TIME_ZONE", getLocalTimeZone(configuration).getId());
if (config.contains(PYTHON_LOOPBACK_SERVER_ADDRESS)) {
jobOptions.put(
"PYTHON_LOOPBACK_SERVER_ADDRESS", config.get(PYTHON_LOOPBACK_SERVER_ADDRESS));
}

public boolean isMetricEnabled() {
return metricEnabled;
}

public boolean isProfileEnabled() {
return profileEnabled;
}

public boolean isUsingManagedMemory() {
return isUsingManagedMemory;
return config;
}

public Configuration getConfig() {
return config;
/**
* Returns the current session time zone id. It is used when converting to/from {@code TIMESTAMP
* WITH LOCAL TIME ZONE}.
*
* @see org.apache.flink.table.types.logical.LocalZonedTimestampType
*/
private static ZoneId getLocalTimeZone(ReadableConfig config) {
String zone = config.get(TableConfigOptions.LOCAL_TIME_ZONE);
return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
? ZoneId.systemDefault()
: ZoneId.of(zone);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ReadableConfig;

/** The base interface of runner which is responsible for the execution of Python functions. */
@Internal
Expand All @@ -28,7 +29,7 @@ public interface PythonFunctionRunner {
/**
* Prepares the Python function runner, such as preparing the Python execution environment, etc.
*/
void open(PythonConfig config) throws Exception;
void open(ReadableConfig config) throws Exception;

/** Tear-down the Python function runner. */
void close() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -20,11 +20,14 @@

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.description.Description;

import java.util.Map;

/** Configuration options for the Python API. */
@PublicEvolving
public class PythonOptions {
Expand Down Expand Up @@ -236,4 +239,35 @@ public class PythonOptions {
+ "The `thread` mode means that the Python user-defined functions will be executed in the same process of the Java operator. "
+ "Note that currently it still doesn't support to execute Python user-defined functions in `thread` mode in all places. "
+ "It will fall back to `process` mode in these cases.");

// ------------------------------------------------------------------------------------------
// config options used for internal purpose
// ------------------------------------------------------------------------------------------

@Documentation.ExcludeFromDocumentation(
"Internal use only. The options will be exported as environment variables which could be accessed in Python worker process.")
public static final ConfigOption<Map<String, String>> PYTHON_JOB_OPTIONS =
ConfigOptions.key("python.job-options").mapType().noDefaultValue();

@Documentation.ExcludeFromDocumentation(
"Internal use only. The distributed cache entries for 'python.files'.")
public static final ConfigOption<Map<String, String>> PYTHON_FILES_DISTRIBUTED_CACHE_INFO =
ConfigOptions.key("python.internal.files-key-map").mapType().noDefaultValue();

@Documentation.ExcludeFromDocumentation(
"Internal use only. The distributed cache entries for 'python.requirements'.")
public static final ConfigOption<Map<String, String>>
PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO =
ConfigOptions.key("python.internal.requirements-file-key")
.mapType()
.noDefaultValue();

@Documentation.ExcludeFromDocumentation(
"Internal use only. The distributed cache entries for 'python.archives'.")
public static final ConfigOption<Map<String, String>> PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO =
ConfigOptions.key("python.internal.archives-key-map").mapType().noDefaultValue();

@Documentation.ExcludeFromDocumentation("Internal use only. Used for local debug.")
public static final ConfigOption<String> PYTHON_LOOPBACK_SERVER_ADDRESS =
ConfigOptions.key("python.loopback-server.address").stringType().noDefaultValue();
}
Loading

0 comments on commit 02f0984

Please sign in to comment.