Skip to content

Commit

Permalink
[FLINK-7220] [checkpoints] Update RocksDB dependency to 5.5.5
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed Jul 28, 2017
1 parent e975140 commit d818fc4
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 44 deletions.
6 changes: 3 additions & 3 deletions flink-contrib/flink-statebackend-rocksdb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ under the License.
</dependency>

<dependency>
<groupId>com.data-artisans</groupId>
<artifactId>frocksdbjni</artifactId>
<version>4.11.2-artisans</version>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>5.5.5</version>
</dependency>

<!-- test dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.flink.contrib.streaming.state;

import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
import org.rocksdb.DBOptions;
import org.rocksdb.StringAppendOperator;

/**
* The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}.
Expand All @@ -46,14 +46,13 @@ public enum PredefinedOptions {
@Override
public DBOptions createDBOptions() {
return new DBOptions()
.setUseFsync(false)
.setDisableDataSync(true);
.setUseFsync(false);
}

@Override
public ColumnFamilyOptions createColumnOptions() {
return new ColumnFamilyOptions()
.setMergeOperator(new StringAppendOperator());
.setMergeOperatorName(MERGE_OPERATOR_NAME);
}

},
Expand Down Expand Up @@ -86,14 +85,13 @@ public DBOptions createDBOptions() {
return new DBOptions()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setDisableDataSync(true)
.setMaxOpenFiles(-1);
}

@Override
public ColumnFamilyOptions createColumnOptions() {
return new ColumnFamilyOptions()
.setMergeOperator(new StringAppendOperator())
.setMergeOperatorName(MERGE_OPERATOR_NAME)
.setCompactionStyle(CompactionStyle.LEVEL)
.setLevelCompactionDynamicLevelBytes(true);
}
Expand Down Expand Up @@ -133,7 +131,6 @@ public DBOptions createDBOptions() {
return new DBOptions()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setDisableDataSync(true)
.setMaxOpenFiles(-1);
}

Expand All @@ -146,7 +143,7 @@ public ColumnFamilyOptions createColumnOptions() {
final long writeBufferSize = 64 * 1024 * 1024;

return new ColumnFamilyOptions()
.setMergeOperator(new StringAppendOperator())
.setMergeOperatorName(MERGE_OPERATOR_NAME)
.setCompactionStyle(CompactionStyle.LEVEL)
.setLevelCompactionDynamicLevelBytes(true)
.setTargetFileSizeBase(targetFileSize)
Expand All @@ -158,6 +155,7 @@ public ColumnFamilyOptions createColumnOptions() {
new BlockBasedTableConfig()
.setBlockCacheSize(blockCacheSize)
.setBlockSize(blockSize)
.setFilter(new BloomFilter())
);
}
},
Expand Down Expand Up @@ -186,19 +184,21 @@ public DBOptions createDBOptions() {
return new DBOptions()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setDisableDataSync(true)
.setMaxOpenFiles(-1);
}

@Override
public ColumnFamilyOptions createColumnOptions() {
return new ColumnFamilyOptions()
.setMergeOperator(new StringAppendOperator());
.setMergeOperatorName(MERGE_OPERATOR_NAME);
}
};

// ------------------------------------------------------------------------

// The name of the merge operator in RocksDB. Do not change except you know exactly what you do.
public static final String MERGE_OPERATOR_NAME = "stringappendtest";

/**
* Creates the {@link DBOptions}for this pre-defined setting.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@
* streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon
* checkpointing. This state backend can store very large state that exceeds memory and spills
* to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe.
*
* <p>This class follows the rules for closing/releasing native RocksDB resources as described in
+ <a href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families">
* this document</a>.
*/
public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {

Expand Down Expand Up @@ -159,6 +163,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*/
protected RocksDB db;

/**
* We are not using the default column family for Flink state ops, but we still need to remember this handle so that
* we can close it properly when the backend is closed. This is required by RocksDB's native memory management.
*/
private ColumnFamilyHandle defaultColumnFamily;

/**
* Information about the k/v states as we create them. This is used to retrieve the
* column family that is used for a state and also for sanity checks when restoring.
Expand Down Expand Up @@ -254,30 +264,31 @@ public void dispose() {
// and access it in a synchronized block that locks on #dbDisposeLock.
if (db != null) {

for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
kvStateInformation.values()) {
try {
column.f0.close();
} catch (Exception ex) {
LOG.info("Exception while closing ColumnFamilyHandle object.", ex);
}
// RocksDB's native memory management requires that *all* CFs (including default) are closed before the
// DB is closed. So we start with the ones created by Flink...
for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData :
kvStateInformation.values()) {

IOUtils.closeQuietly(columnMetaData.f0);
}

kvStateInformation.clear();
restoredKvStateMetaInfos.clear();
// ... close the default CF ...
IOUtils.closeQuietly(defaultColumnFamily);

try {
db.close();
} catch (Exception ex) {
LOG.info("Exception while closing RocksDB object.", ex);
}
// ... and finally close the DB instance ...
IOUtils.closeQuietly(db);

// invalidate the reference before releasing the lock so that other accesses will not cause crashes
db = null;

}
}

IOUtils.closeQuietly(columnOptions);
kvStateInformation.clear();
restoredKvStateMetaInfos.clear();

IOUtils.closeQuietly(dbOptions);
IOUtils.closeQuietly(columnOptions);

try {
FileUtils.deleteDirectory(instanceBasePath);
Expand Down Expand Up @@ -1039,6 +1050,8 @@ private RocksDB openDB(
List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException {

List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(stateColumnFamilyDescriptors);

// we add the required descriptor for the default CF in last position.
columnFamilyDescriptors.add(
new ColumnFamilyDescriptor(
"default".getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions));
Expand All @@ -1057,9 +1070,15 @@ private RocksDB openDB(
throw new IOException("Error while opening RocksDB instance.", e);
}

final int defaultColumnFamilyIndex = columnFamilyHandles.size() - 1;

// extract the default column family.
defaultColumnFamily = columnFamilyHandles.get(defaultColumnFamilyIndex);

if (stateColumnFamilyHandles != null) {
// return all CFs except the default CF which is kept separately because it is not used in Flink operations.
stateColumnFamilyHandles.addAll(
columnFamilyHandles.subList(0, columnFamilyHandles.size() - 1));
columnFamilyHandles.subList(0, defaultColumnFamilyIndex));
}

return db;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,16 +260,7 @@ public void testPredefinedOptions() throws Exception {
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());

try (
DBOptions optCreated = rocksDbBackend.getDbOptions();
DBOptions optReference = new DBOptions();
ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) {

// check that our instance uses something that we configured
assertEquals(true, optCreated.disableDataSync());
// just ensure that we pickend an option that actually differs from the reference.
assertEquals(false, optReference.disableDataSync());

try (ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) {
assertEquals(CompactionStyle.LEVEL, colCreated.compactionStyle());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import org.rocksdb.StringAppendOperator;
import org.rocksdb.WriteOptions;
import sun.misc.Unsafe;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import static org.apache.flink.contrib.streaming.state.PredefinedOptions.MERGE_OPERATOR_NAME;

/**
* Test that validates that the performance of RocksDB is as expected.
* This test guards against the bug filed as 'FLINK-5756'
Expand Down Expand Up @@ -74,9 +75,8 @@ public void testRocksDbMergePerformance() throws Exception {
.setIncreaseParallelism(4)
.setUseFsync(false)
.setMaxOpenFiles(-1)
.setDisableDataSync(true)
.setCreateIfMissing(true)
.setMergeOperator(new StringAppendOperator());
.setMergeOperatorName(MERGE_OPERATOR_NAME);

final WriteOptions write_options = new WriteOptions()
.setSync(false)
Expand Down Expand Up @@ -152,9 +152,8 @@ public void testRocksDbRangeGetPerformance() throws Exception {
.setIncreaseParallelism(4)
.setUseFsync(false)
.setMaxOpenFiles(-1)
.setDisableDataSync(true)
.setCreateIfMissing(true)
.setMergeOperator(new StringAppendOperator());
.setMergeOperatorName(MERGE_OPERATOR_NAME);

final WriteOptions write_options = new WriteOptions()
.setSync(false)
Expand Down

0 comments on commit d818fc4

Please sign in to comment.