diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index fc8c3480c2473..9dd729cae5b64 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -24,7 +24,6 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.serialization.SerializerConfigImpl; -import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DescribedEnum; @@ -48,7 +47,6 @@ import java.util.Objects; import java.util.Optional; -import static org.apache.flink.configuration.ConfigOptions.key; import static org.apache.flink.configuration.description.TextElement.text; import static org.apache.flink.util.Preconditions.checkArgument; @@ -61,8 +59,6 @@ * functions that do not define a specific value directly. *
  • The number of retries in the case of failed executions. *
  • The delay between execution retries. - *
  • The {@link ExecutionMode} of the program: Batch or Pipelined. The default execution mode is - * {@link ExecutionMode#PIPELINED} *
  • Enabling or disabling the "closure cleaner". The closure cleaner pre-processes the * implementations of functions. In case they are (anonymous) inner classes, it removes unused * references to the enclosing class to fix certain serialization-related problems and to @@ -104,22 +100,6 @@ public class ExecutionConfig implements Serializable, ArchiveableIf you decide to expose any of those {@link ConfigOption}s, please double-check if the - * key, type and descriptions are sensible, as the initial values are arbitrary. - */ - // -------------------------------------------------------------------------------------------- - - private static final ConfigOption EXECUTION_MODE = - key("hidden.execution.mode") - .enumType(ExecutionMode.class) - .defaultValue(ExecutionMode.PIPELINED) - .withDescription("Defines how data exchange happens - batch or pipelined"); - // -------------------------------------------------------------------------------------------- /** @@ -408,46 +388,6 @@ public Optional getSchedulerType() { return configuration.getOptional(JobManagerOptions.SCHEDULER); } - /** - * Sets the execution mode to execute the program. The execution mode defines whether data - * exchanges are performed in a batch or on a pipelined manner. - * - *

    The default execution mode is {@link ExecutionMode#PIPELINED}. - * - * @param executionMode The execution mode to use. - * @deprecated The {@link ExecutionMode} is deprecated because it's only used in DataSet APIs. - * All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future - * Flink major version. You can still build your application in DataSet, but you should move - * to either the DataStream and/or Table API. - * @see - * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet - * API - */ - @Deprecated - public void setExecutionMode(ExecutionMode executionMode) { - configuration.set(EXECUTION_MODE, executionMode); - } - - /** - * Gets the execution mode used to execute the program. The execution mode defines whether data - * exchanges are performed in a batch or on a pipelined manner. - * - *

    The default execution mode is {@link ExecutionMode#PIPELINED}. - * - * @return The execution mode for the program. - * @deprecated The {@link ExecutionMode} is deprecated because it's only used in DataSet APIs. - * All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future - * Flink major version. You can still build your application in DataSet, but you should move - * to either the DataStream and/or Table API. - * @see - * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet - * API - */ - @Deprecated - public ExecutionMode getExecutionMode() { - return configuration.get(EXECUTION_MODE); - } - /** * Enables the Flink runtime to auto-generate UID's for operators. * diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java deleted file mode 100644 index e60aad819aa28..0000000000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common; - -import org.apache.flink.annotation.Public; - -/** - * The execution mode specifies how a batch program is executed in terms of data exchange: - * pipelining or batched. - * - * @deprecated The {@link ExecutionMode} is deprecated because it's only used in DataSet APIs. All - * Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future Flink - * major version. You can still build your application in DataSet, but you should move to either - * the DataStream and/or Table API. - * @see - * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API - */ -@Deprecated -@Public -public enum ExecutionMode { - - /** - * Executes the program in a pipelined fashion (including shuffles and broadcasts), except for - * data exchanges that are susceptible to deadlocks when pipelining. These data exchanges are - * performed in a batch manner. - * - *

    An example of situations that are susceptible to deadlocks (when executed in a pipelined - * manner) are data flows that branch (one data set consumed by multiple operations) and re-join - * later: - * - *

    {@code
    -     * DataSet data = ...;
    -     * DataSet mapped1 = data.map(new MyMapper());
    -     * DataSet mapped2 = data.map(new AnotherMapper());
    -     * mapped1.join(mapped2).where(...).equalTo(...);
    -     * }
    - */ - PIPELINED, - - /** - * Executes the program in a pipelined fashion (including shuffles and broadcasts), - * including data exchanges that are susceptible to deadlocks when executed via - * pipelining. - * - *

    Usually, {@link #PIPELINED} is the preferable option, which pipelines most data exchanges - * and only uses batch data exchanges in situations that are susceptible to deadlocks. - * - *

    This option should only be used with care and only in situations where the programmer is - * sure that the program is safe for full pipelining and that Flink was too conservative when - * choosing the batch exchange at a certain point. - */ - PIPELINED_FORCED, - - // This is for later, we are missing a bit of infrastructure for this. - // /** - // * The execution mode starts executing the program in a pipelined fashion - // * (except for deadlock prone situations), similar to the {@link #PIPELINED} - // * option. In the case of a task failure, re-execution happens in a batched - // * mode, as defined for the {@link #BATCH} option. - // */ - // PIPELINED_WITH_BATCH_FALLBACK, - - /** - * This mode executes all shuffles and broadcasts in a batch fashion, while pipelining data - * between operations that exchange data only locally between one producer and one consumer. - */ - BATCH, - - /** - * This mode executes the program in a strict batch way, including all points where data is - * forwarded locally from one producer to one consumer. This mode is typically more expensive to - * execute than the {@link #BATCH} mode. It does guarantee that no successive operations are - * ever executed concurrently. - */ - BATCH_FORCED -} diff --git a/flink-python/docs/reference/pyflink.common/config.rst b/flink-python/docs/reference/pyflink.common/config.rst index fa9611e409a53..144cfcd938cbf 100644 --- a/flink-python/docs/reference/pyflink.common/config.rst +++ b/flink-python/docs/reference/pyflink.common/config.rst @@ -51,17 +51,6 @@ ExecutionConfig ExecutionConfig -ExecutionMode -------------- - -.. currentmodule:: pyflink.common.execution_mode - -.. autosummary:: - :toctree: api/ - - ExecutionMode - - RestartStrategy --------------- diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/common/__init__.py index 19f76febc7d47..823285225612a 100644 --- a/flink-python/pyflink/common/__init__.py +++ b/flink-python/pyflink/common/__init__.py @@ -52,7 +52,6 @@ from pyflink.common.config_options import ConfigOption, ConfigOptions from pyflink.common.configuration import Configuration from pyflink.common.execution_config import ExecutionConfig -from pyflink.common.execution_mode import ExecutionMode from pyflink.common.input_dependency_constraint import InputDependencyConstraint from pyflink.common.job_client import JobClient from pyflink.common.job_execution_result import JobExecutionResult @@ -81,7 +80,6 @@ 'SimpleStringSchema', 'Encoder', 'CompletableFuture', - 'ExecutionMode', 'InputDependencyConstraint', 'JobClient', 'JobExecutionResult', diff --git a/flink-python/pyflink/common/execution_config.py b/flink-python/pyflink/common/execution_config.py index db46d95755130..efdf0c2de8d3e 100644 --- a/flink-python/pyflink/common/execution_config.py +++ b/flink-python/pyflink/common/execution_config.py @@ -18,7 +18,6 @@ from typing import Dict, List -from pyflink.common.execution_mode import ExecutionMode from pyflink.java_gateway import get_gateway __all__ = ['ExecutionConfig'] @@ -36,9 +35,6 @@ class ExecutionConfig(object): - The delay between execution retries. - - The :class:`ExecutionMode` of the program: Batch or Pipelined. - The default execution mode is :data:`ExecutionMode.PIPELINED` - - Enabling or disabling the "closure cleaner". The closure cleaner pre-processes the implementations of functions. In case they are (anonymous) inner classes, it removes unused references to the enclosing class to fix certain serialization-related @@ -246,41 +242,6 @@ def set_task_cancellation_timeout(self, timeout: int) -> 'ExecutionConfig': self._j_execution_config = self._j_execution_config.setTaskCancellationTimeout(timeout) return self - def set_execution_mode(self, execution_mode: ExecutionMode) -> 'ExecutionConfig': - """ - Sets the execution mode to execute the program. The execution mode defines whether - data exchanges are performed in a batch or on a pipelined manner. - - The default execution mode is :data:`ExecutionMode.PIPELINED`. - - Example: - :: - - >>> config.set_execution_mode(ExecutionMode.BATCH) - - :param execution_mode: The execution mode to use. The execution mode could be - :data:`ExecutionMode.PIPELINED`, - :data:`ExecutionMode.PIPELINED_FORCED`, - :data:`ExecutionMode.BATCH` or - :data:`ExecutionMode.BATCH_FORCED`. - """ - self._j_execution_config.setExecutionMode(execution_mode._to_j_execution_mode()) - return self - - def get_execution_mode(self) -> 'ExecutionMode': - """ - Gets the execution mode used to execute the program. The execution mode defines whether - data exchanges are performed in a batch or on a pipelined manner. - - The default execution mode is :data:`ExecutionMode.PIPELINED`. - - .. seealso:: :func:`set_execution_mode` - - :return: The execution mode for the program. - """ - j_execution_mode = self._j_execution_config.getExecutionMode() - return ExecutionMode._from_j_execution_mode(j_execution_mode) - def enable_force_kryo(self) -> 'ExecutionConfig': """ Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. diff --git a/flink-python/pyflink/common/execution_mode.py b/flink-python/pyflink/common/execution_mode.py deleted file mode 100644 index 5252024526c27..0000000000000 --- a/flink-python/pyflink/common/execution_mode.py +++ /dev/null @@ -1,82 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -from enum import Enum - -from pyflink.java_gateway import get_gateway - -__all__ = ['ExecutionMode'] - - -class ExecutionMode(Enum): - """ - The execution mode specifies how a batch program is executed in terms - of data exchange: pipelining or batched. - - :data:`PIPELINED`: - - Executes the program in a pipelined fashion (including shuffles and broadcasts), - except for data exchanges that are susceptible to deadlocks when pipelining. - These data exchanges are performed in a batch manner. - - An example of situations that are susceptible to deadlocks (when executed in a - pipelined manner) are data flows that branch (one data set consumed by multiple - operations) and re-join later. - - - :data:`PIPELINED_FORCED`: - - Executes the program in a pipelined fashion (including shuffles and broadcasts), - **including** data exchanges that are susceptible to deadlocks when - executed via pipelining. - - Usually, PIPELINED is the preferable option, which pipelines most - data exchanges and only uses batch data exchanges in situations that are - susceptible to deadlocks. - - This option should only be used with care and only in situations where the - programmer is sure that the program is safe for full pipelining and that - Flink was too conservative when choosing the batch exchange at a certain - point. - - :data:`BATCH`: - - This mode executes all shuffles and broadcasts in a batch fashion, while - pipelining data between operations that exchange data only locally - between one producer and one consumer. - - :data:`BATCH_FORCED`: - - This mode executes the program in a strict batch way, including all points - where data is forwarded locally from one producer to one consumer. This mode - is typically more expensive to execute than the BATCH mode. It does - guarantee that no successive operations are ever executed concurrently. - """ - - PIPELINED = 0 - PIPELINED_FORCED = 1 - BATCH = 2 - BATCH_FORCED = 3 - - @staticmethod - def _from_j_execution_mode(j_execution_mode) -> 'ExecutionMode': - return ExecutionMode[j_execution_mode.name()] - - def _to_j_execution_mode(self): - gateway = get_gateway() - JExecutionMode = gateway.jvm.org.apache.flink.api.common.ExecutionMode - return getattr(JExecutionMode, self.name) diff --git a/flink-python/pyflink/common/tests/test_execution_config.py b/flink-python/pyflink/common/tests/test_execution_config.py index e08e5a23e83c2..3a8c278959f3b 100644 --- a/flink-python/pyflink/common/tests/test_execution_config.py +++ b/flink-python/pyflink/common/tests/test_execution_config.py @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ from pyflink.datastream import StreamExecutionEnvironment -from pyflink.common import (ExecutionConfig, ExecutionMode, Configuration) +from pyflink.common import (ExecutionConfig, Configuration) from pyflink.java_gateway import get_gateway from pyflink.testing.test_case_utils import PyFlinkTestCase from pyflink.util.java_utils import get_j_env_configuration @@ -91,24 +91,6 @@ def test_get_set_task_cancellation_timeout(self): self.assertEqual(self.execution_config.get_task_cancellation_timeout(), 3000) - def test_get_set_execution_mode(self): - - self.execution_config.set_execution_mode(ExecutionMode.BATCH) - - self.assertEqual(self.execution_config.get_execution_mode(), ExecutionMode.BATCH) - - self.execution_config.set_execution_mode(ExecutionMode.PIPELINED) - - self.assertEqual(self.execution_config.get_execution_mode(), ExecutionMode.PIPELINED) - - self.execution_config.set_execution_mode(ExecutionMode.BATCH_FORCED) - - self.assertEqual(self.execution_config.get_execution_mode(), ExecutionMode.BATCH_FORCED) - - self.execution_config.set_execution_mode(ExecutionMode.PIPELINED_FORCED) - - self.assertEqual(self.execution_config.get_execution_mode(), ExecutionMode.PIPELINED_FORCED) - def test_disable_enable_auto_generated_uids(self): self.execution_config.disable_auto_generated_uids() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/DataExchangeMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/DataExchangeMode.java deleted file mode 100644 index 74e63f760ba59..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/DataExchangeMode.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network; - -import org.apache.flink.api.common.ExecutionMode; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; - -/** Defines how the data exchange between two specific operators happens. */ -public enum DataExchangeMode { - - /** - * The data exchange is streamed, sender and receiver are online at the same time, and the - * receiver back-pressures the sender. - */ - PIPELINED, - - /** - * The data exchange is decoupled. The sender first produces its entire result and finishes. - * After that, the receiver is started and may consume the data. - */ - BATCH, - - /** - * The data exchange starts like in {@link #PIPELINED} and falls back to {@link #BATCH} for - * recovery runs. - */ - PIPELINE_WITH_BATCH_FALLBACK; - - // ------------------------------------------------------------------------ - - public static DataExchangeMode getForForwardExchange(ExecutionMode mode) { - return FORWARD[mode.ordinal()]; - } - - public static DataExchangeMode getForShuffleOrBroadcast(ExecutionMode mode) { - return SHUFFLE[mode.ordinal()]; - } - - public static DataExchangeMode getPipelineBreakingExchange(ExecutionMode mode) { - return BREAKING[mode.ordinal()]; - } - - /** - * Computes the mode of data exchange to be used for a given execution mode and ship strategy. - * The type of the data exchange depends also on whether this connection has been identified to - * require pipeline breaking for deadlock avoidance. - * - *

      - *
    • If the connection is set to be pipeline breaking, this returns the pipeline breaking - * variant of the execution mode {@link - * org.apache.flink.runtime.io.network.DataExchangeMode#getPipelineBreakingExchange(org.apache.flink.api.common.ExecutionMode)}. - *
    • If the data exchange is a simple FORWARD (one-to-one communication), this returns - * {@link - * org.apache.flink.runtime.io.network.DataExchangeMode#getForForwardExchange(org.apache.flink.api.common.ExecutionMode)}. - *
    • If otherwise, this returns {@link - * org.apache.flink.runtime.io.network.DataExchangeMode#getForShuffleOrBroadcast(org.apache.flink.api.common.ExecutionMode)}. - *
    - * - * @param shipStrategy The ship strategy (FORWARD, PARTITION, BROADCAST, ...) of the runtime - * data exchange. - * @return The data exchange mode for the connection, given the concrete ship strategy. - */ - public static DataExchangeMode select( - ExecutionMode executionMode, ShipStrategyType shipStrategy, boolean breakPipeline) { - - if (shipStrategy == null || shipStrategy == ShipStrategyType.NONE) { - throw new IllegalArgumentException("shipStrategy may not be null or NONE"); - } - if (executionMode == null) { - throw new IllegalArgumentException("executionMode may not mbe null"); - } - - if (breakPipeline) { - return getPipelineBreakingExchange(executionMode); - } else if (shipStrategy == ShipStrategyType.FORWARD) { - return getForForwardExchange(executionMode); - } else { - return getForShuffleOrBroadcast(executionMode); - } - } - - // ------------------------------------------------------------------------ - - private static final DataExchangeMode[] FORWARD = - new DataExchangeMode[ExecutionMode.values().length]; - - private static final DataExchangeMode[] SHUFFLE = - new DataExchangeMode[ExecutionMode.values().length]; - - private static final DataExchangeMode[] BREAKING = - new DataExchangeMode[ExecutionMode.values().length]; - - // initialize the map between execution modes and exchange modes in - static { - FORWARD[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED; - SHUFFLE[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED; - BREAKING[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED; - - FORWARD[ExecutionMode.PIPELINED.ordinal()] = PIPELINED; - SHUFFLE[ExecutionMode.PIPELINED.ordinal()] = PIPELINED; - BREAKING[ExecutionMode.PIPELINED.ordinal()] = BATCH; - - FORWARD[ExecutionMode.BATCH.ordinal()] = PIPELINED; - SHUFFLE[ExecutionMode.BATCH.ordinal()] = BATCH; - BREAKING[ExecutionMode.BATCH.ordinal()] = BATCH; - - FORWARD[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH; - SHUFFLE[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH; - BREAKING[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH; - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java deleted file mode 100644 index da5506c78c4f4..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network; - -import org.apache.flink.api.common.ExecutionMode; - -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -/** This test verifies that the data exchange modes are defined for every execution mode. */ -class DataExchangeModeTest { - - @Test - void testForward() { - for (ExecutionMode mode : ExecutionMode.values()) { - assertThat(DataExchangeMode.getForForwardExchange(mode)).isNotNull(); - } - } - - @Test - void testShuffleAndBroadcast() { - for (ExecutionMode mode : ExecutionMode.values()) { - assertThat(DataExchangeMode.getForShuffleOrBroadcast(mode)).isNotNull(); - } - } - - @Test - void testPipelineBreaking() { - for (ExecutionMode mode : ExecutionMode.values()) { - assertThat(DataExchangeMode.getPipelineBreakingExchange(mode)).isNotNull(); - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java index 5a049d9817260..20ee4d943004c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.scheduler.benchmark; -import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -26,15 +26,15 @@ /** * {@link JobConfiguration} contains the configuration of a STREAMING/BATCH job. It concludes {@link - * DistributionPattern}, {@link ResultPartitionType}, {@link JobType}, {@link ExecutionMode}, {@link - * HybridPartitionDataConsumeConstraint}. + * DistributionPattern}, {@link ResultPartitionType}, {@link JobType}, {@link RuntimeExecutionMode}, + * {@link HybridPartitionDataConsumeConstraint}. */ public enum JobConfiguration { STREAMING( DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, JobType.STREAMING, - ExecutionMode.PIPELINED, + RuntimeExecutionMode.STREAMING, 4000, false), @@ -42,7 +42,7 @@ public enum JobConfiguration { DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, JobType.BATCH, - ExecutionMode.BATCH, + RuntimeExecutionMode.BATCH, 4000, false), @@ -50,7 +50,7 @@ public enum JobConfiguration { DistributionPattern.ALL_TO_ALL, ResultPartitionType.HYBRID_FULL, JobType.BATCH, - ExecutionMode.BATCH, + RuntimeExecutionMode.BATCH, HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS, 4000, false), @@ -59,7 +59,7 @@ public enum JobConfiguration { DistributionPattern.ALL_TO_ALL, ResultPartitionType.HYBRID_FULL, JobType.BATCH, - ExecutionMode.BATCH, + RuntimeExecutionMode.BATCH, HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS, 4000, false), @@ -68,7 +68,7 @@ public enum JobConfiguration { DistributionPattern.ALL_TO_ALL, ResultPartitionType.HYBRID_FULL, JobType.BATCH, - ExecutionMode.BATCH, + RuntimeExecutionMode.BATCH, HybridPartitionDataConsumeConstraint.ALL_PRODUCERS_FINISHED, 4000, false), @@ -77,7 +77,7 @@ public enum JobConfiguration { DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, JobType.STREAMING, - ExecutionMode.PIPELINED, + RuntimeExecutionMode.STREAMING, 10, false), @@ -85,7 +85,7 @@ public enum JobConfiguration { DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, JobType.BATCH, - ExecutionMode.BATCH, + RuntimeExecutionMode.BATCH, 10, false), @@ -93,7 +93,7 @@ public enum JobConfiguration { DistributionPattern.ALL_TO_ALL, ResultPartitionType.HYBRID_FULL, JobType.BATCH, - ExecutionMode.BATCH, + RuntimeExecutionMode.BATCH, HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS, 10, false), @@ -102,7 +102,7 @@ public enum JobConfiguration { DistributionPattern.ALL_TO_ALL, ResultPartitionType.HYBRID_FULL, JobType.BATCH, - ExecutionMode.BATCH, + RuntimeExecutionMode.BATCH, HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS, 10, false), @@ -111,7 +111,7 @@ public enum JobConfiguration { DistributionPattern.ALL_TO_ALL, ResultPartitionType.HYBRID_FULL, JobType.BATCH, - ExecutionMode.BATCH, + RuntimeExecutionMode.BATCH, HybridPartitionDataConsumeConstraint.ALL_PRODUCERS_FINISHED, 10, false), @@ -120,7 +120,7 @@ public enum JobConfiguration { DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, JobType.STREAMING, - ExecutionMode.PIPELINED, + RuntimeExecutionMode.STREAMING, 4000, true), @@ -128,7 +128,7 @@ public enum JobConfiguration { DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, JobType.BATCH, - ExecutionMode.BATCH, + RuntimeExecutionMode.BATCH, 4000, true), @@ -136,7 +136,7 @@ public enum JobConfiguration { DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, JobType.STREAMING, - ExecutionMode.PIPELINED, + RuntimeExecutionMode.STREAMING, 10, true), @@ -144,7 +144,7 @@ public enum JobConfiguration { DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, JobType.BATCH, - ExecutionMode.BATCH, + RuntimeExecutionMode.STREAMING, 10, true); @@ -152,7 +152,7 @@ public enum JobConfiguration { private final DistributionPattern distributionPattern; private final ResultPartitionType resultPartitionType; private final JobType jobType; - private final ExecutionMode executionMode; + private final RuntimeExecutionMode runtimeExecutionMode; private final boolean evenlySpreadOutSlots; private final HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint; @@ -160,14 +160,14 @@ public enum JobConfiguration { DistributionPattern distributionPattern, ResultPartitionType resultPartitionType, JobType jobType, - ExecutionMode executionMode, + RuntimeExecutionMode runtimeExecutionMode, int parallelism, boolean evenlySpreadOutSlots) { this( distributionPattern, resultPartitionType, jobType, - executionMode, + runtimeExecutionMode, HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS, parallelism, evenlySpreadOutSlots); @@ -177,14 +177,14 @@ public enum JobConfiguration { DistributionPattern distributionPattern, ResultPartitionType resultPartitionType, JobType jobType, - ExecutionMode executionMode, + RuntimeExecutionMode runtimeExecutionMode, HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint, int parallelism, boolean evenlySpreadOutSlots) { this.distributionPattern = distributionPattern; this.resultPartitionType = resultPartitionType; this.jobType = jobType; - this.executionMode = executionMode; + this.runtimeExecutionMode = runtimeExecutionMode; this.hybridPartitionDataConsumeConstraint = hybridPartitionDataConsumeConstraint; this.parallelism = parallelism; this.evenlySpreadOutSlots = evenlySpreadOutSlots; @@ -206,8 +206,8 @@ public JobType getJobType() { return jobType; } - public ExecutionMode getExecutionMode() { - return executionMode; + public RuntimeExecutionMode getRuntimeExecutionMode() { + return runtimeExecutionMode; } public HybridPartitionDataConsumeConstraint getHybridPartitionDataConsumeConstraint() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java index a60b3fd584b29..af718fdd7cfca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java @@ -95,9 +95,7 @@ public static JobGraph createJobGraph( jobGraph.setJobType(jobConfiguration.getJobType()); - final ExecutionConfig executionConfig = new ExecutionConfig(); - executionConfig.setExecutionMode(jobConfiguration.getExecutionMode()); - jobGraph.setExecutionConfig(executionConfig); + jobGraph.setExecutionConfig(new ExecutionConfig()); return jobGraph; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java index 6c65053c46e86..a35494a714337 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java @@ -24,48 +24,10 @@ import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; -import java.util.Iterator; - /** A collection of utilities for {@link DataStream DataStreams}. */ @Experimental public final class DataStreamUtils { - /** - * Triggers the distributed execution of the streaming dataflow and returns an iterator over the - * elements of the given DataStream. - * - *

    The DataStream application is executed in the regular distributed manner on the target - * environment, and the events from the stream are polled back to this application process and - * thread through Flink's REST API. - * - * @deprecated Please use {@link DataStream#executeAndCollect()}. - */ - @Deprecated - public static Iterator collect(DataStream stream) { - return collect(stream, "Data Stream Collect"); - } - - /** - * Triggers the distributed execution of the streaming dataflow and returns an iterator over the - * elements of the given DataStream. - * - *

    The DataStream application is executed in the regular distributed manner on the target - * environment, and the events from the stream are polled back to this application process and - * thread through Flink's REST API. - * - * @deprecated Please use {@link DataStream#executeAndCollect()}. - */ - @Deprecated - public static Iterator collect(DataStream stream, String executionJobName) { - try { - return stream.executeAndCollect(executionJobName); - } catch (Exception e) { - // this "wrap as unchecked" step is here only to preserve the exception signature - // backwards compatible. - throw new RuntimeException("Failed to execute data stream", e); - } - } - // ------------------------------------------------------------------------ // Deriving a KeyedStream from a stream already partitioned by key // without a shuffle diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java index b46910033dbd8..1a7843dc88d07 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.test.recovery; -import org.apache.flink.api.common.ExecutionMode; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.java.tuple.Tuple2; @@ -274,9 +273,7 @@ private static StreamExecutionEnvironment createExecutionEnvironment() { StreamExecutionEnvironment env = new TestStreamEnvironment(miniCluster, 1); RestartStrategyUtils.configureFixedDelayRestartStrategy( env, MAX_JOB_RESTART_ATTEMPTS, Duration.ofMillis(10)); - env.getConfig() - .setExecutionMode( - ExecutionMode.BATCH_FORCED); // forces all partitions to be blocking + env.setRuntimeMode(RuntimeExecutionMode.BATCH); return env; }