Skip to content

Commit

Permalink
[FLINK-5626] Improved resource deallocation in RocksDBKeyedStateBackend
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter authored and uce committed Jan 25, 2017
1 parent a811545 commit cd9115f
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
Expand All @@ -68,11 +68,9 @@
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
Expand All @@ -96,15 +94,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {

private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);

/** Operator identifier that is used to uniqueify the RocksDB storage path. */
private final String operatorIdentifier;

/** JobID for uniquifying backup paths. */
private final JobID jobId;

/** The options from the options factory, cached */
/** The column family options from the options factory */
private final ColumnFamilyOptions columnOptions;

/** The DB options from the options factory */
private final DBOptions dbOptions;

/** Path where this configured instance stores its data directory */
private final File instanceBasePath;

Expand Down Expand Up @@ -145,19 +140,18 @@ public RocksDBKeyedStateBackend(
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange
) throws Exception {
) throws IOException {

super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);
this.operatorIdentifier = operatorIdentifier;
this.jobId = jobId;
this.columnOptions = columnFamilyOptions;
this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions);
this.dbOptions = Preconditions.checkNotNull(dbOptions);

this.instanceBasePath = instanceBasePath;
this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath);
this.instanceRocksDBPath = new File(instanceBasePath, "db");

if (!instanceBasePath.exists()) {
if (!instanceBasePath.mkdirs()) {
throw new RuntimeException("Could not create RocksDB data directory.");
throw new IOException("Could not create RocksDB data directory.");
}
}

Expand All @@ -168,17 +162,23 @@ public RocksDBKeyedStateBackend(
FileUtils.deleteDirectory(instanceRocksDBPath);
}
} catch (IOException e) {
throw new RuntimeException("Error cleaning RocksDB data directory.", e);
throw new IOException("Error cleaning RocksDB data directory.", e);
}

List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1);
// RocksDB seems to need this...
columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
try {
db = RocksDB.open(dbOptions, instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);

db = RocksDB.open(
Preconditions.checkNotNull(dbOptions),
instanceRocksDBPath.getAbsolutePath(),
columnFamilyDescriptors,
columnFamilyHandles);

} catch (RocksDBException e) {
throw new RuntimeException("Error while opening RocksDB instance.", e);
throw new IOException("Error while opening RocksDB instance.", e);
}
keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1;
kvStateInformation = new HashMap<>();
Expand All @@ -200,21 +200,32 @@ public void dispose() {

for (Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> column :
kvStateInformation.values()) {

column.f0.close();
try {
column.f0.close();
} catch (Exception ex) {
LOG.info("Exception while closing ColumnFamilyHandle object.", ex);
}
}

kvStateInformation.clear();

db.close();
try {
db.close();
} catch (Exception ex) {
LOG.info("Exception while closing RocksDB object.", ex);
}

db = null;
}
}

IOUtils.closeQuietly(columnOptions);
IOUtils.closeQuietly(dbOptions);

try {
FileUtils.deleteDirectory(instanceBasePath);
} catch (IOException ioex) {
LOG.info("Could not delete instace base path for RocksDB: " + instanceBasePath);
LOG.info("Could not delete instace base path for RocksDB: " + instanceBasePath, ioex);
}
}

Expand Down Expand Up @@ -245,14 +256,17 @@ public RunnableFuture<KeyGroupsStateHandle> snapshot(
// hold the db lock while operation on the db to guard us against async db disposal
synchronized (asyncSnapshotLock) {

if (kvStateInformation.isEmpty()) {
LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
" . Returning null.");
if (db != null) {

return new DoneFuture<>(null);
}
if (kvStateInformation.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
" . Returning null.");
}

return new DoneFuture<>(null);
}

if (db != null) {
snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
} else {
throw new IOException("RocksDB closed.");
Expand Down Expand Up @@ -328,9 +342,10 @@ static final class RocksDBSnapshotOperation {

private Snapshot snapshot;
private ReadOptions readOptions;
private List<Tuple2<RocksIterator, Integer>> kvStateIterators;

private CheckpointStreamFactory.CheckpointStateOutputStream outStream;
private DataOutputView outputView;
private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
private KeyGroupsStateHandle snapshotResultStateHandle;

RocksDBSnapshotOperation(
Expand Down Expand Up @@ -401,26 +416,26 @@ public void closeCheckpointStream() throws IOException {

/**
* 5) Release the snapshot object for RocksDB and clean up.
*
*/
public void releaseSnapshotResources(boolean canceled) {

if (null != kvStateIterators) {
for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) {
kvStateIterator.f0.close();
IOUtils.closeQuietly(kvStateIterator.f0);
}
kvStateIterators = null;
}

if (null != snapshot) {
if(null != stateBackend.db) {
if (null != stateBackend.db) {
stateBackend.db.releaseSnapshot(snapshot);
}
snapshot.close();
IOUtils.closeQuietly(snapshot);
snapshot = null;
}

if (null != readOptions) {
readOptions.close();
IOUtils.closeQuietly(readOptions);
readOptions = null;
}

Expand Down Expand Up @@ -477,8 +492,10 @@ private void writeKVStateMetaData() throws IOException {
//retrieve iterator for this k/v states
readOptions = new ReadOptions();
readOptions.setSnapshot(snapshot);
RocksIterator iterator = stateBackend.db.newIterator(column.getValue().f0, readOptions);
kvStateIterators.add(new Tuple2<>(iterator, kvStateId));

kvStateIterators.add(
new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId));

++kvStateId;
}

Expand All @@ -493,12 +510,12 @@ private void writeKVStateData() throws IOException, InterruptedException {
byte[] previousKey = null;
byte[] previousValue = null;

List<Tuple2<RocksIterator, Integer>> kvStateIteratorsHandover = this.kvStateIterators;
this.kvStateIterators = null;

// Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
kvStateIteratorsHandover, stateBackend.keyGroupPrefixBytes)) {
kvStateIterators, stateBackend.keyGroupPrefixBytes)) {

// handover complete, null out to prevent double close
kvStateIterators = null;

//preamble: setup with first key-group as our lookahead
if (mergeIterator.isValid()) {
Expand Down Expand Up @@ -584,7 +601,7 @@ static boolean hasMetaDataFollowsFlag(byte[] key) {
}

private static void checkInterrupted() throws InterruptedException {
if(Thread.currentThread().isInterrupted()) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("RocksDB snapshot interrupted.");
}
}
Expand Down Expand Up @@ -674,7 +691,7 @@ private void restoreKeyGroupsInStateHandle()
} finally {
if (currentStateHandleInStream != null) {
rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterClosable(currentStateHandleInStream);
currentStateHandleInStream.close();
IOUtils.closeQuietly(currentStateHandleInStream);
}
}
}
Expand Down Expand Up @@ -778,7 +795,8 @@ private void restoreKVStateData() throws IOException, RocksDBException {
* that we checkpointed, i.e. is already in the map of column families.
*/
@SuppressWarnings("rawtypes, unchecked")
protected <N, S> ColumnFamilyHandle getColumnFamily(StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) {
protected <N, S> ColumnFamilyHandle getColumnFamily(
StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException {

Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo =
kvStateInformation.get(descriptor.getName());
Expand All @@ -790,12 +808,13 @@ protected <N, S> ColumnFamilyHandle getColumnFamily(StateDescriptor<?, S> descri
descriptor.getSerializer());

if (stateInfo != null) {
if (!newMetaInfo.isCompatibleWith(stateInfo.f1)) {
throw new RuntimeException("Trying to access state using wrong meta info, was " + stateInfo.f1 +
if (newMetaInfo.isCompatibleWith(stateInfo.f1)) {
stateInfo.f1 = newMetaInfo;
return stateInfo.f0;
} else {
throw new IOException("Trying to access state using wrong meta info, was " + stateInfo.f1 +
" trying access with " + newMetaInfo);
}
stateInfo.f1 = newMetaInfo;
return stateInfo.f0;
}

ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(
Expand All @@ -809,7 +828,7 @@ protected <N, S> ColumnFamilyHandle getColumnFamily(StateDescriptor<?, S> descri
rawAccess.put(descriptor.getName(), tuple);
return columnFamily;
} catch (RocksDBException e) {
throw new RuntimeException("Error creating ColumnFamilyHandle.", e);
throw new IOException("Error creating ColumnFamilyHandle.", e);
}
}

Expand Down Expand Up @@ -866,21 +885,20 @@ protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
* Wraps a RocksDB iterator to cache it's current key and assign an id for the key/value state to the iterator.
* Used by #MergeIterator.
*/
static final class MergeIterator {
static final class MergeIterator implements AutoCloseable {

/**
*
* @param iterator The #RocksIterator to wrap .
* @param iterator The #RocksIterator to wrap .
* @param kvStateId Id of the K/V state to which this iterator belongs.
*/
public MergeIterator(RocksIterator iterator, int kvStateId) {
MergeIterator(RocksIterator iterator, int kvStateId) {
this.iterator = Preconditions.checkNotNull(iterator);
this.currentKey = iterator.key();
this.kvStateId = kvStateId;
}

private byte[] currentKey;
private final RocksIterator iterator;
private byte[] currentKey;
private final int kvStateId;

public byte[] getCurrentKey() {
Expand All @@ -899,16 +917,17 @@ public int getKvStateId() {
return kvStateId;
}

@Override
public void close() {
this.iterator.close();
IOUtils.closeQuietly(iterator);
}
}

/**
* Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
* The resulting iteration sequence is ordered by (key-group, kv-state).
*/
static final class RocksDBMergeIterator implements Closeable {
static final class RocksDBMergeIterator implements AutoCloseable {

private final PriorityQueue<MergeIterator> heap;
private final int keyGroupPrefixByteCount;
Expand Down Expand Up @@ -943,20 +962,22 @@ public int compare(MergeIterator o1, MergeIterator o2) {
Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount);

if (kvStateIterators.size() > 0) {
this.heap = new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
PriorityQueue<MergeIterator> iteratorPriorityQueue =
new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);

for (Tuple2<RocksIterator, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0;
final RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0;
rocksIterator.seekToFirst();
if (rocksIterator.isValid()) {
heap.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
} else {
rocksIterator.close();
IOUtils.closeQuietly(rocksIterator);
}
}

kvStateIterators.clear();

this.heap = iteratorPriorityQueue;
this.valid = !heap.isEmpty();
this.currentSubIterator = heap.poll();
} else {
Expand Down Expand Up @@ -991,7 +1012,7 @@ public void next() {
detectNewKeyGroup(oldKey);
}
} else {
rocksIterator.close();
IOUtils.closeQuietly(rocksIterator);

if (heap.isEmpty()) {
currentSubIterator = null;
Expand Down Expand Up @@ -1082,16 +1103,10 @@ private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) {

@Override
public void close() {
IOUtils.closeQuietly(currentSubIterator);
currentSubIterator = null;

if (null != currentSubIterator) {
currentSubIterator.close();
currentSubIterator = null;
}

for (MergeIterator iterator : heap) {
iterator.close();
}

IOUtils.closeAllQuietly(heap);
heap.clear();
}
}
Expand Down Expand Up @@ -1148,7 +1163,8 @@ private void restoreOldSavepointKeyedState(Collection<KeyGroupsStateHandle> rest
// the EOFException will get us out of this...
while (true) {
byte mappingByte = inputView.readByte();
ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte), null);
ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte),null);

byte[] keyAndNamespace = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);

ByteArrayInputStreamWithPos bis = new ByteArrayInputStreamWithPos(keyAndNamespace);
Expand Down
Loading

0 comments on commit cd9115f

Please sign in to comment.