Skip to content

Commit

Permalink
[FLINK-14788][configuration] Add configure method to CheckpointConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Nov 22, 2019
1 parent e14bd50 commit d3dbc96
Show file tree
Hide file tree
Showing 3 changed files with 338 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.streaming.api.CheckpointingMode;

Expand Down Expand Up @@ -436,4 +437,33 @@ public boolean deleteOnCancellation() {
return deleteOnCancellation;
}
}

/**
* Sets all relevant options contained in the {@link ReadableConfig} such as e.g.
* {@link ExecutionCheckpointingOptions#CHECKPOINTING_MODE}.
*
* <p>It will change the value of a setting only if a corresponding option was set in the
* {@code configuration}. If a key is not present, the current value of a field will remain
* untouched.
*
* @param configuration a configuration to read the values from
*/
public void configure(ReadableConfig configuration) {
configuration.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_MODE)
.ifPresent(this::setCheckpointingMode);
configuration.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)
.ifPresent(i -> this.setCheckpointInterval(i.toMillis()));
configuration.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
.ifPresent(t -> this.setCheckpointTimeout(t.toMillis()));
configuration.getOptional(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS)
.ifPresent(this::setMaxConcurrentCheckpoints);
configuration.getOptional(ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS)
.ifPresent(m -> this.setMinPauseBetweenCheckpoints(m.toMillis()));
configuration.getOptional(ExecutionCheckpointingOptions.PREFER_CHECKPOINT_FOR_RECOVERY)
.ifPresent(this::setPreferCheckpointForRecovery);
configuration.getOptional(ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER)
.ifPresent(this::setTolerableCheckpointFailureNumber);
configuration.getOptional(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT)
.ifPresent(this::enableExternalizedCheckpoints);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.environment;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.TextElement;
import org.apache.flink.streaming.api.CheckpointingMode;

import java.time.Duration;

/**
* Execution {@link ConfigOption} for configuring checkpointing related parameters.
*
* @see CheckpointConfig
*/
@PublicEvolving
public class ExecutionCheckpointingOptions {
public static final ConfigOption<CheckpointingMode> CHECKPOINTING_MODE =
ConfigOptions.key("execution.checkpointing.mode")
.enumType(CheckpointingMode.class)
.defaultValue(CheckpointingMode.EXACTLY_ONCE)
.withDescription("The checkpointing mode (exactly-once vs. at-least-once).");

public static final ConfigOption<Duration> CHECKPOINTING_TIMEOUT =
ConfigOptions.key("execution.checkpointing.timeout")
.durationType()
.defaultValue(Duration.ofMinutes(10))
.withDescription("The maximum time that a checkpoint may take before being discarded.");

public static final ConfigOption<Integer> MAX_CONCURRENT_CHECKPOINTS =
ConfigOptions.key("execution.checkpointing.max-concurrent-checkpoints")
.intType()
.defaultValue(1)
.withDescription("The maximum number of checkpoint attempts that may be in progress at the same time. If " +
"this value is n, then no checkpoints will be triggered while n checkpoint attempts are currently in " +
"flight. For the next checkpoint to be triggered, one checkpoint attempt would need to finish or " +
"expire.");

public static final ConfigOption<Duration> MIN_PAUSE_BETWEEN_CHECKPOINTS =
ConfigOptions.key("execution.checkpointing.min-pause")
.durationType()
.defaultValue(Duration.ZERO)
.withDescription(Description.builder()
.text("The minimal pause between checkpointing attempts. This setting defines how soon the" +
"checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger" +
"another checkpoint with respect to the maximum number of concurrent checkpoints" +
"(see %s).", TextElement.code(MAX_CONCURRENT_CHECKPOINTS.key()))
.linebreak()
.linebreak()
.text("If the maximum number of concurrent checkpoints is set to one, this setting makes effectively " +
"sure that a minimum amount of time passes where no checkpoint is in progress at all.")
.build());

public static final ConfigOption<Boolean> PREFER_CHECKPOINT_FOR_RECOVERY =
ConfigOptions.key("execution.checkpointing.prefer-checkpoint-for-recovery")
.booleanType()
.defaultValue(false)
.withDescription("If enabled, a job recovery should fallback to checkpoint when there is a more recent " +
"savepoint.");

public static final ConfigOption<Integer> TOLERABLE_FAILURE_NUMBER =
ConfigOptions.key("execution.checkpointing.tolerable-failed-checkpoints")
.intType()
.noDefaultValue()
.withDescription("The tolerable checkpoint failure number. If set to 0, that means" +
"we do not tolerance any checkpoint failure.");

public static final ConfigOption<CheckpointConfig.ExternalizedCheckpointCleanup> EXTERNALIZED_CHECKPOINT =
ConfigOptions.key("execution.checkpointing.externalized-checkpoint-retention")
.enumType(CheckpointConfig.ExternalizedCheckpointCleanup.class)
.noDefaultValue()
.withDescription(Description.builder()
.text(
"Externalized checkpoints write their meta data out to persistent storage and are not " +
"automatically cleaned up when the owning job fails or is suspended (terminating with job " +
"status %s or %s. In this case, you have to manually clean up the checkpoint state, both the " +
"meta data and actual program state.",
TextElement.code("JobStatus#FAILED"),
TextElement.code("JobStatus#SUSPENDED"))
.linebreak()
.linebreak()
.text(
"The mode defines how an externalized checkpoint should be cleaned up on job cancellation. If " +
"you choose to retain externalized checkpoints on cancellation you have to handle checkpoint " +
"clean up manually when you cancel the job as well (terminating with job status %s).",
TextElement.code("JobStatus#CANCELED"))
.linebreak()
.linebreak()
.text(
"The target directory for externalized checkpoints is configured via %s.",
TextElement.code(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key())).build());

public static final ConfigOption<Duration> CHECKPOINTING_INTERVAL =
ConfigOptions.key("execution.checkpointing.interval")
.durationType()
.noDefaultValue()
.withDescription(Description.builder()
.text("Gets the interval in which checkpoints are periodically scheduled.")
.linebreak()
.linebreak()
.text("This setting defines the base interval. Checkpoint triggering may be delayed by the settings " +
"%s and %s",
TextElement.code(MAX_CONCURRENT_CHECKPOINTS.key()),
TextElement.code(MIN_PAUSE_BETWEEN_CHECKPOINTS.key()))
.build());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.environment;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.CheckpointingMode;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.Collection;
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;

/**
* Tests for configuring {@link CheckpointConfig} via
* {@link CheckpointConfig#configure(ReadableConfig)}.
*/
@RunWith(Parameterized.class)
public class CheckpointConfigFromConfigurationTest {

@Parameterized.Parameters(name = "{0}")
public static Collection<TestSpec> specs() {
return Arrays.asList(
TestSpec.testValue(CheckpointingMode.AT_LEAST_ONCE)
.whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE")
.viaSetter(CheckpointConfig::setCheckpointingMode)
.getterVia(CheckpointConfig::getCheckpointingMode)
.nonDefaultValue(CheckpointingMode.AT_LEAST_ONCE),

TestSpec.testValue(10000L)
.whenSetFromFile("execution.checkpointing.interval", "10 s")
.viaSetter(CheckpointConfig::setCheckpointInterval)
.getterVia(CheckpointConfig::getCheckpointInterval)
.nonDefaultValue(100L),

TestSpec.testValue(12000L)
.whenSetFromFile("execution.checkpointing.timeout", "12 s")
.viaSetter(CheckpointConfig::setCheckpointTimeout)
.getterVia(CheckpointConfig::getCheckpointTimeout)
.nonDefaultValue(100L),

TestSpec.testValue(12)
.whenSetFromFile("execution.checkpointing.max-concurrent-checkpoints", "12")
.viaSetter(CheckpointConfig::setMaxConcurrentCheckpoints)
.getterVia(CheckpointConfig::getMaxConcurrentCheckpoints)
.nonDefaultValue(100),

TestSpec.testValue(1000L)
.whenSetFromFile("execution.checkpointing.min-pause", "1 s")
.viaSetter(CheckpointConfig::setMinPauseBetweenCheckpoints)
.getterVia(CheckpointConfig::getMinPauseBetweenCheckpoints)
.nonDefaultValue(100L),

TestSpec.testValue(true)
.whenSetFromFile("execution.checkpointing.prefer-checkpoint-for-recovery", "true")
.viaSetter(CheckpointConfig::setPreferCheckpointForRecovery)
.getterVia(CheckpointConfig::isPreferCheckpointForRecovery)
.nonDefaultValue(true),

TestSpec.testValue(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
.whenSetFromFile("execution.checkpointing.externalized-checkpoint-retention", "RETAIN_ON_CANCELLATION")
.viaSetter(CheckpointConfig::enableExternalizedCheckpoints)
.getterVia(CheckpointConfig::getExternalizedCheckpointCleanup)
.nonDefaultValue(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION),

TestSpec.testValue(12)
.whenSetFromFile("execution.checkpointing.tolerable-failed-checkpoints", "12")
.viaSetter(CheckpointConfig::setTolerableCheckpointFailureNumber)
.getterVia(CheckpointConfig::getTolerableCheckpointFailureNumber)
.nonDefaultValue(100)
);
}

@Parameterized.Parameter
public TestSpec spec;

@Test
public void testLoadingFromConfiguration() {
CheckpointConfig configFromSetters = new CheckpointConfig();
CheckpointConfig configFromFile = new CheckpointConfig();

Configuration configuration = new Configuration();
configuration.setString(spec.key, spec.value);
configFromFile.configure(configuration);

spec.setValue(configFromSetters);
spec.assertEqual(configFromFile, configFromSetters);
}

@Test
public void testNotOverridingIfNotSet() {
CheckpointConfig config = new CheckpointConfig();

spec.setNonDefaultValue(config);
Configuration configuration = new Configuration();
config.configure(configuration);

spec.assertEqualNonDefault(config);
}

private static class TestSpec<T> {
private String key;
private String value;
private final T objectValue;
private T nonDefaultValue;
private BiConsumer<CheckpointConfig, T> setter;
private Function<CheckpointConfig, T> getter;

private TestSpec(T value) {
this.objectValue = value;
}

public static <T> TestSpec<T> testValue(T value) {
return new TestSpec<>(value);
}

public TestSpec<T> whenSetFromFile(String key, String value) {
this.key = key;
this.value = value;
return this;
}

public TestSpec<T> viaSetter(BiConsumer<CheckpointConfig, T> setter) {
this.setter = setter;
return this;
}

public TestSpec<T> getterVia(Function<CheckpointConfig, T> getter) {
this.getter = getter;
return this;
}

public TestSpec<T> nonDefaultValue(T nonDefaultValue) {
this.nonDefaultValue = nonDefaultValue;
return this;
}

public void setValue(CheckpointConfig config) {
setter.accept(config, objectValue);
}

public void setNonDefaultValue(CheckpointConfig config) {
setter.accept(config, nonDefaultValue);
}

public void assertEqual(CheckpointConfig configFromFile, CheckpointConfig configFromSetters) {
assertThat(getter.apply(configFromFile), equalTo(getter.apply(configFromSetters)));
}

public void assertEqualNonDefault(CheckpointConfig configFromFile) {
assertThat(getter.apply(configFromFile), equalTo(nonDefaultValue));
}

@Override
public String toString() {
return "key='" + key + '\'';
}
}
}

0 comments on commit d3dbc96

Please sign in to comment.