Skip to content

Commit

Permalink
[FLINK-3718] Add Option For Completely Async Backup in RocksDB State …
Browse files Browse the repository at this point in the history
…Backend

This also refactors the RocksDB backend to keep one RocksDB data base in
the backend where all key/value state is stored. Individual named
key/value states get a reference to the db and store their state in a
column family. This way, we only have to backup one RocksDB data base
and can centrally decide how to do backups.
  • Loading branch information
aljoscha committed Apr 20, 2016
1 parent 77f1a2e commit c33325e
Show file tree
Hide file tree
Showing 22 changed files with 1,536 additions and 1,133 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

package org.apache.flink.contrib.streaming.state;

import org.rocksdb.Options;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;

/**
* A factory for {@link Options} to be passed to the {@link RocksDBStateBackend}.
* A factory for {@link DBOptions} to be passed to the {@link RocksDBStateBackend}.
* Options have to be created lazily by this factory, because the {@code Options}
* class is not serializable and holds pointers to native code.
*
Expand Down Expand Up @@ -55,5 +56,19 @@ public interface OptionsFactory extends java.io.Serializable {
* @param currentOptions The options object with the pre-defined options.
* @return The options object on which the additional options are set.
*/
Options createOptions(Options currentOptions);
DBOptions createDBOptions(DBOptions currentOptions);

/**
* This method should set the additional options on top of the current options object.
* The current options object may contain pre-defined options based on flags that have
* been configured on the state backend.
*
* <p>It is important to set the options on the current object and return the result from
* the setter methods, otherwise the pre-defined options may get lost.
*
* @param currentOptions The options object with the pre-defined options.
* @return The options object on which the additional options are set.
*/
ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.flink.contrib.streaming.state;

import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
import org.rocksdb.Options;
import org.rocksdb.DBOptions;
import org.rocksdb.StringAppendOperator;

/**
* The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}.
Expand All @@ -42,11 +44,18 @@ public enum PredefinedOptions {
DEFAULT {

@Override
public Options createOptions() {
return new Options()
public DBOptions createDBOptions() {
return new DBOptions()
.setUseFsync(false)
.setDisableDataSync(true);
}

@Override
public ColumnFamilyOptions createColumnOptions() {
return new ColumnFamilyOptions()
.setMergeOperator(new StringAppendOperator());
}

},

/**
Expand All @@ -72,16 +81,22 @@ public Options createOptions() {
SPINNING_DISK_OPTIMIZED {

@Override
public Options createOptions() {
public DBOptions createDBOptions() {

return new Options()
.setCompactionStyle(CompactionStyle.LEVEL)
.setLevelCompactionDynamicLevelBytes(true)
return new DBOptions()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setDisableDataSync(true)
.setMaxOpenFiles(-1);
}

@Override
public ColumnFamilyOptions createColumnOptions() {
return new ColumnFamilyOptions()
.setMergeOperator(new StringAppendOperator())
.setCompactionStyle(CompactionStyle.LEVEL)
.setLevelCompactionDynamicLevelBytes(true);
}
},

/**
Expand Down Expand Up @@ -113,25 +128,32 @@ public Options createOptions() {
SPINNING_DISK_OPTIMIZED_HIGH_MEM {

@Override
public Options createOptions() {
public DBOptions createDBOptions() {

return new DBOptions()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setDisableDataSync(true)
.setMaxOpenFiles(-1);
}

@Override
public ColumnFamilyOptions createColumnOptions() {

final long blockCacheSize = 256 * 1024 * 1024;
final long blockSize = 128 * 1024;
final long targetFileSize = 256 * 1024 * 1024;
final long writeBufferSize = 64 * 1024 * 1024;

return new Options()
return new ColumnFamilyOptions()
.setMergeOperator(new StringAppendOperator())
.setCompactionStyle(CompactionStyle.LEVEL)
.setLevelCompactionDynamicLevelBytes(true)
.setTargetFileSizeBase(targetFileSize)
.setMaxBytesForLevelBase(4 * targetFileSize)
.setWriteBufferSize(writeBufferSize)
.setIncreaseParallelism(4)
.setMinWriteBufferNumberToMerge(3)
.setMaxWriteBufferNumber(4)
.setUseFsync(false)
.setDisableDataSync(true)
.setMaxOpenFiles(-1)
.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockCacheSize(blockCacheSize)
Expand Down Expand Up @@ -160,21 +182,35 @@ public Options createOptions() {
FLASH_SSD_OPTIMIZED {

@Override
public Options createOptions() {
return new Options()
public DBOptions createDBOptions() {
return new DBOptions()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setDisableDataSync(true)
.setMaxOpenFiles(-1);
}

@Override
public ColumnFamilyOptions createColumnOptions() {
return new ColumnFamilyOptions()
.setMergeOperator(new StringAppendOperator());
}
};

// ------------------------------------------------------------------------

/**
* Creates the options for this pre-defined setting.
* Creates the {@link DBOptions}for this pre-defined setting.
*
* @return The pre-defined options object.
*/
public abstract Options createOptions();
public abstract DBOptions createDBOptions();

/**
* Creates the {@link org.rocksdb.ColumnFamilyOptions}for this pre-defined setting.
*
* @return The pre-defined options object.
*/
public abstract ColumnFamilyOptions createColumnOptions();

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,14 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KvState;

import org.rocksdb.Options;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;

import static java.util.Objects.requireNonNull;

Expand All @@ -46,15 +43,15 @@
* @param <T> The type of the values that can be folded into the state.
* @param <ACC> The type of the value in the folding state.
*/
public class RocksDBFoldingState<K, N, T, ACC>
class RocksDBFoldingState<K, N, T, ACC>
extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
implements FoldingState<T, ACC> {

/** Serializer for the values */
private final TypeSerializer<ACC> valueSerializer;

/** This holds the name of the state and can create an initial default value for the state. */
protected final FoldingStateDescriptor<T, ACC> stateDesc;
private final FoldingStateDescriptor<T, ACC> stateDesc;

/** User-specified fold function */
private final FoldFunction<T, ACC> foldFunction;
Expand All @@ -63,59 +60,23 @@ public class RocksDBFoldingState<K, N, T, ACC>
* We disable writes to the write-ahead-log here. We can't have these in the base class
* because JNI segfaults for some reason if they are.
*/
protected final WriteOptions writeOptions;
private final WriteOptions writeOptions;

/**
* Creates a new {@code RocksDBFoldingState}.
*
* @param keySerializer The serializer for the keys.
* @param namespaceSerializer The serializer for the namespace.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
* @param dbPath The path on the local system where RocksDB data should be stored.
* @param backupPath The path where to store backups.
*/
protected RocksDBFoldingState(
TypeSerializer<K> keySerializer,
RocksDBFoldingState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc,
File dbPath,
String backupPath,
Options options) {

super(keySerializer, namespaceSerializer, dbPath, backupPath, options);

this.stateDesc = requireNonNull(stateDesc);
this.valueSerializer = stateDesc.getSerializer();
this.foldFunction = stateDesc.getFoldFunction();

writeOptions = new WriteOptions();
writeOptions.setDisableWAL(true);
}
RocksDBStateBackend backend) {

/**
* Creates a {@code RocksDBFoldingState} by restoring from a directory.
*
* @param keySerializer The serializer for the keys.
* @param namespaceSerializer The serializer for the namespace.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
* @param dbPath The path on the local system where RocksDB data should be stored.
* @param backupPath The path where to store backups.
* @param restorePath The path on the local file system that we are restoring from.
*/
protected RocksDBFoldingState(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc,
File dbPath,
String backupPath,
String restorePath,
Options options) {

super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
super(columnFamily, namespaceSerializer, backend);

this.stateDesc = stateDesc;
this.stateDesc = requireNonNull(stateDesc);
this.valueSerializer = stateDesc.getSerializer();
this.foldFunction = stateDesc.getFoldFunction();

Expand All @@ -130,7 +91,7 @@ public ACC get() {
try {
writeKeyAndNamespace(out);
byte[] key = baos.toByteArray();
byte[] valueBytes = db.get(key);
byte[] valueBytes = backend.db.get(columnFamily, key);
if (valueBytes == null) {
return stateDesc.getDefaultValue();
}
Expand All @@ -147,65 +108,22 @@ public void add(T value) throws IOException {
try {
writeKeyAndNamespace(out);
byte[] key = baos.toByteArray();
byte[] valueBytes = db.get(key);
byte[] valueBytes = backend.db.get(columnFamily, key);

if (valueBytes == null) {
baos.reset();
valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out);
db.put(writeOptions, key, baos.toByteArray());
backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
} else {
ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
ACC newValue = foldFunction.fold(oldValue, value);
baos.reset();
valueSerializer.serialize(newValue, out);
db.put(writeOptions, key, baos.toByteArray());
backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
}
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
}
}

@Override
protected AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> createRocksDBSnapshot(
URI backupUri, long checkpointId) {

return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
}

private static class Snapshot<K, N, T, ACC> extends AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> {
private static final long serialVersionUID = 1L;

public Snapshot(
File dbPath,
String checkpointPath,
URI backupUri,
long checkpointId,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc) {

super(dbPath,
checkpointPath,
backupUri,
checkpointId,
keySerializer,
namespaceSerializer,
stateDesc);
}

@Override
protected KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, RocksDBStateBackend>
createRocksDBState(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc,
File dbPath,
String backupPath,
String restorePath,
Options options) throws Exception {

return new RocksDBFoldingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath, options);
}
}
}

Loading

0 comments on commit c33325e

Please sign in to comment.