Skip to content

Commit

Permalink
[FLINK-3761] Refactor State Backends/Make Keyed State Key-Group Aware
Browse files Browse the repository at this point in the history
The biggest change in this is that functionality that used to be in
AbstractStateBackend is now moved to CheckpointStreamFactory and
KeyedStateBackend. The former is responsible for providing streams that
can be used to checkpoint data while the latter is responsible for
keeping keyed state. A keyed backend can checkpoint the state that it
keeps by using a CheckpointStreamFactory.

This also refactors how asynchronous keyed state snapshots work. They
are not implemented using a Future/RunnableFuture.

Also, this changes the keyed state backends to be key-group aware and to
snapshot the state in key-groups with an index for restoring.
  • Loading branch information
aljoscha committed Aug 31, 2016
1 parent 516ad01 commit 4809f53
Show file tree
Hide file tree
Showing 131 changed files with 4,955 additions and 5,810 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
Expand All @@ -46,7 +45,7 @@
* @param <SD> The type of {@link StateDescriptor}.
*/
public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, V>, V>
implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
implements KvState<N>, State {

private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class);

Expand All @@ -57,7 +56,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
private N currentNamespace;

/** Backend that holds the actual RocksDB instance where we store state */
protected RocksDBStateBackend backend;
protected RocksDBKeyedStateBackend backend;

/** The column family of this particular instance of state */
protected ColumnFamilyHandle columnFamily;
Expand All @@ -77,7 +76,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
protected AbstractRocksDBState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
SD stateDesc,
RocksDBStateBackend backend) {
RocksDBKeyedStateBackend backend) {

this.namespaceSerializer = namespaceSerializer;
this.backend = backend;
Expand Down Expand Up @@ -106,7 +105,7 @@ public void clear() {
}

protected void writeKeyAndNamespace(DataOutputView out) throws IOException {
backend.keySerializer().serialize(backend.currentKey(), out);
backend.getKeySerializer().serialize(backend.getCurrentKey(), out);
out.writeByte(42);
namespaceSerializer.serialize(currentNamespace, out);
}
Expand All @@ -116,27 +115,6 @@ public void setCurrentNamespace(N namespace) {
this.currentNamespace = Preconditions.checkNotNull(namespace, "Namespace");
}

@Override
public void dispose() {
// ignore because we don't hold any state ourselves
}

@Override
public SD getStateDescriptor() {
return stateDesc;
}

@Override
public void setCurrentKey(K key) {
// ignore because we don't hold any state ourselves
}

@Override
public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> snapshot(long checkpointId,
long timestamp) throws Exception {
throw new RuntimeException("Should not be called. Backups happen in RocksDBStateBackend.");
}

@Override
@SuppressWarnings("unchecked")
public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
public RocksDBFoldingState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc,
RocksDBStateBackend backend) {
RocksDBKeyedStateBackend backend) {

super(columnFamily, namespaceSerializer, stateDesc, backend);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state;

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.util.SerializableObject;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;

/**
* A {@link KeyedStateBackend} that stores its state in {@code RocksDB} and will serialize state to
* streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon
* checkpointing. This state backend can store very large state that exceeds memory and spills
* to disk.
*/
public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {

private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);

/** Operator identifier that is used to uniqueify the RocksDB storage path. */
private final String operatorIdentifier;

/** JobID for uniquifying backup paths. */
private final JobID jobId;

/** The options from the options factory, cached */
private final ColumnFamilyOptions columnOptions;

/** Path where this configured instance stores its data directory */
private final File instanceBasePath;

/** Path where this configured instance stores its RocksDB data base */
private final File instanceRocksDBPath;

/**
* Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}
* to store state. The different k/v states that we have don't each have their own RocksDB
* instance. They all write to this instance but to their own column family.
*/
protected volatile RocksDB db;

/**
* Lock for protecting cleanup of the RocksDB db. We acquire this when doing asynchronous
* checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try
* iterating over a disposed db.
*/
private final SerializableObject dbCleanupLock = new SerializableObject();

/**
* Information about the k/v states as we create them. This is used to retrieve the
* column family that is used for a state and also for sanity checks when restoring.
*/
private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> kvStateInformation;

public RocksDBKeyedStateBackend(
JobID jobId,
String operatorIdentifier,
File instanceBasePath,
DBOptions dbOptions,
ColumnFamilyOptions columnFamilyOptions,
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
KeyGroupAssigner<K> keyGroupAssigner,
KeyGroupRange keyGroupRange
) throws Exception {

super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange);

this.operatorIdentifier = operatorIdentifier;
this.jobId = jobId;
this.columnOptions = columnFamilyOptions;

this.instanceBasePath = instanceBasePath;
this.instanceRocksDBPath = new File(instanceBasePath, "db");

RocksDB.loadLibrary();

if (!instanceBasePath.exists()) {
if (!instanceBasePath.mkdirs()) {
throw new RuntimeException("Could not create RocksDB data directory.");
}
}

// clean it, this will remove the last part of the path but RocksDB will recreate it
try {
if (instanceRocksDBPath.exists()) {
LOG.warn("Deleting already existing db directory {}.", instanceRocksDBPath);
FileUtils.deleteDirectory(instanceRocksDBPath);
}
} catch (IOException e) {
throw new RuntimeException("Error cleaning RocksDB data directory.", e);
}

List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1);
// RocksDB seems to need this...
columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
try {
db = RocksDB.open(dbOptions, instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
} catch (RocksDBException e) {
throw new RuntimeException("Error while opening RocksDB instance.", e);
}

kvStateInformation = new HashMap<>();
}

@Override
public void close() throws Exception {
super.close();

// we have to lock because we might have an asynchronous checkpoint going on
synchronized (dbCleanupLock) {
if (db != null) {
for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
column.f0.dispose();
}

db.dispose();
db = null;
}
}

FileUtils.deleteDirectory(instanceBasePath);
}

@Override
public Future<KeyGroupsStateHandle> snapshot(
long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory) throws Exception {
throw new RuntimeException("Not implemented.");
}

// ------------------------------------------------------------------------
// State factories
// ------------------------------------------------------------------------

/**
* Creates a column family handle for use with a k/v state. When restoring from a snapshot
* we don't restore the individual k/v states, just the global RocksDB data base and the
* list of column families. When a k/v state is first requested we check here whether we
* already have a column family for that and return it or create a new one if it doesn't exist.
*
* <p>This also checks whether the {@link StateDescriptor} for a state matches the one
* that we checkpointed, i.e. is already in the map of column families.
*/
protected ColumnFamilyHandle getColumnFamily(StateDescriptor descriptor) {

Tuple2<ColumnFamilyHandle, StateDescriptor> stateInfo = kvStateInformation.get(descriptor.getName());

if (stateInfo != null) {
if (!stateInfo.f1.equals(descriptor)) {
throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 + " trying access with " + descriptor);
}
return stateInfo.f0;
}

ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(descriptor.getName().getBytes(), columnOptions);

try {
ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
kvStateInformation.put(descriptor.getName(), new Tuple2<>(columnFamily, descriptor));
return columnFamily;
} catch (RocksDBException e) {
throw new RuntimeException("Error creating ColumnFamilyHandle.", e);
}
}

@Override
protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<T> stateDesc) throws Exception {

ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);

return new RocksDBValueState<>(columnFamily, namespaceSerializer, stateDesc, this);
}

@Override
protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc) throws Exception {

ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);

return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this);
}

@Override
protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<T> stateDesc) throws Exception {

ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);

return new RocksDBReducingState<>(columnFamily, namespaceSerializer, stateDesc, this);
}

@Override
protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {

ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);

return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public class RocksDBListState<K, N, V>
public RocksDBListState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
RocksDBStateBackend backend) {
RocksDBKeyedStateBackend backend) {

super(columnFamily, namespaceSerializer, stateDesc, backend);
this.valueSerializer = stateDesc.getSerializer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public class RocksDBReducingState<K, N, V>
public RocksDBReducingState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<V> stateDesc,
RocksDBStateBackend backend) {
RocksDBKeyedStateBackend backend) {

super(columnFamily, namespaceSerializer, stateDesc, backend);
this.valueSerializer = stateDesc.getSerializer();
this.reduceFunction = stateDesc.getReduceFunction();
Expand Down
Loading

0 comments on commit 4809f53

Please sign in to comment.