Skip to content

Commit

Permalink
[FLINK-32684][rpc] Introduces RpcOptions and deprecates AkkaOptions (a…
Browse files Browse the repository at this point in the history
…pache#24188)

* Renames AkkaOptions into RpcOptions using the IDE and updates the class' JavaDoc
* Copies RpcOptions to AkkaOptions and deprecates AkkaOptions. Additionally, makes each option reference the corresponding RpcOption member.
* Moves deprecated RPC options out of RpcOptions and updates references
  • Loading branch information
XComp authored Jan 30, 2024
1 parent 973190e commit c678244
Show file tree
Hide file tree
Showing 55 changed files with 511 additions and 386 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.testjar.ForbidConfigurationJob;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
Expand Down Expand Up @@ -101,8 +101,7 @@ void setUp() {
config = new Configuration();
config.set(JobManagerOptions.ADDRESS, "localhost");

config.set(
AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
config.set(RpcOptions.ASK_TIMEOUT_DURATION, RpcOptions.ASK_TIMEOUT_DURATION.defaultValue());
}

private Configuration fromPackagedProgram(
Expand Down
265 changes: 40 additions & 225 deletions flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -764,14 +764,14 @@ public final class ConfigConstants {
/**
* Timeout for the startup of the actor system.
*
* @deprecated Use {@link AkkaOptions#STARTUP_TIMEOUT} instead.
* @deprecated Use {@link RpcOptions#STARTUP_TIMEOUT} instead.
*/
@Deprecated public static final String AKKA_STARTUP_TIMEOUT = "akka.startup-timeout";

/**
* Heartbeat interval of the transport failure detector.
*
* @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead.
* @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead.
*/
@Deprecated
public static final String AKKA_TRANSPORT_HEARTBEAT_INTERVAL =
Expand All @@ -780,15 +780,15 @@ public final class ConfigConstants {
/**
* Allowed heartbeat pause for the transport failure detector.
*
* @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead.
* @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_PAUSE} instead.
*/
@Deprecated
public static final String AKKA_TRANSPORT_HEARTBEAT_PAUSE = "akka.transport.heartbeat.pause";

/**
* Detection threshold of transport failure detector.
*
* @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead.
* @deprecated Use {@link RpcOptions#TRANSPORT_THRESHOLD} instead.
*/
@Deprecated public static final String AKKA_TRANSPORT_THRESHOLD = "akka.transport.threshold";

Expand Down Expand Up @@ -818,49 +818,49 @@ public final class ConfigConstants {
/**
* Akka TCP timeout.
*
* @deprecated Use {@link AkkaOptions#TCP_TIMEOUT} instead.
* @deprecated Use {@link RpcOptions#TCP_TIMEOUT} instead.
*/
@Deprecated public static final String AKKA_TCP_TIMEOUT = "akka.tcp.timeout";

/**
* Override SSL support for the Akka transport.
*
* @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead.
* @deprecated Use {@link RpcOptions#SSL_ENABLED} instead.
*/
@Deprecated public static final String AKKA_SSL_ENABLED = "akka.ssl.enabled";

/**
* Maximum framesize of akka messages.
*
* @deprecated Use {@link AkkaOptions#FRAMESIZE} instead.
* @deprecated Use {@link RpcOptions#FRAMESIZE} instead.
*/
@Deprecated public static final String AKKA_FRAMESIZE = "akka.framesize";

/**
* Maximum number of messages until another actor is executed by the same thread.
*
* @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead.
* @deprecated Use {@link RpcOptions#DISPATCHER_THROUGHPUT} instead.
*/
@Deprecated public static final String AKKA_DISPATCHER_THROUGHPUT = "akka.throughput";

/**
* Log lifecycle events.
*
* @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead.
* @deprecated Use {@link RpcOptions#LOG_LIFECYCLE_EVENTS} instead.
*/
@Deprecated public static final String AKKA_LOG_LIFECYCLE_EVENTS = "akka.log.lifecycle.events";

/**
* Timeout for all blocking calls on the cluster side.
*
* @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead.
* @deprecated Use {@link RpcOptions#ASK_TIMEOUT_DURATION} instead.
*/
@Deprecated public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";

/**
* Timeout for all blocking calls that look up remote actors.
*
* @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead.
* @deprecated Use {@link RpcOptions#LOOKUP_TIMEOUT_DURATION} instead.
*/
@Deprecated public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";

Expand All @@ -874,7 +874,7 @@ public final class ConfigConstants {
/**
* Exit JVM on fatal Akka errors.
*
* @deprecated Use {@link AkkaOptions#JVM_EXIT_ON_FATAL_ERROR} instead.
* @deprecated Use {@link RpcOptions#JVM_EXIT_ON_FATAL_ERROR} instead.
*/
@Deprecated
public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = "akka.jvm-exit-on-fatal-error";
Expand Down Expand Up @@ -1547,37 +1547,37 @@ public final class ConfigConstants {

// ------------------------------ Akka Values ------------------------------

/** @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead. */
/** @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead. */
@Deprecated public static final String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";

/** @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead. */
/** @deprecated Use {@link RpcOptions#TRANSPORT_HEARTBEAT_PAUSE} instead. */
@Deprecated public static final String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s";

/** @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead. */
/** @deprecated Use {@link RpcOptions#TRANSPORT_THRESHOLD} instead. */
@Deprecated public static final double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0;

/** @deprecated This default value is no longer used and has no effect on Flink. */
@Deprecated public static final double DEFAULT_AKKA_WATCH_THRESHOLD = 12;

/** @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead. */
/** @deprecated Use {@link RpcOptions#DISPATCHER_THROUGHPUT} instead. */
@Deprecated public static final int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15;

/** @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead. */
/** @deprecated Use {@link RpcOptions#LOG_LIFECYCLE_EVENTS} instead. */
@Deprecated public static final boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false;

/** @deprecated Use {@link AkkaOptions#FRAMESIZE} instead. */
/** @deprecated Use {@link RpcOptions#FRAMESIZE} instead. */
@Deprecated public static final String DEFAULT_AKKA_FRAMESIZE = "10485760b";

/** @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead. */
/** @deprecated Use {@link RpcOptions#ASK_TIMEOUT_DURATION} instead. */
@Deprecated public static final String DEFAULT_AKKA_ASK_TIMEOUT = "10 s";

/** @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead. */
/** @deprecated Use {@link RpcOptions#LOOKUP_TIMEOUT_DURATION} instead. */
@Deprecated public static final String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s";

/** @deprecated Use {@code ClientOptions#CLIENT_TIMEOUT} instead. */
@Deprecated public static final String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s";

/** @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead. */
/** @deprecated Use {@link RpcOptions#SSL_ENABLED} instead. */
@Deprecated public static final boolean DEFAULT_AKKA_SSL_ENABLED = true;

// ----------------------------- SSL Values --------------------------------
Expand Down
Loading

0 comments on commit c678244

Please sign in to comment.