Skip to content

Commit

Permalink
[FLINK-19153] Remove deprecated ExecutionMode in flink-core module
Browse files Browse the repository at this point in the history
  • Loading branch information
codenohup authored and reswqa committed Oct 3, 2024
1 parent a69e1f1 commit 8931ce4
Show file tree
Hide file tree
Showing 13 changed files with 27 additions and 551 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DescribedEnum;
Expand All @@ -48,7 +47,6 @@
import java.util.Objects;
import java.util.Optional;

import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.TextElement.text;
import static org.apache.flink.util.Preconditions.checkArgument;

Expand All @@ -61,8 +59,6 @@
* functions that do not define a specific value directly.
* <li>The number of retries in the case of failed executions.
* <li>The delay between execution retries.
* <li>The {@link ExecutionMode} of the program: Batch or Pipelined. The default execution mode is
* {@link ExecutionMode#PIPELINED}
* <li>Enabling or disabling the "closure cleaner". The closure cleaner pre-processes the
* implementations of functions. In case they are (anonymous) inner classes, it removes unused
* references to the enclosing class to fix certain serialization-related problems and to
Expand Down Expand Up @@ -104,22 +100,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
public static final int PARALLELISM_UNKNOWN = -2;

/**
* Internal {@link ConfigOption}s, that are not exposed and it's not possible to configure them
* via config files. We are defining them here, so that we can store them in the {@link
* #configuration}.
*
* <p>If you decide to expose any of those {@link ConfigOption}s, please double-check if the
* key, type and descriptions are sensible, as the initial values are arbitrary.
*/
// --------------------------------------------------------------------------------------------

private static final ConfigOption<ExecutionMode> EXECUTION_MODE =
key("hidden.execution.mode")
.enumType(ExecutionMode.class)
.defaultValue(ExecutionMode.PIPELINED)
.withDescription("Defines how data exchange happens - batch or pipelined");

// --------------------------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -408,46 +388,6 @@ public Optional<SchedulerType> getSchedulerType() {
return configuration.getOptional(JobManagerOptions.SCHEDULER);
}

/**
* Sets the execution mode to execute the program. The execution mode defines whether data
* exchanges are performed in a batch or on a pipelined manner.
*
* <p>The default execution mode is {@link ExecutionMode#PIPELINED}.
*
* @param executionMode The execution mode to use.
* @deprecated The {@link ExecutionMode} is deprecated because it's only used in DataSet APIs.
* All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future
* Flink major version. You can still build your application in DataSet, but you should move
* to either the DataStream and/or Table API.
* @see <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741">
* FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet
* API</a>
*/
@Deprecated
public void setExecutionMode(ExecutionMode executionMode) {
configuration.set(EXECUTION_MODE, executionMode);
}

/**
* Gets the execution mode used to execute the program. The execution mode defines whether data
* exchanges are performed in a batch or on a pipelined manner.
*
* <p>The default execution mode is {@link ExecutionMode#PIPELINED}.
*
* @return The execution mode for the program.
* @deprecated The {@link ExecutionMode} is deprecated because it's only used in DataSet APIs.
* All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future
* Flink major version. You can still build your application in DataSet, but you should move
* to either the DataStream and/or Table API.
* @see <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741">
* FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet
* API</a>
*/
@Deprecated
public ExecutionMode getExecutionMode() {
return configuration.get(EXECUTION_MODE);
}

/**
* Enables the Flink runtime to auto-generate UID's for operators.
*
Expand Down

This file was deleted.

11 changes: 0 additions & 11 deletions flink-python/docs/reference/pyflink.common/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,6 @@ ExecutionConfig
ExecutionConfig


ExecutionMode
-------------

.. currentmodule:: pyflink.common.execution_mode

.. autosummary::
:toctree: api/

ExecutionMode


RestartStrategy
---------------

Expand Down
2 changes: 0 additions & 2 deletions flink-python/pyflink/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
from pyflink.common.config_options import ConfigOption, ConfigOptions
from pyflink.common.configuration import Configuration
from pyflink.common.execution_config import ExecutionConfig
from pyflink.common.execution_mode import ExecutionMode
from pyflink.common.input_dependency_constraint import InputDependencyConstraint
from pyflink.common.job_client import JobClient
from pyflink.common.job_execution_result import JobExecutionResult
Expand Down Expand Up @@ -81,7 +80,6 @@
'SimpleStringSchema',
'Encoder',
'CompletableFuture',
'ExecutionMode',
'InputDependencyConstraint',
'JobClient',
'JobExecutionResult',
Expand Down
39 changes: 0 additions & 39 deletions flink-python/pyflink/common/execution_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from typing import Dict, List

from pyflink.common.execution_mode import ExecutionMode
from pyflink.java_gateway import get_gateway

__all__ = ['ExecutionConfig']
Expand All @@ -36,9 +35,6 @@ class ExecutionConfig(object):
- The delay between execution retries.
- The :class:`ExecutionMode` of the program: Batch or Pipelined.
The default execution mode is :data:`ExecutionMode.PIPELINED`
- Enabling or disabling the "closure cleaner". The closure cleaner pre-processes
the implementations of functions. In case they are (anonymous) inner classes,
it removes unused references to the enclosing class to fix certain serialization-related
Expand Down Expand Up @@ -246,41 +242,6 @@ def set_task_cancellation_timeout(self, timeout: int) -> 'ExecutionConfig':
self._j_execution_config = self._j_execution_config.setTaskCancellationTimeout(timeout)
return self

def set_execution_mode(self, execution_mode: ExecutionMode) -> 'ExecutionConfig':
"""
Sets the execution mode to execute the program. The execution mode defines whether
data exchanges are performed in a batch or on a pipelined manner.
The default execution mode is :data:`ExecutionMode.PIPELINED`.
Example:
::
>>> config.set_execution_mode(ExecutionMode.BATCH)
:param execution_mode: The execution mode to use. The execution mode could be
:data:`ExecutionMode.PIPELINED`,
:data:`ExecutionMode.PIPELINED_FORCED`,
:data:`ExecutionMode.BATCH` or
:data:`ExecutionMode.BATCH_FORCED`.
"""
self._j_execution_config.setExecutionMode(execution_mode._to_j_execution_mode())
return self

def get_execution_mode(self) -> 'ExecutionMode':
"""
Gets the execution mode used to execute the program. The execution mode defines whether
data exchanges are performed in a batch or on a pipelined manner.
The default execution mode is :data:`ExecutionMode.PIPELINED`.
.. seealso:: :func:`set_execution_mode`
:return: The execution mode for the program.
"""
j_execution_mode = self._j_execution_config.getExecutionMode()
return ExecutionMode._from_j_execution_mode(j_execution_mode)

def enable_force_kryo(self) -> 'ExecutionConfig':
"""
Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO.
Expand Down
82 changes: 0 additions & 82 deletions flink-python/pyflink/common/execution_mode.py

This file was deleted.

20 changes: 1 addition & 19 deletions flink-python/pyflink/common/tests/test_execution_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import (ExecutionConfig, ExecutionMode, Configuration)
from pyflink.common import (ExecutionConfig, Configuration)
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import PyFlinkTestCase
from pyflink.util.java_utils import get_j_env_configuration
Expand Down Expand Up @@ -91,24 +91,6 @@ def test_get_set_task_cancellation_timeout(self):

self.assertEqual(self.execution_config.get_task_cancellation_timeout(), 3000)

def test_get_set_execution_mode(self):

self.execution_config.set_execution_mode(ExecutionMode.BATCH)

self.assertEqual(self.execution_config.get_execution_mode(), ExecutionMode.BATCH)

self.execution_config.set_execution_mode(ExecutionMode.PIPELINED)

self.assertEqual(self.execution_config.get_execution_mode(), ExecutionMode.PIPELINED)

self.execution_config.set_execution_mode(ExecutionMode.BATCH_FORCED)

self.assertEqual(self.execution_config.get_execution_mode(), ExecutionMode.BATCH_FORCED)

self.execution_config.set_execution_mode(ExecutionMode.PIPELINED_FORCED)

self.assertEqual(self.execution_config.get_execution_mode(), ExecutionMode.PIPELINED_FORCED)

def test_disable_enable_auto_generated_uids(self):

self.execution_config.disable_auto_generated_uids()
Expand Down
Loading

0 comments on commit 8931ce4

Please sign in to comment.