Skip to content

Commit

Permalink
[FLINK-16144] get client.timeout for the client, with a fallback to t…
Browse files Browse the repository at this point in the history
…he akka.client.timeout.

This closes apache#12179.
  • Loading branch information
wtog authored and kl0u committed May 29, 2020
1 parent 9579325 commit 13272ce
Show file tree
Hide file tree
Showing 17 changed files with 148 additions and 51 deletions.
6 changes: 0 additions & 6 deletions docs/_includes/generated/akka_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@
<td>Integer</td>
<td>Min number of threads to cap factor-based number to.</td>
</tr>
<tr>
<td><h5>akka.client.timeout</h5></td>
<td style="word-wrap: break-word;">"60 s"</td>
<td>String</td>
<td>Timeout for all blocking calls on the client side.</td>
</tr>
<tr>
<td><h5>akka.fork-join-executor.parallelism-factor</h5></td>
<td style="word-wrap: break-word;">2.0</td>
Expand Down
24 changes: 24 additions & 0 deletions docs/_includes/generated/client_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>client.retry-period</h5></td>
<td style="word-wrap: break-word;">2 s</td>
<td>Duration</td>
<td>The interval (in ms) between consecutive retries of failed attempts to execute commands through the CLI or Flink's clients, wherever retry is supported (default 2sec).</td>
</tr>
<tr>
<td><h5>client.timeout</h5></td>
<td style="word-wrap: break-word;">1 min</td>
<td>Duration</td>
<td>Timeout on the client side.</td>
</tr>
</tbody>
</table>
12 changes: 0 additions & 12 deletions docs/_includes/generated/execution_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,5 @@
<td>Boolean</td>
<td>Tells if we should use compression for the state snapshot data or not</td>
</tr>
<tr>
<td><h5>execution.embedded-rpc-retry-period</h5></td>
<td style="word-wrap: break-word;">2 s</td>
<td>Duration</td>
<td>The retry period (in ms) between consecutive attempts to get the job status when executing applications in "Application Mode".</td>
</tr>
<tr>
<td><h5>execution.embedded-rpc-timeout</h5></td>
<td style="word-wrap: break-word;">1 h</td>
<td>Duration</td>
<td>The rpc timeout (in ms) when executing applications in "Application Mode". This affects all rpc's available through the Job Client and job submission.</td>
</tr>
</tbody>
</table>
4 changes: 4 additions & 0 deletions docs/ops/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ These options may be removed in a future release.

# Backup

#### Client

{% include generated/client_configuration.html %}

#### Execution

{% include generated/deployment_configuration.html %}
Expand Down
4 changes: 4 additions & 0 deletions docs/ops/config.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ These options may be removed in a future release.

# Backup

#### Client

{% include generated/client_configuration.html %}

#### Execution

{% include generated/deployment_configuration.html %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.security.SecurityConfiguration;
Expand Down Expand Up @@ -141,7 +140,7 @@ public CliFrontend(
customCommandLine.addRunOptions(customCommandLineOptions);
}

this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration);
this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.cli;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import java.time.Duration;

/**
* Describes a client configuration parameter.
*/
@PublicEvolving
public class ClientOptions {

public static final ConfigOption<Duration> CLIENT_TIMEOUT =
ConfigOptions.key("client.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDeprecatedKeys("akka.client.timeout") // the deprecated AkkaOptions.CLIENT_TIMEOUT
.withDescription("Timeout on the client side.");

public static final ConfigOption<Duration> CLIENT_RETRY_PERIOD =
ConfigOptions.key("client.retry-period")
.durationType()
.defaultValue(Duration.ofMillis(2000))
.withDescription("The interval (in ms) between consecutive retries of failed attempts to execute " +
"commands through the CLI or Flink's clients, wherever retry is supported (default 2sec).");
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutorServiceLoader;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.client.JobCancellationException;
Expand Down Expand Up @@ -263,8 +263,8 @@ private CompletableFuture<JobResult> getJobResult(
final JobID jobId,
final ScheduledExecutor scheduledExecutor) {

final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis());
final Time retryPeriod = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_RETRY_PERIOD).toMillis());
final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final Time retryPeriod = Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());

return JobStatusPollingUtils.getJobResult(
dispatcherGateway, jobId, scheduledExecutor, timeout, retryPeriod);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
Expand Down Expand Up @@ -103,7 +103,7 @@ private CompletableFuture<JobClient> getJobClientFuture(final JobID jobId) {
}

private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration) throws MalformedURLException {
final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis());
final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());

final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
final JobID actualJobId = jobGraph.getJobID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.application.EmbeddedJobClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
Expand Down Expand Up @@ -80,7 +80,7 @@ public PipelineExecutor getExecutor(final Configuration configuration) {
submittedJobIds,
dispatcherGateway,
jobId -> {
final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis());
final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
return new EmbeddedJobClient(jobId, dispatcherGateway, retryExecutor, timeout);
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.cli;

import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.time.Duration;

import static org.junit.Assert.assertEquals;

/**
* unit test for ClientOptions.
*/
@RunWith(JUnit4.class)
public class ClientOptionsTest {

@Test
public void testGetClientTimeout() {
Configuration configuration = new Configuration();
configuration.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofSeconds(10));

assertEquals(configuration.get(ClientOptions.CLIENT_TIMEOUT), Duration.ofSeconds(10));

configuration = new Configuration();
configuration.set(AkkaOptions.CLIENT_TIMEOUT, "20 s");
assertEquals(configuration.get(ClientOptions.CLIENT_TIMEOUT), Duration.ofSeconds(20));

configuration = new Configuration();
assertEquals(configuration.get(ClientOptions.CLIENT_TIMEOUT), ClientOptions.CLIENT_TIMEOUT.defaultValue());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,16 @@ public class AkkaOptions {

/**
* Timeout for all blocking calls on the client side.
*
* @deprecated Use the {@code ClientOptions.CLIENT_TIMEOUT} instead.
*/
@Deprecated
public static final ConfigOption<String> CLIENT_TIMEOUT = ConfigOptions
.key("akka.client.timeout")
.stringType()
.defaultValue("60 s")
.withDescription("Timeout for all blocking calls on the client side.");
.withDescription("DEPRECATED: Use the \"client.timeout\" instead." +
" Timeout for all blocking calls on the client side.");

/**
* Exit JVM on fatal Akka errors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ public final class ConfigConstants {
/**
* Timeout for all blocking calls on the client side.
*
* @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead.
* @deprecated Use {@code ClientOptions#CLIENT_TIMEOUT} instead.
*/
@Deprecated
public static final String AKKA_CLIENT_TIMEOUT = "akka.client.timeout";
Expand Down Expand Up @@ -1787,7 +1787,7 @@ public final class ConfigConstants {
public static final String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s";

/**
* @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead.
* @deprecated Use {@code ClientOptions#CLIENT_TIMEOUT} instead.
*/
@Deprecated
public static final String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,4 @@ public class ExecutionOptions {
"throughput")
)
.build());

public static final ConfigOption<Duration> EMBEDDED_RPC_TIMEOUT =
ConfigOptions.key("execution.embedded-rpc-timeout")
.durationType()
.defaultValue(Duration.ofMillis(60 * 60 * 1000))
.withDescription("The rpc timeout (in ms) when executing applications in \"Application Mode\". " +
"This affects all rpc's available through the Job Client and job submission.");

public static final ConfigOption<Duration> EMBEDDED_RPC_RETRY_PERIOD =
ConfigOptions.key("execution.embedded-rpc-retry-period")
.durationType()
.defaultValue(Duration.ofMillis(2000))
.withDescription("The retry period (in ms) between consecutive attempts to get the job status " +
"when executing applications in \"Application Mode\".");

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public class ConfigOptionsDocGenerator {
new OptionsClassLocation("flink-state-backends/flink-statebackend-rocksdb", "org.apache.flink.contrib.streaming.state"),
new OptionsClassLocation("flink-table/flink-table-api-java", "org.apache.flink.table.api.config"),
new OptionsClassLocation("flink-python", "org.apache.flink.python"),
new OptionsClassLocation("flink-kubernetes", "org.apache.flink.kubernetes.configuration")
new OptionsClassLocation("flink-kubernetes", "org.apache.flink.kubernetes.configuration"),
new OptionsClassLocation("flink-clients", "org.apache.flink.client.cli")
};

static final Set<String> EXCLUSIONS = new HashSet<>(Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,10 +793,6 @@ object AkkaUtils {
TimeUtils.parseDuration(config.getString(AkkaOptions.LOOKUP_TIMEOUT))
}

def getClientTimeout(config: Configuration): time.Duration = {
TimeUtils.parseDuration(config.getString(AkkaOptions.CLIENT_TIMEOUT))
}

/** Returns the address of the given [[ActorSystem]]. The [[Address]] object contains
* the port and the host under which the actor system is reachable
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class AkkaUtilsTest
val IPv4AddressString = "192.168.0.1"
val port = 1234
val address = new InetSocketAddress(IPv4AddressString, port)

val url = s"akka://flink@$IPv4AddressString:$port/user/jobmanager"

val result = AkkaUtils.getInetSocketAddressFromAkkaURL(url)
Expand Down

0 comments on commit 13272ce

Please sign in to comment.