Skip to content

Commit

Permalink
[FLINK-15177][state-backend-rocksdb] Migrate RocksDB Configurable Opt…
Browse files Browse the repository at this point in the history
…ions to new type safe config options

This also simplifies the validation logic.
  • Loading branch information
StephanEwen committed Dec 11, 2019
1 parent 3820bdd commit fdc5ed5
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.BLOCK_CACHE_SIZE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.BLOCK_SIZE;
Expand All @@ -57,6 +56,8 @@
*/
public class DefaultConfigurableOptionsFactory implements ConfigurableOptionsFactory {

private static final long serialVersionUID = 1L;

private final Map<String, String> configuredOptions = new HashMap<>();

@Override
Expand Down Expand Up @@ -131,7 +132,7 @@ public Map<String, String> getConfiguredOptions() {
return new HashMap<>(configuredOptions);
}

private boolean isOptionConfigured(ConfigOption configOption) {
private boolean isOptionConfigured(ConfigOption<?> configOption) {
return configuredOptions.containsKey(configOption.key());
}

Expand Down Expand Up @@ -300,44 +301,37 @@ public DefaultConfigurableOptionsFactory setBlockCacheSize(String blockCacheSize
return this;
}

private static final String[] CANDIDATE_CONFIGS = new String[]{
private static final ConfigOption<?>[] CANDIDATE_CONFIGS = new ConfigOption<?>[] {
// configurable DBOptions
MAX_BACKGROUND_THREADS.key(),
MAX_OPEN_FILES.key(),
MAX_BACKGROUND_THREADS,
MAX_OPEN_FILES,

// configurable ColumnFamilyOptions
COMPACTION_STYLE.key(),
USE_DYNAMIC_LEVEL_SIZE.key(),
TARGET_FILE_SIZE_BASE.key(),
MAX_SIZE_LEVEL_BASE.key(),
WRITE_BUFFER_SIZE.key(),
MAX_WRITE_BUFFER_NUMBER.key(),
MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(),
BLOCK_SIZE.key(),
BLOCK_CACHE_SIZE.key()
COMPACTION_STYLE,
USE_DYNAMIC_LEVEL_SIZE,
TARGET_FILE_SIZE_BASE,
MAX_SIZE_LEVEL_BASE,
WRITE_BUFFER_SIZE,
MAX_WRITE_BUFFER_NUMBER,
MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
BLOCK_SIZE,
BLOCK_CACHE_SIZE
};

private static final Set<String> POSITIVE_INT_CONFIG_SET = new HashSet<>(Arrays.asList(
MAX_BACKGROUND_THREADS.key(),
MAX_WRITE_BUFFER_NUMBER.key(),
MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key()
));

private static final Set<String> SIZE_CONFIG_SET = new HashSet<>(Arrays.asList(
TARGET_FILE_SIZE_BASE.key(),
MAX_SIZE_LEVEL_BASE.key(),
WRITE_BUFFER_SIZE.key(),
BLOCK_SIZE.key(),
BLOCK_CACHE_SIZE.key()
private static final Set<ConfigOption<?>> POSITIVE_INT_CONFIG_SET = new HashSet<>(Arrays.asList(
MAX_BACKGROUND_THREADS,
MAX_WRITE_BUFFER_NUMBER,
MIN_WRITE_BUFFER_NUMBER_TO_MERGE
));

private static final Set<String> BOOLEAN_CONFIG_SET = new HashSet<>(Collections.singletonList(
USE_DYNAMIC_LEVEL_SIZE.key()
private static final Set<ConfigOption<?>> SIZE_CONFIG_SET = new HashSet<>(Arrays.asList(
TARGET_FILE_SIZE_BASE,
MAX_SIZE_LEVEL_BASE,
WRITE_BUFFER_SIZE,
BLOCK_SIZE,
BLOCK_CACHE_SIZE
));

private static final Set<String> COMPACTION_STYLE_SET = Arrays.stream(CompactionStyle.values())
.map(c -> c.name().toLowerCase()).collect(Collectors.toSet());

/**
* Creates a {@link DefaultConfigurableOptionsFactory} instance from a {@link Configuration}.
*
Expand All @@ -349,13 +343,12 @@ public DefaultConfigurableOptionsFactory setBlockCacheSize(String blockCacheSize
*/
@Override
public DefaultConfigurableOptionsFactory configure(Configuration configuration) {
for (String key : CANDIDATE_CONFIGS) {
String newValue = configuration.getString(key, null);
for (ConfigOption<?> option : CANDIDATE_CONFIGS) {
Optional<?> newValue = configuration.getOptional(option);

if (newValue != null) {
if (checkArgumentValid(key, newValue)) {
this.configuredOptions.put(key, newValue);
}
if (newValue.isPresent()) {
checkArgumentValid(option, newValue.get());
this.configuredOptions.put(option.key(), newValue.get().toString());
}
}
return this;
Expand All @@ -371,30 +364,19 @@ public String toString() {
/**
* Helper method to check whether the (key,value) is valid through given configuration and returns the formatted value.
*
* @param key The configuration key which is configurable in {@link RocksDBConfigurableOptions}.
* @param option The configuration key which is configurable in {@link RocksDBConfigurableOptions}.
* @param value The value within given configuration.
*
* @return whether the given key and value in string format is legal.
*/
private static boolean checkArgumentValid(String key, String value) {
if (POSITIVE_INT_CONFIG_SET.contains(key)) {
private static void checkArgumentValid(ConfigOption<?> option, Object value) {
final String key = option.key();

Preconditions.checkArgument(Integer.parseInt(value) > 0,
if (POSITIVE_INT_CONFIG_SET.contains(option)) {
Preconditions.checkArgument((Integer) value > 0,
"Configured value for key: " + key + " must be larger than 0.");
} else if (SIZE_CONFIG_SET.contains(key)) {

Preconditions.checkArgument(MemorySize.parseBytes(value) > 0,
} else if (SIZE_CONFIG_SET.contains(option)) {
Preconditions.checkArgument(((MemorySize) value).getBytes() > 0,
"Configured size for key" + key + " must be larger than 0.");
} else if (BOOLEAN_CONFIG_SET.contains(key)) {

Preconditions.checkArgument("true".equalsIgnoreCase(value) || "false".equalsIgnoreCase(value),
"The configured boolean value: " + value + " for key: " + key + " is illegal.");
} else if (key.equals(COMPACTION_STYLE.key())) {
value = value.toLowerCase();
Preconditions.checkArgument(COMPACTION_STYLE_SET.contains(value),
"Compression type: " + value + " is not recognized with legal types: " + String.join(", ", COMPACTION_STYLE_SET));
}
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
package org.apache.flink.contrib.streaming.state;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.description.Description;

import org.rocksdb.CompactionStyle;

import java.io.Serializable;

import static org.apache.flink.configuration.ConfigOptions.key;
Expand All @@ -43,14 +46,16 @@ public class RocksDBConfigurableOptions implements Serializable {
// Provided configurable DBOptions within Flink
//--------------------------------------------------------------------------

public static final ConfigOption<String> MAX_BACKGROUND_THREADS =
public static final ConfigOption<Integer> MAX_BACKGROUND_THREADS =
key("state.backend.rocksdb.thread.num")
.intType()
.noDefaultValue()
.withDescription("The maximum number of concurrent background flush and compaction jobs (per TaskManager). " +
"RocksDB has default configuration as '1'.");

public static final ConfigOption<String> MAX_OPEN_FILES =
public static final ConfigOption<Integer> MAX_OPEN_FILES =
key("state.backend.rocksdb.files.open")
.intType()
.noDefaultValue()
.withDescription("The maximum number of open files (per TaskManager) that can be used by the DB, '-1' means no limit. " +
"RocksDB has default configuration as '5000'.");
Expand All @@ -59,15 +64,17 @@ public class RocksDBConfigurableOptions implements Serializable {
// Provided configurable ColumnFamilyOptions within Flink
//--------------------------------------------------------------------------

public static final ConfigOption<String> COMPACTION_STYLE =
public static final ConfigOption<CompactionStyle> COMPACTION_STYLE =
key("state.backend.rocksdb.compaction.style")
.enumType(CompactionStyle.class)
.noDefaultValue()
.withDescription(String.format("The specified compaction style for DB. Candidate compaction style is %s, %s or %s, " +
"and RocksDB choose '%s' as default style.", LEVEL.name(), FIFO.name(), UNIVERSAL.name(),
LEVEL.name()));

public static final ConfigOption<String> USE_DYNAMIC_LEVEL_SIZE =
public static final ConfigOption<Boolean> USE_DYNAMIC_LEVEL_SIZE =
key("state.backend.rocksdb.compaction.level.use-dynamic-size")
.booleanType()
.noDefaultValue()
.withDescription(Description.builder().text("If true, RocksDB will pick target size of each level dynamically. From an empty DB, ")
.text("RocksDB would make last level the base level, which means merging L0 data into the last level, ")
Expand All @@ -78,44 +85,51 @@ public class RocksDBConfigurableOptions implements Serializable {
"RocksDB's doc."))
.build());

public static final ConfigOption<String> TARGET_FILE_SIZE_BASE =
public static final ConfigOption<MemorySize> TARGET_FILE_SIZE_BASE =
key("state.backend.rocksdb.compaction.level.target-file-size-base")
.memoryType()
.noDefaultValue()
.withDescription("The target file size for compaction, which determines a level-1 file size. " +
"RocksDB has default configuration as '2MB'.");

public static final ConfigOption<String> MAX_SIZE_LEVEL_BASE =
public static final ConfigOption<MemorySize> MAX_SIZE_LEVEL_BASE =
key("state.backend.rocksdb.compaction.level.max-size-level-base")
.memoryType()
.noDefaultValue()
.withDescription("The upper-bound of the total size of level base files in bytes. " +
"RocksDB has default configuration as '10MB'.");

public static final ConfigOption<String> WRITE_BUFFER_SIZE =
public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
key("state.backend.rocksdb.writebuffer.size")
.memoryType()
.noDefaultValue()
.withDescription("The amount of data built up in memory (backed by an unsorted log on disk) " +
"before converting to a sorted on-disk files. RocksDB has default writebuffer size as '64MB'.");

public static final ConfigOption<String> MAX_WRITE_BUFFER_NUMBER =
public static final ConfigOption<Integer> MAX_WRITE_BUFFER_NUMBER =
key("state.backend.rocksdb.writebuffer.count")
.intType()
.noDefaultValue()
.withDescription("Tne maximum number of write buffers that are built up in memory. " +
"RocksDB has default configuration as '2'.");

public static final ConfigOption<String> MIN_WRITE_BUFFER_NUMBER_TO_MERGE =
public static final ConfigOption<Integer> MIN_WRITE_BUFFER_NUMBER_TO_MERGE =
key("state.backend.rocksdb.writebuffer.number-to-merge")
.intType()
.noDefaultValue()
.withDescription("The minimum number of write buffers that will be merged together before writing to storage. " +
"RocksDB has default configuration as '1'.");

public static final ConfigOption<String> BLOCK_SIZE =
public static final ConfigOption<MemorySize> BLOCK_SIZE =
key("state.backend.rocksdb.block.blocksize")
.memoryType()
.noDefaultValue()
.withDescription("The approximate size (in bytes) of user data packed per block. " +
"RocksDB has default blocksize as '4KB'.");

public static final ConfigOption<String> BLOCK_CACHE_SIZE =
public static final ConfigOption<MemorySize> BLOCK_CACHE_SIZE =
key("state.backend.rocksdb.block.cache-size")
.memoryType()
.noDefaultValue()
.withDescription("The amount of the cache for data blocks in RocksDB. " +
"RocksDB has default block-cache size as '8MB'.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,16 +483,16 @@ public void testConfigurableOptionsFromConfig() throws IOException {

// verify legal configuration
{
configuration.setString(RocksDBConfigurableOptions.COMPACTION_STYLE, "level");
configuration.setString(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, "TRUE");
configuration.setString(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, "8 mb");
configuration.setString(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, "128MB");
configuration.setString(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, "4");
configuration.setString(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, "4");
configuration.setString(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE, "2");
configuration.setString(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "64 MB");
configuration.setString(RocksDBConfigurableOptions.BLOCK_SIZE, "4 kb");
configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "512 mb");
configuration.setString(RocksDBConfigurableOptions.COMPACTION_STYLE.key(), "level");
configuration.setString(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE.key(), "TRUE");
configuration.setString(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE.key(), "8 mb");
configuration.setString(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE.key(), "128MB");
configuration.setString(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS.key(), "4");
configuration.setString(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER.key(), "4");
configuration.setString(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(), "2");
configuration.setString(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE.key(), "64 MB");
configuration.setString(RocksDBConfigurableOptions.BLOCK_SIZE.key(), "4 kb");
configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE.key(), "512 mb");

DefaultConfigurableOptionsFactory optionsFactory = new DefaultConfigurableOptionsFactory();
optionsFactory.configure(configuration);
Expand Down Expand Up @@ -740,10 +740,10 @@ static Environment getMockEnvironment(File... tempDirs) {
}

private void verifyIllegalArgument(
ConfigOption<String> configOption,
ConfigOption<?> configOption,
String configValue) {
Configuration configuration = new Configuration();
configuration.setString(configOption, configValue);
configuration.setString(configOption.key(), configValue);

DefaultConfigurableOptionsFactory optionsFactory = new DefaultConfigurableOptionsFactory();
try {
Expand Down

0 comments on commit fdc5ed5

Please sign in to comment.