diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 3e591162ba91d..7b53a79a147d5 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -33,13 +33,13 @@ import org.apache.flink.client.cli.ExecutionConfigAccessor; import org.apache.flink.client.deployment.ClusterClientJobClientAdapter; import org.apache.flink.client.testjar.ForbidConfigurationJob; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.core.execution.DetachedJobExecutionResult; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.PipelineExecutor; @@ -101,8 +101,7 @@ void setUp() { config = new Configuration(); config.set(JobManagerOptions.ADDRESS, "localhost"); - config.set( - AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue()); + config.set(RpcOptions.ASK_TIMEOUT_DURATION, RpcOptions.ASK_TIMEOUT_DURATION.defaultValue()); } private Configuration fromPackagedProgram( diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index 5f3cccc495b89..236ec3e3511a9 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -18,154 +18,78 @@ package org.apache.flink.configuration; -import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.description.Description; import org.apache.flink.util.TimeUtils; import java.time.Duration; import static org.apache.flink.configuration.description.LinkElement.link; -import static org.apache.flink.configuration.description.TextElement.code; -/** Akka configuration options. */ +/** + * RPC configuration options. + * + * @deprecated Use {@link RpcOptions} instead. + */ @PublicEvolving +@Deprecated // since 1.19.0 public class AkkaOptions { - @Internal - @Documentation.ExcludeFromDocumentation("Internal use only") - public static final ConfigOption FORCE_RPC_INVOCATION_SERIALIZATION = - ConfigOptions.key("pekko.rpc.force-invocation-serialization") - .booleanType() - .defaultValue(false) - .withDeprecatedKeys("akka.rpc.force-invocation-serialization") - .withDescription( - Description.builder() - .text( - "Forces the serialization of all RPC invocations (that are not explicitly annotated with %s)." - + "This option can be used to find serialization issues in the argument/response types without relying requiring HA setups." - + "This option should not be enabled in production.", - code("org.apache.flink.runtime.rpc.Local")) - .build()); - public static boolean isForceRpcInvocationSerializationEnabled(Configuration config) { - return config.getOptional(FORCE_RPC_INVOCATION_SERIALIZATION) - .orElse( - FORCE_RPC_INVOCATION_SERIALIZATION.defaultValue() - || System.getProperties() - .containsKey(FORCE_RPC_INVOCATION_SERIALIZATION.key())); + return RpcOptions.isForceRpcInvocationSerializationEnabled(config); } /** Flag whether to capture call stacks for RPC ask calls. */ public static final ConfigOption CAPTURE_ASK_CALLSTACK = - ConfigOptions.key("pekko.ask.callstack") - .booleanType() - .defaultValue(true) - .withDeprecatedKeys("akka.ask.callstack") - .withDescription( - "If true, call stack for asynchronous asks are captured. That way, when an ask fails " - + "(for example times out), you get a proper exception, describing to the original method call and " - + "call site. Note that in case of having millions of concurrent RPC calls, this may add to the " - + "memory footprint."); + RpcOptions.CAPTURE_ASK_CALLSTACK; /** Timeout for Pekko ask calls. */ public static final ConfigOption ASK_TIMEOUT_DURATION = - ConfigOptions.key("pekko.ask.timeout") - .durationType() - .defaultValue(Duration.ofSeconds(10)) - .withDeprecatedKeys("akka.ask.timeout") - .withDescription( - "Timeout used for all futures and blocking Pekko calls. If Flink fails due to timeouts then you" - + " should try to increase this value. Timeouts can be caused by slow machines or a congested network. The" - + " timeout value requires a time-unit specifier (ms/s/min/h/d)."); + RpcOptions.ASK_TIMEOUT_DURATION; - /** @deprecated Use {@link #ASK_TIMEOUT_DURATION} */ + /** @deprecated Use {@link RpcOptions#ASK_TIMEOUT_DURATION} */ @Deprecated public static final ConfigOption ASK_TIMEOUT = - ConfigOptions.key(ASK_TIMEOUT_DURATION.key()) + ConfigOptions.key(RpcOptions.ASK_TIMEOUT_DURATION.key()) .stringType() .defaultValue( - TimeUtils.formatWithHighestUnit(ASK_TIMEOUT_DURATION.defaultValue())) - .withDescription(ASK_TIMEOUT_DURATION.description()); + TimeUtils.formatWithHighestUnit( + RpcOptions.ASK_TIMEOUT_DURATION.defaultValue())) + .withDescription(RpcOptions.ASK_TIMEOUT_DURATION.description()); /** The Pekko tcp connection timeout. */ - public static final ConfigOption TCP_TIMEOUT = - ConfigOptions.key("pekko.tcp.timeout") - .stringType() - .defaultValue("20 s") - .withDeprecatedKeys("akka.tcp.timeout") - .withDescription( - "Timeout for all outbound connections. If you should experience problems with connecting to a" - + " TaskManager due to a slow network, you should increase this value."); + public static final ConfigOption TCP_TIMEOUT = RpcOptions.TCP_TIMEOUT; /** Timeout for the startup of the actor system. */ - public static final ConfigOption STARTUP_TIMEOUT = - ConfigOptions.key("pekko.startup-timeout") - .stringType() - .noDefaultValue() - .withDeprecatedKeys("akka.startup-timeout") - .withDescription( - "Timeout after which the startup of a remote component is considered being failed."); + public static final ConfigOption STARTUP_TIMEOUT = RpcOptions.STARTUP_TIMEOUT; /** Override SSL support for the Pekko transport. */ - public static final ConfigOption SSL_ENABLED = - ConfigOptions.key("pekko.ssl.enabled") - .booleanType() - .defaultValue(true) - .withDeprecatedKeys("akka.ssl.enabled") - .withDescription( - "Turns on SSL for Pekko’s remote communication. This is applicable only when the global ssl flag" - + " security.ssl.enabled is set to true."); + public static final ConfigOption SSL_ENABLED = RpcOptions.SSL_ENABLED; /** Maximum framesize of Pekko messages. */ - public static final ConfigOption FRAMESIZE = - ConfigOptions.key("pekko.framesize") - .stringType() - .defaultValue("10485760b") - .withDeprecatedKeys("akka.framesize") - .withDescription( - "Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink" - + " fails because messages exceed this limit, then you should increase it. The message size requires a" - + " size-unit specifier."); + public static final ConfigOption FRAMESIZE = RpcOptions.FRAMESIZE; /** Maximum number of messages until another actor is executed by the same thread. */ public static final ConfigOption DISPATCHER_THROUGHPUT = - ConfigOptions.key("pekko.throughput") - .intType() - .defaultValue(15) - .withDeprecatedKeys("akka.throughput") - .withDescription( - "Number of messages that are processed in a batch before returning the thread to the pool. Low" - + " values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness."); + RpcOptions.DISPATCHER_THROUGHPUT; /** Log lifecycle events. */ public static final ConfigOption LOG_LIFECYCLE_EVENTS = - ConfigOptions.key("pekko.log.lifecycle.events") - .booleanType() - .defaultValue(false) - .withDeprecatedKeys("akka.log.lifecycle.events") - .withDescription( - "Turns on the Pekko’s remote logging of events. Set this value to 'true' in case of debugging."); + RpcOptions.LOG_LIFECYCLE_EVENTS; /** Timeout for all blocking calls that look up remote actors. */ public static final ConfigOption LOOKUP_TIMEOUT_DURATION = - ConfigOptions.key("pekko.lookup.timeout") - .durationType() - .defaultValue(Duration.ofSeconds(10)) - .withDeprecatedKeys("akka.lookup.timeout") - .withDescription( - "Timeout used for the lookup of the JobManager. The timeout value has to contain a time-unit" - + " specifier (ms/s/min/h/d)."); + RpcOptions.LOOKUP_TIMEOUT_DURATION; - /** @deprecated use {@link #LOOKUP_TIMEOUT_DURATION} */ + /** @deprecated use {@link RpcOptions#LOOKUP_TIMEOUT_DURATION} */ @Deprecated public static final ConfigOption LOOKUP_TIMEOUT = - ConfigOptions.key(LOOKUP_TIMEOUT_DURATION.key()) + ConfigOptions.key(RpcOptions.LOOKUP_TIMEOUT_DURATION.key()) .stringType() .defaultValue( - TimeUtils.formatWithHighestUnit(LOOKUP_TIMEOUT_DURATION.defaultValue())) - .withDescription(LOOKUP_TIMEOUT_DURATION.description()); + TimeUtils.formatWithHighestUnit( + RpcOptions.LOOKUP_TIMEOUT_DURATION.defaultValue())) + .withDescription(RpcOptions.LOOKUP_TIMEOUT_DURATION.description()); /** * Timeout for all blocking calls on the client side. @@ -183,167 +107,58 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con /** Exit JVM on fatal Pekko errors. */ public static final ConfigOption JVM_EXIT_ON_FATAL_ERROR = - ConfigOptions.key("pekko.jvm-exit-on-fatal-error") - .booleanType() - .defaultValue(true) - .withDeprecatedKeys("akka.jvm-exit-on-fatal-error") - .withDescription("Exit JVM on fatal Pekko errors."); + RpcOptions.JVM_EXIT_ON_FATAL_ERROR; /** Milliseconds a gate should be closed for after a remote connection was disconnected. */ - public static final ConfigOption RETRY_GATE_CLOSED_FOR = - ConfigOptions.key("pekko.retry-gate-closed-for") - .longType() - .defaultValue(50L) - .withDeprecatedKeys("akka.retry-gate-closed-for") - .withDescription( - "Milliseconds a gate should be closed for after a remote connection was disconnected."); + public static final ConfigOption RETRY_GATE_CLOSED_FOR = RpcOptions.RETRY_GATE_CLOSED_FOR; // ================================================== // Configurations for fork-join-executor. // ================================================== public static final ConfigOption FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR = - ConfigOptions.key("pekko.fork-join-executor.parallelism-factor") - .doubleType() - .defaultValue(2.0) - .withDeprecatedKeys("akka.fork-join-executor.parallelism-factor") - .withDescription( - Description.builder() - .text( - "The parallelism factor is used to determine thread pool size using the" - + " following formula: ceil(available processors * factor). Resulting size" - + " is then bounded by the parallelism-min and parallelism-max values.") - .build()); + RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR; public static final ConfigOption FORK_JOIN_EXECUTOR_PARALLELISM_MIN = - ConfigOptions.key("pekko.fork-join-executor.parallelism-min") - .intType() - .defaultValue(8) - .withDeprecatedKeys("akka.fork-join-executor.parallelism-min") - .withDescription( - Description.builder() - .text( - "Min number of threads to cap factor-based parallelism number to.") - .build()); + RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN; public static final ConfigOption FORK_JOIN_EXECUTOR_PARALLELISM_MAX = - ConfigOptions.key("pekko.fork-join-executor.parallelism-max") - .intType() - .defaultValue(64) - .withDeprecatedKeys("akka.fork-join-executor.parallelism-max") - .withDescription( - Description.builder() - .text( - "Max number of threads to cap factor-based parallelism number to.") - .build()); + RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX; public static final ConfigOption REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR = - ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-factor") - .doubleType() - .defaultValue(2.0) - .withDescription( - Description.builder() - .text( - "The parallelism factor is used to determine thread pool size using the" - + " following formula: ceil(available processors * factor). Resulting size" - + " is then bounded by the parallelism-min and parallelism-max values.") - .build()); + RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR; public static final ConfigOption REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN = - ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-min") - .intType() - .defaultValue(8) - .withDescription( - Description.builder() - .text( - "Min number of threads to cap factor-based parallelism number to.") - .build()); + RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN; public static final ConfigOption REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX = - ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-max") - .intType() - .defaultValue(16) - .withDescription( - Description.builder() - .text( - "Max number of threads to cap factor-based parallelism number to.") - .build()); + RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX; // ================================================== // Configurations for client-socket-work-pool. // ================================================== public static final ConfigOption CLIENT_SOCKET_WORKER_POOL_SIZE_MIN = - ConfigOptions.key("pekko.client-socket-worker-pool.pool-size-min") - .intType() - .defaultValue(1) - .withDeprecatedKeys("akka.client-socket-worker-pool.pool-size-min") - .withDescription( - Description.builder() - .text("Min number of threads to cap factor-based number to.") - .build()); + RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN; public static final ConfigOption CLIENT_SOCKET_WORKER_POOL_SIZE_MAX = - ConfigOptions.key("pekko.client-socket-worker-pool.pool-size-max") - .intType() - .defaultValue(2) - .withDeprecatedKeys("akka.client-socket-worker-pool.pool-size-max") - .withDescription( - Description.builder() - .text("Max number of threads to cap factor-based number to.") - .build()); + RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX; public static final ConfigOption CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR = - ConfigOptions.key("pekko.client-socket-worker-pool.pool-size-factor") - .doubleType() - .defaultValue(1.0) - .withDeprecatedKeys("akka.client-socket-worker-pool.pool-size-factor") - .withDescription( - Description.builder() - .text( - "The pool size factor is used to determine thread pool size" - + " using the following formula: ceil(available processors * factor)." - + " Resulting size is then bounded by the pool-size-min and" - + " pool-size-max values.") - .build()); + RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR; // ================================================== // Configurations for server-socket-work-pool. // ================================================== public static final ConfigOption SERVER_SOCKET_WORKER_POOL_SIZE_MIN = - ConfigOptions.key("pekko.server-socket-worker-pool.pool-size-min") - .intType() - .defaultValue(1) - .withDeprecatedKeys("akka.server-socket-worker-pool.pool-size-min") - .withDescription( - Description.builder() - .text("Min number of threads to cap factor-based number to.") - .build()); + RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN; public static final ConfigOption SERVER_SOCKET_WORKER_POOL_SIZE_MAX = - ConfigOptions.key("pekko.server-socket-worker-pool.pool-size-max") - .intType() - .defaultValue(2) - .withDeprecatedKeys("akka.server-socket-worker-pool.pool-size-max") - .withDescription( - Description.builder() - .text("Max number of threads to cap factor-based number to.") - .build()); + RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX; public static final ConfigOption SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR = - ConfigOptions.key("pekko.server-socket-worker-pool.pool-size-factor") - .doubleType() - .defaultValue(1.0) - .withDeprecatedKeys("akka.server-socket-worker-pool.pool-size-factor") - .withDescription( - Description.builder() - .text( - "The pool size factor is used to determine thread pool size" - + " using the following formula: ceil(available processors * factor)." - + " Resulting size is then bounded by the pool-size-min and" - + " pool-size-max values.") - .build()); + RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR; // ================================================== // Deprecated options diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index b7751abd07b76..58a33a6060a2f 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -764,14 +764,14 @@ public final class ConfigConstants { /** * Timeout for the startup of the actor system. * - * @deprecated Use {@link AkkaOptions#STARTUP_TIMEOUT} instead. + * @deprecated Use {@link RpcOptions#STARTUP_TIMEOUT} instead. */ @Deprecated public static final String AKKA_STARTUP_TIMEOUT = "akka.startup-timeout"; /** * Heartbeat interval of the transport failure detector. * - * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead. + * @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead. */ @Deprecated public static final String AKKA_TRANSPORT_HEARTBEAT_INTERVAL = @@ -780,7 +780,7 @@ public final class ConfigConstants { /** * Allowed heartbeat pause for the transport failure detector. * - * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead. + * @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_PAUSE} instead. */ @Deprecated public static final String AKKA_TRANSPORT_HEARTBEAT_PAUSE = "akka.transport.heartbeat.pause"; @@ -788,7 +788,7 @@ public final class ConfigConstants { /** * Detection threshold of transport failure detector. * - * @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead. + * @deprecated Use {@link RpcOptions#TRANSPORT_THRESHOLD} instead. */ @Deprecated public static final String AKKA_TRANSPORT_THRESHOLD = "akka.transport.threshold"; @@ -818,49 +818,49 @@ public final class ConfigConstants { /** * Akka TCP timeout. * - * @deprecated Use {@link AkkaOptions#TCP_TIMEOUT} instead. + * @deprecated Use {@link RpcOptions#TCP_TIMEOUT} instead. */ @Deprecated public static final String AKKA_TCP_TIMEOUT = "akka.tcp.timeout"; /** * Override SSL support for the Akka transport. * - * @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead. + * @deprecated Use {@link RpcOptions#SSL_ENABLED} instead. */ @Deprecated public static final String AKKA_SSL_ENABLED = "akka.ssl.enabled"; /** * Maximum framesize of akka messages. * - * @deprecated Use {@link AkkaOptions#FRAMESIZE} instead. + * @deprecated Use {@link RpcOptions#FRAMESIZE} instead. */ @Deprecated public static final String AKKA_FRAMESIZE = "akka.framesize"; /** * Maximum number of messages until another actor is executed by the same thread. * - * @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead. + * @deprecated Use {@link RpcOptions#DISPATCHER_THROUGHPUT} instead. */ @Deprecated public static final String AKKA_DISPATCHER_THROUGHPUT = "akka.throughput"; /** * Log lifecycle events. * - * @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead. + * @deprecated Use {@link RpcOptions#LOG_LIFECYCLE_EVENTS} instead. */ @Deprecated public static final String AKKA_LOG_LIFECYCLE_EVENTS = "akka.log.lifecycle.events"; /** * Timeout for all blocking calls on the cluster side. * - * @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead. + * @deprecated Use {@link RpcOptions#ASK_TIMEOUT_DURATION} instead. */ @Deprecated public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout"; /** * Timeout for all blocking calls that look up remote actors. * - * @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead. + * @deprecated Use {@link RpcOptions#LOOKUP_TIMEOUT_DURATION} instead. */ @Deprecated public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout"; @@ -874,7 +874,7 @@ public final class ConfigConstants { /** * Exit JVM on fatal Akka errors. * - * @deprecated Use {@link AkkaOptions#JVM_EXIT_ON_FATAL_ERROR} instead. + * @deprecated Use {@link RpcOptions#JVM_EXIT_ON_FATAL_ERROR} instead. */ @Deprecated public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = "akka.jvm-exit-on-fatal-error"; @@ -1547,37 +1547,37 @@ public final class ConfigConstants { // ------------------------------ Akka Values ------------------------------ - /** @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead. */ + /** @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead. */ @Deprecated public static final String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s"; - /** @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead. */ + /** @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_PAUSE} instead. */ @Deprecated public static final String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s"; - /** @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead. */ + /** @deprecated Use {@link RpcOptions#TRANSPORT_THRESHOLD} instead. */ @Deprecated public static final double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0; /** @deprecated This default value is no longer used and has no effect on Flink. */ @Deprecated public static final double DEFAULT_AKKA_WATCH_THRESHOLD = 12; - /** @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead. */ + /** @deprecated Use {@link RpcOptions#DISPATCHER_THROUGHPUT} instead. */ @Deprecated public static final int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15; - /** @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead. */ + /** @deprecated Use {@link RpcOptions#LOG_LIFECYCLE_EVENTS} instead. */ @Deprecated public static final boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false; - /** @deprecated Use {@link AkkaOptions#FRAMESIZE} instead. */ + /** @deprecated Use {@link RpcOptions#FRAMESIZE} instead. */ @Deprecated public static final String DEFAULT_AKKA_FRAMESIZE = "10485760b"; - /** @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead. */ + /** @deprecated Use {@link RpcOptions#ASK_TIMEOUT_DURATION} instead. */ @Deprecated public static final String DEFAULT_AKKA_ASK_TIMEOUT = "10 s"; - /** @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead. */ + /** @deprecated Use {@link RpcOptions#LOOKUP_TIMEOUT_DURATION} instead. */ @Deprecated public static final String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s"; /** @deprecated Use {@code ClientOptions#CLIENT_TIMEOUT} instead. */ @Deprecated public static final String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s"; - /** @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead. */ + /** @deprecated Use {@link RpcOptions#SSL_ENABLED} instead. */ @Deprecated public static final boolean DEFAULT_AKKA_SSL_ENABLED = true; // ----------------------------- SSL Values -------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RpcOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RpcOptions.java new file mode 100644 index 0000000000000..a219663478812 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/RpcOptions.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.docs.Documentation; +import org.apache.flink.configuration.description.Description; + +import java.time.Duration; + +import static org.apache.flink.configuration.description.TextElement.code; + +/** RPC configuration options. */ +@PublicEvolving +public class RpcOptions { + + @Internal + @Documentation.ExcludeFromDocumentation("Internal use only") + public static final ConfigOption FORCE_RPC_INVOCATION_SERIALIZATION = + ConfigOptions.key("pekko.rpc.force-invocation-serialization") + .booleanType() + .defaultValue(false) + .withDeprecatedKeys("akka.rpc.force-invocation-serialization") + .withDescription( + Description.builder() + .text( + "Forces the serialization of all RPC invocations (that are not explicitly annotated with %s)." + + "This option can be used to find serialization issues in the argument/response types without relying requiring HA setups." + + "This option should not be enabled in production.", + code("org.apache.flink.runtime.rpc.Local")) + .build()); + + public static boolean isForceRpcInvocationSerializationEnabled(Configuration config) { + return config.getOptional(FORCE_RPC_INVOCATION_SERIALIZATION) + .orElse( + FORCE_RPC_INVOCATION_SERIALIZATION.defaultValue() + || System.getProperties() + .containsKey(FORCE_RPC_INVOCATION_SERIALIZATION.key())); + } + + /** Flag whether to capture call stacks for RPC ask calls. */ + public static final ConfigOption CAPTURE_ASK_CALLSTACK = + ConfigOptions.key("pekko.ask.callstack") + .booleanType() + .defaultValue(true) + .withDeprecatedKeys("akka.ask.callstack") + .withDescription( + "If true, call stack for asynchronous asks are captured. That way, when an ask fails " + + "(for example times out), you get a proper exception, describing to the original method call and " + + "call site. Note that in case of having millions of concurrent RPC calls, this may add to the " + + "memory footprint."); + + /** Timeout for Pekko ask calls. */ + public static final ConfigOption ASK_TIMEOUT_DURATION = + ConfigOptions.key("pekko.ask.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDeprecatedKeys("akka.ask.timeout") + .withDescription( + "Timeout used for all futures and blocking Pekko calls. If Flink fails due to timeouts then you" + + " should try to increase this value. Timeouts can be caused by slow machines or a congested network. The" + + " timeout value requires a time-unit specifier (ms/s/min/h/d)."); + + /** The Pekko tcp connection timeout. */ + public static final ConfigOption TCP_TIMEOUT = + ConfigOptions.key("pekko.tcp.timeout") + .stringType() + .defaultValue("20 s") + .withDeprecatedKeys("akka.tcp.timeout") + .withDescription( + "Timeout for all outbound connections. If you should experience problems with connecting to a" + + " TaskManager due to a slow network, you should increase this value."); + + /** Timeout for the startup of the actor system. */ + public static final ConfigOption STARTUP_TIMEOUT = + ConfigOptions.key("pekko.startup-timeout") + .stringType() + .noDefaultValue() + .withDeprecatedKeys("akka.startup-timeout") + .withDescription( + "Timeout after which the startup of a remote component is considered being failed."); + + /** Override SSL support for the Pekko transport. */ + public static final ConfigOption SSL_ENABLED = + ConfigOptions.key("pekko.ssl.enabled") + .booleanType() + .defaultValue(true) + .withDeprecatedKeys("akka.ssl.enabled") + .withDescription( + "Turns on SSL for Pekko’s remote communication. This is applicable only when the global ssl flag" + + " security.ssl.enabled is set to true."); + + /** Maximum framesize of Pekko messages. */ + public static final ConfigOption FRAMESIZE = + ConfigOptions.key("pekko.framesize") + .stringType() + .defaultValue("10485760b") + .withDeprecatedKeys("akka.framesize") + .withDescription( + "Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink" + + " fails because messages exceed this limit, then you should increase it. The message size requires a" + + " size-unit specifier."); + + /** Maximum number of messages until another actor is executed by the same thread. */ + public static final ConfigOption DISPATCHER_THROUGHPUT = + ConfigOptions.key("pekko.throughput") + .intType() + .defaultValue(15) + .withDeprecatedKeys("akka.throughput") + .withDescription( + "Number of messages that are processed in a batch before returning the thread to the pool. Low" + + " values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness."); + + /** Log lifecycle events. */ + public static final ConfigOption LOG_LIFECYCLE_EVENTS = + ConfigOptions.key("pekko.log.lifecycle.events") + .booleanType() + .defaultValue(false) + .withDeprecatedKeys("akka.log.lifecycle.events") + .withDescription( + "Turns on the Pekko’s remote logging of events. Set this value to 'true' in case of debugging."); + + /** Timeout for all blocking calls that look up remote actors. */ + public static final ConfigOption LOOKUP_TIMEOUT_DURATION = + ConfigOptions.key("pekko.lookup.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDeprecatedKeys("akka.lookup.timeout") + .withDescription( + "Timeout used for the lookup of the JobManager. The timeout value has to contain a time-unit" + + " specifier (ms/s/min/h/d)."); + + /** Exit JVM on fatal Pekko errors. */ + public static final ConfigOption JVM_EXIT_ON_FATAL_ERROR = + ConfigOptions.key("pekko.jvm-exit-on-fatal-error") + .booleanType() + .defaultValue(true) + .withDeprecatedKeys("akka.jvm-exit-on-fatal-error") + .withDescription("Exit JVM on fatal Pekko errors."); + + /** Milliseconds a gate should be closed for after a remote connection was disconnected. */ + public static final ConfigOption RETRY_GATE_CLOSED_FOR = + ConfigOptions.key("pekko.retry-gate-closed-for") + .longType() + .defaultValue(50L) + .withDeprecatedKeys("akka.retry-gate-closed-for") + .withDescription( + "Milliseconds a gate should be closed for after a remote connection was disconnected."); + + // ================================================== + // Configurations for fork-join-executor. + // ================================================== + + public static final ConfigOption FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR = + ConfigOptions.key("pekko.fork-join-executor.parallelism-factor") + .doubleType() + .defaultValue(2.0) + .withDeprecatedKeys("akka.fork-join-executor.parallelism-factor") + .withDescription( + Description.builder() + .text( + "The parallelism factor is used to determine thread pool size using the" + + " following formula: ceil(available processors * factor). Resulting size" + + " is then bounded by the parallelism-min and parallelism-max values.") + .build()); + + public static final ConfigOption FORK_JOIN_EXECUTOR_PARALLELISM_MIN = + ConfigOptions.key("pekko.fork-join-executor.parallelism-min") + .intType() + .defaultValue(8) + .withDeprecatedKeys("akka.fork-join-executor.parallelism-min") + .withDescription( + Description.builder() + .text( + "Min number of threads to cap factor-based parallelism number to.") + .build()); + + public static final ConfigOption FORK_JOIN_EXECUTOR_PARALLELISM_MAX = + ConfigOptions.key("pekko.fork-join-executor.parallelism-max") + .intType() + .defaultValue(64) + .withDeprecatedKeys("akka.fork-join-executor.parallelism-max") + .withDescription( + Description.builder() + .text( + "Max number of threads to cap factor-based parallelism number to.") + .build()); + + public static final ConfigOption REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR = + ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-factor") + .doubleType() + .defaultValue(2.0) + .withDescription( + Description.builder() + .text( + "The parallelism factor is used to determine thread pool size using the" + + " following formula: ceil(available processors * factor). Resulting size" + + " is then bounded by the parallelism-min and parallelism-max values.") + .build()); + + public static final ConfigOption REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN = + ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-min") + .intType() + .defaultValue(8) + .withDescription( + Description.builder() + .text( + "Min number of threads to cap factor-based parallelism number to.") + .build()); + + public static final ConfigOption REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX = + ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-max") + .intType() + .defaultValue(16) + .withDescription( + Description.builder() + .text( + "Max number of threads to cap factor-based parallelism number to.") + .build()); + + // ================================================== + // Configurations for client-socket-work-pool. + // ================================================== + + public static final ConfigOption CLIENT_SOCKET_WORKER_POOL_SIZE_MIN = + ConfigOptions.key("pekko.client-socket-worker-pool.pool-size-min") + .intType() + .defaultValue(1) + .withDeprecatedKeys("akka.client-socket-worker-pool.pool-size-min") + .withDescription( + Description.builder() + .text("Min number of threads to cap factor-based number to.") + .build()); + + public static final ConfigOption CLIENT_SOCKET_WORKER_POOL_SIZE_MAX = + ConfigOptions.key("pekko.client-socket-worker-pool.pool-size-max") + .intType() + .defaultValue(2) + .withDeprecatedKeys("akka.client-socket-worker-pool.pool-size-max") + .withDescription( + Description.builder() + .text("Max number of threads to cap factor-based number to.") + .build()); + + public static final ConfigOption CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR = + ConfigOptions.key("pekko.client-socket-worker-pool.pool-size-factor") + .doubleType() + .defaultValue(1.0) + .withDeprecatedKeys("akka.client-socket-worker-pool.pool-size-factor") + .withDescription( + Description.builder() + .text( + "The pool size factor is used to determine thread pool size" + + " using the following formula: ceil(available processors * factor)." + + " Resulting size is then bounded by the pool-size-min and" + + " pool-size-max values.") + .build()); + + // ================================================== + // Configurations for server-socket-work-pool. + // ================================================== + + public static final ConfigOption SERVER_SOCKET_WORKER_POOL_SIZE_MIN = + ConfigOptions.key("pekko.server-socket-worker-pool.pool-size-min") + .intType() + .defaultValue(1) + .withDeprecatedKeys("akka.server-socket-worker-pool.pool-size-min") + .withDescription( + Description.builder() + .text("Min number of threads to cap factor-based number to.") + .build()); + + public static final ConfigOption SERVER_SOCKET_WORKER_POOL_SIZE_MAX = + ConfigOptions.key("pekko.server-socket-worker-pool.pool-size-max") + .intType() + .defaultValue(2) + .withDeprecatedKeys("akka.server-socket-worker-pool.pool-size-max") + .withDescription( + Description.builder() + .text("Max number of threads to cap factor-based number to.") + .build()); + + public static final ConfigOption SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR = + ConfigOptions.key("pekko.server-socket-worker-pool.pool-size-factor") + .doubleType() + .defaultValue(1.0) + .withDeprecatedKeys("akka.server-socket-worker-pool.pool-size-factor") + .withDescription( + Description.builder() + .text( + "The pool size factor is used to determine thread pool size" + + " using the following formula: ceil(available processors * factor)." + + " Resulting size is then bounded by the pool-size-min and" + + " pool-size-max values.") + .build()); +} diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index f6c64c2f6db8c..aaeee7e74b485 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -266,15 +266,15 @@ public class TaskManagerOptions { public static final ConfigOption SLOT_TIMEOUT = key("taskmanager.slot.timeout") .durationType() - .defaultValue(AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue()) - .withFallbackKeys(AkkaOptions.ASK_TIMEOUT_DURATION.key()) + .defaultValue(RpcOptions.ASK_TIMEOUT_DURATION.defaultValue()) + .withFallbackKeys(RpcOptions.ASK_TIMEOUT_DURATION.key()) .withDescription( Description.builder() .text( "Timeout used for identifying inactive slots. The TaskManager will free the slot if it does not become active " + "within the given amount of time. Inactive slots can be caused by an out-dated slot request. If no " + "value is configured, then it will fall back to %s.", - code(AkkaOptions.ASK_TIMEOUT_DURATION.key())) + code(RpcOptions.ASK_TIMEOUT_DURATION.key())) .build()); @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER) diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java index 5149919ab83f1..86ccdca92260e 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.rpc.pekko; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.runtime.rpc.RpcSystem; import org.apache.flink.util.NetUtils; @@ -264,11 +264,9 @@ private ActorSystemBootstrapTools() {} public static RpcSystem.ForkJoinExecutorConfiguration getForkJoinExecutorConfiguration( final Configuration configuration) { final double parallelismFactor = - configuration.get(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR); - final int minParallelism = - configuration.get(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN); - final int maxParallelism = - configuration.get(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX); + configuration.get(RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR); + final int minParallelism = configuration.get(RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN); + final int maxParallelism = configuration.get(RpcOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX); return new RpcSystem.ForkJoinExecutorConfiguration( parallelismFactor, minParallelism, maxParallelism); @@ -277,11 +275,11 @@ public static RpcSystem.ForkJoinExecutorConfiguration getForkJoinExecutorConfigu public static RpcSystem.ForkJoinExecutorConfiguration getRemoteForkJoinExecutorConfiguration( final Configuration configuration) { final double parallelismFactor = - configuration.get(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR); + configuration.get(RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR); final int minParallelism = - configuration.get(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN); + configuration.get(RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN); final int maxParallelism = - configuration.get(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX); + configuration.get(RpcOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX); return new RpcSystem.ForkJoinExecutorConfiguration( parallelismFactor, minParallelism, maxParallelism); diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoInvocationHandler.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoInvocationHandler.java index bc117df44b712..496304dc6aabc 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoInvocationHandler.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoInvocationHandler.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.rpc.pekko; -import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils; import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.Local; @@ -400,7 +400,7 @@ static Throwable resolveTimeoutException( + "more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase %s.", rpcInvocation, recipient, - AkkaOptions.ASK_TIMEOUT_DURATION.key())); + RpcOptions.ASK_TIMEOUT_DURATION.key())); } newException.initCause(exception); diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceConfiguration.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceConfiguration.java index e52c277b37b4a..ee19ed005df76 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceConfiguration.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceConfiguration.java @@ -17,8 +17,8 @@ package org.apache.flink.runtime.rpc.pekko; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import javax.annotation.Nonnull; @@ -77,14 +77,14 @@ public boolean isForceRpcInvocationSerialization() { } public static PekkoRpcServiceConfiguration fromConfiguration(Configuration configuration) { - final Duration timeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION); + final Duration timeout = configuration.get(RpcOptions.ASK_TIMEOUT_DURATION); final long maximumFramesize = PekkoRpcServiceUtils.extractMaximumFramesize(configuration); - final boolean captureAskCallStacks = configuration.get(AkkaOptions.CAPTURE_ASK_CALLSTACK); + final boolean captureAskCallStacks = configuration.get(RpcOptions.CAPTURE_ASK_CALLSTACK); final boolean forceRpcInvocationSerialization = - AkkaOptions.isForceRpcInvocationSerializationEnabled(configuration); + RpcOptions.isForceRpcInvocationSerializationEnabled(configuration); return new PekkoRpcServiceConfiguration( configuration, diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceUtils.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceUtils.java index 4de9914aa1c14..c3eb3925e966e 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceUtils.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcServiceUtils.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.rpc.pekko; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.rpc.AddressResolution; import org.apache.flink.runtime.rpc.RpcService; @@ -130,7 +130,7 @@ public static String getRpcUrl( checkNotNull(config, "config is null"); final boolean sslEnabled = - config.get(AkkaOptions.SSL_ENABLED) && SecurityOptions.isInternalSSLEnabled(config); + config.get(RpcOptions.SSL_ENABLED) && SecurityOptions.isInternalSSLEnabled(config); return getRpcUrl( hostname, @@ -232,7 +232,7 @@ public enum Protocol { // ------------------------------------------------------------------------ public static long extractMaximumFramesize(Configuration configuration) { - String maxFrameSizeStr = configuration.get(AkkaOptions.FRAMESIZE); + String maxFrameSizeStr = configuration.get(RpcOptions.FRAMESIZE); String configStr = String.format(SIMPLE_CONFIG_TEMPLATE, maxFrameSizeStr); Config config = ConfigFactory.parseString(configStr); return config.getBytes(MAXIMUM_FRAME_SIZE_PATH); diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java index 660235e125e4f..095ae648726f5 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.rpc.pekko; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils; import org.apache.flink.runtime.rpc.RpcSystem; @@ -70,11 +70,11 @@ public static String getFlinkActorSystemName() { * @return Flink's basic Pekko config */ private static Config getBasicConfig(Configuration configuration) { - final int throughput = configuration.get(AkkaOptions.DISPATCHER_THROUGHPUT); + final int throughput = configuration.get(RpcOptions.DISPATCHER_THROUGHPUT); final String jvmExitOnFatalError = - booleanToOnOrOff(configuration.get(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)); + booleanToOnOrOff(configuration.get(RpcOptions.JVM_EXIT_ON_FATAL_ERROR)); final String logLifecycleEvents = - booleanToOnOrOff(configuration.get(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + booleanToOnOrOff(configuration.get(RpcOptions.LOG_LIFECYCLE_EVENTS)); final String supervisorStrategy = EscalatingSupervisorStrategy.class.getCanonicalName(); return new ConfigBuilder() @@ -206,39 +206,39 @@ private static Config getRemoteConfig( private static void addBaseRemoteConfig( ConfigBuilder configBuilder, Configuration configuration, int port, int externalPort) { - final Duration askTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION); + final Duration askTimeout = configuration.get(RpcOptions.ASK_TIMEOUT_DURATION); final String startupTimeout = TimeUtils.getStringInMillis( TimeUtils.parseDuration( configuration.get( - AkkaOptions.STARTUP_TIMEOUT, + RpcOptions.STARTUP_TIMEOUT, TimeUtils.getStringInMillis( askTimeout.multipliedBy(10L))))); final String tcpTimeout = TimeUtils.getStringInMillis( - TimeUtils.parseDuration(configuration.get(AkkaOptions.TCP_TIMEOUT))); + TimeUtils.parseDuration(configuration.get(RpcOptions.TCP_TIMEOUT))); - final String framesize = configuration.get(AkkaOptions.FRAMESIZE); + final String framesize = configuration.get(RpcOptions.FRAMESIZE); final int clientSocketWorkerPoolPoolSizeMin = - configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN); + configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN); final int clientSocketWorkerPoolPoolSizeMax = - configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX); + configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX); final double clientSocketWorkerPoolPoolSizeFactor = - configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR); + configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR); final int serverSocketWorkerPoolPoolSizeMin = - configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN); + configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN); final int serverSocketWorkerPoolPoolSizeMax = - configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX); + configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX); final double serverSocketWorkerPoolPoolSizeFactor = - configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR); + configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR); final String logLifecycleEvents = - booleanToOnOrOff(configuration.get(AkkaOptions.LOG_LIFECYCLE_EVENTS)); + booleanToOnOrOff(configuration.get(RpcOptions.LOG_LIFECYCLE_EVENTS)); - final long retryGateClosedFor = configuration.get(AkkaOptions.RETRY_GATE_CLOSED_FOR); + final long retryGateClosedFor = configuration.get(RpcOptions.RETRY_GATE_CLOSED_FOR); configBuilder .add("pekko {") @@ -312,7 +312,7 @@ private static void addSslRemoteConfig( ConfigBuilder configBuilder, Configuration configuration) { final boolean enableSSLConfig = - configuration.get(AkkaOptions.SSL_ENABLED) + configuration.get(RpcOptions.SSL_ENABLED) && SecurityOptions.isInternalSSLEnabled(configuration); final String enableSSL = booleanToOnOrOff(enableSSLConfig); diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/MessageSerializationTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/MessageSerializationTest.java index 7ad851f032288..22aeb2f31dd81 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/MessageSerializationTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/MessageSerializationTest.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.rpc.pekko; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.runtime.rpc.Local; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; @@ -51,7 +51,7 @@ class MessageSerializationTest { @BeforeAll static void setup() throws Exception { Configuration configuration = new Configuration(); - configuration.set(AkkaOptions.FRAMESIZE, maxFrameSize + "b"); + configuration.set(RpcOptions.FRAMESIZE, maxFrameSize + "b"); rpcService1 = PekkoRpcServiceUtils.remoteServiceBuilder(configuration, "localhost", 0) diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActorOversizedResponseMessageTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActorOversizedResponseMessageTest.java index 9170ec784f6ce..ae3a9828e25f5 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActorOversizedResponseMessageTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActorOversizedResponseMessageTest.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.rpc.pekko; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; @@ -57,8 +57,8 @@ class PekkoRpcActorOversizedResponseMessageTest { static void setupClass() throws Exception { final Configuration configuration = new Configuration(); // some tests explicitly test local communication where no serialization should occur - configuration.set(AkkaOptions.FORCE_RPC_INVOCATION_SERIALIZATION, false); - configuration.set(AkkaOptions.FRAMESIZE, FRAMESIZE + " b"); + configuration.set(RpcOptions.FORCE_RPC_INVOCATION_SERIALIZATION, false); + configuration.set(RpcOptions.FRAMESIZE, FRAMESIZE + " b"); rpcService1 = PekkoRpcServiceUtils.remoteServiceBuilder(configuration, "localhost", 0) diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java index 78f300cf0c933..553b2cd5589e1 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java @@ -17,8 +17,8 @@ package org.apache.flink.runtime.rpc.pekko; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.rpc.AddressResolution; import org.apache.flink.runtime.rpc.RpcSystem; @@ -225,7 +225,7 @@ void getConfigHandlesIPv6Address() { @Test void getConfigDefaultsStartupTimeoutTo10TimesOfAskTimeout() { final Configuration configuration = new Configuration(); - configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMillis(100)); + configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofMillis(100)); final Config config = PekkoUtils.getConfig(configuration, new HostAndPort("localhost", 31337)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java index 7b1a2281d8d7e..4514f890510e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java @@ -21,10 +21,10 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.SchedulerExecutionMode; import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.runtime.blob.BlobWriter; @@ -152,7 +152,7 @@ public static DefaultSlotPoolServiceSchedulerFactory fromConfiguration( Configuration configuration, JobType jobType, boolean isDynamicGraph) { final Time rpcTimeout = - Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); + Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)); final Time slotIdleTimeout = Time.milliseconds(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT)); final Time batchSlotTimeout = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java index ce30f69803475..47c23ee178b64 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java @@ -19,10 +19,10 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration; import org.apache.flink.util.Preconditions; @@ -75,7 +75,7 @@ public Configuration getConfiguration() { public static JobMasterConfiguration fromConfiguration(Configuration configuration) { final Time rpcTimeout = - Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); + Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)); final Time slotRequestTimeout = Time.milliseconds(configuration.get(JobManagerOptions.SLOT_REQUEST_TIMEOUT)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java index 9905906f640be..61166015d2965 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java @@ -18,12 +18,12 @@ package org.apache.flink.runtime.minicluster; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.core.plugin.PluginManager; @@ -90,8 +90,8 @@ private UnmodifiableConfiguration generateConfiguration(final Configuration conf } // increase the ask.timeout if not set in order to harden tests on slow CI - if (!modifiedConfig.contains(AkkaOptions.ASK_TIMEOUT_DURATION)) { - modifiedConfig.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(5L)); + if (!modifiedConfig.contains(RpcOptions.ASK_TIMEOUT_DURATION)) { + modifiedConfig.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(5L)); } return new UnmodifiableConfiguration(modifiedConfig); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java index fc473564b5e53..3f28af9f31dab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java @@ -20,10 +20,10 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.runtime.blocklist.BlocklistUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.ClusterInformation; @@ -89,7 +89,7 @@ protected ResourceManager createResourceManager( fatalErrorHandler, resourceManagerMetricGroup, standaloneClusterStartupPeriodTime, - Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)), + Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)), ioExecutor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java index 40df201135808..1698a6c4e8b7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java @@ -20,8 +20,8 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.runtime.blocklist.BlocklistHandler; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; @@ -165,7 +165,7 @@ public ActiveResourceManager( resourceManagerMetricGroup, Time.fromDuration( Preconditions.checkNotNull(flinkConfig) - .get(AkkaOptions.ASK_TIMEOUT_DURATION)), + .get(RpcOptions.ASK_TIMEOUT_DURATION)), ioExecutor); this.flinkConfig = flinkConfig; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java index b5d9734b128e5..66181662277c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java @@ -20,10 +20,10 @@ import org.apache.flink.api.common.resources.CPUResource; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; import org.apache.flink.util.ConfigurationException; @@ -238,7 +238,7 @@ public static SlotManagerConfiguration fromConfiguration( throws ConfigurationException { final Time rpcTimeout = - Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); + Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)); final Time taskManagerTimeout = Time.milliseconds(configuration.get(ResourceManagerOptions.TASK_MANAGER_TIMEOUT)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java index 3d437b04e1fa3..34959bcc3f002 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -18,9 +18,9 @@ package org.apache.flink.runtime.taskexecutor; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -193,7 +193,7 @@ public static TaskManagerConfiguration fromConfiguration( final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(configuration); - final Duration rpcTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION); + final Duration rpcTimeout = configuration.get(RpcOptions.ASK_TIMEOUT_DURATION); LOG.debug("Messages have a max timeout of " + rpcTimeout); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 726a414850e79..26d3a290c78ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -21,9 +21,9 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JMXServerOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.TaskManagerOptionsInternal; import org.apache.flink.core.fs.FileSystem; @@ -171,7 +171,7 @@ public TaskManagerRunner( this.pluginManager = checkNotNull(pluginManager); this.taskExecutorServiceFactory = checkNotNull(taskExecutorServiceFactory); - timeout = Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); + timeout = Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)); this.terminationFuture = new CompletableFuture<>(); this.shutdown = false; @@ -725,7 +725,7 @@ private static String determineTaskManagerBindAddressByConnectingToResourceManag RpcSystemUtils rpcSystemUtils) throws LeaderRetrievalException { - final Duration lookupTimeout = configuration.get(AkkaOptions.LOOKUP_TIMEOUT_DURATION); + final Duration lookupTimeout = configuration.get(RpcOptions.LOOKUP_TIMEOUT_DURATION); final InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index 48b0e08dd7a12..fbd5a456747fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -19,13 +19,13 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.TaskManagerOptionsInternal; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -289,7 +289,7 @@ public static TaskManagerServicesConfiguration fromConfiguration( QueryableStateConfiguration.fromConfiguration(configuration); long timerServiceShutdownTimeout = - configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION).toMillis(); + configuration.get(RpcOptions.ASK_TIMEOUT_DURATION).toMillis(); final RetryingRegistrationConfiguration retryingRegistrationConfiguration = RetryingRegistrationConfiguration.fromConfiguration(configuration); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index af658994f3c51..e1d6c16778cc1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -21,10 +21,10 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.runtime.blob.TransientBlobService; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.leaderelection.LeaderContender; @@ -267,7 +267,7 @@ public WebMonitorEndpoint( } private VertexThreadInfoTracker initializeThreadInfoTracker(ScheduledExecutorService executor) { - final Duration askTimeout = clusterConfiguration.get(AkkaOptions.ASK_TIMEOUT_DURATION); + final Duration askTimeout = clusterConfiguration.get(RpcOptions.ASK_TIMEOUT_DURATION); final Duration flameGraphCleanUpInterval = clusterConfiguration.get(RestOptions.FLAMEGRAPH_CLEANUP_INTERVAL); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java index 4d2156701d697..58e38d62d451f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobWriter; @@ -60,7 +60,7 @@ public static TestingDefaultExecutionGraphBuilder newBuilder() { return new TestingDefaultExecutionGraphBuilder(); } - private Time rpcTimeout = Time.fromDuration(AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue()); + private Time rpcTimeout = Time.fromDuration(RpcOptions.ASK_TIMEOUT_DURATION.defaultValue()); private ClassLoader userClassLoader = DefaultExecutionGraph.class.getClassLoader(); private BlobWriter blobWriter = VoidBlobWriter.getInstance(); private ShuffleMaster shuffleMaster = ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java index 7514e04e4cba6..f7158a39da10a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java @@ -18,9 +18,9 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; @@ -62,7 +62,7 @@ class PartialConsumePipelinedResultTest { private static Configuration getFlinkConfiguration() { final Configuration config = new Configuration(); - config.set(AkkaOptions.ASK_TIMEOUT_DURATION, TestingUtils.DEFAULT_ASK_TIMEOUT); + config.set(RpcOptions.ASK_TIMEOUT_DURATION, TestingUtils.DEFAULT_ASK_TIMEOUT); config.set(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS); return config; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java index e5e4c896b56fe..53822036e33eb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.jobmanager; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.reader.RecordReader; @@ -68,7 +68,7 @@ public class SlotCountExceedingParallelismTest extends TestLogger { private static Configuration getFlinkConfiguration() { final Configuration config = new Configuration(); - config.set(AkkaOptions.ASK_TIMEOUT_DURATION, TestingUtils.DEFAULT_ASK_TIMEOUT); + config.set(RpcOptions.ASK_TIMEOUT_DURATION, TestingUtils.DEFAULT_ASK_TIMEOUT); return config; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java index 67bc11a4266f5..8cdcb3e610e28 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.runtime.blocklist.BlocklistHandler; import org.apache.flink.runtime.blocklist.BlocklistUtils; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -101,7 +101,7 @@ protected ResourceManager createResourceManager( clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, - Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)), + Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)), ioExecutor); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java index c98635798192b..4a6a760636ce1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.rpc; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; @@ -41,7 +41,7 @@ void testConnectFailure() throws Exception { // we start the RPC service with a very long timeout to ensure that the test // can only pass if the connection problem is not recognized merely via a timeout Configuration configuration = new Configuration(); - configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(10000000)); + configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(10000000)); try (RpcSystem rpcSystem = RpcSystem.load()) { final RpcService rpcService = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java index c6099b169fb76..7828cc366e175 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.rpc; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.util.concurrent.FutureUtils; @@ -50,10 +50,10 @@ class RpcSSLAuthITCase { @Test void testConnectFailure() throws Exception { final Configuration baseConfig = new Configuration(); - baseConfig.set(AkkaOptions.TCP_TIMEOUT, "1 s"); + baseConfig.set(RpcOptions.TCP_TIMEOUT, "1 s"); // we start the RPC service with a very long timeout to ensure that the test // can only pass if the connection problem is not recognized merely via a timeout - baseConfig.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(10000000)); + baseConfig.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(10000000)); // !!! This config has KEY_STORE_FILE / TRUST_STORE_FILE !!! Configuration sslConfig1 = new Configuration(baseConfig); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java index f30ac260f4fe7..8d7847a822f0f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java @@ -18,11 +18,11 @@ package org.apache.flink.runtime.taskexecutor; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.TaskManagerOptionsInternal; import org.apache.flink.configuration.UnmodifiableConfiguration; @@ -259,7 +259,7 @@ private static Configuration createFlinkConfigWithHostBindPolicy( final Configuration config = new Configuration(); config.set(TaskManagerOptions.HOST_BIND_POLICY, bindPolicy.toString()); config.set(JobManagerOptions.ADDRESS, "localhost"); - config.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMillis(10)); + config.set(RpcOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMillis(10)); return new UnmodifiableConfiguration(config); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java index 7aec05cbeb3dd..22ec9dbe70e80 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java @@ -19,9 +19,9 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.minicluster.MiniCluster; @@ -91,7 +91,7 @@ public static final class Builder { private int numberTaskManagers = 1; private int numberSlotsPerTaskManager = 1; private Time shutdownTimeout = - Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); + Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)); private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED; private MiniCluster.HaServices haServices = MiniCluster.HaServices.CONFIGURED; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java index d1f9f0324fe7f..86662095af4af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java @@ -18,10 +18,10 @@ package org.apache.flink.runtime.testutils; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; @@ -121,7 +121,7 @@ public static Configuration configureZooKeeperHA( config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, fsStateHandlePath + "/checkpoints"); config.set(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery"); - config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100)); + config.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100)); return config; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index acdd07735b5b2..179a3cb13f2a5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -51,7 +51,7 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; @@ -1443,7 +1443,7 @@ public void collectAsync(Collector collector) { new CollectSinkOperatorFactory<>(serializer, accumulatorName); CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator(); long resultFetchTimeout = - env.getConfiguration().get(AkkaOptions.ASK_TIMEOUT_DURATION).toMillis(); + env.getConfiguration().get(RpcOptions.ASK_TIMEOUT_DURATION).toMillis(); CollectResultIterator iterator = new CollectResultIterator<>( operator.getOperatorIdFuture(), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java index f1562be013524..a6dd08b8f395d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java @@ -20,7 +20,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.CheckpointingMode; @@ -88,7 +88,7 @@ public CollectResultIterator( operatorIdFuture, accumulatorName, retryMillis, - AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis()); + RpcOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis()); this.bufferedResult = null; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java index 0a45f019dd97e..4ce00d58bffda 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java @@ -21,9 +21,9 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; @@ -130,7 +130,7 @@ public DataStreamSink consumeDataStream( inputStream .getExecutionEnvironment() .getConfiguration() - .get(AkkaOptions.ASK_TIMEOUT_DURATION) + .get(RpcOptions.ASK_TIMEOUT_DURATION) .toMillis(); iterator = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java index f38181badfcc7..d51337f500280 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java @@ -20,10 +20,10 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.delegation.PlannerBase; @@ -61,7 +61,7 @@ public class BatchExecDynamicFilteringDataCollector extends ExecNodeBase "If the collector collects more data than the threshold (default is 8M), " + "an empty DynamicFilterEvent with a flag only will be sent to Coordinator, " + "which could avoid exceeding the pekko limit and out-of-memory (see " - + AkkaOptions.FRAMESIZE.key() + + RpcOptions.FRAMESIZE.key() + "). Otherwise a DynamicFilterEvent with all deduplicated records will be sent to Coordinator."); private final List dynamicFilteringFieldIndices; diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java index 793e5b0bf9c89..2fe57682f2cba 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java @@ -26,8 +26,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.source.Boundedness; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.connector.testframe.environment.TestEnvironment; import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings; import org.apache.flink.connector.testframe.external.ExternalSystemDataReader; @@ -614,7 +614,7 @@ protected CollectResultIterator addCollectSink(DataStream stream) { serializer, accumulatorName, stream.getExecutionEnvironment().getCheckpointConfig(), - AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis()); + RpcOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis()); } private void waitExpectedSizeData(CollectResultIterator iterator, int targetNum) { diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java index 3efed9d4c168d..7f5f0d3013a12 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java @@ -26,8 +26,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.connector.testframe.environment.ClusterControllable; import org.apache.flink.connector.testframe.environment.TestEnvironment; import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings; @@ -795,7 +795,7 @@ protected CollectResultIterator build(JobClient jobClient) { serializer, accumulatorName, checkpointConfig, - AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis()); + RpcOptions.ASK_TIMEOUT_DURATION.defaultValue().toMillis()); iterator.setJobClient(jobClient); return iterator; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index a9087f9b24984..96219147c3089 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -27,9 +27,9 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.optimizer.DataStatistics; @@ -102,7 +102,7 @@ public class AccumulatorLiveITCase extends TestLogger { private static Configuration getConfiguration() { Configuration config = new Configuration(); - config.set(AkkaOptions.ASK_TIMEOUT_DURATION, TestingUtils.DEFAULT_ASK_TIMEOUT); + config.set(RpcOptions.ASK_TIMEOUT_DURATION, TestingUtils.DEFAULT_ASK_TIMEOUT); config.set(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL); return config; diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java index cd9206c781e92..5bcd17b982650 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java @@ -22,11 +22,11 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.Plan; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; @@ -85,7 +85,7 @@ private static Configuration getConfiguration() { verifyJvmOptions(); Configuration config = new Configuration(); config.set(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true); - config.set(AkkaOptions.ASK_TIMEOUT_DURATION, TestingUtils.DEFAULT_ASK_TIMEOUT); + config.set(RpcOptions.ASK_TIMEOUT_DURATION, TestingUtils.DEFAULT_ASK_TIMEOUT); config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4096")); config.set(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 2048); @@ -99,7 +99,7 @@ protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxT // submit job final JobGraph jobGraph = getJobGraph(plan); - final long rpcTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION).toMillis(); + final long rpcTimeout = configuration.get(RpcOptions.ASK_TIMEOUT_DURATION).toMillis(); ClusterClient client = CLUSTER.getClusterClient(); JobID jobID = client.submitJob(jobGraph).get(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index dae5e0aef99cd..0fe64cf037746 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -23,9 +23,9 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -72,8 +72,8 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { private static Configuration getConfiguration() { Configuration config = new Configuration(); config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("48m")); - config.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMinutes(1)); - config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); + config.set(RpcOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMinutes(1)); + config.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); return config; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index d82b77b63cdfe..6405546d3d487 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -27,11 +27,11 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBOptions; @@ -236,7 +236,7 @@ protected Configuration createClusterConfig() throws IOException { final File haDir = temporaryFolder.newFolder(); Configuration config = new Configuration(); - config.set(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b"); + config.set(RpcOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b"); if (zkServer != null) { config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java index 4218be61e60e9..80084bc3fbc73 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java @@ -42,11 +42,11 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.io.InputStatus; @@ -785,7 +785,7 @@ public Configuration getConfiguration(File checkpointDir) { // amount of buffers conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(32)); conf.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.ofMebiBytes(32)); - conf.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); + conf.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); return conf; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java index 1c484d3213a6e..c1a8fe9e95a14 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java @@ -24,10 +24,10 @@ import org.apache.flink.client.program.MiniClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.execution.SavepointFormatType; @@ -136,7 +136,7 @@ public static void setUp() throws Exception { // some tests check for serialization problems related to class-loading // this requires all RPCs to actually go through serialization - config.set(AkkaOptions.FORCE_RPC_INVOCATION_SERIALIZATION, true); + config.set(RpcOptions.FORCE_RPC_INVOCATION_SERIALIZATION, true); miniClusterResource = new MiniClusterResource( diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java index 993f9eaa0cebf..a911f1a0fd45c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java @@ -25,8 +25,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.streaming.api.datastream.DataStream; @@ -46,7 +46,7 @@ public class StreamingCustomInputSplitProgram { public static void main(String[] args) throws Exception { Configuration config = new Configuration(); - config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(5)); + config.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(5)); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java index 5e97398f77517..17b73c9c84795 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.io.GenericInputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.runtime.testutils.MiniClusterResource; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; @@ -60,7 +60,7 @@ public class RemoteEnvironmentITCase extends TestLogger { @Test public void testUserSpecificParallelism() throws Exception { Configuration config = new Configuration(); - config.set(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT); + config.set(RpcOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT); final URI restAddress = MINI_CLUSTER_RESOURCE.getRestAddress(); final String hostname = restAddress.getHost(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index 165e8326f16b4..246399b1a2680 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -23,12 +23,12 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.core.testutils.EachCallbackWrapper; @@ -110,7 +110,7 @@ void testCancelingOnProcessFailure() throws Throwable { Configuration config = new Configuration(); config.set(JobManagerOptions.ADDRESS, "localhost"); - config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100)); + config.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100)); config.set(HighAvailabilityOptions.HA_MODE, "zookeeper"); config.set( HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase.java index e2c575387d521..a188c7b4379be 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase.java @@ -19,13 +19,13 @@ package org.apache.flink.test.recovery; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.HeartbeatManagerOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.blocklist.BlocklistUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -207,7 +207,7 @@ protected ResourceManager createResourceManager( fatalErrorHandler, resourceManagerMetricGroup, standaloneClusterStartupPeriodTime, - Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)), + Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)), ioExecutor) { @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java index b59f1614baa26..0fe8f4d5f7c6f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java @@ -18,10 +18,10 @@ package org.apache.flink.test.recovery; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils; @@ -62,7 +62,7 @@ public void testDeterministicWorkingDirIsNotDeletedInCaseOfProcessFailure() thro ClusterOptions.PROCESS_WORKING_DIR_BASE, workingDirBase.getAbsolutePath()); configuration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, resourceId.toString()); configuration.set(JobManagerOptions.ADDRESS, "localhost"); - configuration.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ZERO); + configuration.set(RpcOptions.LOOKUP_TIMEOUT_DURATION, Duration.ZERO); final File workingDirectory = ClusterEntrypointUtils.generateTaskManagerWorkingDirectoryFile( @@ -99,7 +99,7 @@ public void testNondeterministicWorkingDirIsDeletedInCaseOfProcessFailure() thro configuration.set( ClusterOptions.PROCESS_WORKING_DIR_BASE, workingDirBase.getAbsolutePath()); configuration.set(JobManagerOptions.ADDRESS, "localhost"); - configuration.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ZERO); + configuration.set(RpcOptions.LOOKUP_TIMEOUT_DURATION, Duration.ZERO); final TestProcessBuilder.TestProcess taskManagerProcess = new TestProcessBuilder(TaskExecutorProcessEntryPoint.class.getName()) diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java index 9a092c8147ac4..99965735b4c53 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java @@ -20,9 +20,9 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionMode; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; @@ -85,7 +85,7 @@ public static Boolean[] params() { public void testNoDataCompressionForBoundedBlockingShuffle() throws Exception { Configuration configuration = new Configuration(); configuration.set(NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED, false); - configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); + configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); configuration.set( NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, Integer.MAX_VALUE); @@ -98,7 +98,7 @@ public void testNoDataCompressionForBoundedBlockingShuffle() throws Exception { public void testNoDataCompressionForSortMergeBlockingShuffle() throws Exception { Configuration configuration = new Configuration(); configuration.set(NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED, false); - configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); + configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, ExecutionMode.BATCH); JobGraphRunningUtil.execute(jobGraph, configuration, NUM_TASKMANAGERS, NUM_SLOTS); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java index de4cb0c5b0399..05f5d25638f40 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java @@ -21,12 +21,12 @@ import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.yarn.configuration.YarnDeploymentTarget; @@ -131,7 +131,7 @@ private Configuration createDefaultConfiguration( Configuration configuration = new Configuration(); configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768)); configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g")); - configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(30)); + configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(30)); configuration.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName()); configuration.set(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion); configuration.set(PipelineOptions.JARS, Collections.singletonList(userJar.toString())); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java index 61bb1f504213e..84975185d3b02 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java @@ -20,10 +20,10 @@ import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobResult; @@ -152,7 +152,7 @@ private Configuration getDefaultConfiguration() { final Configuration configuration = new Configuration(); configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768)); configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g")); - configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(30)); + configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(30)); configuration.set(CLASSPATH_INCLUDE_USER_JAR, YarnConfigOptions.UserJarInclusion.DISABLED); return configuration; diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java index 551f355adea7c..4a1963ced5e93 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java @@ -21,10 +21,10 @@ import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobType; @@ -214,7 +214,7 @@ private Configuration createDefaultConfiguration( Configuration configuration = new Configuration(); configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768)); configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g")); - configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(30)); + configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(30)); configuration.set(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion); return configuration; diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java index ab8986adccf33..48efd848e07e9 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java @@ -19,8 +19,8 @@ package org.apache.flink.yarn; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.TaskManagerOptionsInternal; @@ -118,7 +118,7 @@ private static void setupConfigurationFromVariables( LOG.info("TM: keytab principal obtained {}", keytabPrincipal); // tell pekko to die in case of an error - configuration.set(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); + configuration.set(RpcOptions.JVM_EXIT_ON_FATAL_ERROR, true); String keytabPath = Utils.resolveKeytabPath(currDir, localKeytabPath); diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index b183fd2604ec0..f596e476a6fc8 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -27,13 +27,13 @@ import org.apache.flink.client.deployment.ClusterClientServiceLoader; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.util.ConfigurationException; @@ -100,7 +100,7 @@ void testDynamicProperties() throws Exception { "-j", "fake.jar", "-D", - AkkaOptions.ASK_TIMEOUT_DURATION.key() + "=5 min", + RpcOptions.ASK_TIMEOUT_DURATION.key() + "=5 min", "-D", CoreOptions.FLINK_JVM_OPTIONS.key() + "=-DappName=foobar", "-D", @@ -108,7 +108,7 @@ void testDynamicProperties() throws Exception { }); Configuration executorConfig = cli.toConfiguration(cmd); - assertThat(executorConfig.get(AkkaOptions.ASK_TIMEOUT_DURATION)).hasMinutes(5); + assertThat(executorConfig.get(RpcOptions.ASK_TIMEOUT_DURATION)).hasMinutes(5); assertThat(executorConfig.get(CoreOptions.FLINK_JVM_OPTIONS)).isEqualTo("-DappName=foobar"); assertThat(executorConfig.get(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD)) .isEqualTo("changeit");