Skip to content

Commit

Permalink
[FLINK-10073] [sql-client] Allow setting a restart strategy in SQL Cl…
Browse files Browse the repository at this point in the history
…ient

Adds support for fine-grained restart strategies per job/query.

This closes apache#6506.
  • Loading branch information
twalthr committed Aug 8, 2018
1 parent 9e936a5 commit 73bf45a
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 3 deletions.
34 changes: 33 additions & 1 deletion docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ Greg, 1

Both result modes can be useful during the prototyping of SQL queries.

<span class="label label-danger">Attention</span> Queries that are executed in a batch environment, can only be retrieved using the `table` result mode.

After a query is defined, it can be submitted to the cluster as a long-running, detached Flink job. For this, a target system that stores the results needs to be specified using the [INSERT INTO statement](sqlClient.html#detached-sql-queries). The [configuration section](sqlClient.html#configuration) explains how to declare table sources for reading data, how to declare table sinks for writing data, and how to configure other table program properties.

{% top %}
Expand Down Expand Up @@ -204,6 +206,8 @@ execution:
max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default)
min-idle-state-retention: 0 # optional: table program's minimum idle state time
max-idle-state-retention: 0 # optional: table program's maximum idle state time
restart-strategy: # optional: restart strategy
type: fallback # "fallback" to global restart strategy by default

# Deployment properties allow for describing the cluster to which table programs are submitted to.

Expand All @@ -227,7 +231,35 @@ Depending on the use case, a configuration can be split into multiple files. The
CLI commands > session environment file > defaults environment file
{% endhighlight %}

Queries that are executed in a batch environment, can only be retrieved using the `table` result mode.
#### Restart Strategies

Restart strategies control how Flink jobs are restarted in case of a failure. Similar to [global restart strategies]({{ site.baseurl }}/dev/restart_strategies.html) for a Flink cluster, a more fine-grained restart configuration can be declared in an environment file.

The following strategies are supported:

{% highlight yaml %}
execution:
# falls back to the global strategy defined in flink-conf.yaml
restart-strategy:
type: fallback

# job fails directly and no restart is attempted
restart-strategy:
type: none

# attempts a given number of times to restart the job
restart-strategy:
type: fixed-delay
attempts: 3 # retries before job is declared as failed (default: Integer.MAX_VALUE)
delay: 10000 # delay in ms between retries (default: 10 s)

# attempts as long as the maximum number of failures per time interval is not exceeded
restart-strategy:
type: failure-rate
max-failures-per-interval: 1 # retries in interval until failing (default: 1)
failure-rate-interval: 60000 # measuring interval in ms for failure rate
delay: 10000 # delay in ms between retries (default: 10 s)
{% endhighlight %}

{% top %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ execution:
min-idle-state-retention: 0
# maximum idle state retention in ms
max-idle-state-retention: 0
# controls how table programs are restarted in case of a failures
restart-strategy:
# strategy type
# possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default)
type: fallback


#==============================================================================
# Deployment properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.table.client.config;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.TimeCharacteristic;

import java.util.Collections;
Expand Down Expand Up @@ -57,10 +59,10 @@ public boolean isBatchExecution() {
}

public TimeCharacteristic getTimeCharacteristic() {
final String s = properties.getOrDefault(
final String characteristic = properties.getOrDefault(
PropertyStrings.EXECUTION_TIME_CHARACTERISTIC,
PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME);
switch (s) {
switch (characteristic) {
case PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME:
return TimeCharacteristic.EventTime;
case PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME:
Expand Down Expand Up @@ -90,6 +92,45 @@ public int getMaxParallelism() {
return Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_PARALLELISM, Integer.toString(128)));
}

public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
final String restartStrategy = properties.getOrDefault(
PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE,
PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK);
switch (restartStrategy) {
case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE:
return RestartStrategies.noRestart();
case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY:
final int attempts = Integer.parseInt(
properties.getOrDefault(
PropertyStrings.EXECUTION_RESTART_STRATEGY_ATTEMPTS,
Integer.toString(Integer.MAX_VALUE)));
final long fixedDelay = Long.parseLong(
properties.getOrDefault(
PropertyStrings.EXECUTION_RESTART_STRATEGY_DELAY,
Long.toString(10_000)));
return RestartStrategies.fixedDelayRestart(attempts, fixedDelay);
case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE:
final int failureRate = Integer.parseInt(
properties.getOrDefault(
PropertyStrings.EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL,
Integer.toString(1)));
final long failureInterval = Long.parseLong(
properties.getOrDefault(
PropertyStrings.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL,
Long.toString(60_000)));
final long attemptDelay = Long.parseLong(
properties.getOrDefault(
PropertyStrings.EXECUTION_RESTART_STRATEGY_DELAY,
Long.toString(10_000)));
return RestartStrategies.failureRateRestart(
failureRate,
Time.milliseconds(failureInterval),
Time.milliseconds(attemptDelay));
default:
return RestartStrategies.fallBackRestart();
}
}

public boolean isChangelogMode() {
return Objects.equals(
properties.get(PropertyStrings.EXECUTION_RESULT_MODE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ private PropertyStrings() {

public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";

public static final String EXECUTION_RESTART_STRATEGY_TYPE = "restart-strategy.type";

public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK = "fallback";

public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE = "none";

public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY = "fixed-delay";

public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE = "failure-rate";

public static final String EXECUTION_RESTART_STRATEGY_ATTEMPTS = "restart-strategy.attempts";

public static final String EXECUTION_RESTART_STRATEGY_DELAY = "restart-strategy.delay";

public static final String EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL = "restart-strategy.failure-rate-interval";

public static final String EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL = "restart-strategy.max-failures-per-interval";

public static final String DEPLOYMENT = "deployment";

public static final String DEPLOYMENT_TYPE = "type";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,14 @@ private FlinkPlan createPlan(String name, Configuration flinkConfig) {

private ExecutionEnvironment createExecutionEnvironment() {
final ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
execEnv.setRestartStrategy(mergedEnv.getExecution().getRestartStrategy());
execEnv.setParallelism(mergedEnv.getExecution().getParallelism());
return execEnv;
}

private StreamExecutionEnvironment createStreamExecutionEnvironment() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(mergedEnv.getExecution().getRestartStrategy());
env.setParallelism(mergedEnv.getExecution().getParallelism());
env.setMaxParallelism(mergedEnv.getExecution().getMaxParallelism());
env.setStreamTimeCharacteristic(mergedEnv.getExecution().getTimeCharacteristic());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.client.gateway.local;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.configuration.Configuration;
Expand All @@ -41,6 +42,7 @@

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* Test for {@link ExecutionContext}.
Expand All @@ -53,7 +55,16 @@ public class ExecutionContextTest {
public void testExecutionConfig() throws Exception {
final ExecutionContext<?> context = createExecutionContext();
final ExecutionConfig config = context.createEnvironmentInstance().getExecutionConfig();

assertEquals(99, config.getAutoWatermarkInterval());

final RestartStrategies.RestartStrategyConfiguration restartConfig = config.getRestartStrategy();
assertTrue(restartConfig instanceof RestartStrategies.FailureRateRestartStrategyConfiguration);
final RestartStrategies.FailureRateRestartStrategyConfiguration failureRateStrategy =
(RestartStrategies.FailureRateRestartStrategyConfiguration) restartConfig;
assertEquals(10, failureRateStrategy.getMaxFailureRate());
assertEquals(99_000, failureRateStrategy.getFailureInterval().toMilliseconds());
assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ public void testGetSessionProperties() throws Exception {
expectedProperties.put("execution.max-idle-state-retention", "0");
expectedProperties.put("execution.min-idle-state-retention", "0");
expectedProperties.put("execution.result-mode", "table");
expectedProperties.put("execution.restart-strategy.type", "failure-rate");
expectedProperties.put("execution.restart-strategy.max-failures-per-interval", "10");
expectedProperties.put("execution.restart-strategy.failure-rate-interval", "99000");
expectedProperties.put("execution.restart-strategy.delay", "1000");
expectedProperties.put("deployment.response-timeout", "5000");

assertEquals(expectedProperties, actualProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ execution:
min-idle-state-retention: 0
max-idle-state-retention: 0
result-mode: "$VAR_3"
restart-strategy:
type: failure-rate
max-failures-per-interval: 10
failure-rate-interval: 99000
delay: 1000

deployment:
response-timeout: 5000
Expand Down

0 comments on commit 73bf45a

Please sign in to comment.