Skip to content

Commit

Permalink
[FLINK-34081][configuration] Refactor all callers of deprecated `getX…
Browse files Browse the repository at this point in the history
…xx(ConfigOption<Xxx>)`, `getXxx(ConfigOption<Xxx>, Xxx)` and `setXxx(ConfigOption<Integer>, Xxx)` methods of Configuration
  • Loading branch information
1996fanrui committed Jan 22, 2024
1 parent b79d3f0 commit c8f27c2
Show file tree
Hide file tree
Showing 457 changed files with 1,578 additions and 1,710 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static void executeProgram(

LOG.info(
"Starting program (detached: {})",
!configuration.getBoolean(DeploymentOptions.ATTACHED));
!configuration.get(DeploymentOptions.ATTACHED));

ContextEnvironment.setAsContext(
executorServiceLoader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ public void addGeneralOptions(Options baseOptions) {
@Override
public Configuration toConfiguration(CommandLine commandLine) throws FlinkException {
final Configuration resultingConfiguration = new Configuration();
resultingConfiguration.setString(DeploymentOptions.TARGET, RemoteExecutor.NAME);
resultingConfiguration.set(DeploymentOptions.TARGET, RemoteExecutor.NAME);

if (commandLine.hasOption(zookeeperNamespaceOption.getOpt())) {
String zkNamespace = commandLine.getOptionValue(zookeeperNamespaceOption.getOpt());
resultingConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
resultingConfiguration.set(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
}

return resultingConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1413,10 +1413,10 @@ public static String getConfigurationDirectoryFromEnv() {
* @param config The configuration to write to
*/
static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
config.setString(JobManagerOptions.ADDRESS, address.getHostString());
config.setInteger(JobManagerOptions.PORT, address.getPort());
config.setString(RestOptions.ADDRESS, address.getHostString());
config.setInteger(RestOptions.PORT, address.getPort());
config.set(JobManagerOptions.ADDRESS, address.getHostString());
config.set(JobManagerOptions.PORT, address.getPort());
config.set(RestOptions.ADDRESS, address.getHostString());
config.set(RestOptions.PORT, address.getPort());
}

public static List<CustomCommandLine> loadCustomCommandLines(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,10 @@ public Configuration toConfiguration(CommandLine commandLine) throws FlinkExcept
setJobManagerAddressInConfig(resultingConfiguration, jobManagerAddress);

URL url = NetUtils.getCorrectHostnamePort(addressWithPort);
resultingConfiguration.setString(RestOptions.PATH, url.getPath());
resultingConfiguration.setBoolean(
SecurityOptions.SSL_REST_ENABLED, isHttpsProtocol(url));
resultingConfiguration.set(RestOptions.PATH, url.getPath());
resultingConfiguration.set(SecurityOptions.SSL_REST_ENABLED, isHttpsProtocol(url));
}
resultingConfiguration.setString(DeploymentOptions.TARGET, RemoteExecutor.NAME);
resultingConfiguration.set(DeploymentOptions.TARGET, RemoteExecutor.NAME);

DynamicPropertiesUtil.encodeDynamicProperties(commandLine, resultingConfiguration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,18 @@ public List<URL> getClasspaths() throws MalformedURLException {
}

public int getParallelism() {
return configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
return configuration.get(CoreOptions.DEFAULT_PARALLELISM);
}

public boolean getDetachedMode() {
return !configuration.getBoolean(DeploymentOptions.ATTACHED);
return !configuration.get(DeploymentOptions.ATTACHED);
}

public SavepointRestoreSettings getSavepointRestoreSettings() {
return SavepointRestoreSettings.fromConfiguration(configuration);
}

public boolean isShutdownOnAttachedExit() {
return configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED);
return configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ public Configuration toConfiguration(final CommandLine commandLine) {

final String executorName = commandLine.getOptionValue(executorOption.getOpt());
if (executorName != null) {
resultConfiguration.setString(DeploymentOptions.TARGET, executorName);
resultConfiguration.set(DeploymentOptions.TARGET, executorName);
}

final String targetName = commandLine.getOptionValue(targetOption.getOpt());
if (targetName != null) {
resultConfiguration.setString(DeploymentOptions.TARGET, targetName);
resultConfiguration.set(DeploymentOptions.TARGET, targetName);
}

DynamicPropertiesUtil.encodeDynamicProperties(commandLine, resultConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,11 @@ public SavepointRestoreSettings getSavepointRestoreSettings() {

public void applyToConfiguration(Configuration configuration) {
if (hasParallelismOpt) {
configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, getParallelism());
configuration.set(CoreOptions.DEFAULT_PARALLELISM, getParallelism());
}

configuration.setBoolean(DeploymentOptions.ATTACHED, !getDetachedMode());
configuration.setBoolean(
DeploymentOptions.SHUTDOWN_IF_ATTACHED, isShutdownOnAttachedExit());
configuration.set(DeploymentOptions.ATTACHED, !getDetachedMode());
configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, isShutdownOnAttachedExit());
ConfigUtils.encodeCollectionToConfig(
configuration, PipelineOptions.CLASSPATHS, getClasspaths(), URL::toString);
SavepointRestoreSettings.toConfiguration(getSavepointRestoreSettings(), configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public ClusterSpecification getClusterSpecification(Configuration configuration)
.getTotalProcessMemorySize()
.getMebiBytes();

int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
int slotsPerTaskManager = configuration.get(TaskManagerOptions.NUM_TASK_SLOTS);

return new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMB)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ public class StandaloneClientFactory implements ClusterClientFactory<StandaloneC
@Override
public boolean isCompatibleWith(Configuration configuration) {
checkNotNull(configuration);
return RemoteExecutor.NAME.equalsIgnoreCase(
configuration.getString(DeploymentOptions.TARGET));
return RemoteExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public StandaloneClusterDescriptor(Configuration config) {

@Override
public String getClusterDescription() {
String host = config.getString(JobManagerOptions.ADDRESS, "");
int port = config.getInteger(JobManagerOptions.PORT, -1);
String host = config.get(JobManagerOptions.ADDRESS, "");
int port = config.get(JobManagerOptions.PORT, -1);
return "Standalone cluster at " + host + ":" + port;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ && isCanceledOrFailed(maybeApplicationStatus.get())) {
private CompletableFuture<Acknowledge> finish(
DispatcherGateway dispatcherGateway, ApplicationStatus applicationStatus) {
boolean shouldShutDownOnFinish =
configuration.getBoolean(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH);
configuration.get(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH);
return shouldShutDownOnFinish
? dispatcherGateway.shutDownCluster(applicationStatus)
: CompletableFuture.completedFuture(Acknowledge.get());
Expand All @@ -209,7 +209,7 @@ private CompletableFuture<Void> fixJobIdAndRunApplicationAsync(
final Optional<String> configuredJobId =
configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
final boolean submitFailedJobOnApplicationError =
configuration.getBoolean(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR);
configuration.get(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR);
if (!HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)
&& !configuredJobId.isPresent()) {
return runApplicationAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public CompletableFuture<JobClient> execute(
effectiveConfig.addAll(configuration);

// we only support attached execution with the local executor.
checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
checkState(configuration.get(DeploymentOptions.ATTACHED));

final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader);

Expand All @@ -93,7 +93,7 @@ private JobGraph getJobGraph(
if (pipeline instanceof Plan) {
Plan plan = (Plan) pipeline;
final int slotsPerTaskManager =
configuration.getInteger(
configuration.get(
TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism());
final int numTaskManagers =
configuration.get(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ public static JobGraph getJobGraph(
.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
.ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID)));

if (configuration.getBoolean(DeploymentOptions.ATTACHED)
&& configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
if (configuration.get(DeploymentOptions.ATTACHED)
&& configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
jobGraph.setInitialClientHeartbeatTimeout(
configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
configuration.get(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
}

jobGraph.addJars(executionConfigAccessor.getJars());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ private JobExecutionResult getJobExecutionResult(final JobClient jobClient) thro
checkNotNull(jobClient);

JobExecutionResult jobExecutionResult;
if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
if (getConfiguration().get(DeploymentOptions.ATTACHED)) {
CompletableFuture<JobExecutionResult> jobExecutionResultFuture =
jobClient.getJobExecutionResult();

ScheduledExecutorService clientHeartbeatService = null;
if (getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
if (getConfiguration().get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
Thread shutdownHook =
ShutdownHookUtil.addShutdownHook(
() -> {
Expand All @@ -119,8 +119,8 @@ private JobExecutionResult getJobExecutionResult(final JobClient jobClient) thro
clientHeartbeatService =
ClientUtils.reportHeartbeatPeriodically(
jobClient,
getConfiguration().getLong(ClientOptions.CLIENT_HEARTBEAT_INTERVAL),
getConfiguration().getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
getConfiguration().get(ClientOptions.CLIENT_HEARTBEAT_INTERVAL),
getConfiguration().get(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
}

jobExecutionResult = jobExecutionResultFuture.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private MiniClusterConfiguration getMiniClusterConfig(int maximumParallelism) {
Configuration configuration = new Configuration(this.configuration);

if (!configuration.contains(RestOptions.BIND_PORT)) {
configuration.setString(RestOptions.BIND_PORT, "0");
configuration.set(RestOptions.BIND_PORT, "0");
}

int numTaskManagers = configuration.get(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ private JobExecutionResult getJobExecutionResult(final JobClient jobClient) thro
checkNotNull(jobClient);

JobExecutionResult jobExecutionResult;
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
if (configuration.get(DeploymentOptions.ATTACHED)) {
CompletableFuture<JobExecutionResult> jobExecutionResultFuture =
jobClient.getJobExecutionResult();

ScheduledExecutorService clientHeartbeatService = null;
if (configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
if (configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
Thread shutdownHook =
ShutdownHookUtil.addShutdownHook(
() -> {
Expand All @@ -163,8 +163,8 @@ private JobExecutionResult getJobExecutionResult(final JobClient jobClient) thro
clientHeartbeatService =
ClientUtils.reportHeartbeatPeriodically(
jobClient,
configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_INTERVAL),
configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
configuration.get(ClientOptions.CLIENT_HEARTBEAT_INTERVAL),
configuration.get(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
}

jobExecutionResult = jobExecutionResultFuture.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ private RestClusterClient(
jobmanagerUrl =
new URL(
SecurityOptions.isRestSSLEnabled(configuration) ? "https" : "http",
configuration.getString(JobManagerOptions.ADDRESS),
configuration.getInteger(JobManagerOptions.PORT),
configuration.getString(RestOptions.PATH));
configuration.get(JobManagerOptions.ADDRESS),
configuration.get(JobManagerOptions.PORT),
configuration.get(RestOptions.PATH));

if (restClient != null) {
this.restClient = restClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ public static RestClusterClientConfiguration fromConfiguration(Configuration con
RestClientConfiguration restClientConfiguration =
RestClientConfiguration.fromConfiguration(config);

final long awaitLeaderTimeout = config.getLong(RestOptions.AWAIT_LEADER_TIMEOUT);
final int retryMaxAttempts = config.getInteger(RestOptions.RETRY_MAX_ATTEMPTS);
final long retryDelay = config.getLong(RestOptions.RETRY_DELAY);
final long awaitLeaderTimeout = config.get(RestOptions.AWAIT_LEADER_TIMEOUT);
final int retryMaxAttempts = config.get(RestOptions.RETRY_MAX_ATTEMPTS);
final long retryDelay = config.get(RestOptions.RETRY_DELAY);

return new RestClusterClientConfiguration(
restClientConfiguration, awaitLeaderTimeout, retryMaxAttempts, retryDelay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ private Configuration createConfiguration(boolean shutdownOnAttachedExit) {
Dispatcher.CLIENT_ALIVENESS_CHECK_DURATION,
Duration.ofMillis(clientHeartbeatInterval));
if (shutdownOnAttachedExit) {
configuration.setBoolean(DeploymentOptions.ATTACHED, true);
configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true);
configuration.set(DeploymentOptions.ATTACHED, true);
configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true);
}
return configuration;
}
Expand All @@ -115,8 +115,8 @@ private JobClient submitJob(Configuration configuration) throws Exception {
JobGraph cancellableJobGraph = getCancellableJobGraph();
// Enable heartbeat only when both execution.attached and
// execution.shutdown-on-attached-exit are true.
if (configuration.getBoolean(DeploymentOptions.ATTACHED)
&& configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
if (configuration.get(DeploymentOptions.ATTACHED)
&& configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
cancellableJobGraph.setInitialClientHeartbeatTimeout(clientHeartbeatTimeout);
}
return perJobMiniClusterFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public void write(int b) {}

public static void checkJobManagerAddress(
Configuration config, String expectedAddress, int expectedPort) {
String jobManagerAddress = config.getString(JobManagerOptions.ADDRESS);
int jobManagerPort = config.getInteger(JobManagerOptions.PORT, -1);
String jobManagerAddress = config.get(JobManagerOptions.ADDRESS);
int jobManagerPort = config.get(JobManagerOptions.PORT, -1);

assertThat(jobManagerAddress).isEqualTo(expectedAddress);
assertThat(jobManagerPort).isEqualTo(expectedPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ void isActiveWhenTargetOnlyInConfig() throws CliArgsException {
@Test
void testWithPreexistingConfigurationInConstructor() throws CliArgsException {
final Configuration loadedConfig = new Configuration();
loadedConfig.setInteger(CoreOptions.DEFAULT_PARALLELISM, 2);
loadedConfig.setBoolean(DeploymentOptions.ATTACHED, false);
loadedConfig.set(CoreOptions.DEFAULT_PARALLELISM, 2);
loadedConfig.set(DeploymentOptions.ATTACHED, false);

final ConfigOption<List<Integer>> listOption =
key("test.list").intType().asList().noDefaultValue();
Expand All @@ -92,9 +92,9 @@ void testWithPreexistingConfigurationInConstructor() throws CliArgsException {

final Configuration configuration = cliUnderTest.toConfiguration(commandLine);

assertThat(configuration.getString(DeploymentOptions.TARGET)).isEqualTo("test-executor");
assertThat(configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)).isEqualTo(5);
assertThat(configuration.getBoolean(DeploymentOptions.ATTACHED)).isFalse();
assertThat(configuration.get(DeploymentOptions.TARGET)).isEqualTo("test-executor");
assertThat(configuration.get(CoreOptions.DEFAULT_PARALLELISM)).isEqualTo(5);
assertThat(configuration.get(DeploymentOptions.ATTACHED)).isFalse();
assertThat(configuration.get(listOption)).isEqualTo(listValue);
}

Expand Down Expand Up @@ -123,6 +123,6 @@ private void testIsActiveHelper(final String executorOption) throws CliArgsExcep

final Configuration configuration = cliUnderTest.toConfiguration(commandLine);
assertThat(configuration.get(DeploymentOptions.TARGET)).isEqualTo(expectedExecutorName);
assertThat(configuration.getInteger(configOption)).isEqualTo(expectedValue);
assertThat(configuration.get(configOption)).isEqualTo(expectedValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public DummyClusterClientFactory(ClusterClient<ClusterID> clusterClient) {

@Override
public boolean isCompatibleWith(Configuration configuration) {
return ID.equals(configuration.getString(DeploymentOptions.TARGET));
return ID.equals(configuration.get(DeploymentOptions.TARGET));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void addGeneralOptions(Options baseOptions) {
@Override
public Configuration toConfiguration(CommandLine commandLine) {
final Configuration configuration = new Configuration();
configuration.setString(DeploymentOptions.TARGET, DummyClusterClientFactory.ID);
configuration.set(DeploymentOptions.TARGET, DummyClusterClientFactory.ID);
return configuration;
}
}
Loading

0 comments on commit c8f27c2

Please sign in to comment.