From 73bf45ab4802ca396ce7e3a0393bb999642a3d4a Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Mon, 6 Aug 2018 18:27:57 +0200 Subject: [PATCH] [FLINK-10073] [sql-client] Allow setting a restart strategy in SQL Client Adds support for fine-grained restart strategies per job/query. This closes #6506. --- docs/dev/table/sqlClient.md | 34 +++++++++++++- .../conf/sql-client-defaults.yaml | 6 +++ .../flink/table/client/config/Execution.java | 45 ++++++++++++++++++- .../table/client/config/PropertyStrings.java | 18 ++++++++ .../gateway/local/ExecutionContext.java | 2 + .../gateway/local/ExecutionContextTest.java | 11 +++++ .../gateway/local/LocalExecutorITCase.java | 4 ++ .../resources/test-sql-client-defaults.yaml | 5 +++ 8 files changed, 122 insertions(+), 3 deletions(-) diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md index d35aa591a4def..0e8d2d651b1c9 100644 --- a/docs/dev/table/sqlClient.md +++ b/docs/dev/table/sqlClient.md @@ -108,6 +108,8 @@ Greg, 1 Both result modes can be useful during the prototyping of SQL queries. +Attention Queries that are executed in a batch environment, can only be retrieved using the `table` result mode. + After a query is defined, it can be submitted to the cluster as a long-running, detached Flink job. For this, a target system that stores the results needs to be specified using the [INSERT INTO statement](sqlClient.html#detached-sql-queries). The [configuration section](sqlClient.html#configuration) explains how to declare table sources for reading data, how to declare table sinks for writing data, and how to configure other table program properties. {% top %} @@ -204,6 +206,8 @@ execution: max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default) min-idle-state-retention: 0 # optional: table program's minimum idle state time max-idle-state-retention: 0 # optional: table program's maximum idle state time + restart-strategy: # optional: restart strategy + type: fallback # "fallback" to global restart strategy by default # Deployment properties allow for describing the cluster to which table programs are submitted to. @@ -227,7 +231,35 @@ Depending on the use case, a configuration can be split into multiple files. The CLI commands > session environment file > defaults environment file {% endhighlight %} -Queries that are executed in a batch environment, can only be retrieved using the `table` result mode. +#### Restart Strategies + +Restart strategies control how Flink jobs are restarted in case of a failure. Similar to [global restart strategies]({{ site.baseurl }}/dev/restart_strategies.html) for a Flink cluster, a more fine-grained restart configuration can be declared in an environment file. + +The following strategies are supported: + +{% highlight yaml %} +execution: + # falls back to the global strategy defined in flink-conf.yaml + restart-strategy: + type: fallback + + # job fails directly and no restart is attempted + restart-strategy: + type: none + + # attempts a given number of times to restart the job + restart-strategy: + type: fixed-delay + attempts: 3 # retries before job is declared as failed (default: Integer.MAX_VALUE) + delay: 10000 # delay in ms between retries (default: 10 s) + + # attempts as long as the maximum number of failures per time interval is not exceeded + restart-strategy: + type: failure-rate + max-failures-per-interval: 1 # retries in interval until failing (default: 1) + failure-rate-interval: 60000 # measuring interval in ms for failure rate + delay: 10000 # delay in ms between retries (default: 10 s) +{% endhighlight %} {% top %} diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml index 51e6e95bc1f26..8be4ce63ecb26 100644 --- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml +++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml @@ -74,6 +74,12 @@ execution: min-idle-state-retention: 0 # maximum idle state retention in ms max-idle-state-retention: 0 + # controls how table programs are restarted in case of a failures + restart-strategy: + # strategy type + # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default) + type: fallback + #============================================================================== # Deployment properties diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java index 0d6e6dd59acd5..b7c28938121fc 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java @@ -18,6 +18,8 @@ package org.apache.flink.table.client.config; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.TimeCharacteristic; import java.util.Collections; @@ -57,10 +59,10 @@ public boolean isBatchExecution() { } public TimeCharacteristic getTimeCharacteristic() { - final String s = properties.getOrDefault( + final String characteristic = properties.getOrDefault( PropertyStrings.EXECUTION_TIME_CHARACTERISTIC, PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME); - switch (s) { + switch (characteristic) { case PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME: return TimeCharacteristic.EventTime; case PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME: @@ -90,6 +92,45 @@ public int getMaxParallelism() { return Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_PARALLELISM, Integer.toString(128))); } + public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() { + final String restartStrategy = properties.getOrDefault( + PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE, + PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK); + switch (restartStrategy) { + case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE: + return RestartStrategies.noRestart(); + case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY: + final int attempts = Integer.parseInt( + properties.getOrDefault( + PropertyStrings.EXECUTION_RESTART_STRATEGY_ATTEMPTS, + Integer.toString(Integer.MAX_VALUE))); + final long fixedDelay = Long.parseLong( + properties.getOrDefault( + PropertyStrings.EXECUTION_RESTART_STRATEGY_DELAY, + Long.toString(10_000))); + return RestartStrategies.fixedDelayRestart(attempts, fixedDelay); + case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE: + final int failureRate = Integer.parseInt( + properties.getOrDefault( + PropertyStrings.EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL, + Integer.toString(1))); + final long failureInterval = Long.parseLong( + properties.getOrDefault( + PropertyStrings.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL, + Long.toString(60_000))); + final long attemptDelay = Long.parseLong( + properties.getOrDefault( + PropertyStrings.EXECUTION_RESTART_STRATEGY_DELAY, + Long.toString(10_000))); + return RestartStrategies.failureRateRestart( + failureRate, + Time.milliseconds(failureInterval), + Time.milliseconds(attemptDelay)); + default: + return RestartStrategies.fallBackRestart(); + } + } + public boolean isChangelogMode() { return Objects.equals( properties.get(PropertyStrings.EXECUTION_RESULT_MODE), diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java index 76e52defb8bdc..2a6b001d5f4d9 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java @@ -57,6 +57,24 @@ private PropertyStrings() { public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table"; + public static final String EXECUTION_RESTART_STRATEGY_TYPE = "restart-strategy.type"; + + public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK = "fallback"; + + public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE = "none"; + + public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY = "fixed-delay"; + + public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE = "failure-rate"; + + public static final String EXECUTION_RESTART_STRATEGY_ATTEMPTS = "restart-strategy.attempts"; + + public static final String EXECUTION_RESTART_STRATEGY_DELAY = "restart-strategy.delay"; + + public static final String EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL = "restart-strategy.failure-rate-interval"; + + public static final String EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL = "restart-strategy.max-failures-per-interval"; + public static final String DEPLOYMENT = "deployment"; public static final String DEPLOYMENT_TYPE = "type"; diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index 4283953446e17..9ff683755d277 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -361,12 +361,14 @@ private FlinkPlan createPlan(String name, Configuration flinkConfig) { private ExecutionEnvironment createExecutionEnvironment() { final ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment(); + execEnv.setRestartStrategy(mergedEnv.getExecution().getRestartStrategy()); execEnv.setParallelism(mergedEnv.getExecution().getParallelism()); return execEnv; } private StreamExecutionEnvironment createStreamExecutionEnvironment() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(mergedEnv.getExecution().getRestartStrategy()); env.setParallelism(mergedEnv.getExecution().getParallelism()); env.setMaxParallelism(mergedEnv.getExecution().getMaxParallelism()); env.setStreamTimeCharacteristic(mergedEnv.getExecution().getTimeCharacteristic()); diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java index bc29f6f27955a..04575c696a27a 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.client.gateway.local; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.client.cli.DefaultCLI; import org.apache.flink.configuration.Configuration; @@ -41,6 +42,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Test for {@link ExecutionContext}. @@ -53,7 +55,16 @@ public class ExecutionContextTest { public void testExecutionConfig() throws Exception { final ExecutionContext context = createExecutionContext(); final ExecutionConfig config = context.createEnvironmentInstance().getExecutionConfig(); + assertEquals(99, config.getAutoWatermarkInterval()); + + final RestartStrategies.RestartStrategyConfiguration restartConfig = config.getRestartStrategy(); + assertTrue(restartConfig instanceof RestartStrategies.FailureRateRestartStrategyConfiguration); + final RestartStrategies.FailureRateRestartStrategyConfiguration failureRateStrategy = + (RestartStrategies.FailureRateRestartStrategyConfiguration) restartConfig; + assertEquals(10, failureRateStrategy.getMaxFailureRate()); + assertEquals(99_000, failureRateStrategy.getFailureInterval().toMilliseconds()); + assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds()); } @Test diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index d8452e42dbfa3..ed4ce7a9d8d84 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -149,6 +149,10 @@ public void testGetSessionProperties() throws Exception { expectedProperties.put("execution.max-idle-state-retention", "0"); expectedProperties.put("execution.min-idle-state-retention", "0"); expectedProperties.put("execution.result-mode", "table"); + expectedProperties.put("execution.restart-strategy.type", "failure-rate"); + expectedProperties.put("execution.restart-strategy.max-failures-per-interval", "10"); + expectedProperties.put("execution.restart-strategy.failure-rate-interval", "99000"); + expectedProperties.put("execution.restart-strategy.delay", "1000"); expectedProperties.put("deployment.response-timeout", "5000"); assertEquals(expectedProperties, actualProperties); diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml index b759874939d31..22351bc395996 100644 --- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml +++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml @@ -117,6 +117,11 @@ execution: min-idle-state-retention: 0 max-idle-state-retention: 0 result-mode: "$VAR_3" + restart-strategy: + type: failure-rate + max-failures-per-interval: 10 + failure-rate-interval: 99000 + delay: 1000 deployment: response-timeout: 5000