From 02f09845cef2d337c04b7032d08007b73555d976 Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Fri, 25 Mar 2022 14:19:08 +0800 Subject: [PATCH] [FLINK-26190][python] Remove getTableConfig from ExecNodeConfiguration This closes #19333. --- .../stream_execution_environment.py | 2 +- .../test_stream_execution_environment.py | 14 +- .../pyflink/fn_execution/beam/beam_boot.py | 4 +- flink-python/pyflink/fn_execution/coders.py | 6 +- .../pyflink/table/table_environment.py | 8 +- .../org/apache/flink/python/PythonConfig.java | 189 +++++++----------- .../flink/python/PythonFunctionRunner.java | 3 +- .../apache/flink/python/PythonOptions.java | 36 +++- .../python/env/PythonDependencyInfo.java | 52 +++-- .../flink/python/util/PythonConfigUtil.java | 92 ++------- .../python/util/PythonDependencyUtils.java | 97 ++++----- ...bstractEmbeddedPythonFunctionOperator.java | 15 +- ...bstractExternalPythonFunctionOperator.java | 5 +- .../AbstractPythonFunctionOperator.java | 35 +--- .../python/PythonCoProcessOperator.java | 1 - .../python/PythonKeyedCoProcessOperator.java | 1 - .../python/PythonKeyedProcessOperator.java | 1 - .../python/PythonProcessOperator.java | 1 - .../BeamDataStreamPythonFunctionRunner.java | 9 +- .../python/beam/BeamPythonFunctionRunner.java | 66 +++--- .../beam/SimpleStateRequestHandler.java | 18 +- .../AbstractStatelessFunctionOperator.java | 1 - ...AbstractPythonStreamAggregateOperator.java | 17 +- ...tArrowPythonAggregateFunctionOperator.java | 6 +- ...onOverWindowAggregateFunctionOperator.java | 6 +- .../AbstractPythonScalarFunctionOperator.java | 7 +- .../EmbeddedPythonScalarFunctionOperator.java | 7 +- .../ArrowPythonScalarFunctionOperator.java | 3 +- .../table/PythonTableFunctionOperator.java | 6 +- .../beam/BeamTablePythonFunctionRunner.java | 7 - .../apache/flink/python/PythonConfigTest.java | 146 -------------- .../python/env/PythonDependencyInfoTest.java | 28 ++- .../python/util/PythonConfigUtilTest.java | 12 -- .../util/PythonDependencyUtilsTest.java | 42 ++-- ...honStreamGroupWindowAggregateOperator.java | 1 - ...ythonStreamGroupAggregateOperatorTest.java | 2 - ...StreamGroupTableAggregateOperatorTest.java | 2 - ...honGroupAggregateFunctionOperatorTest.java | 2 - ...upWindowAggregateFunctionOperatorTest.java | 2 - ...erWindowAggregateFunctionOperatorTest.java | 2 - ...upWindowAggregateFunctionOperatorTest.java | 2 - ...ythonProcTimeBoundedRangeOperatorTest.java | 2 - ...PythonProcTimeBoundedRowsOperatorTest.java | 2 - ...PythonRowTimeBoundedRangeOperatorTest.java | 2 - ...wPythonRowTimeBoundedRowsOperatorTest.java | 2 - .../PythonScalarFunctionOperatorTest.java | 2 - ...ArrowPythonScalarFunctionOperatorTest.java | 2 - .../PythonTableFunctionOperatorTest.java | 2 - ...sThroughPythonAggregateFunctionRunner.java | 7 +- ...PassThroughPythonScalarFunctionRunner.java | 3 - .../PassThroughPythonTableFunctionRunner.java | 3 - ...ghStreamAggregatePythonFunctionRunner.java | 3 - ...upWindowAggregatePythonFunctionRunner.java | 4 - ...eamTableAggregatePythonFunctionRunner.java | 3 - .../plan/nodes/exec/ExecNodeConfig.java | 21 +- .../batch/BatchExecPythonGroupAggregate.java | 2 +- .../BatchExecPythonGroupWindowAggregate.java | 2 +- .../batch/BatchExecPythonOverAggregate.java | 2 +- .../exec/common/CommonExecPythonCalc.java | 2 +- .../common/CommonExecPythonCorrelate.java | 2 +- .../StreamExecPythonGroupAggregate.java | 2 +- .../StreamExecPythonGroupTableAggregate.java | 2 +- .../StreamExecPythonGroupWindowAggregate.java | 2 +- .../stream/StreamExecPythonOverAggregate.java | 2 +- .../nodes/exec/utils/CommonPythonUtil.java | 25 +-- 65 files changed, 374 insertions(+), 683 deletions(-) delete mode 100644 flink-python/src/test/java/org/apache/flink/python/PythonConfigTest.java diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py index e7619446ed184..076b036bc5a8a 100644 --- a/flink-python/pyflink/datastream/stream_execution_environment.py +++ b/flink-python/pyflink/datastream/stream_execution_environment.py @@ -980,7 +980,7 @@ def startup_loopback_server(): BeamFnLoopbackWorkerPoolServicer config = Configuration(j_configuration=j_configuration) config.set_string( - "PYFLINK_LOOPBACK_SERVER_ADDRESS", BeamFnLoopbackWorkerPoolServicer().start()) + "python.loopback-server.address", BeamFnLoopbackWorkerPoolServicer().start()) python_worker_execution_mode = os.environ.get('_python_worker_execution_mode') diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py index 17f8054cb6316..ca9d1ab292128 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py @@ -445,10 +445,10 @@ def plus_three(value): from test_dep2 import add_three return add_three(value) + env.add_python_file(python_file_path) t_env = StreamTableEnvironment.create( stream_execution_environment=env, environment_settings=EnvironmentSettings.in_streaming_mode()) - env.add_python_file(python_file_path) from pyflink.table.udf import udf from pyflink.table.expressions import col @@ -678,13 +678,15 @@ def add_from_file(i): # The parallelism of Sink: Test Sink should be 4 self.assertEqual(nodes[4]['parallelism'], 4) - env_config_with_dependencies = dict(get_gateway().jvm.org.apache.flink.python.util - .PythonConfigUtil.getEnvConfigWithDependencies( - env._j_stream_execution_environment).toMap()) + python_dependency_config = dict( + get_gateway().jvm.org.apache.flink.python.util.PythonDependencyUtils. + configurePythonDependencies( + env._j_stream_execution_environment.getCachedFiles(), + env._j_stream_execution_environment.getConfiguration()).toMap()) # Make sure that user specified files and archives are correctly added. - self.assertIsNotNone(env_config_with_dependencies['python.files']) - self.assertIsNotNone(env_config_with_dependencies['python.archives']) + self.assertIsNotNone(python_dependency_config['python.internal.files-key-map']) + self.assertIsNotNone(python_dependency_config['python.internal.archives-key-map']) def test_register_slot_sharing_group(self): slot_sharing_group_1 = SlotSharingGroup.builder('slot_sharing_group_1') \ diff --git a/flink-python/pyflink/fn_execution/beam/beam_boot.py b/flink-python/pyflink/fn_execution/beam/beam_boot.py index 411745c0c23e6..4908944864ce6 100644 --- a/flink-python/pyflink/fn_execution/beam/beam_boot.py +++ b/flink-python/pyflink/fn_execution/beam/beam_boot.py @@ -75,12 +75,12 @@ def check_not_empty(check_str, error_message): logging.info("Initializing Python harness: %s" % " ".join(sys.argv)) - if 'PYFLINK_LOOPBACK_SERVER_ADDRESS' in os.environ: + if 'PYTHON_LOOPBACK_SERVER_ADDRESS' in os.environ: logging.info("Starting up Python harness in loopback mode.") params = dict(os.environ) params.update({'SEMI_PERSISTENT_DIRECTORY': semi_persist_dir}) - with grpc.insecure_channel(os.environ['PYFLINK_LOOPBACK_SERVER_ADDRESS']) as channel: + with grpc.insecure_channel(os.environ['PYTHON_LOOPBACK_SERVER_ADDRESS']) as channel: client = BeamFnExternalWorkerPoolStub(channel=channel) request = StartWorkerRequest( worker_id=worker_id, diff --git a/flink-python/pyflink/fn_execution/coders.py b/flink-python/pyflink/fn_execution/coders.py index 2ddd2f30109f2..7e43f78b2b243 100644 --- a/flink-python/pyflink/fn_execution/coders.py +++ b/flink-python/pyflink/fn_execution/coders.py @@ -79,12 +79,12 @@ def _to_field_coder(cls, coder_info_descriptor_proto): field_names = [f.name for f in schema_proto.fields] return RowCoder(field_coders, field_names) elif coder_info_descriptor_proto.HasField('arrow_type'): - timezone = pytz.timezone(os.environ['table.exec.timezone']) + timezone = pytz.timezone(os.environ['TABLE_LOCAL_TIME_ZONE']) schema_proto = coder_info_descriptor_proto.arrow_type.schema row_type = cls._to_row_type(schema_proto) return ArrowCoder(cls._to_arrow_schema(row_type), row_type, timezone) elif coder_info_descriptor_proto.HasField('over_window_arrow_type'): - timezone = pytz.timezone(os.environ['table.exec.timezone']) + timezone = pytz.timezone(os.environ['TABLE_LOCAL_TIME_ZONE']) schema_proto = coder_info_descriptor_proto.over_window_arrow_type.schema row_type = cls._to_row_type(schema_proto) return OverWindowArrowCoder( @@ -633,7 +633,7 @@ def from_proto(field_type): if field_type_name == type_name.TIMESTAMP: return TimestampCoder(field_type.timestamp_info.precision) if field_type_name == type_name.LOCAL_ZONED_TIMESTAMP: - timezone = pytz.timezone(os.environ['table.exec.timezone']) + timezone = pytz.timezone(os.environ['TABLE_LOCAL_TIME_ZONE']) return LocalZonedTimestampCoder(field_type.local_zoned_timestamp_info.precision, timezone) elif field_type_name == type_name.BASIC_ARRAY: return GenericArrayCoder(from_proto(field_type.collection_element_type)) diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index bf8a364d2b55a..3a06c04908c56 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -1625,14 +1625,10 @@ def _config_chaining_optimization(self): def _open(self): # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster def startup_loopback_server(): - from pyflink.common import Configuration from pyflink.fn_execution.beam.beam_worker_pool_service import \ BeamFnLoopbackWorkerPoolServicer - - j_configuration = get_j_env_configuration(self._get_j_env()) - config = Configuration(j_configuration=j_configuration) - config.set_string( - "PYFLINK_LOOPBACK_SERVER_ADDRESS", BeamFnLoopbackWorkerPoolServicer().start()) + self.get_config().set("python.loopback-server.address", + BeamFnLoopbackWorkerPoolServicer().start()) python_worker_execution_mode = os.environ.get('_python_worker_execution_mode') diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java index 5195f1f78fa13..e5c24f9beba8b 100644 --- a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java +++ b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java @@ -19,152 +19,101 @@ package org.apache.flink.python; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.python.util.PythonDependencyUtils; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.util.Preconditions; -import javax.annotation.Nullable; - -import java.io.Serializable; +import java.time.ZoneId; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.flink.python.PythonOptions.PYTHON_LOOPBACK_SERVER_ADDRESS; + /** Configurations for the Python job which are used at run time. */ @Internal -public class PythonConfig implements Serializable { - - private static final long serialVersionUID = 1L; - - /** Max number of elements to include in a bundle. */ - private final int maxBundleSize; - - /** Max duration of a bundle. */ - private final long maxBundleTimeMills; +public class PythonConfig implements ReadableConfig { - /** Max number of elements to include in an arrow batch. */ - private final int maxArrowBatchSize; + private static final List> PYTHON_CONFIG_OPTIONS; - /** - * The python files uploaded by pyflink.table.TableEnvironment#add_python_file() or command line - * option "-pyfs". The key is the file key in distribute cache and the value is the - * corresponding origin file name. - */ - private final Map pythonFilesInfo; - - /** - * The file key of the requirements file in distribute cache. It is specified by - * pyflink.table.TableEnvironment#set_python_requirements() or command line option "-pyreq". - */ - @Nullable private final String pythonRequirementsFileInfo; - - /** - * The file key of the requirements cached directory in distribute cache. It is specified by - * pyflink.table.TableEnvironment#set_python_requirements() or command line option "-pyreq". It - * is used to support installing python packages offline. - */ - @Nullable private final String pythonRequirementsCacheDirInfo; + static { + PYTHON_CONFIG_OPTIONS = + new ArrayList<>(ConfigUtils.getAllConfigOptions(PythonOptions.class)); + } /** - * The python archives uploaded by pyflink.table.TableEnvironment#add_python_archive() or - * command line option "-pyarch". The key is the file key of the archives in distribute cache - * and the value is the name of the directory to extract to. + * Configuration adopted from the outer layer, e.g. flink-conf.yaml, command line arguments, + * TableConfig, etc. */ - private final Map pythonArchivesInfo; + private final ReadableConfig configuration; /** - * The path of the python interpreter (e.g. /usr/local/bin/python) specified by - * pyflink.table.TableConfig#set_python_executable() or command line option "-pyexec". + * Configuration generated in the dependency management mechanisms. See {@link + * PythonDependencyUtils.PythonDependencyManager} for more details. */ - private final String pythonExec; - - /** Whether metric is enabled. */ - private final boolean metricEnabled; - - /** Whether to use managed memory for the Python worker. */ - private final boolean isUsingManagedMemory; - - /** The Configuration that contains execution configs and dependencies info. */ - private final Configuration config; - - /** Whether profile is enabled. */ - private final boolean profileEnabled; - - /** Execution Mode. */ - private final String executionMode; - - public PythonConfig(Configuration config) { - this.config = config; - maxBundleSize = config.get(PythonOptions.MAX_BUNDLE_SIZE); - maxBundleTimeMills = config.get(PythonOptions.MAX_BUNDLE_TIME_MILLS); - maxArrowBatchSize = config.get(PythonOptions.MAX_ARROW_BATCH_SIZE); - pythonFilesInfo = - config.getOptional(PythonDependencyUtils.PYTHON_FILES).orElse(new HashMap<>()); - pythonRequirementsFileInfo = - config.getOptional(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE) - .orElse(new HashMap<>()) - .get(PythonDependencyUtils.FILE); - pythonRequirementsCacheDirInfo = - config.getOptional(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE) - .orElse(new HashMap<>()) - .get(PythonDependencyUtils.CACHE); - pythonArchivesInfo = - config.getOptional(PythonDependencyUtils.PYTHON_ARCHIVES).orElse(new HashMap<>()); - pythonExec = config.get(PythonOptions.PYTHON_EXECUTABLE); - metricEnabled = config.getBoolean(PythonOptions.PYTHON_METRIC_ENABLED); - isUsingManagedMemory = config.getBoolean(PythonOptions.USE_MANAGED_MEMORY); - profileEnabled = config.getBoolean(PythonOptions.PYTHON_PROFILE_ENABLED); - executionMode = config.getString(PythonOptions.PYTHON_EXECUTION_MODE); - } - - public int getMaxBundleSize() { - return maxBundleSize; - } + private final ReadableConfig pythonDependencyConfiguration; - public long getMaxBundleTimeMills() { - return maxBundleTimeMills; + public PythonConfig( + ReadableConfig configuration, ReadableConfig pythonDependencyConfiguration) { + this.configuration = Preconditions.checkNotNull(configuration); + this.pythonDependencyConfiguration = + Preconditions.checkNotNull(pythonDependencyConfiguration); } - public int getMaxArrowBatchSize() { - return maxArrowBatchSize; + @Override + public T get(ConfigOption option) { + return pythonDependencyConfiguration + .getOptional(option) + .orElseGet(() -> configuration.get(option)); } - public Map getPythonFilesInfo() { - return pythonFilesInfo; + @Override + public Optional getOptional(ConfigOption option) { + final Optional value = pythonDependencyConfiguration.getOptional(option); + if (value.isPresent()) { + return value; + } + return configuration.getOptional(option); } - public Optional getPythonRequirementsFileInfo() { - return Optional.ofNullable(pythonRequirementsFileInfo); - } - - public Optional getPythonRequirementsCacheDirInfo() { - return Optional.ofNullable(pythonRequirementsCacheDirInfo); - } - - public Map getPythonArchivesInfo() { - return pythonArchivesInfo; - } - - public String getPythonExec() { - return pythonExec; - } - - public String getExecutionMode() { - return executionMode; - } + public Configuration toConfiguration() { + final Configuration config = new Configuration(); + PYTHON_CONFIG_OPTIONS.forEach( + option -> + getOptional((ConfigOption) option) + .ifPresent(v -> config.set((ConfigOption) option, v))); + + // prepare the job options + Map jobOptions = config.get(PythonOptions.PYTHON_JOB_OPTIONS); + if (jobOptions == null) { + jobOptions = new HashMap<>(); + config.set(PythonOptions.PYTHON_JOB_OPTIONS, jobOptions); + } + jobOptions.put("TABLE_LOCAL_TIME_ZONE", getLocalTimeZone(configuration).getId()); + if (config.contains(PYTHON_LOOPBACK_SERVER_ADDRESS)) { + jobOptions.put( + "PYTHON_LOOPBACK_SERVER_ADDRESS", config.get(PYTHON_LOOPBACK_SERVER_ADDRESS)); + } - public boolean isMetricEnabled() { - return metricEnabled; - } - - public boolean isProfileEnabled() { - return profileEnabled; - } - - public boolean isUsingManagedMemory() { - return isUsingManagedMemory; + return config; } - public Configuration getConfig() { - return config; + /** + * Returns the current session time zone id. It is used when converting to/from {@code TIMESTAMP + * WITH LOCAL TIME ZONE}. + * + * @see org.apache.flink.table.types.logical.LocalZonedTimestampType + */ + private static ZoneId getLocalTimeZone(ReadableConfig config) { + String zone = config.get(TableConfigOptions.LOCAL_TIME_ZONE); + return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone) + ? ZoneId.systemDefault() + : ZoneId.of(zone); } } diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java index 710a04605555d..4a4174a768d74 100644 --- a/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ReadableConfig; /** The base interface of runner which is responsible for the execution of Python functions. */ @Internal @@ -28,7 +29,7 @@ public interface PythonFunctionRunner { /** * Prepares the Python function runner, such as preparing the Python execution environment, etc. */ - void open(PythonConfig config) throws Exception; + void open(ReadableConfig config) throws Exception; /** Tear-down the Python function runner. */ void close() throws Exception; diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java index 78cd3b5556ca9..47bfbd3d56709 100644 --- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java +++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,11 +20,14 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.description.Description; +import java.util.Map; + /** Configuration options for the Python API. */ @PublicEvolving public class PythonOptions { @@ -236,4 +239,35 @@ public class PythonOptions { + "The `thread` mode means that the Python user-defined functions will be executed in the same process of the Java operator. " + "Note that currently it still doesn't support to execute Python user-defined functions in `thread` mode in all places. " + "It will fall back to `process` mode in these cases."); + + // ------------------------------------------------------------------------------------------ + // config options used for internal purpose + // ------------------------------------------------------------------------------------------ + + @Documentation.ExcludeFromDocumentation( + "Internal use only. The options will be exported as environment variables which could be accessed in Python worker process.") + public static final ConfigOption> PYTHON_JOB_OPTIONS = + ConfigOptions.key("python.job-options").mapType().noDefaultValue(); + + @Documentation.ExcludeFromDocumentation( + "Internal use only. The distributed cache entries for 'python.files'.") + public static final ConfigOption> PYTHON_FILES_DISTRIBUTED_CACHE_INFO = + ConfigOptions.key("python.internal.files-key-map").mapType().noDefaultValue(); + + @Documentation.ExcludeFromDocumentation( + "Internal use only. The distributed cache entries for 'python.requirements'.") + public static final ConfigOption> + PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO = + ConfigOptions.key("python.internal.requirements-file-key") + .mapType() + .noDefaultValue(); + + @Documentation.ExcludeFromDocumentation( + "Internal use only. The distributed cache entries for 'python.archives'.") + public static final ConfigOption> PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO = + ConfigOptions.key("python.internal.archives-key-map").mapType().noDefaultValue(); + + @Documentation.ExcludeFromDocumentation("Internal use only. Used for local debug.") + public static final ConfigOption PYTHON_LOOPBACK_SERVER_ADDRESS = + ConfigOptions.key("python.loopback-server.address").stringType().noDefaultValue(); } diff --git a/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java b/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java index 4ef685203b537..80f570b271491 100644 --- a/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java +++ b/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java @@ -20,8 +20,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.cache.DistributedCache; -import org.apache.flink.python.PythonConfig; -import org.apache.flink.python.PythonOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.python.util.PythonDependencyUtils; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -33,6 +33,12 @@ import java.util.Objects; import java.util.Optional; +import static org.apache.flink.python.PythonOptions.PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO; +import static org.apache.flink.python.PythonOptions.PYTHON_EXECUTABLE; +import static org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE; +import static org.apache.flink.python.PythonOptions.PYTHON_FILES_DISTRIBUTED_CACHE_INFO; +import static org.apache.flink.python.PythonOptions.PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO; + /** PythonDependencyInfo contains the information of third-party dependencies. */ @Internal public final class PythonDependencyInfo { @@ -85,7 +91,7 @@ public PythonDependencyInfo( requirementsCacheDir, archives, pythonExec, - PythonOptions.PYTHON_EXECUTION_MODE.defaultValue()); + PYTHON_EXECUTION_MODE.defaultValue()); } public PythonDependencyInfo( @@ -130,15 +136,18 @@ public String getExecutionMode() { /** * Creates PythonDependencyInfo from GlobalJobParameters and DistributedCache. * - * @param pythonConfig The python config. + * @param config The config. * @param distributedCache The DistributedCache object of current task. * @return The PythonDependencyInfo object that contains whole information of python dependency. */ public static PythonDependencyInfo create( - PythonConfig pythonConfig, DistributedCache distributedCache) { + ReadableConfig config, DistributedCache distributedCache) { Map pythonFiles = new LinkedHashMap<>(); - for (Map.Entry entry : pythonConfig.getPythonFilesInfo().entrySet()) { + for (Map.Entry entry : + config.getOptional(PYTHON_FILES_DISTRIBUTED_CACHE_INFO) + .orElse(new HashMap<>()) + .entrySet()) { File pythonFile = distributedCache.getFile(entry.getKey()); String filePath = pythonFile.getAbsolutePath(); pythonFiles.put(filePath, entry.getValue()); @@ -146,27 +155,34 @@ public static PythonDependencyInfo create( String requirementsFilePath = null; String requirementsCacheDir = null; - if (pythonConfig.getPythonRequirementsFileInfo().isPresent()) { - requirementsFilePath = - distributedCache - .getFile(pythonConfig.getPythonRequirementsFileInfo().get()) - .getAbsolutePath(); - if (pythonConfig.getPythonRequirementsCacheDirInfo().isPresent()) { + + String requirementsFileName = + config.getOptional(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO) + .orElse(new HashMap<>()) + .get(PythonDependencyUtils.FILE); + if (requirementsFileName != null) { + requirementsFilePath = distributedCache.getFile(requirementsFileName).getAbsolutePath(); + String requirementsFileCacheDir = + config.getOptional(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO) + .orElse(new HashMap<>()) + .get(PythonDependencyUtils.CACHE); + if (requirementsFileCacheDir != null) { requirementsCacheDir = - distributedCache - .getFile(pythonConfig.getPythonRequirementsCacheDirInfo().get()) - .getAbsolutePath(); + distributedCache.getFile(requirementsFileCacheDir).getAbsolutePath(); } } Map archives = new HashMap<>(); - for (Map.Entry entry : pythonConfig.getPythonArchivesInfo().entrySet()) { + for (Map.Entry entry : + config.getOptional(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO) + .orElse(new HashMap<>()) + .entrySet()) { String archiveFilePath = distributedCache.getFile(entry.getKey()).getAbsolutePath(); String targetPath = entry.getValue(); archives.put(archiveFilePath, targetPath); } - String pythonExec = pythonConfig.getPythonExec(); + String pythonExec = config.get(PYTHON_EXECUTABLE); return new PythonDependencyInfo( pythonFiles, @@ -174,6 +190,6 @@ public static PythonDependencyInfo create( requirementsCacheDir, archives, pythonExec, - pythonConfig.getExecutionMode()); + config.get(PYTHON_EXECUTION_MODE)); } } diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java index 18ddb5845c085..1da956fb04755 100644 --- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java +++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java @@ -17,12 +17,11 @@ package org.apache.flink.python.util; -import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.python.PythonConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; @@ -34,8 +33,6 @@ import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableException; import org.apache.flink.shaded.guava30.com.google.common.collect.Queues; import org.apache.flink.shaded.guava30.com.google.common.collect.Sets; @@ -47,10 +44,7 @@ import java.util.Queue; import java.util.Set; -/** - * A Util class to get the {@link StreamExecutionEnvironment} configuration and merged configuration - * with environment settings. - */ +/** A Util class to handle the configurations of Python jobs. */ public class PythonConfigUtil { public static final String KEYED_STREAM_VALUE_OPERATOR_NAME = "_keyed_stream_values_operator"; @@ -59,19 +53,9 @@ public class PythonConfigUtil { "_partition_custom_map_operator"; /** - * A static method to get the {@link StreamExecutionEnvironment} configuration merged with - * python dependency management configurations. - */ - public static Configuration getEnvConfigWithDependencies(StreamExecutionEnvironment env) - throws InvocationTargetException, IllegalAccessException, NoSuchFieldException { - return PythonDependencyUtils.configurePythonDependencies( - env.getCachedFiles(), (Configuration) env.getConfiguration()); - } - - /** - * Get the private field {@code StreamExecutionEnvironment#configuration} by reflection - * recursively. Then access the field to get the configuration of the given - * StreamExecutionEnvironment. + * Get the private field {@link StreamExecutionEnvironment#configuration} by reflection + * recursively. It allows modification to the configuration compared with {@link + * StreamExecutionEnvironment#getConfiguration()}. */ public static Configuration getEnvironmentConfig(StreamExecutionEnvironment env) throws InvocationTargetException, IllegalAccessException, NoSuchFieldException { @@ -93,62 +77,33 @@ public static Configuration getEnvironmentConfig(StreamExecutionEnvironment env) return (Configuration) configurationField.get(env); } - @SuppressWarnings("unchecked") public static void configPythonOperator(StreamExecutionEnvironment env) - throws IllegalAccessException, InvocationTargetException, NoSuchFieldException { - Configuration mergedConfig = getEnvConfigWithDependencies(env); + throws IllegalAccessException, NoSuchFieldException { + final Configuration config = extractPythonConfiguration(env, env.getConfiguration()); - Field transformationsField = - StreamExecutionEnvironment.class.getDeclaredField("transformations"); - transformationsField.setAccessible(true); - List> transformations = - (List>) transformationsField.get(env); - for (Transformation transformation : transformations) { + for (Transformation transformation : env.getTransformations()) { alignTransformation(transformation); if (isPythonOperator(transformation)) { - // declare it is a Python operator + // declare the use case of managed memory transformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON); AbstractPythonFunctionOperator pythonFunctionOperator = getPythonOperator(transformation); if (pythonFunctionOperator != null) { - Configuration oldConfig = pythonFunctionOperator.getConfiguration(); - // update dependency related configurations for Python operators - pythonFunctionOperator.setConfiguration( - generateNewPythonConfig(oldConfig, mergedConfig)); + pythonFunctionOperator.getConfiguration().addAll(config); } } } } - public static Configuration getMergedConfig( - StreamExecutionEnvironment env, TableConfig tableConfig) { - Configuration config = new Configuration((Configuration) env.getConfiguration()); - PythonDependencyUtils.merge(config, tableConfig.getConfiguration()); - Configuration mergedConfig = + /** Extract the configurations which is used in the Python operators. */ + public static Configuration extractPythonConfiguration( + StreamExecutionEnvironment env, ReadableConfig config) { + final Configuration pythonDependencyConfig = PythonDependencyUtils.configurePythonDependencies(env.getCachedFiles(), config); - mergedConfig.setString("table.exec.timezone", tableConfig.getLocalTimeZone().getId()); - return mergedConfig; - } - - @SuppressWarnings("unchecked") - public static Configuration getMergedConfig(ExecutionEnvironment env, TableConfig tableConfig) { - try { - Field field = ExecutionEnvironment.class.getDeclaredField("cacheFile"); - field.setAccessible(true); - Configuration config = new Configuration(env.getConfiguration()); - PythonDependencyUtils.merge(config, tableConfig.getConfiguration()); - Configuration mergedConfig = - PythonDependencyUtils.configurePythonDependencies( - (List>) - field.get(env), - config); - mergedConfig.setString("table.exec.timezone", tableConfig.getLocalTimeZone().getId()); - return mergedConfig; - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new TableException("Method getMergedConfig failed.", e); - } + final PythonConfig pythonConfig = new PythonConfig(config, pythonDependencyConfig); + return pythonConfig.toConfiguration(); } /** @@ -220,7 +175,7 @@ private static AbstractPythonFunctionOperator getPythonOperator( return null; } - public static boolean isPythonOperator(Transformation transform) { + private static boolean isPythonOperator(Transformation transform) { if (transform instanceof OneInputTransformation) { return isPythonOperator( ((OneInputTransformation) transform).getOperatorFactory()); @@ -266,17 +221,6 @@ private static boolean isPythonDataStreamOperator( } } - /** - * Generator a new {@link Configuration} with the combined config which is derived from - * oldConfig. - */ - private static Configuration generateNewPythonConfig( - Configuration oldConfig, Configuration newConfig) { - Configuration mergedConfig = newConfig.clone(); - mergedConfig.addAll(oldConfig); - return mergedConfig; - } - public static void setPartitionCustomOperatorNumPartitions( List> transformations) { // Update the numPartitions of PartitionCustomOperator after aligned all operators. diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java index f90aa72f8c333..c93a6f4994901 100644 --- a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java +++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java @@ -20,8 +20,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.python.PythonOptions; @@ -39,7 +37,6 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION; @@ -47,8 +44,11 @@ import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION; +import static org.apache.flink.python.PythonOptions.PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO; import static org.apache.flink.python.PythonOptions.PYTHON_CLIENT_EXECUTABLE; import static org.apache.flink.python.PythonOptions.PYTHON_EXECUTABLE; +import static org.apache.flink.python.PythonOptions.PYTHON_FILES_DISTRIBUTED_CACHE_INFO; +import static org.apache.flink.python.PythonOptions.PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO; /** * Utility class for Python dependency management. The dependencies will be registered at the @@ -63,15 +63,6 @@ public class PythonDependencyUtils { public static final String PARAM_DELIMITER = "#"; private static final String HASH_ALGORITHM = "SHA-256"; - // Internal Python Config Options. - - public static final ConfigOption> PYTHON_FILES = - ConfigOptions.key("python.internal.files-key-map").mapType().noDefaultValue(); - public static final ConfigOption> PYTHON_REQUIREMENTS_FILE = - ConfigOptions.key("python.internal.requirements-file-key").mapType().noDefaultValue(); - public static final ConfigOption> PYTHON_ARCHIVES = - ConfigOptions.key("python.internal.archives-key-map").mapType().noDefaultValue(); - /** * Adds python dependencies to registered cache file list according to given configuration and * returns a new configuration which contains the metadata of the registered python @@ -84,10 +75,12 @@ public class PythonDependencyUtils { */ public static Configuration configurePythonDependencies( List> cachedFiles, - Configuration config) { - PythonDependencyManager pythonDependencyManager = + ReadableConfig config) { + final PythonDependencyManager pythonDependencyManager = new PythonDependencyManager(cachedFiles, config); - return pythonDependencyManager.getConfigWithPythonDependencyOptions(); + final Configuration pythonDependencyConfig = new Configuration(); + pythonDependencyManager.applyToConfiguration(pythonDependencyConfig); + return pythonDependencyConfig; } public static Configuration parsePythonDependencyConfiguration(CommandLine commandLine) { @@ -162,14 +155,13 @@ private static class PythonDependencyManager { private static final String PYTHON_ARCHIVE_PREFIX = "python_archive"; private final List> cachedFiles; - private final Configuration internalConfig; + private final ReadableConfig config; private PythonDependencyManager( List> cachedFiles, - Configuration config) { + ReadableConfig config) { this.cachedFiles = cachedFiles; - this.internalConfig = new Configuration(config); - configure(config); + this.config = config; } /** @@ -179,14 +171,17 @@ private PythonDependencyManager( * * @param filePath The path of the Python dependency. */ - private void addPythonFile(String filePath) { + private void addPythonFile(Configuration pythonDependencyConfig, String filePath) { Preconditions.checkNotNull(filePath); String fileKey = generateUniqueFileKey(PYTHON_FILE_PREFIX, filePath); registerCachedFileIfNotExist(filePath, fileKey); - if (!internalConfig.contains(PYTHON_FILES)) { - internalConfig.set(PYTHON_FILES, new LinkedHashMap<>()); + if (!pythonDependencyConfig.contains(PYTHON_FILES_DISTRIBUTED_CACHE_INFO)) { + pythonDependencyConfig.set( + PYTHON_FILES_DISTRIBUTED_CACHE_INFO, new LinkedHashMap<>()); } - internalConfig.get(PYTHON_FILES).put(fileKey, new File(filePath).getName()); + pythonDependencyConfig + .get(PYTHON_FILES_DISTRIBUTED_CACHE_INFO) + .put(fileKey, new File(filePath).getName()); } /** @@ -196,8 +191,9 @@ private void addPythonFile(String filePath) { * * @param requirementsFilePath The path of the requirements file. */ - private void setPythonRequirements(String requirementsFilePath) { - setPythonRequirements(requirementsFilePath, null); + private void setPythonRequirements( + Configuration pythonDependencyConfig, String requirementsFilePath) { + setPythonRequirements(pythonDependencyConfig, requirementsFilePath, null); } /** @@ -210,26 +206,33 @@ private void setPythonRequirements(String requirementsFilePath) { * @param requirementsCachedDir The path of the requirements cached directory. */ private void setPythonRequirements( - String requirementsFilePath, @Nullable String requirementsCachedDir) { + Configuration pythonDependencyConfig, + String requirementsFilePath, + @Nullable String requirementsCachedDir) { Preconditions.checkNotNull(requirementsFilePath); - if (!internalConfig.contains(PYTHON_REQUIREMENTS_FILE)) { - internalConfig.set(PYTHON_REQUIREMENTS_FILE, new HashMap<>()); + if (!pythonDependencyConfig.contains(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO)) { + pythonDependencyConfig.set( + PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO, new HashMap<>()); } - internalConfig.get(PYTHON_REQUIREMENTS_FILE).clear(); + pythonDependencyConfig.get(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO).clear(); removeCachedFilesByPrefix(PYTHON_REQUIREMENTS_FILE_PREFIX); removeCachedFilesByPrefix(PYTHON_REQUIREMENTS_CACHE_PREFIX); String fileKey = generateUniqueFileKey(PYTHON_REQUIREMENTS_FILE_PREFIX, requirementsFilePath); registerCachedFileIfNotExist(requirementsFilePath, fileKey); - internalConfig.get(PYTHON_REQUIREMENTS_FILE).put(FILE, fileKey); + pythonDependencyConfig + .get(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO) + .put(FILE, fileKey); if (requirementsCachedDir != null) { String cacheDirKey = generateUniqueFileKey( PYTHON_REQUIREMENTS_CACHE_PREFIX, requirementsCachedDir); registerCachedFileIfNotExist(requirementsCachedDir, cacheDirKey); - internalConfig.get(PYTHON_REQUIREMENTS_FILE).put(CACHE, cacheDirKey); + pythonDependencyConfig + .get(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO) + .put(CACHE, cacheDirKey); } } @@ -242,24 +245,27 @@ private void setPythonRequirements( * @param archivePath The path of the archive file. * @param targetDir The name of the target directory. */ - private void addPythonArchive(String archivePath, String targetDir) { + private void addPythonArchive( + Configuration pythonDependencyConfig, String archivePath, String targetDir) { Preconditions.checkNotNull(archivePath); - if (!internalConfig.contains(PYTHON_ARCHIVES)) { - internalConfig.set(PYTHON_ARCHIVES, new HashMap<>()); + if (!pythonDependencyConfig.contains(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO)) { + pythonDependencyConfig.set(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO, new HashMap<>()); } String fileKey = generateUniqueFileKey( PYTHON_ARCHIVE_PREFIX, archivePath + PARAM_DELIMITER + targetDir); registerCachedFileIfNotExist(archivePath, fileKey); - internalConfig.get(PYTHON_ARCHIVES).put(fileKey, targetDir); + pythonDependencyConfig + .get(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO) + .put(fileKey, targetDir); } - private void configure(ReadableConfig config) { + private void applyToConfiguration(Configuration pythonDependencyConfig) { config.getOptional(PythonOptions.PYTHON_FILES) .ifPresent( pyFiles -> { for (String filePath : pyFiles.split(FILE_DELIMITER)) { - addPythonFile(filePath); + addPythonFile(pythonDependencyConfig, filePath); } }); @@ -270,9 +276,11 @@ private void configure(ReadableConfig config) { String[] requirementFileAndCache = pyRequirements.split(PARAM_DELIMITER, 2); setPythonRequirements( - requirementFileAndCache[0], requirementFileAndCache[1]); + pythonDependencyConfig, + requirementFileAndCache[0], + requirementFileAndCache[1]); } else { - setPythonRequirements(pyRequirements); + setPythonRequirements(pythonDependencyConfig, pyRequirements); } }); @@ -294,15 +302,16 @@ private void configure(ReadableConfig config) { archivePath = archive; targetDir = new File(archivePath).getName(); } - addPythonArchive(archivePath, targetDir); + addPythonArchive( + pythonDependencyConfig, archivePath, targetDir); } }); config.getOptional(PYTHON_EXECUTABLE) - .ifPresent(e -> internalConfig.set(PYTHON_EXECUTABLE, e)); + .ifPresent(e -> pythonDependencyConfig.set(PYTHON_EXECUTABLE, e)); config.getOptional(PYTHON_CLIENT_EXECUTABLE) - .ifPresent(e -> internalConfig.set(PYTHON_CLIENT_EXECUTABLE, e)); + .ifPresent(e -> pythonDependencyConfig.set(PYTHON_CLIENT_EXECUTABLE, e)); } private String generateUniqueFileKey(String prefix, String hashString) { @@ -333,9 +342,5 @@ private void removeCachedFilesByPrefix(String prefix) { .filter(t -> t.f0.matches("^" + prefix + "_[a-z0-9]{64}$")) .collect(Collectors.toSet())); } - - private Configuration getConfigWithPythonDependencyOptions() { - return internalConfig; - } } } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java index be709e65e924b..6dc961832ee4d 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java @@ -23,7 +23,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.fnexecution.v1.FlinkFnApi; -import org.apache.flink.python.PythonConfig; import org.apache.flink.python.env.PythonDependencyInfo; import org.apache.flink.python.env.embedded.EmbeddedPythonEnvironment; import org.apache.flink.python.env.embedded.EmbeddedPythonEnvironmentManager; @@ -36,6 +35,7 @@ import java.util.Map; import java.util.concurrent.locks.ReentrantLock; +import static org.apache.flink.python.PythonOptions.PYTHON_EXECUTABLE; import static org.apache.flink.python.env.AbstractPythonEnvironmentManager.PYTHON_WORKING_DIR; /** @@ -48,12 +48,9 @@ public abstract class AbstractEmbeddedPythonFunctionOperator private static final long serialVersionUID = 1L; - private static ReentrantLock lock = new ReentrantLock(); + private static final ReentrantLock lock = new ReentrantLock(); - private static Map> workingDirectories = new HashMap<>(); - - /** The python config. */ - protected transient PythonConfig pythonConfig; + private static final Map> workingDirectories = new HashMap<>(); /** Every operator will hold the only python interpreter. */ protected transient PythonInterpreter interpreter; @@ -67,7 +64,6 @@ public AbstractEmbeddedPythonFunctionOperator(Configuration config) { @Override public void open() throws Exception { super.open(); - pythonConfig = new PythonConfig(config); pythonEnvironmentManager = createPythonEnvironmentManager(); pythonEnvironmentManager.open(); EmbeddedPythonEnvironment environment = @@ -105,7 +101,7 @@ public void open() throws Exception { } } - openPythonInterpreter(pythonConfig.getPythonExec(), env); + openPythonInterpreter(config.get(PYTHON_EXECUTABLE), env); } @Override @@ -142,8 +138,7 @@ public void close() throws Exception { @Override protected EmbeddedPythonEnvironmentManager createPythonEnvironmentManager() { PythonDependencyInfo dependencyInfo = - PythonDependencyInfo.create( - pythonConfig, getRuntimeContext().getDistributedCache()); + PythonDependencyInfo.create(config, getRuntimeContext().getDistributedCache()); return new EmbeddedPythonEnvironmentManager( dependencyInfo, getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(), diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java index 702e42e06973b..d95ffafb5afff 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java @@ -53,7 +53,7 @@ public AbstractExternalPythonFunctionOperator(Configuration config) { public void open() throws Exception { super.open(); this.pythonFunctionRunner = createPythonFunctionRunner(); - this.pythonFunctionRunner.open(pythonConfig); + this.pythonFunctionRunner.open(config); this.flushThreadPool = Executors.newSingleThreadExecutor(); } @@ -120,8 +120,7 @@ protected void invokeFinishBundle() throws Exception { @Override protected ProcessPythonEnvironmentManager createPythonEnvironmentManager() { PythonDependencyInfo dependencyInfo = - PythonDependencyInfo.create( - pythonConfig, getRuntimeContext().getDistributedCache()); + PythonDependencyInfo.create(config, getRuntimeContext().getDistributedCache()); PythonEnv pythonEnv = getPythonEnv(); if (pythonEnv.getExecType() == PythonEnv.ExecType.PROCESS) { return new ProcessPythonEnvironmentManager( diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java index aa1dfe1792093..9286b786506d4 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java @@ -20,8 +20,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; -import org.apache.flink.python.PythonConfig; -import org.apache.flink.python.PythonOptions; import org.apache.flink.python.env.PythonEnvironmentManager; import org.apache.flink.python.metric.FlinkMetricContainer; import org.apache.flink.runtime.state.KeyedStateBackend; @@ -36,10 +34,12 @@ import java.lang.reflect.Field; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.concurrent.ScheduledFuture; +import static org.apache.flink.python.PythonOptions.MAX_BUNDLE_SIZE; +import static org.apache.flink.python.PythonOptions.MAX_BUNDLE_TIME_MILLS; +import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED; import static org.apache.flink.streaming.api.utils.ClassLeakCleaner.cleanUpLeakingClasses; import static org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode; @@ -49,7 +49,7 @@ public abstract class AbstractPythonFunctionOperator extends AbstractStream private static final long serialVersionUID = 1L; - protected Configuration config; + protected final Configuration config; /** Max number of elements to include in a bundle. */ protected transient int maxBundleSize; @@ -57,12 +57,6 @@ public abstract class AbstractPythonFunctionOperator extends AbstractStream /** Number of processed elements in the current bundle. */ protected transient int elementCount; - /** The python config. */ - protected transient PythonConfig pythonConfig; - - /** The options used to configure the Python worker process. */ - protected transient Map jobOptions; - /** Max duration of a bundle. */ private transient long maxBundleTimeMills; @@ -83,12 +77,10 @@ public AbstractPythonFunctionOperator(Configuration config) { @Override public void open() throws Exception { try { - this.pythonConfig = new PythonConfig(config); - this.jobOptions = config.toMap(); - this.maxBundleSize = pythonConfig.getMaxBundleSize(); + this.maxBundleSize = config.get(MAX_BUNDLE_SIZE); if (this.maxBundleSize <= 0) { - this.maxBundleSize = PythonOptions.MAX_BUNDLE_SIZE.defaultValue(); - LOG.error( + this.maxBundleSize = MAX_BUNDLE_SIZE.defaultValue(); + LOG.warn( "Invalid value for the maximum bundle size. Using default value of " + this.maxBundleSize + '.'); @@ -96,10 +88,10 @@ public void open() throws Exception { LOG.info("The maximum bundle size is configured to {}.", this.maxBundleSize); } - this.maxBundleTimeMills = pythonConfig.getMaxBundleTimeMills(); + this.maxBundleTimeMills = config.get(MAX_BUNDLE_TIME_MILLS); if (this.maxBundleTimeMills <= 0L) { - this.maxBundleTimeMills = PythonOptions.MAX_BUNDLE_TIME_MILLS.defaultValue(); - LOG.error( + this.maxBundleTimeMills = MAX_BUNDLE_TIME_MILLS.defaultValue(); + LOG.warn( "Invalid value for the maximum bundle time. Using default value of " + this.maxBundleTimeMills + '.'); @@ -256,11 +248,6 @@ public boolean isBundleFinished() { return elementCount == 0; } - /** Reset the {@link Configuration} if needed. */ - public void setConfiguration(Configuration config) { - this.config = config; - } - /** Returns the {@link Configuration}. */ public Configuration getConfiguration() { return config; @@ -289,7 +276,7 @@ private void checkInvokeFinishBundleByTime() throws Exception { } protected FlinkMetricContainer getFlinkMetricContainer() { - return this.pythonConfig.isMetricEnabled() + return this.config.get(PYTHON_METRIC_ENABLED) ? new FlinkMetricContainer(getRuntimeContext().getMetricGroup()) : null; } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java index 820860014c7a3..2b519aeb24e65 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java @@ -74,7 +74,6 @@ public PythonFunctionRunner createPythonFunctionRunner() throws Exception { getRuntimeContext(), getInternalParameters(), inBatchExecutionMode(getKeyedStateBackend())), - jobOptions, getFlinkMetricContainer(), null, null, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java index ba1c94c1a3f22..924b86122de4b 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java @@ -113,7 +113,6 @@ public PythonFunctionRunner createPythonFunctionRunner() throws Exception { getInternalParameters(), keyTypeInfo, inBatchExecutionMode(getKeyedStateBackend())), - jobOptions, getFlinkMetricContainer(), getKeyedStateBackend(), keyTypeSerializer, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java index 3d81c1076bac4..cef5e6ce01dc3 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java @@ -143,7 +143,6 @@ public PythonFunctionRunner createPythonFunctionRunner() throws Exception { getInternalParameters(), keyTypeInfo, inBatchExecutionMode(getKeyedStateBackend())), - jobOptions, getFlinkMetricContainer(), getKeyedStateBackend(), keyTypeSerializer, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java index 69d4e74146b42..1bc820e58585a 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java @@ -71,7 +71,6 @@ public PythonFunctionRunner createPythonFunctionRunner() throws Exception { getRuntimeContext(), getInternalParameters(), inBatchExecutionMode(getKeyedStateBackend())), - jobOptions, getFlinkMetricContainer(), null, null, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java index a76871f7acbd9..df3a651b8d14b 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java @@ -37,7 +37,6 @@ import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; import static org.apache.flink.python.Constants.INPUT_COLLECTION_ID; @@ -71,11 +70,10 @@ public BeamDataStreamPythonFunctionRunner( ProcessPythonEnvironmentManager environmentManager, String headOperatorFunctionUrn, List userDefinedDataStreamFunctions, - Map jobOptions, @Nullable FlinkMetricContainer flinkMetricContainer, - KeyedStateBackend stateBackend, - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, + @Nullable KeyedStateBackend stateBackend, + @Nullable TypeSerializer keySerializer, + @Nullable TypeSerializer namespaceSerializer, @Nullable TimerRegistration timerRegistration, MemoryManager memoryManager, double managedMemoryFraction, @@ -85,7 +83,6 @@ public BeamDataStreamPythonFunctionRunner( super( taskName, environmentManager, - jobOptions, flinkMetricContainer, stateBackend, keySerializer, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java index 8a73aed799940..ad04d9b5299bb 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java @@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.fnexecution.v1.FlinkFnApi; -import org.apache.flink.python.PythonConfig; import org.apache.flink.python.PythonFunctionRunner; import org.apache.flink.python.PythonOptions; import org.apache.flink.python.env.PythonEnvironment; @@ -94,6 +94,7 @@ import static org.apache.flink.python.Constants.WINDOW_CODER_ID; import static org.apache.flink.python.Constants.WINDOW_STRATEGY; import static org.apache.flink.python.Constants.WRAPPER_TIMER_CODER_ID; +import static org.apache.flink.python.PythonOptions.USE_MANAGED_MEMORY; import static org.apache.flink.streaming.api.utils.ProtoUtils.createCoderProto; /** A {@link BeamPythonFunctionRunner} used to execute Python functions. */ @@ -112,13 +113,16 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { /** The Python process execution environment manager. */ private final ProcessPythonEnvironmentManager environmentManager; - /** The options used to configure the Python worker process. */ - private final Map jobOptions; - /** The flinkMetricContainer will be set to null if metric is configured to be turned off. */ - @Nullable private FlinkMetricContainer flinkMetricContainer; + @Nullable private final FlinkMetricContainer flinkMetricContainer; + + @Nullable private final KeyedStateBackend keyedStateBackend; + + @Nullable private final TypeSerializer keySerializer; + + @Nullable private final TypeSerializer namespaceSerializer; - @Nullable private TimerRegistration timerRegistration; + @Nullable private final TimerRegistration timerRegistration; private final MemoryManager memoryManager; @@ -145,7 +149,7 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { private transient StageBundleFactory stageBundleFactory; /** Handler for state requests. */ - private final StateRequestHandler stateRequestHandler; + private transient StateRequestHandler stateRequestHandler; /** Handler for bundle progress messages, both during bundle execution and on its completion. */ private transient BundleProgressHandler progressHandler; @@ -178,11 +182,10 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { public BeamPythonFunctionRunner( String taskName, ProcessPythonEnvironmentManager environmentManager, - Map jobOptions, @Nullable FlinkMetricContainer flinkMetricContainer, - @Nullable KeyedStateBackend keyedStateBackend, - @Nullable TypeSerializer keySerializer, - @Nullable TypeSerializer namespaceSerializer, + @Nullable KeyedStateBackend keyedStateBackend, + @Nullable TypeSerializer keySerializer, + @Nullable TypeSerializer namespaceSerializer, @Nullable TimerRegistration timerRegistration, MemoryManager memoryManager, double managedMemoryFraction, @@ -190,11 +193,10 @@ public BeamPythonFunctionRunner( FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor) { this.taskName = Preconditions.checkNotNull(taskName); this.environmentManager = Preconditions.checkNotNull(environmentManager); - this.jobOptions = Preconditions.checkNotNull(jobOptions); this.flinkMetricContainer = flinkMetricContainer; - this.stateRequestHandler = - getStateRequestHandler( - keyedStateBackend, keySerializer, namespaceSerializer, jobOptions); + this.keyedStateBackend = keyedStateBackend; + this.keySerializer = keySerializer; + this.namespaceSerializer = namespaceSerializer; this.timerRegistration = timerRegistration; this.memoryManager = memoryManager; this.managedMemoryFraction = managedMemoryFraction; @@ -205,22 +207,22 @@ public BeamPythonFunctionRunner( // ------------------------------------------------------------------------ @Override - public void open(PythonConfig config) throws Exception { + public void open(ReadableConfig config) throws Exception { this.bundleStarted = false; this.resultBuffer = new LinkedBlockingQueue<>(); this.reusableResultTuple = new Tuple2<>(); + stateRequestHandler = + getStateRequestHandler( + keyedStateBackend, keySerializer, namespaceSerializer, config); + // The creation of stageBundleFactory depends on the initialized environment manager. environmentManager.open(); PortablePipelineOptions portableOptions = PipelineOptionsFactory.as(PortablePipelineOptions.class); - int stateCacheSize = - Integer.parseInt( - jobOptions.getOrDefault( - PythonOptions.STATE_CACHE_SIZE.key(), - PythonOptions.STATE_CACHE_SIZE.defaultValue().toString())); + int stateCacheSize = config.get(PythonOptions.STATE_CACHE_SIZE); if (stateCacheSize > 0) { portableOptions .as(ExperimentalOptions.class) @@ -231,7 +233,7 @@ public void open(PythonConfig config) throws Exception { Struct pipelineOptions = PipelineOptionsTranslation.toProto(portableOptions); - if (memoryManager != null && config.isUsingManagedMemory()) { + if (memoryManager != null && config.get(USE_MANAGED_MEMORY)) { Preconditions.checkArgument( managedMemoryFraction > 0 && managedMemoryFraction <= 1.0, String.format( @@ -243,7 +245,7 @@ public void open(PythonConfig config) throws Exception { (size) -> new PythonSharedResources( createJobBundleFactory(pipelineOptions), - createPythonExecutionEnvironment(size)); + createPythonExecutionEnvironment(config, size)); sharedResources = memoryManager.getSharedMemoryResourceForManagedMemory( @@ -262,7 +264,7 @@ public void open(PythonConfig config) throws Exception { jobBundleFactory = createJobBundleFactory(pipelineOptions); stageBundleFactory = createStageBundleFactory( - jobBundleFactory, createPythonExecutionEnvironment(-1)); + jobBundleFactory, createPythonExecutionEnvironment(config, -1)); } progressHandler = getProgressHandler(flinkMetricContainer); } @@ -393,13 +395,13 @@ private void finishBundle() { * Creates a specification which specifies the portability Python execution environment. It's * used by Beam's portability framework to creates the actual Python execution environment. */ - private RunnerApi.Environment createPythonExecutionEnvironment(long memoryLimitBytes) - throws Exception { + private RunnerApi.Environment createPythonExecutionEnvironment( + ReadableConfig config, long memoryLimitBytes) throws Exception { PythonEnvironment environment = environmentManager.createEnvironment(); if (environment instanceof ProcessPythonEnvironment) { ProcessPythonEnvironment processEnvironment = (ProcessPythonEnvironment) environment; Map env = processEnvironment.getEnv(); - env.putAll(jobOptions); + config.getOptional(PythonOptions.PYTHON_JOB_OPTIONS).ifPresent(env::putAll); env.put(PYTHON_WORKER_MEMORY_LIMIT, String.valueOf(memoryLimitBytes)); return Environments.createProcessEnvironment( "", "", processEnvironment.getCommand(), env); @@ -599,16 +601,16 @@ private TimerReceiverFactory createTimerReceiverFactory() { } private static StateRequestHandler getStateRequestHandler( - KeyedStateBackend keyedStateBackend, - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - Map jobOptions) { + KeyedStateBackend keyedStateBackend, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + ReadableConfig config) { if (keyedStateBackend == null) { return StateRequestHandler.unsupported(); } else { assert keySerializer != null; return new SimpleStateRequestHandler( - keyedStateBackend, keySerializer, namespaceSerializer, jobOptions); + keyedStateBackend, keySerializer, namespaceSerializer, config); } } } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/SimpleStateRequestHandler.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/SimpleStateRequestHandler.java index 6f5e7524b721b..ba42dc4d385e9 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/SimpleStateRequestHandler.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/SimpleStateRequestHandler.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.runtime.RowSerializer; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -135,12 +136,12 @@ public class SimpleStateRequestHandler implements StateRequestHandler { private final BeamFnApi.ProcessBundleRequest.CacheToken cacheToken; SimpleStateRequestHandler( - KeyedStateBackend keyedStateBackend, - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - Map config) { + KeyedStateBackend keyedStateBackend, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + ReadableConfig config) { this.keyedStateBackend = keyedStateBackend; - TypeSerializer frameworkKeySerializer = keyedStateBackend.getKeySerializer(); + TypeSerializer frameworkKeySerializer = keyedStateBackend.getKeySerializer(); if (!(frameworkKeySerializer instanceof AbstractRowDataSerializer || frameworkKeySerializer instanceof RowSerializer)) { throw new RuntimeException("Currently SimpleStateRequestHandler only support row key!"); @@ -157,12 +158,7 @@ public class SimpleStateRequestHandler implements StateRequestHandler { stateDescriptorCache = new HashMap<>(); mapStateIteratorCache = new HashMap<>(); mapStateIterateResponseBatchSize = - Integer.valueOf( - config.getOrDefault( - PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE.key(), - PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE - .defaultValue() - .toString())); + config.get(PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE); if (mapStateIterateResponseBatchSize <= 0) { throw new RuntimeException( String.format( diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java index b925bbe4426da..c5f9f66a42d6d 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java @@ -109,7 +109,6 @@ public PythonFunctionRunner createPythonFunctionRunner() throws IOException { createPythonEnvironmentManager(), getFunctionUrn(), getUserDefinedFunctionsProto(), - jobOptions, getFlinkMetricContainer(), getContainingTask().getEnvironment().getMemoryManager(), getOperatorConfig() diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java index 9ec99126be8c5..816ea04f3a227 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java @@ -47,6 +47,8 @@ import java.util.Arrays; import java.util.stream.Collectors; +import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED; +import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED; import static org.apache.flink.streaming.api.utils.ProtoUtils.createRowTypeCoderInfoDescriptorProto; import static org.apache.flink.table.runtime.typeutils.PythonTypeUtils.toProtoType; @@ -156,7 +158,6 @@ public void open() throws Exception { PythonTypeUtils.toInternalSerializer(userDefinedFunctionOutputType); rowDataWrapper = new StreamRecordRowDataWrappingCollector(output); super.open(); - configJobOptions(); } @Override @@ -175,7 +176,6 @@ public PythonFunctionRunner createPythonFunctionRunner() throws Exception { createPythonEnvironmentManager(), getFunctionUrn(), getUserDefinedFunctionsProto(), - jobOptions, getFlinkMetricContainer(), getKeyedStateBackend(), getKeySerializer(), @@ -235,8 +235,8 @@ TypeSerializer getWindowSerializer() { protected FlinkFnApi.UserDefinedAggregateFunctions getUserDefinedFunctionsProto() { FlinkFnApi.UserDefinedAggregateFunctions.Builder builder = FlinkFnApi.UserDefinedAggregateFunctions.newBuilder(); - builder.setMetricEnabled(pythonConfig.isMetricEnabled()); - builder.setProfileEnabled(pythonConfig.isProfileEnabled()); + builder.setMetricEnabled(config.get(PYTHON_METRIC_ENABLED)); + builder.setProfileEnabled(config.get(PYTHON_PROFILE_ENABLED)); builder.addAllGrouping(Arrays.stream(grouping).boxed().collect(Collectors.toList())); builder.setGenerateUpdateBefore(generateUpdateBefore); builder.setIndexOfCountStar(indexOfCountStar); @@ -263,15 +263,6 @@ protected FlinkFnApi.UserDefinedAggregateFunctions getUserDefinedFunctionsProto( public abstract RowType createUserDefinedFunctionOutputType(); - private void configJobOptions() { - jobOptions.put( - PythonOptions.STATE_CACHE_SIZE.key(), - String.valueOf(config.get(PythonOptions.STATE_CACHE_SIZE))); - jobOptions.put( - PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE.key(), - String.valueOf(config.get(PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE))); - } - public FlinkFnApi.CoderInfoDescriptor createInputCoderInfoDescriptor(RowType runnerInputType) { return createRowTypeCoderInfoDescriptorProto( runnerInputType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, false); diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java index a893bb8617f8b..cd7167f1da9f1 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java @@ -36,6 +36,8 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; +import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED; +import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED; import static org.apache.flink.streaming.api.utils.ProtoUtils.createArrowTypeCoderInfoDescriptorProto; import static org.apache.flink.streaming.api.utils.ProtoUtils.getUserDefinedFunctionProto; @@ -153,8 +155,8 @@ public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() { for (PythonFunctionInfo pythonFunctionInfo : pandasAggFunctions) { builder.addUdfs(getUserDefinedFunctionProto(pythonFunctionInfo)); } - builder.setMetricEnabled(pythonConfig.isMetricEnabled()); - builder.setProfileEnabled(pythonConfig.isProfileEnabled()); + builder.setMetricEnabled(config.get(PYTHON_METRIC_ENABLED)); + builder.setProfileEnabled(config.get(PYTHON_PROFILE_ENABLED)); return builder.build(); } } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java index 2fd586131e3fa..1425a35fe9548 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java @@ -37,6 +37,8 @@ import java.util.List; import java.util.ListIterator; +import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED; +import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED; import static org.apache.flink.streaming.api.utils.ProtoUtils.createOverWindowArrowTypeCoderInfoDescriptorProto; import static org.apache.flink.streaming.api.utils.ProtoUtils.getUserDefinedFunctionProto; @@ -259,8 +261,8 @@ public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() { functionBuilder.setWindowIndex(aggWindowIndex[i]); builder.addUdfs(functionBuilder); } - builder.setMetricEnabled(pythonConfig.isMetricEnabled()); - builder.setProfileEnabled(pythonConfig.isProfileEnabled()); + builder.setMetricEnabled(config.get(PYTHON_METRIC_ENABLED)); + builder.setProfileEnabled(config.get(PYTHON_PROFILE_ENABLED)); // add windows for (int i = 0; i < lowerBoundary.length; i++) { FlinkFnApi.OverWindow.Builder windowBuilder = FlinkFnApi.OverWindow.newBuilder(); diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java index e5220fb005529..ed887657e2147 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java @@ -35,6 +35,9 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; +import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED; +import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED; + /** * Base class for all stream operators to execute Python {@link ScalarFunction}s. It executes the * Python {@link ScalarFunction}s in separate Python execution environment. @@ -121,8 +124,8 @@ public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() { for (PythonFunctionInfo pythonFunctionInfo : scalarFunctions) { builder.addUdfs(ProtoUtils.getUserDefinedFunctionProto(pythonFunctionInfo)); } - builder.setMetricEnabled(pythonConfig.isMetricEnabled()); - builder.setProfileEnabled(pythonConfig.isProfileEnabled()); + builder.setMetricEnabled(config.get(PYTHON_METRIC_ENABLED)); + builder.setProfileEnabled(config.get(PYTHON_PROFILE_ENABLED)); return builder.build(); } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java index 3c843cd19b154..e30180bcb98e4 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java @@ -44,6 +44,9 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED; +import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED; + /** The Python {@link ScalarFunction} operator in embedded Python environment. */ @Internal public class EmbeddedPythonScalarFunctionOperator @@ -236,8 +239,8 @@ public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() { for (PythonFunctionInfo pythonFunctionInfo : scalarFunctions) { builder.addUdfs(ProtoUtils.getUserDefinedFunctionProto(pythonFunctionInfo)); } - builder.setMetricEnabled(pythonConfig.isMetricEnabled()); - builder.setProfileEnabled(pythonConfig.isProfileEnabled()); + builder.setMetricEnabled(config.get(PYTHON_METRIC_ENABLED)); + builder.setProfileEnabled(config.get(PYTHON_PROFILE_ENABLED)); return builder.build(); } } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java index 1db9dda484a69..37e9010f22753 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator; import org.apache.flink.table.types.logical.RowType; +import static org.apache.flink.python.PythonOptions.MAX_ARROW_BATCH_SIZE; import static org.apache.flink.streaming.api.utils.ProtoUtils.createArrowTypeCoderInfoDescriptorProto; /** Arrow Python {@link ScalarFunction} operator. */ @@ -67,7 +68,7 @@ public ArrowPythonScalarFunctionOperator( @Override public void open() throws Exception { super.open(); - maxArrowBatchSize = Math.min(pythonConfig.getMaxArrowBatchSize(), maxBundleSize); + maxArrowBatchSize = Math.min(config.get(MAX_ARROW_BATCH_SIZE), maxBundleSize); arrowSerializer = new ArrowSerializer(udfInputType, udfOutputType); arrowSerializer.open(bais, baos); currentBatchCount = 0; diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java index 43778cfeb5b9b..81862b993007b 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java @@ -41,6 +41,8 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; +import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED; +import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED; import static org.apache.flink.streaming.api.utils.ProtoUtils.createFlattenRowTypeCoderInfoDescriptorProto; import static org.apache.flink.streaming.api.utils.ProtoUtils.createRowTypeCoderInfoDescriptorProto; @@ -157,8 +159,8 @@ public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() { FlinkFnApi.UserDefinedFunctions.Builder builder = FlinkFnApi.UserDefinedFunctions.newBuilder(); builder.addUdfs(ProtoUtils.getUserDefinedFunctionProto(tableFunction)); - builder.setMetricEnabled(pythonConfig.isMetricEnabled()); - builder.setProfileEnabled(pythonConfig.isProfileEnabled()); + builder.setMetricEnabled(config.get(PYTHON_METRIC_ENABLED)); + builder.setProfileEnabled(config.get(PYTHON_PROFILE_ENABLED)); return builder.build(); } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java index d502a47c01651..ff605356d3465 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java @@ -34,7 +34,6 @@ import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; import static org.apache.flink.python.Constants.INPUT_COLLECTION_ID; @@ -57,7 +56,6 @@ public BeamTablePythonFunctionRunner( ProcessPythonEnvironmentManager environmentManager, String functionUrn, GeneratedMessageV3 userDefinedFunctionProto, - Map jobOptions, FlinkMetricContainer flinkMetricContainer, KeyedStateBackend keyedStateBackend, TypeSerializer keySerializer, @@ -69,7 +67,6 @@ public BeamTablePythonFunctionRunner( super( taskName, environmentManager, - jobOptions, flinkMetricContainer, keyedStateBackend, keySerializer, @@ -122,7 +119,6 @@ public static BeamTablePythonFunctionRunner stateless( ProcessPythonEnvironmentManager environmentManager, String functionUrn, GeneratedMessageV3 userDefinedFunctionProto, - Map jobOptions, FlinkMetricContainer flinkMetricContainer, MemoryManager memoryManager, double managedMemoryFraction, @@ -133,7 +129,6 @@ public static BeamTablePythonFunctionRunner stateless( environmentManager, functionUrn, userDefinedFunctionProto, - jobOptions, flinkMetricContainer, null, null, @@ -149,7 +144,6 @@ public static BeamTablePythonFunctionRunner stateful( ProcessPythonEnvironmentManager environmentManager, String functionUrn, GeneratedMessageV3 userDefinedFunctionProto, - Map jobOptions, FlinkMetricContainer flinkMetricContainer, KeyedStateBackend keyedStateBackend, TypeSerializer keySerializer, @@ -163,7 +157,6 @@ public static BeamTablePythonFunctionRunner stateful( environmentManager, functionUrn, userDefinedFunctionProto, - jobOptions, flinkMetricContainer, keyedStateBackend, keySerializer, diff --git a/flink-python/src/test/java/org/apache/flink/python/PythonConfigTest.java b/flink-python/src/test/java/org/apache/flink/python/PythonConfigTest.java deleted file mode 100644 index 871c862282a63..0000000000000 --- a/flink-python/src/test/java/org/apache/flink/python/PythonConfigTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.python; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.python.util.PythonDependencyUtils; - -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -/** Tests for {@link PythonConfig}. */ -public class PythonConfigTest { - - @Test - public void testDefaultConfigure() { - PythonConfig pythonConfig = new PythonConfig(new Configuration()); - assertThat( - pythonConfig.getMaxBundleSize(), - is(equalTo(PythonOptions.MAX_BUNDLE_SIZE.defaultValue()))); - assertThat( - pythonConfig.getMaxBundleTimeMills(), - is(equalTo(PythonOptions.MAX_BUNDLE_TIME_MILLS.defaultValue()))); - assertThat( - pythonConfig.getMaxArrowBatchSize(), - is(equalTo(PythonOptions.MAX_ARROW_BATCH_SIZE.defaultValue()))); - assertThat(pythonConfig.getPythonFilesInfo().isEmpty(), is(true)); - assertThat(pythonConfig.getPythonRequirementsFileInfo().isPresent(), is(false)); - assertThat(pythonConfig.getPythonRequirementsCacheDirInfo().isPresent(), is(false)); - assertThat(pythonConfig.getPythonArchivesInfo().isEmpty(), is(true)); - assertThat(pythonConfig.getPythonExec(), is("python")); - assertThat( - pythonConfig.isUsingManagedMemory(), - is(equalTo(PythonOptions.USE_MANAGED_MEMORY.defaultValue()))); - } - - @Test - public void testMaxBundleSize() { - Configuration config = new Configuration(); - config.set(PythonOptions.MAX_BUNDLE_SIZE, 10); - PythonConfig pythonConfig = new PythonConfig(config); - assertThat(pythonConfig.getMaxBundleSize(), is(equalTo(10))); - } - - @Test - public void testMaxBundleTimeMills() { - Configuration config = new Configuration(); - config.set(PythonOptions.MAX_BUNDLE_TIME_MILLS, 10L); - PythonConfig pythonConfig = new PythonConfig(config); - assertThat(pythonConfig.getMaxBundleTimeMills(), is(equalTo(10L))); - } - - @Test - public void testMaxArrowBatchSize() { - Configuration config = new Configuration(); - config.set(PythonOptions.MAX_ARROW_BATCH_SIZE, 10); - PythonConfig pythonConfig = new PythonConfig(config); - assertThat(pythonConfig.getMaxArrowBatchSize(), is(equalTo(10))); - } - - @Test - public void testPythonFilesInfo() { - Configuration config = new Configuration(); - Map pythonFiles = new HashMap<>(); - pythonFiles.put("python_file_{SHA256}", "file0.py"); - config.set(PythonDependencyUtils.PYTHON_FILES, pythonFiles); - PythonConfig pythonConfig = new PythonConfig(config); - assertThat(pythonConfig.getPythonFilesInfo(), is(equalTo(pythonFiles))); - } - - @Test - public void testPythonRequirementsFileInfo() { - Configuration config = new Configuration(); - Map pythonRequirementsFile = - config.getOptional(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE) - .orElse(new HashMap<>()); - pythonRequirementsFile.put(PythonDependencyUtils.FILE, "python_requirements_file_{SHA256}"); - config.set(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE, pythonRequirementsFile); - PythonConfig pythonConfig = new PythonConfig(config); - assertThat( - pythonConfig.getPythonRequirementsFileInfo().get(), - is(equalTo("python_requirements_file_{SHA256}"))); - } - - @Test - public void testPythonRequirementsCacheDirInfo() { - Configuration config = new Configuration(); - Map pythonRequirementsFile = - config.getOptional(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE) - .orElse(new HashMap<>()); - pythonRequirementsFile.put( - PythonDependencyUtils.CACHE, "python_requirements_cache_{SHA256}"); - config.set(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE, pythonRequirementsFile); - PythonConfig pythonConfig = new PythonConfig(config); - assertThat( - pythonConfig.getPythonRequirementsCacheDirInfo().get(), - is(equalTo("python_requirements_cache_{SHA256}"))); - } - - @Test - public void testPythonArchivesInfo() { - Configuration config = new Configuration(); - Map pythonArchives = new HashMap<>(); - pythonArchives.put("python_archive_{SHA256}", "file0.zip"); - config.set(PythonDependencyUtils.PYTHON_ARCHIVES, pythonArchives); - PythonConfig pythonConfig = new PythonConfig(config); - assertThat(pythonConfig.getPythonArchivesInfo(), is(equalTo(pythonArchives))); - } - - @Test - public void testPythonExec() { - Configuration config = new Configuration(); - config.set(PythonOptions.PYTHON_EXECUTABLE, "/usr/local/bin/python3"); - PythonConfig pythonConfig = new PythonConfig(config); - assertThat(pythonConfig.getPythonExec(), is(equalTo("/usr/local/bin/python3"))); - } - - @Test - public void testManagedMemory() { - Configuration config = new Configuration(); - config.set(PythonOptions.USE_MANAGED_MEMORY, true); - PythonConfig pythonConfig = new PythonConfig(config); - assertThat(pythonConfig.isUsingManagedMemory(), is(equalTo(true))); - } -} diff --git a/flink-python/src/test/java/org/apache/flink/python/env/PythonDependencyInfoTest.java b/flink-python/src/test/java/org/apache/flink/python/env/PythonDependencyInfoTest.java index b80f56f0e07de..5478130b1b01f 100644 --- a/flink-python/src/test/java/org/apache/flink/python/env/PythonDependencyInfoTest.java +++ b/flink-python/src/test/java/org/apache/flink/python/env/PythonDependencyInfoTest.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.python.PythonConfig; import org.apache.flink.python.PythonOptions; import org.apache.flink.python.util.PythonDependencyUtils; import org.apache.flink.util.OperatingSystem; @@ -35,6 +34,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +import static org.apache.flink.python.PythonOptions.PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO; +import static org.apache.flink.python.PythonOptions.PYTHON_FILES_DISTRIBUTED_CACHE_INFO; +import static org.apache.flink.python.PythonOptions.PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -75,9 +77,8 @@ public void testParsePythonFiles() { Map pythonFiles = new HashMap<>(); pythonFiles.put("python_file_{SHA256_0}", "test_file1.py"); pythonFiles.put("python_file_{SHA256_1}", "test_file2.py"); - config.set(PythonDependencyUtils.PYTHON_FILES, pythonFiles); - PythonDependencyInfo dependencyInfo = - PythonDependencyInfo.create(new PythonConfig(config), distributedCache); + config.set(PYTHON_FILES_DISTRIBUTED_CACHE_INFO, pythonFiles); + PythonDependencyInfo dependencyInfo = PythonDependencyInfo.create(config, distributedCache); Map expected = new HashMap<>(); expected.put("/distributed_cache/file0", "test_file1.py"); @@ -91,18 +92,17 @@ public void testParsePythonRequirements() throws IOException { Assume.assumeFalse(OperatingSystem.isWindows()); Configuration config = new Configuration(); - config.set(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE, new HashMap<>()); - config.get(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE) + config.set(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO, new HashMap<>()); + config.get(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO) .put(PythonDependencyUtils.FILE, "python_requirements_file_{SHA256}"); - PythonDependencyInfo dependencyInfo = - PythonDependencyInfo.create(new PythonConfig(config), distributedCache); + PythonDependencyInfo dependencyInfo = PythonDependencyInfo.create(config, distributedCache); assertEquals("/distributed_cache/file2", dependencyInfo.getRequirementsFilePath().get()); assertFalse(dependencyInfo.getRequirementsCacheDir().isPresent()); - config.get(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE) + config.get(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO) .put(PythonDependencyUtils.CACHE, "python_requirements_cache_{SHA256}"); - dependencyInfo = PythonDependencyInfo.create(new PythonConfig(config), distributedCache); + dependencyInfo = PythonDependencyInfo.create(config, distributedCache); assertEquals("/distributed_cache/file2", dependencyInfo.getRequirementsFilePath().get()); assertEquals("/distributed_cache/file3", dependencyInfo.getRequirementsCacheDir().get()); @@ -117,9 +117,8 @@ public void testParsePythonArchives() { Map pythonArchives = new HashMap<>(); pythonArchives.put("python_archive_{SHA256_0}", "py27.zip"); pythonArchives.put("python_archive_{SHA256_1}", "py37"); - config.set(PythonDependencyUtils.PYTHON_ARCHIVES, pythonArchives); - PythonDependencyInfo dependencyInfo = - PythonDependencyInfo.create(new PythonConfig(config), distributedCache); + config.set(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO, pythonArchives); + PythonDependencyInfo dependencyInfo = PythonDependencyInfo.create(config, distributedCache); Map expected = new HashMap<>(); expected.put("/distributed_cache/file4", "py27.zip"); @@ -131,8 +130,7 @@ public void testParsePythonArchives() { public void testParsePythonExec() { Configuration config = new Configuration(); config.set(PythonOptions.PYTHON_EXECUTABLE, "/usr/bin/python3"); - PythonDependencyInfo dependencyInfo = - PythonDependencyInfo.create(new PythonConfig(config), distributedCache); + PythonDependencyInfo dependencyInfo = PythonDependencyInfo.create(config, distributedCache); assertEquals("/usr/bin/python3", dependencyInfo.getPythonExec()); } diff --git a/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java b/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java index c6e997c620788..2718634b479e4 100644 --- a/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java +++ b/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java @@ -25,25 +25,13 @@ import org.junit.Test; -import java.lang.reflect.InvocationTargetException; import java.util.Collections; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; /** A test class to test PythonConfigUtil getting executionEnvironment correctly. */ public class PythonConfigUtilTest { - @Test - public void testGetEnvironmentConfig() - throws IllegalAccessException, NoSuchFieldException, InvocationTargetException { - StreamExecutionEnvironment executionEnvironment = - StreamExecutionEnvironment.getExecutionEnvironment(); - Configuration envConfig = - PythonConfigUtil.getEnvConfigWithDependencies(executionEnvironment); - assertNotNull(envConfig); - } - @Test public void testJobName() { String jobName = "MyTestJob"; diff --git a/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java b/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java index 46f370be76802..01238c08a8626 100644 --- a/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java +++ b/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java @@ -32,14 +32,14 @@ import java.util.Properties; import java.util.stream.Collectors; +import static org.apache.flink.python.PythonOptions.PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO; import static org.apache.flink.python.PythonOptions.PYTHON_CLIENT_EXECUTABLE; import static org.apache.flink.python.PythonOptions.PYTHON_EXECUTABLE; +import static org.apache.flink.python.PythonOptions.PYTHON_FILES_DISTRIBUTED_CACHE_INFO; import static org.apache.flink.python.PythonOptions.PYTHON_REQUIREMENTS; +import static org.apache.flink.python.PythonOptions.PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO; import static org.apache.flink.python.util.PythonDependencyUtils.CACHE; import static org.apache.flink.python.util.PythonDependencyUtils.FILE; -import static org.apache.flink.python.util.PythonDependencyUtils.PYTHON_ARCHIVES; -import static org.apache.flink.python.util.PythonDependencyUtils.PYTHON_FILES; -import static org.apache.flink.python.util.PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE; import static org.apache.flink.python.util.PythonDependencyUtils.configurePythonDependencies; import static org.apache.flink.python.util.PythonDependencyUtils.merge; import static org.junit.Assert.assertEquals; @@ -74,20 +74,20 @@ public void testPythonFiles() { "tmp_dir/test_dir"); verifyCachedFiles(expectedCachedFiles); - Configuration expectedConfiguration = new Configuration(config); - expectedConfiguration.set(PYTHON_FILES, new HashMap<>()); + Configuration expectedConfiguration = new Configuration(); + expectedConfiguration.set(PYTHON_FILES_DISTRIBUTED_CACHE_INFO, new HashMap<>()); expectedConfiguration - .get(PYTHON_FILES) + .get(PYTHON_FILES_DISTRIBUTED_CACHE_INFO) .put( "python_file_83bbdaee494ad7d9b334c02ec71dc86a0868f7f8e49d1249a37c517dc6ee15a7", "test_file1.py"); expectedConfiguration - .get(PYTHON_FILES) + .get(PYTHON_FILES_DISTRIBUTED_CACHE_INFO) .put( "python_file_e57a895cb1256500098be0874128680cd9f56000d48fcd393c48d6371bd2d947", "test_file2.py"); expectedConfiguration - .get(PYTHON_FILES) + .get(PYTHON_FILES_DISTRIBUTED_CACHE_INFO) .put( "python_file_e56bc55ff643576457b3d012b2bba888727c71cf05a958930f2263398c4e9798", "test_dir"); @@ -106,10 +106,10 @@ public void testPythonRequirements() { "tmp_dir/requirements.txt"); verifyCachedFiles(expectedCachedFiles); - Configuration expectedConfiguration = new Configuration(config); - expectedConfiguration.set(PYTHON_REQUIREMENTS_FILE, new HashMap<>()); + Configuration expectedConfiguration = new Configuration(); + expectedConfiguration.set(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO, new HashMap<>()); expectedConfiguration - .get(PYTHON_REQUIREMENTS_FILE) + .get(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO) .put( FILE, "python_requirements_file_69390ca43c69ada3819226fcfbb5b6d27e111132a9427e7f201edd82e9d65ff6"); @@ -127,15 +127,15 @@ public void testPythonRequirements() { "tmp_dir/cache"); verifyCachedFiles(expectedCachedFiles); - expectedConfiguration = new Configuration(config); - expectedConfiguration.set(PYTHON_REQUIREMENTS_FILE, new HashMap<>()); + expectedConfiguration = new Configuration(); + expectedConfiguration.set(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO, new HashMap<>()); expectedConfiguration - .get(PYTHON_REQUIREMENTS_FILE) + .get(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO) .put( FILE, "python_requirements_file_56fd0c530faaa7129dca8d314cf69cbfc7c1c5c952f5176a003253e2f418873e"); expectedConfiguration - .get(PYTHON_REQUIREMENTS_FILE) + .get(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO) .put( CACHE, "python_requirements_cache_2f563dd6731c2c7c5e1ef1ef8279f61e907dc3bfc698adb71b109e43ed93e143"); @@ -169,25 +169,25 @@ public void testPythonArchives() { "tmp_dir/py37.zip"); verifyCachedFiles(expectedCachedFiles); - Configuration expectedConfiguration = new Configuration(config); - expectedConfiguration.set(PYTHON_ARCHIVES, new HashMap<>()); + Configuration expectedConfiguration = new Configuration(); + expectedConfiguration.set(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO, new HashMap<>()); expectedConfiguration - .get(PYTHON_ARCHIVES) + .get(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO) .put( "python_archive_4cc74e4003de886434723f351771df2a84f72531c52085acc0915e19d70df2ba", "file1.zip"); expectedConfiguration - .get(PYTHON_ARCHIVES) + .get(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO) .put( "python_archive_5f3fca2a4165c7d9c94b00bfab956c15f14c41e9e03f6037c83eb61157fce09c", "py37.zip"); expectedConfiguration - .get(PYTHON_ARCHIVES) + .get(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO) .put( "python_archive_f8a1c874251230f21094880d9dd878ffb5714454b69184d8ad268a6563269f0c", "py37.zip#venv2"); expectedConfiguration - .get(PYTHON_ARCHIVES) + .get(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO) .put( "python_archive_c7d970ce1c5794367974ce8ef536c2343bed8fcfe7c2422c51548e58007eee6a", "py37.zip#venv"); diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java index a26574d7336a5..a14e36375e83f 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java @@ -200,7 +200,6 @@ public PythonFunctionRunner createPythonFunctionRunner() throws Exception { userDefinedFunctionOutputType, STREAM_GROUP_WINDOW_AGGREGATE_URN, getUserDefinedFunctionsProto(), - new HashMap<>(), PythonTestUtils.createMockFlinkMetricContainer(), getKeyedStateBackend(), getKeySerializer(), diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java index d5472fc27a505..2e6e9f377d84f 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java @@ -40,7 +40,6 @@ import org.junit.Test; import java.io.IOException; -import java.util.HashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Function; @@ -262,7 +261,6 @@ public PythonFunctionRunner createPythonFunctionRunner() { outputType, STREAM_GROUP_AGGREGATE_URN, getUserDefinedFunctionsProto(), - new HashMap<>(), PythonTestUtils.createMockFlinkMetricContainer(), getKeyedStateBackend(), getKeySerializer(), diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java index bdef22d144c1e..d4cffdb493598 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java @@ -40,7 +40,6 @@ import org.junit.Test; import java.io.IOException; -import java.util.HashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Function; @@ -275,7 +274,6 @@ public PythonFunctionRunner createPythonFunctionRunner() { outputType, STREAM_GROUP_TABLE_AGGREGATE_URN, getUserDefinedFunctionsProto(), - new HashMap<>(), PythonTestUtils.createMockFlinkMetricContainer(), getKeyedStateBackend(), getKeySerializer(), diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java index ea80b4c154e66..b01bfe9c19def 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java @@ -41,7 +41,6 @@ import org.junit.Test; import java.util.Arrays; -import java.util.HashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -237,7 +236,6 @@ public PythonFunctionRunner createPythonFunctionRunner() { udfOutputType, getFunctionUrn(), getUserDefinedFunctionsProto(), - new HashMap<>(), PythonTestUtils.createMockFlinkMetricContainer(), false); } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java index e762d4ee9668d..531f9f549480e 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java @@ -43,7 +43,6 @@ import org.junit.Test; import java.util.Arrays; -import java.util.HashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -406,7 +405,6 @@ public PythonFunctionRunner createPythonFunctionRunner() { udfOutputType, getFunctionUrn(), getUserDefinedFunctionsProto(), - new HashMap<>(), PythonTestUtils.createMockFlinkMetricContainer(), false); } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java index 991fcdd21dd51..785950c5a6438 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java @@ -42,7 +42,6 @@ import org.junit.Test; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -307,7 +306,6 @@ public PythonFunctionRunner createPythonFunctionRunner() { udfOutputType, getFunctionUrn(), getUserDefinedFunctionsProto(), - new HashMap<>(), PythonTestUtils.createMockFlinkMetricContainer(), true); } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java index d55c615d655e6..3368402f261c8 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java @@ -54,7 +54,6 @@ import java.time.Duration; import java.time.ZoneId; import java.util.Arrays; -import java.util.HashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -521,7 +520,6 @@ public PythonFunctionRunner createPythonFunctionRunner() { udfOutputType, getFunctionUrn(), getUserDefinedFunctionsProto(), - new HashMap<>(), PythonTestUtils.createMockFlinkMetricContainer(), false); } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java index f774cdff79c19..ce866a11f73cb 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java @@ -40,7 +40,6 @@ import org.junit.Test; import java.util.Arrays; -import java.util.HashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** Test for {@link StreamArrowPythonProcTimeBoundedRangeOperator}. */ @@ -167,7 +166,6 @@ public PythonFunctionRunner createPythonFunctionRunner() { udfOutputType, getFunctionUrn(), getUserDefinedFunctionsProto(), - new HashMap<>(), PythonTestUtils.createMockFlinkMetricContainer(), false); } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java index cb48b13bb4085..ee5e14bc42830 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java @@ -40,7 +40,6 @@ import org.junit.Test; import java.util.Arrays; -import java.util.HashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** Test for {@link StreamArrowPythonProcTimeBoundedRowsOperator}. */ @@ -168,7 +167,6 @@ public PythonFunctionRunner createPythonFunctionRunner() { udfOutputType, getFunctionUrn(), getUserDefinedFunctionsProto(), - new HashMap<>(), PythonTestUtils.createMockFlinkMetricContainer(), false); } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java index 3b6017a289c89..60d7dcac318d5 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java @@ -44,7 +44,6 @@ import org.junit.Test; import java.util.Arrays; -import java.util.HashMap; import java.util.concurrent.ConcurrentLinkedQueue; import static org.junit.Assert.assertEquals; @@ -323,7 +322,6 @@ public PythonFunctionRunner createPythonFunctionRunner() { udfOutputType, getFunctionUrn(), getUserDefinedFunctionsProto(), - new HashMap<>(), PythonTestUtils.createMockFlinkMetricContainer(), false); } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java index f45a17aae3df2..c0548dc71bf6f 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java @@ -42,7 +42,6 @@ import org.junit.Test; import java.util.Arrays; -import java.util.HashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -293,7 +292,6 @@ public PythonFunctionRunner createPythonFunctionRunner() { udfOutputType, getFunctionUrn(), getUserDefinedFunctionsProto(), - new HashMap<>(), PythonTestUtils.createMockFlinkMetricContainer(), false); } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java index cce6a70a9c053..d6372f44aa9d5 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java @@ -40,7 +40,6 @@ import java.io.IOException; import java.util.Collection; -import java.util.HashMap; import static org.apache.flink.table.runtime.util.StreamRecordUtils.row; @@ -151,7 +150,6 @@ public PythonFunctionRunner createPythonFunctionRunner() throws IOException { udfOutputType, getFunctionUrn(), getUserDefinedFunctionsProto(), - new HashMap<>(), PythonTestUtils.createMockFlinkMetricContainer()); } } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java index 2b41b2b7c1b41..9bf35613c9c3c 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java @@ -41,7 +41,6 @@ import java.io.IOException; import java.util.Collection; -import java.util.HashMap; import static org.apache.flink.table.runtime.util.StreamRecordUtils.row; @@ -150,7 +149,6 @@ public PythonFunctionRunner createPythonFunctionRunner() throws IOException { udfOutputType, getFunctionUrn(), getUserDefinedFunctionsProto(), - new HashMap<>(), PythonTestUtils.createMockFlinkMetricContainer()); } } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java index 299722537a105..1d38abedeaabb 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java @@ -39,7 +39,6 @@ import org.apache.calcite.rel.core.JoinRelType; import java.util.Collection; -import java.util.HashMap; import static org.apache.flink.table.runtime.util.StreamRecordUtils.row; @@ -132,7 +131,6 @@ public PythonFunctionRunner createPythonFunctionRunner() { udfOutputType, getFunctionUrn(), getUserDefinedFunctionsProto(), - new HashMap<>(), PythonTestUtils.createMockFlinkMetricContainer()); } } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java index fb16068280957..f19b5c60188f5 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java @@ -19,11 +19,11 @@ package org.apache.flink.table.runtime.utils; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.fnexecution.v1.FlinkFnApi; -import org.apache.flink.python.PythonConfig; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.FlinkMetricContainer; import org.apache.flink.table.data.RowData; @@ -37,7 +37,6 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import java.util.Map; import static org.apache.flink.streaming.api.utils.ProtoUtils.createArrowTypeCoderInfoDescriptorProto; @@ -72,7 +71,6 @@ public PassThroughPythonAggregateFunctionRunner( RowType outputType, String functionUrn, FlinkFnApi.UserDefinedFunctions userDefinedFunctions, - Map jobOptions, FlinkMetricContainer flinkMetricContainer, boolean isBatchOverWindow) { super( @@ -80,7 +78,6 @@ public PassThroughPythonAggregateFunctionRunner( environmentManager, functionUrn, userDefinedFunctions, - jobOptions, flinkMetricContainer, null, null, @@ -97,7 +94,7 @@ public PassThroughPythonAggregateFunctionRunner( } @Override - public void open(PythonConfig config) throws Exception { + public void open(ReadableConfig config) throws Exception { super.open(config); bais = new ByteArrayInputStreamWithPos(); baisWrapper = new DataInputViewStreamWrapper(bais); diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java index 4815507f6aa65..e0cd0fbde1c07 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java @@ -29,7 +29,6 @@ import java.util.LinkedList; import java.util.List; -import java.util.Map; import static org.apache.flink.streaming.api.utils.ProtoUtils.createFlattenRowTypeCoderInfoDescriptorProto; @@ -48,14 +47,12 @@ public PassThroughPythonScalarFunctionRunner( RowType outputType, String functionUrn, FlinkFnApi.UserDefinedFunctions userDefinedFunctions, - Map jobOptions, FlinkMetricContainer flinkMetricContainer) { super( taskName, environmentManager, functionUrn, userDefinedFunctions, - jobOptions, flinkMetricContainer, null, null, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java index 94694222b498b..48af1207b6e35 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java @@ -29,7 +29,6 @@ import java.util.LinkedList; import java.util.List; -import java.util.Map; import static org.apache.flink.streaming.api.utils.ProtoUtils.createFlattenRowTypeCoderInfoDescriptorProto; @@ -50,14 +49,12 @@ public PassThroughPythonTableFunctionRunner( RowType outputType, String functionUrn, FlinkFnApi.UserDefinedFunctions userDefinedFunctions, - Map jobOptions, FlinkMetricContainer flinkMetricContainer) { super( taskName, environmentManager, functionUrn, userDefinedFunctions, - jobOptions, flinkMetricContainer, null, null, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java index e2886005c5400..230f2e4b86261 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java @@ -31,7 +31,6 @@ import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.function.Function; import static org.apache.flink.streaming.api.utils.ProtoUtils.createRowTypeCoderInfoDescriptorProto; @@ -54,7 +53,6 @@ public PassThroughStreamAggregatePythonFunctionRunner( RowType outputType, String functionUrn, FlinkFnApi.UserDefinedAggregateFunctions userDefinedFunctions, - Map jobOptions, FlinkMetricContainer flinkMetricContainer, KeyedStateBackend keyedStateBackend, TypeSerializer keySerializer, @@ -64,7 +62,6 @@ public PassThroughStreamAggregatePythonFunctionRunner( environmentManager, functionUrn, userDefinedFunctions, - jobOptions, flinkMetricContainer, keyedStateBackend, keySerializer, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java index 7f977f2dbcf17..a76fc1c7fd845 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java @@ -30,8 +30,6 @@ import org.apache.beam.runners.fnexecution.control.JobBundleFactory; import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct; -import java.util.Map; - import static org.apache.flink.streaming.api.utils.ProtoUtils.createFlattenRowTypeCoderInfoDescriptorProto; /** @@ -50,7 +48,6 @@ public PassThroughStreamGroupWindowAggregatePythonFunctionRunner( RowType outputType, String functionUrn, FlinkFnApi.UserDefinedAggregateFunctions userDefinedFunctions, - Map jobOptions, FlinkMetricContainer flinkMetricContainer, KeyedStateBackend keyedStateBackend, TypeSerializer keySerializer, @@ -60,7 +57,6 @@ public PassThroughStreamGroupWindowAggregatePythonFunctionRunner( environmentManager, functionUrn, userDefinedFunctions, - jobOptions, flinkMetricContainer, keyedStateBackend, keySerializer, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java index f9ed8acefab86..d519a1a220f1d 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java @@ -32,7 +32,6 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.function.Function; import static org.apache.flink.streaming.api.utils.ProtoUtils.createRowTypeCoderInfoDescriptorProto; @@ -56,7 +55,6 @@ public PassThroughStreamTableAggregatePythonFunctionRunner( RowType outputType, String functionUrn, FlinkFnApi.UserDefinedAggregateFunctions userDefinedFunctions, - Map jobOptions, FlinkMetricContainer flinkMetricContainer, KeyedStateBackend keyedStateBackend, TypeSerializer keySerializer, @@ -66,7 +64,6 @@ public PassThroughStreamTableAggregatePythonFunctionRunner( environmentManager, functionUrn, userDefinedFunctions, - jobOptions, flinkMetricContainer, keyedStateBackend, keySerializer, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java index 22b0ef24cd428..b33396fbec0d9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java @@ -21,11 +21,9 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.planner.delegation.PlannerBase; -import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil; import java.util.Optional; @@ -38,10 +36,7 @@ @Internal public final class ExecNodeConfig implements ReadableConfig { - // See https://issues.apache.org/jira/browse/FLINK-26190 - // Used only by CommonPythonUtil#getMergedConfig(StreamExecutionEnvironment, TableConfig)} - // otherwise it can be changed to ReadableConfig. - private final TableConfig tableConfig; + private final ReadableConfig tableConfig; private final ReadableConfig nodeConfig; @@ -50,20 +45,6 @@ public final class ExecNodeConfig implements ReadableConfig { this.tableConfig = tableConfig; } - /** - * Return the {@link PlannerBase#getTableConfig()}. - * - * @return the {@link PlannerBase#getTableConfig()}. - * @deprecated This method is used only for {@link - * CommonPythonUtil#getMergedConfig(StreamExecutionEnvironment, TableConfig)}. It should be - * removed when this method is refactored to accept a {@link ReadableConfig} instead. - */ - // See https://issues.apache.org/jira/browse/FLINK-26190 - @Deprecated - public TableConfig getTableConfig() { - return tableConfig; - } - @Override public T get(ConfigOption option) { return nodeConfig.getOptional(option).orElseGet(() -> tableConfig.get(option)); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java index 1a4c0ed395ecc..1eeca1738fb1e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java @@ -94,7 +94,7 @@ protected Transformation translateToPlanInternal( final RowType inputRowType = (RowType) inputEdge.getOutputType(); final RowType outputRowType = InternalTypeInfo.of(getOutputType()).toRowType(); Configuration pythonConfig = - CommonPythonUtil.getMergedConfig(planner.getExecEnv(), config.getTableConfig()); + CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config); OneInputTransformation transform = createPythonOneInputTransformation( inputTransform, inputRowType, outputRowType, pythonConfig, config); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java index 70c5be91b1d32..72ff42e060340 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java @@ -114,7 +114,7 @@ protected Transformation translateToPlanInternal( final Tuple2 windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window); final Configuration pythonConfig = - CommonPythonUtil.getMergedConfig(planner.getExecEnv(), config.getTableConfig()); + CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config); int groupBufferLimitSize = pythonConfig.getInteger( ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java index c27f084bbe247..c373878fd258a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java @@ -153,7 +153,7 @@ protected Transformation translateToPlanInternal( } } Configuration pythonConfig = - CommonPythonUtil.getMergedConfig(planner.getExecEnv(), config.getTableConfig()); + CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config); OneInputTransformation transform = createPythonOneInputTransformation( inputTransform, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java index 736a0ef982232..42f0b0f66b8dd 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java @@ -103,7 +103,7 @@ protected Transformation translateToPlanInternal( final Transformation inputTransform = (Transformation) inputEdge.translateToPlan(planner); final Configuration pythonConfig = - CommonPythonUtil.getMergedConfig(planner.getExecEnv(), config.getTableConfig()); + CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config); OneInputTransformation ret = createPythonOneInputTransformation(inputTransform, config, pythonConfig); if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(pythonConfig)) { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java index 8607148dda889..cfd18c1f477ba 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java @@ -90,7 +90,7 @@ protected Transformation translateToPlanInternal( final Transformation inputTransform = (Transformation) inputEdge.translateToPlan(planner); final Configuration pythonConfig = - CommonPythonUtil.getMergedConfig(planner.getExecEnv(), config.getTableConfig()); + CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config); OneInputTransformation transform = createPythonOneInputTransformation(inputTransform, config, pythonConfig); if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(pythonConfig)) { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java index d55ac1b7a23ca..0d1cb1da282a5 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java @@ -157,7 +157,7 @@ protected Transformation translateToPlanInternal( PythonAggregateFunctionInfo[] pythonFunctionInfos = aggInfosAndDataViewSpecs.f0; DataViewSpec[][] dataViewSpecs = aggInfosAndDataViewSpecs.f1; Configuration pythonConfig = - CommonPythonUtil.getMergedConfig(planner.getExecEnv(), config.getTableConfig()); + CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config); final OneInputStreamOperator operator = getPythonAggregateFunctionOperator( pythonConfig, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java index 2303f03345c6a..0b4cd55b824e7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java @@ -130,7 +130,7 @@ protected Transformation translateToPlanInternal( PythonAggregateFunctionInfo[] pythonFunctionInfos = aggInfosAndDataViewSpecs.f0; DataViewSpec[][] dataViewSpecs = aggInfosAndDataViewSpecs.f1; Configuration pythonConfig = - CommonPythonUtil.getMergedConfig(planner.getExecEnv(), config.getTableConfig()); + CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config); OneInputStreamOperator pythonOperator = getPythonTableAggregateFunctionOperator( pythonConfig, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java index 88331c116a3cc..71d210988e6e9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java @@ -233,7 +233,7 @@ protected Transformation translateToPlanInternal( WindowAssigner windowAssigner = windowAssignerAndTrigger.f0; Trigger trigger = windowAssignerAndTrigger.f1; Configuration pythonConfig = - CommonPythonUtil.getMergedConfig(planner.getExecEnv(), config.getTableConfig()); + CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config); boolean isGeneralPythonUDAF = Arrays.stream(aggCalls) .anyMatch(x -> PythonUtil.isPythonAggregate(x, PythonFunctionKind.GENERAL)); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java index 1071a80e81714..4b7cdcfb1ac3e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java @@ -179,7 +179,7 @@ protected Transformation translateToPlanInternal( } long precedingOffset = -1 * (long) boundValue; Configuration pythonConfig = - CommonPythonUtil.getMergedConfig(planner.getExecEnv(), config.getTableConfig()); + CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config); OneInputTransformation transform = createPythonOneInputTransformation( inputTransform, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java index 538bfdb37ef6e..d9ac38b3725bf 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java @@ -21,8 +21,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.dataview.DataView; import org.apache.flink.table.api.dataview.ListView; @@ -98,7 +98,7 @@ public class CommonPythonUtil { private CommonPythonUtil() {} - public static Class loadClass(String className) { + public static Class loadClass(String className) { try { return Class.forName(className, false, Thread.currentThread().getContextClassLoader()); } catch (ClassNotFoundException e) { @@ -107,21 +107,22 @@ public static Class loadClass(String className) { } } - @SuppressWarnings("unchecked") - public static Configuration getMergedConfig( - StreamExecutionEnvironment env, TableConfig tableConfig) { - Class clazz = loadClass(PYTHON_CONFIG_UTILS_CLASS); + public static Configuration extractPythonConfiguration( + StreamExecutionEnvironment env, ReadableConfig tableConfig) { + Class clazz = loadClass(PYTHON_CONFIG_UTILS_CLASS); try { StreamExecutionEnvironment realEnv = getRealEnvironment(env); Method method = clazz.getDeclaredMethod( - "getMergedConfig", StreamExecutionEnvironment.class, TableConfig.class); + "extractPythonConfiguration", + StreamExecutionEnvironment.class, + ReadableConfig.class); return (Configuration) method.invoke(null, realEnv, tableConfig); } catch (NoSuchFieldException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { - throw new TableException("Method getMergedConfig accessed failed.", e); + throw new TableException("Method extractPythonConfiguration accessed failed.", e); } } @@ -149,7 +150,7 @@ public static PythonFunctionInfo createPythonFunctionInfo( @SuppressWarnings("unchecked") public static boolean isPythonWorkerUsingManagedMemory(Configuration config) { - Class clazz = loadClass(PYTHON_OPTIONS_CLASS); + Class clazz = loadClass(PYTHON_OPTIONS_CLASS); try { return config.getBoolean( (ConfigOption) (clazz.getField("USE_MANAGED_MEMORY").get(null))); @@ -160,7 +161,7 @@ public static boolean isPythonWorkerUsingManagedMemory(Configuration config) { @SuppressWarnings("unchecked") public static boolean isPythonWorkerInProcessMode(Configuration config) { - Class clazz = loadClass(PYTHON_OPTIONS_CLASS); + Class clazz = loadClass(PYTHON_OPTIONS_CLASS); try { return config.getString( (ConfigOption) @@ -401,12 +402,12 @@ private static byte[] convertLiteralToPython(RexLiteral o, SqlTypeName typeName) return (byte[]) pickleValue.invoke(null, value, type); } - @SuppressWarnings("unchecked") private static void loadPickleValue() { if (pickleValue == null) { synchronized (CommonPythonUtil.class) { if (pickleValue == null) { - Class clazz = loadClass("org.apache.flink.api.common.python.PythonBridgeUtils"); + Class clazz = + loadClass("org.apache.flink.api.common.python.PythonBridgeUtils"); try { pickleValue = clazz.getMethod("pickleValue", Object.class, byte.class); } catch (NoSuchMethodException e) {