From 302c6741f862f13c6ea3d5490a31fadc20e976c8 Mon Sep 17 00:00:00 2001 From: zjureel Date: Thu, 18 May 2017 12:34:40 +0800 Subject: [PATCH] [FLINK-6495] Migrate Akka configuration options This closes #3935. --- .../client/program/ClientConnectionTest.java | 5 +- .../flink/client/program/ClientTest.java | 3 +- .../apache/flink/storm/api/FlinkClient.java | 5 +- .../flink/configuration/AkkaOptions.java | 102 ++++++++++++++++-- .../flink/configuration/ConfigConstants.java | 92 ++++++++++++++++ .../MesosTaskManagerRunner.java | 3 +- .../webmonitor/metrics/MetricFetcher.java | 4 +- .../client/JobAttachmentClientActor.java | 4 +- .../client/JobSubmissionClientActor.java | 4 +- .../FlinkResourceManager.java | 4 +- .../restart/FailureRateRestartStrategy.java | 3 +- .../restart/FixedDelayRestartStrategy.java | 6 +- .../restart/RestartStrategyFactory.java | 6 +- .../runtime/query/QueryableStateClient.java | 8 +- .../ResourceManagerConfiguration.java | 8 +- .../slotmanager/SlotManagerConfiguration.java | 12 +-- .../runtime/rpc/akka/AkkaRpcServiceUtils.java | 6 +- .../TaskManagerConfiguration.java | 3 +- .../apache/flink/runtime/akka/AkkaUtils.scala | 65 ++++------- .../minicluster/FlinkMiniCluster.scala | 8 +- .../clusterframework/ResourceManagerTest.java | 6 +- .../PartialConsumePipelinedResultTest.java | 3 +- .../runtime/jobmanager/JobManagerTest.java | 3 +- ...kManagerComponentsStartupShutdownTest.java | 7 +- .../TaskManagerRegistrationTest.java | 9 +- .../runtime/testutils/ZooKeeperTestUtils.java | 9 +- .../flink/runtime/akka/AkkaSslITCase.scala | 6 +- .../jobmanager/JobManagerConnectionTest.scala | 4 +- .../runtime/jobmanager/RecoveryITCase.scala | 4 +- .../runtime/testingUtils/TestingUtils.scala | 4 +- .../apache/flink/test/util/TestBaseUtils.java | 11 +- .../accumulators/AccumulatorLiveITCase.java | 3 +- .../test/cancelling/CancelingTestBase.java | 3 +- ...EventTimeAllWindowCheckpointingITCase.java | 5 +- .../jar/StreamingCustomInputSplitProgram.java | 4 +- .../RemoteEnvironmentITCase.java | 5 +- ...TaskManagerProcessFailureRecoveryTest.java | 11 +- .../test/recovery/ChaosMonkeyITCase.java | 7 +- .../ProcessFailureCancelingITCase.java | 10 +- .../TaskManagerFailureRecoveryITCase.java | 7 +- .../ZooKeeperLeaderElectionITCase.java | 3 +- .../flink/yarn/YarnTaskExecutorRunner.java | 3 +- .../flink/yarn/YarnTaskManagerRunner.java | 3 +- 43 files changed, 324 insertions(+), 157 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java index 246a75c9fd87a..eb9f3c5f83bdc 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java @@ -21,6 +21,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CommonTestUtils; @@ -92,8 +93,8 @@ public void testExceptionWhenRemoteJobManagerUnreachable() throws Exception { private static void testFailureBehavior(final InetSocketAddress unreachableEndpoint) throws Exception { final Configuration config = new Configuration(); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, (ASK_STARTUP_TIMEOUT) + " ms"); - config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, (CONNECT_TIMEOUT) + " ms"); + config.setString(AkkaOptions.ASK_TIMEOUT, ASK_STARTUP_TIMEOUT + " ms"); + config.setString(AkkaOptions.LOOKUP_TIMEOUT, CONNECT_TIMEOUT + " ms"); config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, unreachableEndpoint.getHostName()); config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, unreachableEndpoint.getPort()); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index b7ade2a5e09fe..13a25642f5e8a 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -32,6 +32,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.DetachedEnvironment.DetachedJobExecutionResult; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.optimizer.DataStatistics; @@ -97,7 +98,7 @@ public void setUp() throws Exception { config = new Configuration(); config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, freePort); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT); + config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue()); try { scala.Tuple2 address = new scala.Tuple2("localhost", freePort); diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java index 21794f9fefc36..626335d948e2f 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java @@ -39,6 +39,7 @@ import org.apache.flink.client.program.JobWithJars; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -269,7 +270,7 @@ public void killTopologyWithOpts(final String name, final KillOptions options) t JobID getTopologyJobId(final String id) { final Configuration configuration = GlobalConfiguration.loadConfiguration(); if (this.timeout != null) { - configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout); + configuration.setString(AkkaOptions.ASK_TIMEOUT, this.timeout); } try { @@ -309,7 +310,7 @@ JobID getTopologyJobId(final String id) { private FiniteDuration getTimeout() { final Configuration configuration = GlobalConfiguration.loadConfiguration(); if (this.timeout != null) { - configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout); + configuration.setString(AkkaOptions.ASK_TIMEOUT, this.timeout); } return AkkaUtils.getClientTimeout(configuration); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index 97b209e6136a9..9bfc237546473 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -29,30 +29,114 @@ public class AkkaOptions { /** - * Timeout for akka ask calls + * Timeout for akka ask calls. */ - public static final ConfigOption AKKA_ASK_TIMEOUT = ConfigOptions + public static final ConfigOption ASK_TIMEOUT = ConfigOptions .key("akka.ask.timeout") .defaultValue("10 s"); + /** + * The Akka death watch heartbeat interval. + */ + public static final ConfigOption WATCH_HEARTBEAT_INTERVAL = ConfigOptions + .key("akka.watch.heartbeat.interval") + .defaultValue(ASK_TIMEOUT.defaultValue()); + + /** + * The maximum acceptable Akka death watch heartbeat pause. + */ + public static final ConfigOption WATCH_HEARTBEAT_PAUSE = ConfigOptions + .key("akka.watch.heartbeat.pause") + .defaultValue(ASK_TIMEOUT.defaultValue()); + /** * The Akka tcp connection timeout. */ - public static final ConfigOption AKKA_TCP_TIMEOUT = ConfigOptions + public static final ConfigOption TCP_TIMEOUT = ConfigOptions .key("akka.tcp.timeout") .defaultValue("20 s"); /** - * The Akka death watch heartbeat interval. + * Timeout for the startup of the actor system. */ - public static final ConfigOption AKKA_WATCH_HEARTBEAT_INTERVAL = ConfigOptions - .key("akka.watch.heartbeat.interval") + public static final ConfigOption STARTUP_TIMEOUT = ConfigOptions + .key("akka.startup-timeout") + .noDefaultValue(); + + /** + * Heartbeat interval of the transport failure detector. + */ + public static final ConfigOption TRANSPORT_HEARTBEAT_INTERVAL = ConfigOptions + .key("akka.transport.heartbeat.interval") + .defaultValue("1000 s"); + + /** + * Allowed heartbeat pause for the transport failure detector. + */ + public static final ConfigOption TRANSPORT_HEARTBEAT_PAUSE = ConfigOptions + .key("akka.transport.heartbeat.pause") + .defaultValue("6000 s"); + + /** + * Detection threshold of transport failure detector. + */ + public static final ConfigOption TRANSPORT_THRESHOLD = ConfigOptions + .key("akka.transport.threshold") + .defaultValue(300.0); + + /** + * Detection threshold for the phi accrual watch failure detector. + */ + public static final ConfigOption WATCH_THRESHOLD = ConfigOptions + .key("akka.watch.threshold") + .defaultValue(12); + + /** + * Override SSL support for the Akka transport. + */ + public static final ConfigOption SSL_ENABLED = ConfigOptions + .key("akka.ssl.enabled") + .defaultValue(true); + + /** + * Maximum framesize of akka messages. + */ + public static final ConfigOption FRAMESIZE = ConfigOptions + .key("akka.framesize") + .defaultValue("10485760b"); + + /** + * Maximum number of messages until another actor is executed by the same thread. + */ + public static final ConfigOption DISPATCHER_THROUGHPUT = ConfigOptions + .key("akka.throughput") + .defaultValue(15); + + /** + * Log lifecycle events. + */ + public static final ConfigOption LOG_LIFECYCLE_EVENTS = ConfigOptions + .key("akka.log.lifecycle.events") + .defaultValue(false); + + /** + * Timeout for all blocking calls that look up remote actors. + */ + public static final ConfigOption LOOKUP_TIMEOUT = ConfigOptions + .key("akka.lookup.timeout") .defaultValue("10 s"); /** - * The maximum acceptable Akka death watch heartbeat pause. + * Timeout for all blocking calls on the client side. */ - public static final ConfigOption AKKA_WATCH_HEARTBEAT_PAUSE = ConfigOptions - .key("akka.watch.heartbeat.pause") + public static final ConfigOption CLIENT_TIMEOUT = ConfigOptions + .key("akka.client.timeout") .defaultValue("60 s"); + + /** + * Exit JVM on fatal Akka errors. + */ + public static final ConfigOption JVM_EXIT_ON_FATAL_ERROR = ConfigOptions + .key("akka.jvm-exit-on-fatal-error") + .defaultValue(true); } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index b5b5486b350ed..65e6c762f5f66 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -708,82 +708,130 @@ public final class ConfigConstants { /** * Timeout for the startup of the actor system + * + * @deprecated Use {@link AkkaOptions#STARTUP_TIMEOUT} instead. */ + @Deprecated public static final String AKKA_STARTUP_TIMEOUT = "akka.startup-timeout"; /** * Heartbeat interval of the transport failure detector + * + * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead. */ + @Deprecated public static final String AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "akka.transport.heartbeat.interval"; /** * Allowed heartbeat pause for the transport failure detector + * + * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead. */ + @Deprecated public static final String AKKA_TRANSPORT_HEARTBEAT_PAUSE = "akka.transport.heartbeat.pause"; /** * Detection threshold of transport failure detector + * + * @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead. */ + @Deprecated public static final String AKKA_TRANSPORT_THRESHOLD = "akka.transport.threshold"; /** * Heartbeat interval of watch failure detector + * + * @deprecated Use {@link AkkaOptions#WATCH_HEARTBEAT_INTERVAL} instead. */ + @Deprecated public static final String AKKA_WATCH_HEARTBEAT_INTERVAL = "akka.watch.heartbeat.interval"; /** * Allowed heartbeat pause for the watch failure detector + * + * @deprecated Use {@link AkkaOptions#WATCH_HEARTBEAT_PAUSE} instead. */ + @Deprecated public static final String AKKA_WATCH_HEARTBEAT_PAUSE = "akka.watch.heartbeat.pause"; /** * Detection threshold for the phi accrual watch failure detector + * + * @deprecated Use {@link AkkaOptions#WATCH_THRESHOLD} instead. */ + @Deprecated public static final String AKKA_WATCH_THRESHOLD = "akka.watch.threshold"; /** * Akka TCP timeout + * + * @deprecated Use {@link AkkaOptions#TCP_TIMEOUT} instead. */ + @Deprecated public static final String AKKA_TCP_TIMEOUT = "akka.tcp.timeout"; /** * Override SSL support for the Akka transport + * + * @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead. */ + @Deprecated public static final String AKKA_SSL_ENABLED = "akka.ssl.enabled"; /** * Maximum framesize of akka messages + * + * @deprecated Use {@link AkkaOptions#FRAMESIZE} instead. */ + @Deprecated public static final String AKKA_FRAMESIZE = "akka.framesize"; /** * Maximum number of messages until another actor is executed by the same thread + * + * @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead. */ + @Deprecated public static final String AKKA_DISPATCHER_THROUGHPUT = "akka.throughput"; /** * Log lifecycle events + * + * @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead. */ + @Deprecated public static final String AKKA_LOG_LIFECYCLE_EVENTS = "akka.log.lifecycle.events"; /** * Timeout for all blocking calls on the cluster side + * + * @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead. */ + @Deprecated public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout"; /** * Timeout for all blocking calls that look up remote actors + * + * @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead. */ + @Deprecated public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout"; /** * Timeout for all blocking calls on the client side + * + * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead. */ + @Deprecated public static final String AKKA_CLIENT_TIMEOUT = "akka.client.timeout"; /** * Exit JVM on fatal Akka errors + * + * @deprecated Use {@link AkkaOptions#JVM_EXIT_ON_FATAL_ERROR} instead. */ + @Deprecated public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = "akka.jvm-exit-on-fatal-error"; // ----------------------------- Transport SSL Settings-------------------- @@ -1425,26 +1473,70 @@ public final class ConfigConstants { // ------------------------------ Akka Values ------------------------------ + /** + * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead. + */ + @Deprecated public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s"; + /** + * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead. + */ + @Deprecated public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s"; + /** + * @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead. + */ + @Deprecated public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0; + /** + * @deprecated Use {@link AkkaOptions#WATCH_THRESHOLD} instead. + */ + @Deprecated public static double DEFAULT_AKKA_WATCH_THRESHOLD = 12; + /** + * @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead. + */ + @Deprecated public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15; + /** + * @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead. + */ + @Deprecated public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false; + /** + * @deprecated Use {@link AkkaOptions#FRAMESIZE} instead. + */ + @Deprecated public static String DEFAULT_AKKA_FRAMESIZE = "10485760b"; + /** + * @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead. + */ + @Deprecated public static String DEFAULT_AKKA_ASK_TIMEOUT = "10 s"; + /** + * @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead. + */ + @Deprecated public static String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s"; + /** + * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead. + */ + @Deprecated public static String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s"; + /** + * @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead. + */ + @Deprecated public static boolean DEFAULT_AKKA_SSL_ENABLED = true; // ----------------------------- SSL Values -------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java index 206c71beacec7..625880b445413 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java @@ -26,6 +26,7 @@ import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -108,7 +109,7 @@ else if (tmpDirs != null) { } // tell akka to die in case of an error - configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); + configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); // Infer the resource identifier from the environment variable String containerID = Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID)); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java index 4f9214821e90a..c0dcc9911a247 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java @@ -23,7 +23,7 @@ import akka.dispatch.OnSuccess; import akka.pattern.Patterns; import akka.util.Timeout; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.messages.JobManagerMessages; @@ -61,7 +61,7 @@ public class MetricFetcher { private final ActorSystem actorSystem; private final JobManagerRetriever retriever; private final ExecutionContext ctx; - private final FiniteDuration timeout = new FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(), TimeUnit.MILLISECONDS); + private final FiniteDuration timeout = new FiniteDuration(Duration.create(AkkaOptions.ASK_TIMEOUT.defaultValue()).toMillis(), TimeUnit.MILLISECONDS); private MetricStore metrics = new MetricStore(); private MetricDumpDeserializer deserializer = new MetricDumpDeserializer(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java index ffab9cc77636a..9451e20e45f0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java @@ -23,7 +23,7 @@ import akka.actor.Status; import akka.dispatch.Futures; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobClientMessages; @@ -114,7 +114,7 @@ else if (JobClientMessages.getRegistrationTimeout().equals(message)) { client.tell( decorateMessage(new Status.Failure( new JobClientActorRegistrationTimeoutException("Registration for Job at the JobManager " + - "timed out. " + "You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + + "timed out. " + "You may increase '" + AkkaOptions.CLIENT_TIMEOUT.key() + "' in case the JobManager needs more time to confirm the job client registration."))), getSelf()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java index a3fee21525dfa..babb0f66dfc53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java @@ -22,7 +22,7 @@ import akka.actor.Props; import akka.actor.Status; import akka.dispatch.Futures; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.instance.ActorGateway; @@ -119,7 +119,7 @@ public void handleCustomMessage(Object message) { client.tell( decorateMessage(new Status.Failure( new JobClientActorSubmissionTimeoutException("Job submission to the JobManager timed out. " + - "You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + "' in case the JobManager " + + "You may increase '" + AkkaOptions.CLIENT_TIMEOUT.key() + "' in case the JobManager " + "needs more time to configure and confirm the job submission."))), getSelf()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java index 77dbad45a17e3..f9c39c1c92a2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java @@ -26,7 +26,7 @@ import akka.pattern.Patterns; import akka.util.Timeout; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; @@ -148,7 +148,7 @@ protected FlinkResourceManager( } catch (Exception e) { lt = new FiniteDuration( - Duration.apply(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT).toMillis(), + Duration.apply(AkkaOptions.LOOKUP_TIMEOUT.defaultValue()).toMillis(), TimeUnit.MILLISECONDS); } this.messageTimeout = lt; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java index d95e1c37ac4e0..36d81e6e45a08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph.restart; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.concurrent.impl.FlinkFuture; @@ -92,7 +93,7 @@ public static FailureRateRestartStrategyFactory createFactory(Configuration conf String failuresIntervalString = configuration.getString( ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, Duration.apply(1, TimeUnit.MINUTES).toString() ); - String timeoutString = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT); + String timeoutString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL); String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, timeoutString); Duration failuresInterval = Duration.apply(failuresIntervalString); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java index f51ea7c27300f..2b62c0091c752 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph.restart; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.concurrent.impl.FlinkFuture; @@ -72,8 +73,7 @@ public static FixedDelayRestartStrategyFactory createFactory(Configuration confi int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); String timeoutString = configuration.getString( - ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT); + AkkaOptions.WATCH_HEARTBEAT_INTERVAL); String delayString = configuration.getString( ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, @@ -87,7 +87,7 @@ public static FixedDelayRestartStrategyFactory createFactory(Configuration confi } catch (NumberFormatException nfe) { if (delayString.equals(timeoutString)) { throw new Exception("Invalid config value for " + - ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE + ": " + timeoutString + + AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + timeoutString + ". Value must be a valid duration (such as '10 s' or '1 min')"); } else { throw new Exception("Invalid config value for " + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java index 27ee9b61eec07..d1f547f6f9a9e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph.restart; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; @@ -88,8 +89,7 @@ public static RestartStrategyFactory createRestartStrategyFactory(Configuration // support deprecated ConfigConstants values final int numberExecutionRetries = configuration.getInteger(ConfigConstants.EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES); - String pauseString = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT); + String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE); String delayString = configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, pauseString); @@ -100,7 +100,7 @@ public static RestartStrategyFactory createRestartStrategyFactory(Configuration } catch (NumberFormatException nfe) { if (delayString.equals(pauseString)) { throw new Exception("Invalid config value for " + - ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE + ": " + pauseString + + AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + pauseString + ". Value must be a valid duration (such as '10 s' or '1 min')"); } else { throw new Exception("Invalid config value for " + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java index 9b05273bcb5bd..003d803df14b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java @@ -24,7 +24,7 @@ import akka.dispatch.Recover; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.QueryableStateOptions; @@ -114,13 +114,11 @@ public QueryableStateClient( LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID); // Get the ask timeout - String askTimeoutString = config.getString( - ConfigConstants.AKKA_ASK_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT); + String askTimeoutString = config.getString(AkkaOptions.ASK_TIMEOUT); Duration timeout = FiniteDuration.apply(askTimeoutString); if (!timeout.isFinite()) { - throw new IllegalConfigurationException(ConfigConstants.AKKA_ASK_TIMEOUT + throw new IllegalConfigurationException(AkkaOptions.ASK_TIMEOUT.key() + " is not a finite timeout ('" + askTimeoutString + "')"); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java index 2c64f08d382de..0216789a3cb23 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java @@ -53,24 +53,24 @@ public Time getHeartbeatInterval() { // -------------------------------------------------------------------------- public static ResourceManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException { - final String strTimeout = configuration.getString(AkkaOptions.AKKA_ASK_TIMEOUT); + final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT); final Time timeout; try { timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis()); } catch (NumberFormatException e) { throw new ConfigurationException("Could not parse the resource manager's timeout " + - "value " + AkkaOptions.AKKA_ASK_TIMEOUT + '.', e); + "value " + AkkaOptions.ASK_TIMEOUT + '.', e); } - final String strHeartbeatInterval = configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL); + final String strHeartbeatInterval = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL); final Time heartbeatInterval; try { heartbeatInterval = Time.milliseconds(Duration.apply(strHeartbeatInterval).toMillis()); } catch (NumberFormatException e) { throw new ConfigurationException("Could not parse the resource manager's heartbeat interval " + - "value " + AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL + '.', e); + "value " + AkkaOptions.WATCH_HEARTBEAT_INTERVAL + '.', e); } return new ResourceManagerConfiguration(timeout, heartbeatInterval); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java index a6511688369bd..75cad07c7acdb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java @@ -19,9 +19,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.ConfigurationException; import org.apache.flink.util.Preconditions; @@ -55,18 +53,14 @@ public Time getTaskManagerTimeout() { } public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException { - ConfigOption timeoutOption = ConfigOptions - .key(ConfigConstants.AKKA_ASK_TIMEOUT) - .defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT); - - final String strTimeout = configuration.getString(timeoutOption); + final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT); final Time timeout; try { timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis()); } catch (NumberFormatException e) { throw new ConfigurationException("Could not parse the resource manager's timeout " + - "value " + timeoutOption + '.', e); + "value " + AkkaOptions.ASK_TIMEOUT + '.', e); } return new SlotManagerConfiguration(timeout, timeout, timeout); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java index 8789eedf0427b..810efff659e86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java @@ -21,7 +21,7 @@ import akka.actor.ActorSystem; import com.typesafe.config.Config; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; @@ -130,9 +130,7 @@ public static String getRpcUrl( checkNotNull(config, "config is null"); - final boolean sslEnabled = config.getBoolean( - ConfigConstants.AKKA_SSL_ENABLED, - ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) && + final boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(config); return getRpcUrl( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java index a6e47481084a6..ea9f5767b0157 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; @@ -146,7 +147,7 @@ public static TaskManagerConfiguration fromConfiguration(Configuration configura timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis()); } catch (Exception e) { throw new IllegalArgumentException( - "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT + + "Invalid format for '" + AkkaOptions.ASK_TIMEOUT.key() + "'.Use formats like '50 s' or '1 min' to specify the timeout."); } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 62fa73d1ddb31..60a33ba5f9e6b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -184,13 +184,11 @@ object AkkaUtils { * @return Flink's basic Akka config */ private def getBasicAkkaConfig(configuration: Configuration): Config = { - val akkaThroughput = configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT, - ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT) - val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS, - ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS) + val akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT) + val lifecycleEvents = configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS) val jvmExitOnFatalError = if ( - configuration.getBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true)){ + configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)){ "on" } else { "off" @@ -269,48 +267,36 @@ object AkkaUtils { bindAddress: String, port: Int, externalHostname: String, externalPort: Int): Config = { - val akkaAskTimeout = Duration(configuration.getString( - ConfigConstants.AKKA_ASK_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)) + val akkaAskTimeout = Duration(configuration.getString(AkkaOptions.ASK_TIMEOUT)) val startupTimeout = configuration.getString( - ConfigConstants.AKKA_STARTUP_TIMEOUT, + AkkaOptions.STARTUP_TIMEOUT, (akkaAskTimeout * 10).toString) val transportHeartbeatInterval = configuration.getString( - ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_INTERVAL, - ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL) + AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL) val transportHeartbeatPause = configuration.getString( - ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_PAUSE, - ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE) + AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE) - val transportThreshold = configuration.getDouble( - ConfigConstants.AKKA_TRANSPORT_THRESHOLD, - ConfigConstants.DEFAULT_AKKA_TRANSPORT_THRESHOLD) + val transportThreshold = configuration.getDouble(AkkaOptions.TRANSPORT_THRESHOLD) - val watchHeartbeatInterval = configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL); + val watchHeartbeatInterval = configuration.getString( + AkkaOptions.WATCH_HEARTBEAT_INTERVAL) - val watchHeartbeatPause = configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_PAUSE); + val watchHeartbeatPause = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE) - val watchThreshold = configuration.getDouble( - ConfigConstants.AKKA_WATCH_THRESHOLD, - ConfigConstants.DEFAULT_AKKA_WATCH_THRESHOLD) + val watchThreshold = configuration.getInteger(AkkaOptions.WATCH_THRESHOLD) - val akkaTCPTimeout = configuration.getString(AkkaOptions.AKKA_TCP_TIMEOUT); + val akkaTCPTimeout = configuration.getString(AkkaOptions.TCP_TIMEOUT) - val akkaFramesize = configuration.getString( - ConfigConstants.AKKA_FRAMESIZE, - ConfigConstants.DEFAULT_AKKA_FRAMESIZE) + val akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE) - val lifecycleEvents = configuration.getBoolean( - ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS, - ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS) + val lifecycleEvents = configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS) val logLifecycleEvents = if (lifecycleEvents) "on" else "off" - val akkaEnableSSLConfig = configuration.getBoolean(ConfigConstants.AKKA_SSL_ENABLED, - ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) && + val akkaEnableSSLConfig = configuration.getBoolean(AkkaOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(configuration) val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off" @@ -588,14 +574,13 @@ object AkkaUtils { } def getTimeout(config: Configuration): FiniteDuration = { - val duration = Duration(config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)) + val duration = Duration(config.getString(AkkaOptions.ASK_TIMEOUT)) new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) } def getDefaultTimeout: Time = { - val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) + val duration = Duration(AkkaOptions.ASK_TIMEOUT.defaultValue()) Time.milliseconds(duration.toMillis) } @@ -607,30 +592,24 @@ object AkkaUtils { } def getLookupTimeout(config: Configuration): FiniteDuration = { - val duration = Duration(config.getString( - ConfigConstants.AKKA_LOOKUP_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT)) + val duration = Duration(config.getString(AkkaOptions.LOOKUP_TIMEOUT)) new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) } def getDefaultLookupTimeout: FiniteDuration = { - val duration = Duration(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT) + val duration = Duration(AkkaOptions.LOOKUP_TIMEOUT.defaultValue()) new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) } def getClientTimeout(config: Configuration): FiniteDuration = { - val duration = Duration( - config.getString( - ConfigConstants.AKKA_CLIENT_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_CLIENT_TIMEOUT - )) + val duration = Duration(config.getString(AkkaOptions.CLIENT_TIMEOUT)) new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) } def getDefaultClientTimeout: FiniteDuration = { - val duration = Duration(ConfigConstants.DEFAULT_AKKA_CLIENT_TIMEOUT) + val duration = Duration(AkkaOptions.CLIENT_TIMEOUT.defaultValue()) new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 2ace8db4c09d5..abc8946fbf428 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -27,7 +27,7 @@ import akka.pattern.ask import akka.actor.{ActorRef, ActorSystem} import com.typesafe.config.Config import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult} -import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration} import org.apache.flink.core.fs.Path import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.client.{JobClient, JobExecutionException} @@ -265,9 +265,9 @@ abstract class FlinkMiniCluster( // https://docs.travis-ci.com/user/environment-variables#Default-Environment-Variables if (sys.env.contains("CI")) { // Only set if nothing specified in config - if (config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, null) == null) { - val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) * 10 - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, s"${duration.toSeconds}s") + if (!config.contains(AkkaOptions.ASK_TIMEOUT)) { + val duration = Duration(AkkaOptions.ASK_TIMEOUT.defaultValue()) * 10 + config.setString(AkkaOptions.ASK_TIMEOUT, s"${duration.toSeconds}s") LOG.info(s"Akka ask timeout set to ${duration.toSeconds}s") } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index 5aa31ff9aa793..f1bc43b9ad5fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -22,7 +22,7 @@ import akka.testkit.JavaTestKit; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; @@ -192,7 +192,7 @@ protected void run() { // set a short timeout for lookups Configuration shortTimeoutConfig = config.clone(); - shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "1 s"); + shortTimeoutConfig.setString(AkkaOptions.LOOKUP_TIMEOUT, "1 s"); fakeJobManager = TestingUtils.createForwardingActor( system, @@ -234,7 +234,7 @@ protected void run() { // set a long timeout for lookups such that the test fails in case of timeouts Configuration shortTimeoutConfig = config.clone(); - shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "99999 s"); + shortTimeoutConfig.setString(AkkaOptions.LOOKUP_TIMEOUT, "99999 s"); fakeJobManager = TestingUtils.createForwardingActor( system, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java index f19ca4e58a3a8..0346e483d4d38 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; @@ -51,7 +52,7 @@ public static void setUp() throws Exception { final Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS); flink = new TestingCluster(config, true); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index c8459e7fbab2c..1a4396ec1dda7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -26,6 +26,7 @@ import akka.testkit.TestProbe; import com.typesafe.config.Config; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; @@ -601,7 +602,7 @@ public void testKvStateMessages() throws Exception { Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow(); Configuration config = new Configuration(); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100ms"); + config.setString(AkkaOptions.ASK_TIMEOUT, "100ms"); ActorRef jobManagerActor = JobManager.startJobManagerActors( config, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 9dcfc70c26bae..7234feab3e540 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -27,6 +27,7 @@ import akka.testkit.JavaTestKit; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemoryType; @@ -77,9 +78,9 @@ public void testComponentsStartupShutdown() throws Exception { final int BUFFER_SIZE = 32 * 1024; Configuration config = new Configuration(); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms"); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "1 s"); - config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 1); + config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "200 ms"); + config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "1 s"); + config.setInteger(AkkaOptions.WATCH_THRESHOLD, 1); ActorSystem actorSystem = null; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index 0844aad18a499..3953072e179d5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -23,6 +23,7 @@ import akka.actor.InvalidActorNameException; import akka.actor.Terminated; import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -90,10 +91,10 @@ public class TaskManagerRegistrationTest extends TestLogger { @BeforeClass public static void startActorSystem() { config = new Configuration(); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s"); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms"); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s"); - config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0); + config.setString(AkkaOptions.ASK_TIMEOUT, "5 s"); + config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "200 ms"); + config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2 s"); + config.setInteger(AkkaOptions.WATCH_THRESHOLD, 2); actorSystem = AkkaUtils.createLocalActorSystem(config); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java index 42338cda4452f..48eb39222ce20 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.testutils; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; @@ -86,10 +87,10 @@ public static Configuration configureZooKeeperHA( config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery"); // Akka failure detection and execution retries - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms"); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s"); - config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); + config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms"); + config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s"); + config.setInteger(AkkaOptions.WATCH_THRESHOLD, 9); + config.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); config.setString(HighAvailabilityOptions.HA_JOB_DELAY, "10 s"); return config; diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala index 9f8e3e193b532..daf0f4734701d 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala @@ -20,7 +20,7 @@ package org.apache.flink.runtime.akka import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration} import org.apache.flink.runtime.testingUtils.{TestingCluster, TestingUtils, ScalaTestingUtils} import org.junit.runner.RunWith import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} @@ -119,7 +119,7 @@ class AkkaSslITCase(_system: ActorSystem) val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "2 s") + config.setString(AkkaOptions.ASK_TIMEOUT, "2 s") config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true) config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "invalid.keystore") @@ -141,7 +141,7 @@ class AkkaSslITCase(_system: ActorSystem) val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "2 s") + config.setString(AkkaOptions.ASK_TIMEOUT, "2 s") config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala index 97a001d07d554..6d7d87cbf2b68 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala @@ -23,7 +23,7 @@ import java.net.{InetAddress, InetSocketAddress} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} -import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.util.NetUtils import org.junit.Assert._ @@ -122,7 +122,7 @@ class JobManagerConnectionTest { private def createConfigWithLowTimeout() : Configuration = { val config = new Configuration() - config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, + config.setString(AkkaOptions.LOOKUP_TIMEOUT, Duration(timeout, TimeUnit.MILLISECONDS).toSeconds + " s") config } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala index f3ab40905782d..4fc40423dadae 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala @@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager import akka.actor.{ActorSystem, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration} import org.apache.flink.runtime.akka.ListeningBehaviour import org.apache.flink.runtime.io.network.partition.ResultPartitionType import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobStatus, JobVertex} @@ -60,7 +60,7 @@ class RecoveryITCase(_system: ActorSystem) val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers) - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, heartbeatTimeout) + config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, heartbeatTimeout) config.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay") config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1) config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, heartbeatTimeout) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index c8977f095ded7..858bbbb9d01c5 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -28,7 +28,7 @@ import com.google.common.util.concurrent.MoreExecutors import com.typesafe.config.ConfigFactory import grizzled.slf4j.Logger import org.apache.flink.api.common.time.Time -import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions, TaskManagerOptions} +import org.apache.flink.configuration._ import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID @@ -89,7 +89,7 @@ object TestingUtils { val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs) - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout) + config.setString(AkkaOptions.ASK_TIMEOUT, timeout) val cluster = new TestingCluster(config) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index 437dd5f1deb07..5f6f5c46205c0 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -27,6 +27,7 @@ import org.apache.commons.io.IOUtils; import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -146,8 +147,8 @@ public static LocalFlinkMiniCluster startCluster( config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, TASK_MANAGER_MEMORY_SIZE); config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s"); - config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT); + config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s"); + config.setString(AkkaOptions.STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT); config.setInteger(JobManagerOptions.WEB_PORT, 8081); config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); @@ -287,7 +288,7 @@ public static void readAllResultLines( String resultPath, String[] excludePrefixes, boolean inOrderOfFiles) throws IOException { - + checkArgument(resultPath != null, "resultPath cannot be be null"); final BufferedReader[] readers = getResultReader(resultPath, excludePrefixes, inOrderOfFiles); @@ -328,8 +329,8 @@ public static void compareResultsByLinesInMemory( String msg = String.format( "Different elements in arrays: expected %d elements and received %d\n" + "files: %s\n expected: %s\n received: %s", - expected.length, result.length, - Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)), + expected.length, result.length, + Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)), Arrays.toString(expected), Arrays.toString(result)); fail(msg); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index 49ff744a92cd3..92e5768338503 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.LocalEnvironment; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.optimizer.DataStatistics; @@ -120,7 +121,7 @@ public void before() throws Exception { Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); TestingCluster testingCluster = new TestingCluster(config, false, true); testingCluster.start(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java index 06233d6bfdf16..27673121b6326 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java @@ -20,6 +20,7 @@ package org.apache.flink.test.cancelling; import org.apache.flink.api.common.Plan; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; @@ -88,7 +89,7 @@ public void startCluster() throws Exception { config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index a573be63deca2..bda16796eb0eb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; @@ -74,8 +75,8 @@ public static void startTestCluster() { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L); - config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, "60 s"); - config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s"); + config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s"); + config.setString(AkkaOptions.ASK_TIMEOUT, "60 s"); cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java index e7bd5225b6aef..4905d43c2a947 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java @@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; @@ -44,7 +44,7 @@ public class StreamingCustomInputSplitProgram { public static void main(String[] args) throws Exception { Configuration config = new Configuration(); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s"); + config.setString(AkkaOptions.ASK_TIMEOUT, "5 s"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java index 7c6f73aec4620..85961db1d5298 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.GenericInputSplit; @@ -78,7 +79,7 @@ public static void tearDownCluster() throws Exception { @Test(expected=FlinkException.class) public void testInvalidAkkaConfiguration() throws Throwable { Configuration config = new Configuration(); - config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT); + config.setString(AkkaOptions.STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT); final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( cluster.getHostname(), @@ -103,7 +104,7 @@ public void testInvalidAkkaConfiguration() throws Throwable { @Test public void testUserSpecificParallelism() throws Exception { Configuration config = new Configuration(); - config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT); + config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT); final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( cluster.getHostname(), diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index c7c07ce1472ef..5c65a7ff528a2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -24,6 +24,7 @@ import akka.util.Timeout; import org.apache.commons.io.FileUtils; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; @@ -127,11 +128,11 @@ public void testTaskManagerProcessFailure() throws Exception { Tuple2 localAddress = new Tuple2("localhost", jobManagerPort); Configuration jmConfig = new Configuration(); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms"); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s"); - jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9); + jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms"); + jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s"); + jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 9); jmConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "10 s"); - jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); + jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1()); jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort); @@ -409,7 +410,7 @@ public static void main(String[] args) { cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); - cfg.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); + cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, ResourceID.generate(), TaskManager.class); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java index 6d53b9f6d575a..c8c8d2afd7785 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java @@ -22,6 +22,7 @@ import akka.actor.ActorSystem; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -156,9 +157,9 @@ public void testChaosMonkey() throws Exception { ZooKeeper.getConnectString(), FileStateBackendBasePath.toURI().toString()); // Akka and restart timeouts - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms"); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s"); - config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9); + config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms"); + config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s"); + config.setInteger(AkkaOptions.WATCH_THRESHOLD, 9); if (checkpointingIntervalMs >= killEvery.toMillis()) { throw new IllegalArgumentException("Relax! You want to kill processes every " + diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index 9d2806c5483a0..59d5a519ffac7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -29,7 +29,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; @@ -101,10 +101,10 @@ public void testCancelingOnProcessFailure() throws Exception { Tuple2 localAddress = new Tuple2("localhost", jobManagerPort); Configuration jmConfig = new Configuration(); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "5 s"); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2000 s"); - jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 10); - jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); + jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "5 s"); + jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2000 s"); + jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 10); + jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1()); jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java index bafdd9f685d42..93d369a3b3487 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; @@ -77,9 +78,9 @@ public void testRestartWithFailingTaskManager() { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms"); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "20 s"); - config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 20); + config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "500 ms"); + config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "20 s"); + config.setInteger(AkkaOptions.WATCH_THRESHOLD, 20); cluster = new LocalFlinkMiniCluster(config, false); diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java index 40a8f09d55fe3..37e89e926bdff 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java @@ -23,6 +23,7 @@ import akka.actor.PoisonPill; import org.apache.curator.test.TestingServer; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -151,7 +152,7 @@ public void testJobExecutionOnClusterWithLeaderReelection() throws Exception { // we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make // sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message - configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, AkkaUtils.INF_TIMEOUT().toString()); + configuration.setString(AkkaOptions.ASK_TIMEOUT, AkkaUtils.INF_TIMEOUT().toString()); Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java index 1da52d496242f..398a5eb5d3a9c 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java @@ -18,6 +18,7 @@ package org.apache.flink.yarn; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -127,7 +128,7 @@ protected int run(String[] args) { } // tell akka to die in case of an error - configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); + configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); String keytabPath = null; if(remoteKeytabPath != null) { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java index 849a8a64d9450..047a1fae14db3 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.concurrent.Callable; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; @@ -91,7 +92,7 @@ public static void runYarnTaskManager(String[] args, final Class