Skip to content

Commit

Permalink
[FLINK-6495] Migrate Akka configuration options
Browse files Browse the repository at this point in the history
This closes apache#3935.
  • Loading branch information
FangYongs authored and zentol committed May 22, 2017
1 parent 9a9e193 commit 302c674
Show file tree
Hide file tree
Showing 43 changed files with 324 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
Expand Down Expand Up @@ -92,8 +93,8 @@ public void testExceptionWhenRemoteJobManagerUnreachable() throws Exception {
private static void testFailureBehavior(final InetSocketAddress unreachableEndpoint) throws Exception {

final Configuration config = new Configuration();
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, (ASK_STARTUP_TIMEOUT) + " ms");
config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, (CONNECT_TIMEOUT) + " ms");
config.setString(AkkaOptions.ASK_TIMEOUT, ASK_STARTUP_TIMEOUT + " ms");
config.setString(AkkaOptions.LOOKUP_TIMEOUT, CONNECT_TIMEOUT + " ms");
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, unreachableEndpoint.getHostName());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, unreachableEndpoint.getPort());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.DetachedEnvironment.DetachedJobExecutionResult;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
Expand Down Expand Up @@ -97,7 +98,7 @@ public void setUp() throws Exception {
config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, freePort);
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue());

try {
scala.Tuple2<String, Object> address = new scala.Tuple2<String, Object>("localhost", freePort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
Expand Down Expand Up @@ -269,7 +270,7 @@ public void killTopologyWithOpts(final String name, final KillOptions options) t
JobID getTopologyJobId(final String id) {
final Configuration configuration = GlobalConfiguration.loadConfiguration();
if (this.timeout != null) {
configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
configuration.setString(AkkaOptions.ASK_TIMEOUT, this.timeout);
}

try {
Expand Down Expand Up @@ -309,7 +310,7 @@ JobID getTopologyJobId(final String id) {
private FiniteDuration getTimeout() {
final Configuration configuration = GlobalConfiguration.loadConfiguration();
if (this.timeout != null) {
configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
configuration.setString(AkkaOptions.ASK_TIMEOUT, this.timeout);
}

return AkkaUtils.getClientTimeout(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,114 @@
public class AkkaOptions {

/**
* Timeout for akka ask calls
* Timeout for akka ask calls.
*/
public static final ConfigOption<String> AKKA_ASK_TIMEOUT = ConfigOptions
public static final ConfigOption<String> ASK_TIMEOUT = ConfigOptions
.key("akka.ask.timeout")
.defaultValue("10 s");

/**
* The Akka death watch heartbeat interval.
*/
public static final ConfigOption<String> WATCH_HEARTBEAT_INTERVAL = ConfigOptions
.key("akka.watch.heartbeat.interval")
.defaultValue(ASK_TIMEOUT.defaultValue());

/**
* The maximum acceptable Akka death watch heartbeat pause.
*/
public static final ConfigOption<String> WATCH_HEARTBEAT_PAUSE = ConfigOptions
.key("akka.watch.heartbeat.pause")
.defaultValue(ASK_TIMEOUT.defaultValue());

/**
* The Akka tcp connection timeout.
*/
public static final ConfigOption<String> AKKA_TCP_TIMEOUT = ConfigOptions
public static final ConfigOption<String> TCP_TIMEOUT = ConfigOptions
.key("akka.tcp.timeout")
.defaultValue("20 s");

/**
* The Akka death watch heartbeat interval.
* Timeout for the startup of the actor system.
*/
public static final ConfigOption<String> AKKA_WATCH_HEARTBEAT_INTERVAL = ConfigOptions
.key("akka.watch.heartbeat.interval")
public static final ConfigOption<String> STARTUP_TIMEOUT = ConfigOptions
.key("akka.startup-timeout")
.noDefaultValue();

/**
* Heartbeat interval of the transport failure detector.
*/
public static final ConfigOption<String> TRANSPORT_HEARTBEAT_INTERVAL = ConfigOptions
.key("akka.transport.heartbeat.interval")
.defaultValue("1000 s");

/**
* Allowed heartbeat pause for the transport failure detector.
*/
public static final ConfigOption<String> TRANSPORT_HEARTBEAT_PAUSE = ConfigOptions
.key("akka.transport.heartbeat.pause")
.defaultValue("6000 s");

/**
* Detection threshold of transport failure detector.
*/
public static final ConfigOption<Double> TRANSPORT_THRESHOLD = ConfigOptions
.key("akka.transport.threshold")
.defaultValue(300.0);

/**
* Detection threshold for the phi accrual watch failure detector.
*/
public static final ConfigOption<Integer> WATCH_THRESHOLD = ConfigOptions
.key("akka.watch.threshold")
.defaultValue(12);

/**
* Override SSL support for the Akka transport.
*/
public static final ConfigOption<Boolean> SSL_ENABLED = ConfigOptions
.key("akka.ssl.enabled")
.defaultValue(true);

/**
* Maximum framesize of akka messages.
*/
public static final ConfigOption<String> FRAMESIZE = ConfigOptions
.key("akka.framesize")
.defaultValue("10485760b");

/**
* Maximum number of messages until another actor is executed by the same thread.
*/
public static final ConfigOption<Integer> DISPATCHER_THROUGHPUT = ConfigOptions
.key("akka.throughput")
.defaultValue(15);

/**
* Log lifecycle events.
*/
public static final ConfigOption<Boolean> LOG_LIFECYCLE_EVENTS = ConfigOptions
.key("akka.log.lifecycle.events")
.defaultValue(false);

/**
* Timeout for all blocking calls that look up remote actors.
*/
public static final ConfigOption<String> LOOKUP_TIMEOUT = ConfigOptions
.key("akka.lookup.timeout")
.defaultValue("10 s");

/**
* The maximum acceptable Akka death watch heartbeat pause.
* Timeout for all blocking calls on the client side.
*/
public static final ConfigOption<String> AKKA_WATCH_HEARTBEAT_PAUSE = ConfigOptions
.key("akka.watch.heartbeat.pause")
public static final ConfigOption<String> CLIENT_TIMEOUT = ConfigOptions
.key("akka.client.timeout")
.defaultValue("60 s");

/**
* Exit JVM on fatal Akka errors.
*/
public static final ConfigOption<Boolean> JVM_EXIT_ON_FATAL_ERROR = ConfigOptions
.key("akka.jvm-exit-on-fatal-error")
.defaultValue(true);
}
Original file line number Diff line number Diff line change
Expand Up @@ -708,82 +708,130 @@ public final class ConfigConstants {

/**
* Timeout for the startup of the actor system
*
* @deprecated Use {@link AkkaOptions#STARTUP_TIMEOUT} instead.
*/
@Deprecated
public static final String AKKA_STARTUP_TIMEOUT = "akka.startup-timeout";

/**
* Heartbeat interval of the transport failure detector
*
* @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead.
*/
@Deprecated
public static final String AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "akka.transport.heartbeat.interval";

/**
* Allowed heartbeat pause for the transport failure detector
*
* @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead.
*/
@Deprecated
public static final String AKKA_TRANSPORT_HEARTBEAT_PAUSE = "akka.transport.heartbeat.pause";

/**
* Detection threshold of transport failure detector
*
* @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead.
*/
@Deprecated
public static final String AKKA_TRANSPORT_THRESHOLD = "akka.transport.threshold";

/**
* Heartbeat interval of watch failure detector
*
* @deprecated Use {@link AkkaOptions#WATCH_HEARTBEAT_INTERVAL} instead.
*/
@Deprecated
public static final String AKKA_WATCH_HEARTBEAT_INTERVAL = "akka.watch.heartbeat.interval";

/**
* Allowed heartbeat pause for the watch failure detector
*
* @deprecated Use {@link AkkaOptions#WATCH_HEARTBEAT_PAUSE} instead.
*/
@Deprecated
public static final String AKKA_WATCH_HEARTBEAT_PAUSE = "akka.watch.heartbeat.pause";

/**
* Detection threshold for the phi accrual watch failure detector
*
* @deprecated Use {@link AkkaOptions#WATCH_THRESHOLD} instead.
*/
@Deprecated
public static final String AKKA_WATCH_THRESHOLD = "akka.watch.threshold";

/**
* Akka TCP timeout
*
* @deprecated Use {@link AkkaOptions#TCP_TIMEOUT} instead.
*/
@Deprecated
public static final String AKKA_TCP_TIMEOUT = "akka.tcp.timeout";

/**
* Override SSL support for the Akka transport
*
* @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead.
*/
@Deprecated
public static final String AKKA_SSL_ENABLED = "akka.ssl.enabled";

/**
* Maximum framesize of akka messages
*
* @deprecated Use {@link AkkaOptions#FRAMESIZE} instead.
*/
@Deprecated
public static final String AKKA_FRAMESIZE = "akka.framesize";

/**
* Maximum number of messages until another actor is executed by the same thread
*
* @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead.
*/
@Deprecated
public static final String AKKA_DISPATCHER_THROUGHPUT = "akka.throughput";

/**
* Log lifecycle events
*
* @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead.
*/
@Deprecated
public static final String AKKA_LOG_LIFECYCLE_EVENTS = "akka.log.lifecycle.events";

/**
* Timeout for all blocking calls on the cluster side
*
* @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead.
*/
@Deprecated
public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";

/**
* Timeout for all blocking calls that look up remote actors
*
* @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead.
*/
@Deprecated
public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";

/**
* Timeout for all blocking calls on the client side
*
* @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead.
*/
@Deprecated
public static final String AKKA_CLIENT_TIMEOUT = "akka.client.timeout";

/**
* Exit JVM on fatal Akka errors
*
* @deprecated Use {@link AkkaOptions#JVM_EXIT_ON_FATAL_ERROR} instead.
*/
@Deprecated
public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = "akka.jvm-exit-on-fatal-error";

// ----------------------------- Transport SSL Settings--------------------
Expand Down Expand Up @@ -1425,26 +1473,70 @@ public final class ConfigConstants {

// ------------------------------ Akka Values ------------------------------

/**
* @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead.
*/
@Deprecated
public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";

/**
* @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead.
*/
@Deprecated
public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s";

/**
* @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead.
*/
@Deprecated
public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0;

/**
* @deprecated Use {@link AkkaOptions#WATCH_THRESHOLD} instead.
*/
@Deprecated
public static double DEFAULT_AKKA_WATCH_THRESHOLD = 12;

/**
* @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead.
*/
@Deprecated
public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15;

/**
* @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead.
*/
@Deprecated
public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false;

/**
* @deprecated Use {@link AkkaOptions#FRAMESIZE} instead.
*/
@Deprecated
public static String DEFAULT_AKKA_FRAMESIZE = "10485760b";

/**
* @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead.
*/
@Deprecated
public static String DEFAULT_AKKA_ASK_TIMEOUT = "10 s";

/**
* @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead.
*/
@Deprecated
public static String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s";

/**
* @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead.
*/
@Deprecated
public static String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s";

/**
* @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead.
*/
@Deprecated
public static boolean DEFAULT_AKKA_SSL_ENABLED = true;

// ----------------------------- SSL Values --------------------------------
Expand Down
Loading

0 comments on commit 302c674

Please sign in to comment.