diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 74e0509e59714..ec58a63a27e1b 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -17,195 +17,64 @@ package org.apache.flink.contrib.streaming.state; -import org.apache.commons.io.FileUtils; - import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; -import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot; + import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.state.KvStateSnapshot; - -import org.apache.flink.streaming.util.HDFSCopyFromLocal; -import org.apache.flink.streaming.util.HDFSCopyToLocal; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import org.rocksdb.BackupEngine; -import org.rocksdb.BackupableDBOptions; -import org.rocksdb.Env; -import org.rocksdb.FlushOptions; -import org.rocksdb.Options; -import org.rocksdb.RestoreOptions; -import org.rocksdb.RocksDB; +import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; -import java.io.File; import java.io.IOException; -import java.net.URI; -import java.util.UUID; - -import static java.util.Objects.requireNonNull; /** * Base class for {@link State} implementations that store state in a RocksDB database. * - *

This base class is responsible for setting up the RocksDB database, for - * checkpointing/restoring the database and for disposal in the {@link #dispose()} method. The - * concrete subclasses just use the RocksDB handle to store/retrieve state. - * - *

State is checkpointed asynchronously. The synchronous part is drawing the actual backup - * from RocksDB, this is done in {@link #snapshot(long, long)}. This will return a - * {@link AsyncRocksDBSnapshot} that will perform the copying of the backup to the remote - * file system. + *

State is not stored in this class but in the {@link org.rocksdb.RocksDB} instance that + * the {@link RocksDBStateBackend} manages and checkpoints. * * @param The type of the key. * @param The type of the namespace. * @param The type of {@link State}. * @param The type of {@link StateDescriptor}. */ -public abstract class AbstractRocksDBState> - implements KvState, State { +abstract class AbstractRocksDBState> + implements KvState, State { private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class); - /** Serializer for the keys */ - protected final TypeSerializer keySerializer; - /** Serializer for the namespace */ - protected final TypeSerializer namespaceSerializer; - - /** The current key, which the next value methods will refer to */ - protected K currentKey; + private final TypeSerializer namespaceSerializer; /** The current namespace, which the next value methods will refer to */ - protected N currentNamespace; - - /** Store it so that we can clean up in dispose() */ - protected final File basePath; + private N currentNamespace; - /** FileSystem path where checkpoints are stored */ - protected final String checkpointPath; + /** Backend that holds the actual RocksDB instance where we store state */ + protected RocksDBStateBackend backend; - /** Directory in "basePath" where the actual RocksDB data base instance stores its files */ - protected final File rocksDbPath; - - /** Our RocksDB instance */ - protected final RocksDB db; + /** The column family of this particular instance of state */ + ColumnFamilyHandle columnFamily; /** * Creates a new RocksDB backed state. * - * @param keySerializer The serializer for the keys. * @param namespaceSerializer The serializer for the namespace. - * @param basePath The path on the local system where RocksDB data should be stored. */ - protected AbstractRocksDBState( - TypeSerializer keySerializer, + AbstractRocksDBState(ColumnFamilyHandle columnFamily, TypeSerializer namespaceSerializer, - File basePath, - String checkpointPath, - Options options) { - - rocksDbPath = new File(basePath, "db" + UUID.randomUUID().toString()); + RocksDBStateBackend backend) { - this.keySerializer = requireNonNull(keySerializer); this.namespaceSerializer = namespaceSerializer; - this.basePath = basePath; - this.checkpointPath = checkpointPath; - - RocksDB.loadLibrary(); - - if (!basePath.exists()) { - if (!basePath.mkdirs()) { - throw new RuntimeException("Could not create RocksDB data directory."); - } - } - - // clean it, this will remove the last part of the path but RocksDB will recreate it - try { - if (rocksDbPath.exists()) { - LOG.warn("Deleting already existing db directory {}.", rocksDbPath); - FileUtils.deleteDirectory(rocksDbPath); - } - } catch (IOException e) { - throw new RuntimeException("Error cleaning RocksDB data directory.", e); - } - - try { - db = RocksDB.open(options, rocksDbPath.getAbsolutePath()); - } catch (RocksDBException e) { - throw new RuntimeException("Error while opening RocksDB instance.", e); - } + this.backend = backend; - } - - /** - * Creates a new RocksDB backed state and restores from the given backup directory. After - * restoring the backup directory is deleted. - * - * @param keySerializer The serializer for the keys. - * @param namespaceSerializer The serializer for the namespace. - * @param basePath The path on the local system where RocksDB data should be stored. - * @param restorePath The path to a backup directory from which to restore RocksDb database. - */ - protected AbstractRocksDBState( - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - File basePath, - String checkpointPath, - String restorePath, - Options options) { - - rocksDbPath = new File(basePath, "db" + UUID.randomUUID().toString()); - - RocksDB.loadLibrary(); - - // clean it, this will remove the last part of the path but RocksDB will recreate it - try { - if (rocksDbPath.exists()) { - LOG.warn("Deleting already existing db directory {}.", rocksDbPath); - FileUtils.deleteDirectory(rocksDbPath); - } - } catch (IOException e) { - throw new RuntimeException("Error cleaning RocksDB data directory.", e); - } - - try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(restorePath + "/"))) { - backupEngine.restoreDbFromLatestBackup(rocksDbPath.getAbsolutePath(), rocksDbPath.getAbsolutePath(), new RestoreOptions(true)); - } catch (RocksDBException|IllegalArgumentException e) { - throw new RuntimeException("Error while restoring RocksDB state from " + restorePath, e); - } finally { - try { - FileUtils.deleteDirectory(new File(restorePath)); - } catch (IOException e) { - LOG.error("Error cleaning up local restore directory " + restorePath, e); - } - } - - this.keySerializer = requireNonNull(keySerializer); - this.namespaceSerializer = namespaceSerializer; - this.basePath = basePath; - this.checkpointPath = checkpointPath; - - if (!basePath.exists()) { - if (!basePath.mkdirs()) { - throw new RuntimeException("Could not create RocksDB data directory."); - } - } - - try { - db = RocksDB.open(options, rocksDbPath.getAbsolutePath()); - } catch (RocksDBException e) { - throw new RuntimeException("Error while opening RocksDB instance.", e); - } + this.columnFamily = columnFamily; } // ------------------------------------------------------------------------ @@ -217,234 +86,38 @@ final public void clear() { try { writeKeyAndNamespace(out); byte[] key = baos.toByteArray(); - db.remove(key); + backend.db.remove(columnFamily, key); } catch (IOException|RocksDBException e) { throw new RuntimeException("Error while removing entry from RocksDB", e); } } - protected void writeKeyAndNamespace(DataOutputView out) throws IOException { - keySerializer.serialize(currentKey, out); + void writeKeyAndNamespace(DataOutputView out) throws IOException { + backend.keySerializer().serialize(backend.currentKey(), out); out.writeByte(42); namespaceSerializer.serialize(currentNamespace, out); } - @Override - final public void setCurrentKey(K currentKey) { - this.currentKey = currentKey; - } - @Override final public void setCurrentNamespace(N namespace) { this.currentNamespace = namespace; } - protected abstract AbstractRocksDBSnapshot createRocksDBSnapshot(URI backupUri, long checkpointId); - - @Override - public final KvStateSnapshot snapshot(final long checkpointId, long timestamp) throws Exception { - - final File localBackupPath = new File(basePath, "local-chk-" + checkpointId); - final URI backupUri = new URI(checkpointPath + "/chk-" + checkpointId); - - - if (!localBackupPath.exists()) { - if (!localBackupPath.mkdirs()) { - throw new RuntimeException("Could not create local backup path " + localBackupPath); - } - } - - long startTime = System.currentTimeMillis(); - BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath()); - // we disabled the WAL - backupOptions.setBackupLogFiles(false); - // no need to sync since we use the backup only as intermediate data before writing to FileSystem snapshot - backupOptions.setSync(false); - try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), - backupOptions)) { - // make sure to flush because we don't write to the write-ahead-log - db.flush(new FlushOptions().setWaitForFlush(true)); - backupEngine.createNewBackup(db); - } - long endTime = System.currentTimeMillis(); - LOG.info("RocksDB (" + rocksDbPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms."); - - return new AsyncRocksDBSnapshot<>( - localBackupPath, - backupUri, - checkpointId, - this); - } - @Override final public void dispose() { - db.dispose(); - try { - FileUtils.deleteDirectory(basePath); - } catch (IOException e) { - throw new RuntimeException("Error disposing RocksDB data directory.", e); - } + // ignore because we don't hold any state ourselves } - protected static abstract class AbstractRocksDBSnapshot> - implements KvStateSnapshot { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class); - - // ------------------------------------------------------------------------ - // Ctor parameters for RocksDB state - // ------------------------------------------------------------------------ - - /** Store it so that we can clean up in dispose() */ - protected final File basePath; - - /** Where we should put RocksDB backups */ - protected final String checkpointPath; - - // ------------------------------------------------------------------------ - // Info about this checkpoint - // ------------------------------------------------------------------------ - - protected final URI backupUri; - - protected long checkpointId; - - // ------------------------------------------------------------------------ - // For sanity checks - // ------------------------------------------------------------------------ - - /** Key serializer */ - protected final TypeSerializer keySerializer; - - /** Namespace serializer */ - protected final TypeSerializer namespaceSerializer; - - /** Hash of the StateDescriptor, for sanity checks */ - protected final SD stateDesc; - - /** - * Creates a new snapshot from the given state parameters. - */ - public AbstractRocksDBSnapshot(File basePath, - String checkpointPath, - URI backupUri, - long checkpointId, - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - SD stateDesc) { - - this.basePath = basePath; - this.checkpointPath = checkpointPath; - this.backupUri = backupUri; - this.checkpointId = checkpointId; - - this.stateDesc = stateDesc; - this.keySerializer = keySerializer; - this.namespaceSerializer = namespaceSerializer; - } - - /** - * Subclasses must implement this for creating a concrete RocksDB state. - */ - protected abstract KvState createRocksDBState( - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - SD stateDesc, - File basePath, - String backupPath, - String restorePath, - Options options) throws Exception; - - @Override - public final KvState restoreState( - RocksDBStateBackend stateBackend, - TypeSerializer keySerializer, - ClassLoader classLoader, - long recoveryTimestamp) throws Exception { - - // validity checks - if (!this.keySerializer.equals(keySerializer)) { - throw new IllegalArgumentException( - "Cannot restore the state from the snapshot with the given serializers. " + - "State (K/V) was serialized with " + - "(" + keySerializer + ") " + - "now is (" + keySerializer + ")"); - } - - if (!basePath.exists()) { - if (!basePath.mkdirs()) { - throw new RuntimeException("Could not create RocksDB base path " + basePath); - } - } - - final File localBackupPath = new File(basePath, "chk-" + checkpointId); - - if (localBackupPath.exists()) { - try { - LOG.warn("Deleting already existing local backup directory {}.", localBackupPath); - FileUtils.deleteDirectory(localBackupPath); - } catch (IOException e) { - throw new RuntimeException("Error cleaning RocksDB local backup directory.", e); - } - } - - HDFSCopyToLocal.copyToLocal(backupUri, basePath); - return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, basePath, - checkpointPath, localBackupPath.getAbsolutePath(), stateBackend.getRocksDBOptions()); - } - - @Override - public final void discardState() throws Exception { - FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration()); - fs.delete(new Path(backupUri), true); - } + @Override + public void setCurrentKey(K key) { + // ignore because we don't hold any state ourselves - @Override - public final long getStateSize() throws Exception { - FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration()); - return fs.getContentSummary(new Path(backupUri)).getLength(); - } } - /** - * Upon snapshotting the RocksDB backup is created synchronously. The asynchronous part is - * copying the backup to a (possibly) remote filesystem. This is done in {@link #materialize()} - * of this class. - */ - private static class AsyncRocksDBSnapshot> extends AsynchronousKvStateSnapshot { - private static final long serialVersionUID = 1L; - private final File localBackupPath; - private final URI backupUri; - private final long checkpointId; - private transient AbstractRocksDBState state; - - public AsyncRocksDBSnapshot(File localBackupPath, - URI backupUri, - long checkpointId, - AbstractRocksDBState state) { - this.localBackupPath = localBackupPath; - this.backupUri = backupUri; - this.checkpointId = checkpointId; - this.state = state; - } - - @Override - public KvStateSnapshot materialize() throws Exception { - try { - long startTime = System.currentTimeMillis(); - HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri); - long endTime = System.currentTimeMillis(); - LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms."); - return state.createRocksDBSnapshot(backupUri, checkpointId); - } catch (Exception e) { - FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration()); - fs.delete(new Path(backupUri), true); - throw e; - } finally { - FileUtils.deleteQuietly(localBackupPath); - } - } + @Override + public KvStateSnapshot snapshot(long checkpointId, + long timestamp) throws Exception { + throw new RuntimeException("Should not be called. Backups happen in RocksDBStateBackend."); } } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java index 3e52f1f316cf2..863c5daa7b14f 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java @@ -18,10 +18,11 @@ package org.apache.flink.contrib.streaming.state; -import org.rocksdb.Options; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; /** - * A factory for {@link Options} to be passed to the {@link RocksDBStateBackend}. + * A factory for {@link DBOptions} to be passed to the {@link RocksDBStateBackend}. * Options have to be created lazily by this factory, because the {@code Options} * class is not serializable and holds pointers to native code. * @@ -55,5 +56,19 @@ public interface OptionsFactory extends java.io.Serializable { * @param currentOptions The options object with the pre-defined options. * @return The options object on which the additional options are set. */ - Options createOptions(Options currentOptions); + DBOptions createDBOptions(DBOptions currentOptions); + + /** + * This method should set the additional options on top of the current options object. + * The current options object may contain pre-defined options based on flags that have + * been configured on the state backend. + * + *

It is important to set the options on the current object and return the result from + * the setter methods, otherwise the pre-defined options may get lost. + * + * @param currentOptions The options object with the pre-defined options. + * @return The options object on which the additional options are set. + */ + ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions); + } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java index c19b54fae2764..93aac8564637e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java @@ -19,8 +19,10 @@ package org.apache.flink.contrib.streaming.state; import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactionStyle; -import org.rocksdb.Options; +import org.rocksdb.DBOptions; +import org.rocksdb.StringAppendOperator; /** * The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}. @@ -42,11 +44,18 @@ public enum PredefinedOptions { DEFAULT { @Override - public Options createOptions() { - return new Options() + public DBOptions createDBOptions() { + return new DBOptions() .setUseFsync(false) .setDisableDataSync(true); } + + @Override + public ColumnFamilyOptions createColumnOptions() { + return new ColumnFamilyOptions() + .setMergeOperator(new StringAppendOperator()); + } + }, /** @@ -72,16 +81,22 @@ public Options createOptions() { SPINNING_DISK_OPTIMIZED { @Override - public Options createOptions() { + public DBOptions createDBOptions() { - return new Options() - .setCompactionStyle(CompactionStyle.LEVEL) - .setLevelCompactionDynamicLevelBytes(true) + return new DBOptions() .setIncreaseParallelism(4) .setUseFsync(false) .setDisableDataSync(true) .setMaxOpenFiles(-1); } + + @Override + public ColumnFamilyOptions createColumnOptions() { + return new ColumnFamilyOptions() + .setMergeOperator(new StringAppendOperator()) + .setCompactionStyle(CompactionStyle.LEVEL) + .setLevelCompactionDynamicLevelBytes(true); + } }, /** @@ -113,25 +128,32 @@ public Options createOptions() { SPINNING_DISK_OPTIMIZED_HIGH_MEM { @Override - public Options createOptions() { + public DBOptions createDBOptions() { + + return new DBOptions() + .setIncreaseParallelism(4) + .setUseFsync(false) + .setDisableDataSync(true) + .setMaxOpenFiles(-1); + } + + @Override + public ColumnFamilyOptions createColumnOptions() { final long blockCacheSize = 256 * 1024 * 1024; final long blockSize = 128 * 1024; final long targetFileSize = 256 * 1024 * 1024; final long writeBufferSize = 64 * 1024 * 1024; - return new Options() + return new ColumnFamilyOptions() + .setMergeOperator(new StringAppendOperator()) .setCompactionStyle(CompactionStyle.LEVEL) .setLevelCompactionDynamicLevelBytes(true) .setTargetFileSizeBase(targetFileSize) .setMaxBytesForLevelBase(4 * targetFileSize) .setWriteBufferSize(writeBufferSize) - .setIncreaseParallelism(4) .setMinWriteBufferNumberToMerge(3) .setMaxWriteBufferNumber(4) - .setUseFsync(false) - .setDisableDataSync(true) - .setMaxOpenFiles(-1) .setTableFormatConfig( new BlockBasedTableConfig() .setBlockCacheSize(blockCacheSize) @@ -160,21 +182,35 @@ public Options createOptions() { FLASH_SSD_OPTIMIZED { @Override - public Options createOptions() { - return new Options() + public DBOptions createDBOptions() { + return new DBOptions() .setIncreaseParallelism(4) .setUseFsync(false) .setDisableDataSync(true) .setMaxOpenFiles(-1); } + + @Override + public ColumnFamilyOptions createColumnOptions() { + return new ColumnFamilyOptions() + .setMergeOperator(new StringAppendOperator()); + } }; // ------------------------------------------------------------------------ /** - * Creates the options for this pre-defined setting. + * Creates the {@link DBOptions}for this pre-defined setting. * * @return The pre-defined options object. */ - public abstract Options createOptions(); + public abstract DBOptions createDBOptions(); + + /** + * Creates the {@link org.rocksdb.ColumnFamilyOptions}for this pre-defined setting. + * + * @return The pre-defined options object. + */ + public abstract ColumnFamilyOptions createColumnOptions(); + } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java index 20b5181a02d28..91fa8075587dd 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java @@ -24,17 +24,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.KvState; -import org.rocksdb.Options; +import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; import org.rocksdb.WriteOptions; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.File; import java.io.IOException; -import java.net.URI; import static java.util.Objects.requireNonNull; @@ -46,7 +43,7 @@ * @param The type of the values that can be folded into the state. * @param The type of the value in the folding state. */ -public class RocksDBFoldingState +class RocksDBFoldingState extends AbstractRocksDBState, FoldingStateDescriptor> implements FoldingState { @@ -54,7 +51,7 @@ public class RocksDBFoldingState private final TypeSerializer valueSerializer; /** This holds the name of the state and can create an initial default value for the state. */ - protected final FoldingStateDescriptor stateDesc; + private final FoldingStateDescriptor stateDesc; /** User-specified fold function */ private final FoldFunction foldFunction; @@ -63,59 +60,23 @@ public class RocksDBFoldingState * We disable writes to the write-ahead-log here. We can't have these in the base class * because JNI segfaults for some reason if they are. */ - protected final WriteOptions writeOptions; + private final WriteOptions writeOptions; /** * Creates a new {@code RocksDBFoldingState}. * - * @param keySerializer The serializer for the keys. * @param namespaceSerializer The serializer for the namespace. * @param stateDesc The state identifier for the state. This contains name * and can create a default state value. - * @param dbPath The path on the local system where RocksDB data should be stored. - * @param backupPath The path where to store backups. */ - protected RocksDBFoldingState( - TypeSerializer keySerializer, + RocksDBFoldingState(ColumnFamilyHandle columnFamily, TypeSerializer namespaceSerializer, FoldingStateDescriptor stateDesc, - File dbPath, - String backupPath, - Options options) { - - super(keySerializer, namespaceSerializer, dbPath, backupPath, options); - - this.stateDesc = requireNonNull(stateDesc); - this.valueSerializer = stateDesc.getSerializer(); - this.foldFunction = stateDesc.getFoldFunction(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); - } + RocksDBStateBackend backend) { - /** - * Creates a {@code RocksDBFoldingState} by restoring from a directory. - * - * @param keySerializer The serializer for the keys. - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - * @param dbPath The path on the local system where RocksDB data should be stored. - * @param backupPath The path where to store backups. - * @param restorePath The path on the local file system that we are restoring from. - */ - protected RocksDBFoldingState( - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - FoldingStateDescriptor stateDesc, - File dbPath, - String backupPath, - String restorePath, - Options options) { - - super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options); + super(columnFamily, namespaceSerializer, backend); - this.stateDesc = stateDesc; + this.stateDesc = requireNonNull(stateDesc); this.valueSerializer = stateDesc.getSerializer(); this.foldFunction = stateDesc.getFoldFunction(); @@ -130,7 +91,7 @@ public ACC get() { try { writeKeyAndNamespace(out); byte[] key = baos.toByteArray(); - byte[] valueBytes = db.get(key); + byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { return stateDesc.getDefaultValue(); } @@ -147,65 +108,22 @@ public void add(T value) throws IOException { try { writeKeyAndNamespace(out); byte[] key = baos.toByteArray(); - byte[] valueBytes = db.get(key); + byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { baos.reset(); valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out); - db.put(writeOptions, key, baos.toByteArray()); + backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); } else { ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); ACC newValue = foldFunction.fold(oldValue, value); baos.reset(); valueSerializer.serialize(newValue, out); - db.put(writeOptions, key, baos.toByteArray()); + backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); } } catch (Exception e) { throw new RuntimeException("Error while adding data to RocksDB", e); } } - - @Override - protected AbstractRocksDBSnapshot, FoldingStateDescriptor> createRocksDBSnapshot( - URI backupUri, long checkpointId) { - - return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); - } - - private static class Snapshot extends AbstractRocksDBSnapshot, FoldingStateDescriptor> { - private static final long serialVersionUID = 1L; - - public Snapshot( - File dbPath, - String checkpointPath, - URI backupUri, - long checkpointId, - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - FoldingStateDescriptor stateDesc) { - - super(dbPath, - checkpointPath, - backupUri, - checkpointId, - keySerializer, - namespaceSerializer, - stateDesc); - } - - @Override - protected KvState, FoldingStateDescriptor, RocksDBStateBackend> - createRocksDBState( - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - FoldingStateDescriptor stateDesc, - File dbPath, - String backupPath, - String restorePath, - Options options) throws Exception { - - return new RocksDBFoldingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath, options); - } - } } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index f5c589c7afc6b..dc55d1183d146 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -23,17 +23,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.KvState; -import org.rocksdb.Options; +import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; import org.rocksdb.WriteOptions; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.File; import java.io.IOException; -import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -43,11 +40,15 @@ /** * {@link ListState} implementation that stores state in RocksDB. * + *

{@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * * @param The type of the key. * @param The type of the namespace. * @param The type of the values in the list state. */ -public class RocksDBListState +class RocksDBListState extends AbstractRocksDBState, ListStateDescriptor> implements ListState { @@ -55,59 +56,27 @@ public class RocksDBListState private final TypeSerializer valueSerializer; /** This holds the name of the state and can create an initial default value for the state. */ - protected final ListStateDescriptor stateDesc; + private final ListStateDescriptor stateDesc; /** * We disable writes to the write-ahead-log here. We can't have these in the base class * because JNI segfaults for some reason if they are. */ - protected final WriteOptions writeOptions; + private final WriteOptions writeOptions; /** * Creates a new {@code RocksDBListState}. * - * @param keySerializer The serializer for the keys. - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - * @param dbPath The path on the local system where RocksDB data should be stored. - * @param backupPath The path where to store backups. - */ - protected RocksDBListState(TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - ListStateDescriptor stateDesc, - File dbPath, - String backupPath, - Options options) { - - super(keySerializer, namespaceSerializer, dbPath, backupPath, options); - this.stateDesc = requireNonNull(stateDesc); - this.valueSerializer = stateDesc.getSerializer(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); - } - - /** - * Creates a {@code RocksDBListState} by restoring from a directory. - * - * @param keySerializer The serializer for the keys. * @param namespaceSerializer The serializer for the namespace. * @param stateDesc The state identifier for the state. This contains name * and can create a default state value. - * @param dbPath The path on the local system where RocksDB data should be stored. - * @param backupPath The path where to store backups. - * @param restorePath The path on the local file system that we are restoring from. */ - protected RocksDBListState(TypeSerializer keySerializer, + RocksDBListState(ColumnFamilyHandle columnFamily, TypeSerializer namespaceSerializer, ListStateDescriptor stateDesc, - File dbPath, - String backupPath, - String restorePath, - Options options) { + RocksDBStateBackend backend) { - super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options); + super(columnFamily, namespaceSerializer, backend); this.stateDesc = requireNonNull(stateDesc); this.valueSerializer = stateDesc.getSerializer(); @@ -122,7 +91,7 @@ public Iterable get() { try { writeKeyAndNamespace(out); byte[] key = baos.toByteArray(); - byte[] valueBytes = db.get(key); + byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { return Collections.emptyList(); @@ -155,55 +124,11 @@ public void add(V value) throws IOException { baos.reset(); valueSerializer.serialize(value, out); - db.merge(writeOptions, key, baos.toByteArray()); + backend.db.merge(columnFamily, writeOptions, key, baos.toByteArray()); } catch (Exception e) { throw new RuntimeException("Error while adding data to RocksDB", e); } } - - @Override - protected AbstractRocksDBSnapshot, ListStateDescriptor> createRocksDBSnapshot( - URI backupUri, - long checkpointId) { - - return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); - } - - private static class Snapshot extends - AbstractRocksDBSnapshot, ListStateDescriptor> - { - private static final long serialVersionUID = 1L; - - public Snapshot(File dbPath, - String checkpointPath, - URI backupUri, - long checkpointId, - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - ListStateDescriptor stateDesc) { - super(dbPath, - checkpointPath, - backupUri, - checkpointId, - keySerializer, - namespaceSerializer, - stateDesc); - } - - @Override - protected KvState, ListStateDescriptor, RocksDBStateBackend> createRocksDBState( - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - ListStateDescriptor stateDesc, - File basePath, - String backupPath, - String restorePath, - Options options) throws Exception { - - return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, basePath, - checkpointPath, restorePath, options); - } - } } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java index 6a965ccd0c36a..953660deb99f4 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java @@ -24,17 +24,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.KvState; -import org.rocksdb.Options; +import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; import org.rocksdb.WriteOptions; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.File; import java.io.IOException; -import java.net.URI; import static java.util.Objects.requireNonNull; @@ -45,7 +42,7 @@ * @param The type of the namespace. * @param The type of value that the state state stores. */ -public class RocksDBReducingState +class RocksDBReducingState extends AbstractRocksDBState, ReducingStateDescriptor> implements ReducingState { @@ -53,7 +50,7 @@ public class RocksDBReducingState private final TypeSerializer valueSerializer; /** This holds the name of the state and can create an initial default value for the state. */ - protected final ReducingStateDescriptor stateDesc; + private final ReducingStateDescriptor stateDesc; /** User-specified reduce function */ private final ReduceFunction reduceFunction; @@ -62,26 +59,21 @@ public class RocksDBReducingState * We disable writes to the write-ahead-log here. We can't have these in the base class * because JNI segfaults for some reason if they are. */ - protected final WriteOptions writeOptions; + private final WriteOptions writeOptions; /** * Creates a new {@code RocksDBReducingState}. * - * @param keySerializer The serializer for the keys. * @param namespaceSerializer The serializer for the namespace. * @param stateDesc The state identifier for the state. This contains name * and can create a default state value. - * @param dbPath The path on the local system where RocksDB data should be stored. - * @param backupPath The path where to store backups. */ - protected RocksDBReducingState(TypeSerializer keySerializer, + RocksDBReducingState(ColumnFamilyHandle columnFamily, TypeSerializer namespaceSerializer, ReducingStateDescriptor stateDesc, - File dbPath, - String backupPath, - Options options) { + RocksDBStateBackend backend) { - super(keySerializer, namespaceSerializer, dbPath, backupPath, options); + super(columnFamily, namespaceSerializer, backend); this.stateDesc = requireNonNull(stateDesc); this.valueSerializer = stateDesc.getSerializer(); this.reduceFunction = stateDesc.getReduceFunction(); @@ -90,34 +82,6 @@ protected RocksDBReducingState(TypeSerializer keySerializer, writeOptions.setDisableWAL(true); } - /** - * Creates a {@code RocksDBReducingState} by restoring from a directory. - * - * @param keySerializer The serializer for the keys. - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - * @param dbPath The path on the local system where RocksDB data should be stored. - * @param backupPath The path where to store backups. - * @param restorePath The path on the local file system that we are restoring from. - */ - protected RocksDBReducingState(TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - ReducingStateDescriptor stateDesc, - File dbPath, - String backupPath, - String restorePath, - Options options) { - - super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options); - this.stateDesc = stateDesc; - this.valueSerializer = stateDesc.getSerializer(); - this.reduceFunction = stateDesc.getReduceFunction(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); - } - @Override public V get() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -125,7 +89,7 @@ public V get() { try { writeKeyAndNamespace(out); byte[] key = baos.toByteArray(); - byte[] valueBytes = db.get(key); + byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { return null; } @@ -142,66 +106,22 @@ public void add(V value) throws IOException { try { writeKeyAndNamespace(out); byte[] key = baos.toByteArray(); - byte[] valueBytes = db.get(key); + byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { baos.reset(); valueSerializer.serialize(value, out); - db.put(writeOptions, key, baos.toByteArray()); + backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); } else { V oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); V newValue = reduceFunction.reduce(oldValue, value); baos.reset(); valueSerializer.serialize(newValue, out); - db.put(writeOptions, key, baos.toByteArray()); + backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); } } catch (Exception e) { throw new RuntimeException("Error while adding data to RocksDB", e); } } - - @Override - protected AbstractRocksDBSnapshot, ReducingStateDescriptor> createRocksDBSnapshot( - URI backupUri, - long checkpointId) { - - return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); - } - - private static class Snapshot extends - AbstractRocksDBSnapshot, ReducingStateDescriptor> - { - private static final long serialVersionUID = 1L; - - public Snapshot(File dbPath, - String checkpointPath, - URI backupUri, - long checkpointId, - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - ReducingStateDescriptor stateDesc) { - super(dbPath, - checkpointPath, - backupUri, - checkpointId, - keySerializer, - namespaceSerializer, - stateDesc); - } - - @Override - protected KvState, ReducingStateDescriptor, RocksDBStateBackend> createRocksDBState( - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - ReducingStateDescriptor stateDesc, - File basePath, - String backupPath, - String restorePath, - Options options) throws Exception { - - return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc, - basePath, checkpointPath, restorePath, options); - } - } } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 8f846daf495e6..3d63bd2431c3e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -17,15 +17,21 @@ package org.apache.flink.contrib.streaming.state; +import java.io.EOFException; import java.io.File; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; import java.net.URI; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.UUID; +import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; @@ -33,18 +39,40 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputView; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.api.common.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.rocksdb.Options; -import org.rocksdb.StringAppendOperator; +import org.apache.flink.streaming.util.HDFSCopyFromLocal; +import org.apache.flink.streaming.util.HDFSCopyToLocal; +import org.apache.hadoop.fs.FileSystem; +import org.rocksdb.BackupEngine; +import org.rocksdb.BackupableDBOptions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.Env; +import org.rocksdb.ReadOptions; +import org.rocksdb.RestoreOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +95,10 @@ public class RocksDBStateBackend extends AbstractStateBackend { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class); - + + // ------------------------------------------------------------------------ + // Static configuration values + // ------------------------------------------------------------------------ /** The checkpoint directory that we copy the RocksDB backups to. */ private final Path checkpointDirectory; @@ -75,6 +106,9 @@ public class RocksDBStateBackend extends AbstractStateBackend { /** The state backend that stores the non-partitioned state */ private final AbstractStateBackend nonPartitionedStateBackend; + /** Whether we do snapshots fully asynchronous */ + private boolean fullyAsyncBackup = false; + /** Operator identifier that is used to uniqueify the RocksDB storage path. */ private String operatorIdentifier; @@ -100,8 +134,35 @@ public class RocksDBStateBackend extends AbstractStateBackend { private OptionsFactory optionsFactory; /** The options from the options factory, cached */ - private transient Options rocksDbOptions; - + private transient DBOptions dbOptions; + private transient ColumnFamilyOptions columnOptions; + + // ------------------------------------------------------------------------ + // Per operator values that are set in initializerForJob + // ------------------------------------------------------------------------ + + /** Path where this configured instance stores its data directory */ + private transient File instanceBasePath; + + /** Path where this configured instance stores its RocksDB data base */ + private transient File instanceRocksDBPath; + + /** Base path where this configured instance stores checkpoints */ + private transient String instanceCheckpointPath; + + /** + * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState} + * to store state. The different k/v states that we have don't each have their own RocksDB + * instance. They all write to this instance but to their own column family. + */ + transient RocksDB db; + + /** + * Information about the k/v states as we create them. This is used to retrieve the + * column family that is used for a state and also for sanity checks when restoring. + */ + private Map> kvStateInformation; + // ------------------------------------------------------------------------ /** @@ -141,15 +202,11 @@ public RocksDBStateBackend(URI checkpointDataUri) throws IOException { } - public RocksDBStateBackend( - String checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException { - + public RocksDBStateBackend(String checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException { this(new Path(checkpointDataUri).toUri(), nonPartitionedStateBackend); } - public RocksDBStateBackend( - URI checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException { - + public RocksDBStateBackend(URI checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException { this.nonPartitionedStateBackend = requireNonNull(nonPartitionedStateBackend); this.checkpointDirectory = FsStateBackend.validateAndNormalizeUri(checkpointDataUri); } @@ -201,6 +258,40 @@ public void initializeForJob( } nextDirectory = new Random().nextInt(initializedDbBasePaths.length); + + instanceBasePath = new File(getDbPath("dummy_state"), UUID.randomUUID().toString()); + instanceCheckpointPath = getCheckpointPath("dummy_state"); + instanceRocksDBPath = new File(instanceBasePath, "db"); + + RocksDB.loadLibrary(); + + if (!instanceBasePath.exists()) { + if (!instanceBasePath.mkdirs()) { + throw new RuntimeException("Could not create RocksDB data directory."); + } + } + + // clean it, this will remove the last part of the path but RocksDB will recreate it + try { + if (instanceRocksDBPath.exists()) { + LOG.warn("Deleting already existing db directory {}.", instanceRocksDBPath); + FileUtils.deleteDirectory(instanceRocksDBPath); + } + } catch (IOException e) { + throw new RuntimeException("Error cleaning RocksDB data directory.", e); + } + + List columnFamilyDescriptors = new ArrayList<>(1); + // RocksDB seems to need this... + columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes())); + List columnFamilyHandles = new ArrayList<>(1); + try { + db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles); + } catch (RocksDBException e) { + throw new RuntimeException("Error while opening RocksDB instance.", e); + } + + kvStateInformation = new HashMap<>(); } @Override @@ -208,30 +299,44 @@ public void disposeAllStateForCurrentJob() throws Exception { nonPartitionedStateBackend.disposeAllStateForCurrentJob(); } + @Override + public void dispose() { + super.dispose(); + nonPartitionedStateBackend.dispose(); + + if (this.dbOptions != null) { + this.dbOptions.dispose(); + this.dbOptions = null; + } + for (Tuple2 column: kvStateInformation.values()) { + column.f0.dispose(); + } + db.dispose(); + } + @Override public void close() throws Exception { nonPartitionedStateBackend.close(); - Options opt = this.rocksDbOptions; - if (opt != null) { - opt.dispose(); - this.rocksDbOptions = null; + if (this.dbOptions != null) { + this.dbOptions.dispose(); + this.dbOptions = null; } + for (Tuple2 column: kvStateInformation.values()) { + column.f0.dispose(); + } + db.dispose(); } - File getDbPath(String stateName) { + private File getDbPath(String stateName) { return new File(new File(new File(getNextStoragePath(), jobId.toString()), operatorIdentifier), stateName); } - String getCheckpointPath(String stateName) { + private String getCheckpointPath(String stateName) { return checkpointDirectory + "/" + jobId.toString() + "/" + operatorIdentifier + "/" + stateName; } - - File[] getStoragePaths() { - return initializedDbBasePaths; - } - - File getNextStoragePath() { + + private File getNextStoragePath() { int ni = nextDirectory + 1; ni = ni >= initializedDbBasePaths.length ? 0 : ni; nextDirectory = ni; @@ -239,53 +344,545 @@ File getNextStoragePath() { return initializedDbBasePaths[ni]; } + /** + * Visible for tests. + */ + public File[] getStoragePaths() { + return initializedDbBasePaths; + } + + // ------------------------------------------------------------------------ + // Snapshot and restore + // ------------------------------------------------------------------------ + + @Override + public HashMap> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception { + if (keyValueStatesByName == null || keyValueStatesByName.size() == 0) { + return new HashMap<>(); + } + + if (fullyAsyncBackup) { + return performFullyAsyncSnapshot(checkpointId, timestamp); + } else { + return performSemiAsyncSnapshot(checkpointId, timestamp); + } + } + + /** + * Performs a checkpoint by using the RocksDB backup feature to backup to a directory. + * This backup is the asynchronously copied to the final checkpoint location. + */ + private HashMap> performSemiAsyncSnapshot(long checkpointId, long timestamp) throws Exception { + // We don't snapshot individual k/v states since everything is stored in a central + // RocksDB data base. Create a dummy KvStateSnapshot that holds the information about + // that checkpoint. We use the in injectKeyValueStateSnapshots to restore. + + final File localBackupPath = new File(instanceBasePath, "local-chk-" + checkpointId); + final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId); + + if (!localBackupPath.exists()) { + if (!localBackupPath.mkdirs()) { + throw new RuntimeException("Could not create local backup path " + localBackupPath); + } + } + + long startTime = System.currentTimeMillis(); + + BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath()); + // we disabled the WAL + backupOptions.setBackupLogFiles(false); + // no need to sync since we use the backup only as intermediate data before writing to FileSystem snapshot + backupOptions.setSync(false); + + try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), backupOptions)) { + // wait before flush with "true" + backupEngine.createNewBackup(db, true); + } + + long endTime = System.currentTimeMillis(); + LOG.info("RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms."); + + // draw a copy in case it get's changed while performing the async snapshot + List kvStateInformationCopy = new ArrayList<>(); + for (Tuple2 state: kvStateInformation.values()) { + kvStateInformationCopy.add(state.f1); + } + SemiAsyncSnapshot dummySnapshot = new SemiAsyncSnapshot(localBackupPath, + backupUri, + kvStateInformationCopy, + checkpointId); + + + HashMap> result = new HashMap<>(); + result.put("dummy_state", dummySnapshot); + return result; + } + + /** + * Performs a checkpoint by drawing a {@link org.rocksdb.Snapshot} from RocksDB and then + * iterating over all key/value pairs in RocksDB to store them in the final checkpoint + * location. The only synchronous part is the drawing of the {@code Snapshot} which + * is essentially free. + */ + private HashMap> performFullyAsyncSnapshot(long checkpointId, long timestamp) throws Exception { + // we draw a snapshot from RocksDB then iterate over all keys at that point + // and store them in the backup location + + final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId); + + long startTime = System.currentTimeMillis(); + + org.rocksdb.Snapshot snapshot = db.getSnapshot(); + + long endTime = System.currentTimeMillis(); + LOG.info("Fully asynchronous RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms."); + + // draw a copy in case it get's changed while performing the async snapshot + Map> columnFamiliesCopy = new HashMap<>(); + columnFamiliesCopy.putAll(kvStateInformation); + FullyAsynSnapshot dummySnapshot = new FullyAsynSnapshot(db, + snapshot, + this, + backupUri, + columnFamiliesCopy, + checkpointId); + + + HashMap> result = new HashMap<>(); + result.put("dummy_state", dummySnapshot); + return result; + } + + @Override + public final void injectKeyValueStateSnapshots(HashMap keyValueStateSnapshots, long recoveryTimestamp) throws Exception { + if (keyValueStateSnapshots.size() == 0) { + return; + } + + KvStateSnapshot dummyState = keyValueStateSnapshots.get("dummy_state"); + if (dummyState instanceof FinalSemiAsyncSnapshot) { + restoreFromSemiAsyncSnapshot((FinalSemiAsyncSnapshot) dummyState); + } else if (dummyState instanceof FinalFullyAsyncSnapshot) { + restoreFromFullyAsyncSnapshot((FinalFullyAsyncSnapshot) dummyState); + } else { + throw new RuntimeException("Unknown RocksDB snapshot: " + dummyState); + } + } + + private void restoreFromSemiAsyncSnapshot(FinalSemiAsyncSnapshot snapshot) throws Exception { + // This does mostly the same work as initializeForJob, we remove the existing RocksDB + // directory and create a new one from the backup. + // This must be refactored. The StateBackend should either be initialized from + // scratch or from a snapshot. + + if (!instanceBasePath.exists()) { + if (!instanceBasePath.mkdirs()) { + throw new RuntimeException("Could not create RocksDB data directory."); + } + } + + db.dispose(); + + // clean it, this will remove the last part of the path but RocksDB will recreate it + try { + if (instanceRocksDBPath.exists()) { + LOG.warn("Deleting already existing db directory {}.", instanceRocksDBPath); + FileUtils.deleteDirectory(instanceRocksDBPath); + } + } catch (IOException e) { + throw new RuntimeException("Error cleaning RocksDB data directory.", e); + } + + final File localBackupPath = new File(instanceBasePath, "chk-" + snapshot.checkpointId); + + if (localBackupPath.exists()) { + try { + LOG.warn("Deleting already existing local backup directory {}.", localBackupPath); + FileUtils.deleteDirectory(localBackupPath); + } catch (IOException e) { + throw new RuntimeException("Error cleaning RocksDB local backup directory.", e); + } + } + + HDFSCopyToLocal.copyToLocal(snapshot.backupUri, instanceBasePath); + + try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(localBackupPath.getAbsolutePath()))) { + backupEngine.restoreDbFromLatestBackup(instanceRocksDBPath.getAbsolutePath(), instanceRocksDBPath.getAbsolutePath(), new RestoreOptions(true)); + } catch (RocksDBException|IllegalArgumentException e) { + throw new RuntimeException("Error while restoring RocksDB state from " + localBackupPath, e); + } finally { + try { + FileUtils.deleteDirectory(localBackupPath); + } catch (IOException e) { + LOG.error("Error cleaning up local restore directory " + localBackupPath, e); + } + } + + + List columnFamilyDescriptors = new ArrayList<>(snapshot.stateDescriptors.size()); + for (StateDescriptor stateDescriptor: snapshot.stateDescriptors) { + columnFamilyDescriptors.add(new ColumnFamilyDescriptor(stateDescriptor.getName().getBytes(), getColumnOptions())); + } + + // RocksDB seems to need this... + columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes())); + List columnFamilyHandles = new ArrayList<>(snapshot.stateDescriptors.size()); + try { + + db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles); + this.kvStateInformation = new HashMap<>(); + for (int i = 0; i < snapshot.stateDescriptors.size(); i++) { + this.kvStateInformation.put(snapshot.stateDescriptors.get(i).getName(), new Tuple2<>(columnFamilyHandles.get(i), snapshot.stateDescriptors.get(i))); + } + + } catch (RocksDBException e) { + throw new RuntimeException("Error while opening RocksDB instance.", e); + } + } + + private void restoreFromFullyAsyncSnapshot(FinalFullyAsyncSnapshot snapshot) throws Exception { + + DataInputView inputView = snapshot.stateHandle.getState(userCodeClassLoader); + + // clear k/v state information before filling it + kvStateInformation.clear(); + + // first get the column family mapping + int numColumns = inputView.readInt(); + Map columnFamilyMapping = new HashMap<>(numColumns); + for (int i = 0; i < numColumns; i++) { + byte mappingByte = inputView.readByte(); + + ObjectInputStream ooIn = new ObjectInputStream(new DataInputViewStream(inputView)); + StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject(); + + columnFamilyMapping.put(mappingByte, stateDescriptor); + + // this will fill in the k/v state information + getColumnFamily(stateDescriptor); + } + + // try and read until EOF + try { + // the EOFException will get us out of this... + while (true) { + byte mappingByte = inputView.readByte(); + ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte)); + byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView); + byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView); + db.put(handle, key, value); + } + } catch (EOFException e) { + // expected + } + } + + // ------------------------------------------------------------------------ + // Semi-asynchronous Backup Classes + // ------------------------------------------------------------------------ + + /** + * Upon snapshotting the RocksDB backup is created synchronously. The asynchronous part is + * copying the backup to a (possibly) remote filesystem. This is done in {@link #materialize()}. + */ + private static class SemiAsyncSnapshot extends AsynchronousKvStateSnapshot, ValueStateDescriptor, RocksDBStateBackend> { + private static final long serialVersionUID = 1L; + private final File localBackupPath; + private final URI backupUri; + private final List stateDescriptors; + private final long checkpointId; + + private SemiAsyncSnapshot(File localBackupPath, + URI backupUri, + List columnFamilies, + long checkpointId) { + this.localBackupPath = localBackupPath; + this.backupUri = backupUri; + this.stateDescriptors = columnFamilies; + this.checkpointId = checkpointId; + } + + @Override + public KvStateSnapshot, ValueStateDescriptor, RocksDBStateBackend> materialize() throws Exception { + try { + long startTime = System.currentTimeMillis(); + HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri); + long endTime = System.currentTimeMillis(); + LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms."); + return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors); + } catch (Exception e) { + FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration()); + fs.delete(new org.apache.hadoop.fs.Path(backupUri), true); + throw e; + } finally { + FileUtils.deleteQuietly(localBackupPath); + } + } + } + + /** + * Dummy {@link KvStateSnapshot} that holds the state of our one RocksDB data base. This + * also stores the column families that we had at the time of the snapshot so that we can + * restore these. This results from {@link SemiAsyncSnapshot}. + */ + private static class FinalSemiAsyncSnapshot implements KvStateSnapshot, ValueStateDescriptor, RocksDBStateBackend> { + private static final long serialVersionUID = 1L; + + final URI backupUri; + final long checkpointId; + private final List stateDescriptors; + + /** + * Creates a new snapshot from the given state parameters. + */ + private FinalSemiAsyncSnapshot(URI backupUri, long checkpointId, List stateDescriptors) { + this.backupUri = backupUri; + this.checkpointId = checkpointId; + this.stateDescriptors = stateDescriptors; + } + + @Override + public final KvState, ValueStateDescriptor, RocksDBStateBackend> restoreState( + RocksDBStateBackend stateBackend, + TypeSerializer keySerializer, + ClassLoader classLoader, + long recoveryTimestamp) throws Exception { + throw new RuntimeException("Should never happen."); + } + + @Override + public final void discardState() throws Exception { + FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration()); + fs.delete(new org.apache.hadoop.fs.Path(backupUri), true); + } + + @Override + public final long getStateSize() throws Exception { + FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration()); + return fs.getContentSummary(new org.apache.hadoop.fs.Path(backupUri)).getLength(); + } + } + + // ------------------------------------------------------------------------ + // Fully asynchronous Backup Classes + // ------------------------------------------------------------------------ + + /** + * This does the snapshot using a RocksDB snapshot and an iterator over all keys + * at the point of that snapshot. + */ + private static class FullyAsynSnapshot extends AsynchronousKvStateSnapshot, ValueStateDescriptor, RocksDBStateBackend> { + private static final long serialVersionUID = 1L; + + private transient final RocksDB db; + private transient org.rocksdb.Snapshot snapshot; + private transient AbstractStateBackend backend; + + private final URI backupUri; + private final Map> columnFamilies; + private final long checkpointId; + + private FullyAsynSnapshot(RocksDB db, + org.rocksdb.Snapshot snapshot, + AbstractStateBackend backend, + URI backupUri, + Map> columnFamilies, + long checkpointId) { + this.db = db; + this.snapshot = snapshot; + this.backend = backend; + this.backupUri = backupUri; + this.columnFamilies = columnFamilies; + this.checkpointId = checkpointId; + } + + @Override + public KvStateSnapshot, ValueStateDescriptor, RocksDBStateBackend> materialize() throws Exception { + try { + long startTime = System.currentTimeMillis(); + + CheckpointStateOutputView outputView = backend.createCheckpointStateOutputView(checkpointId, startTime); + + outputView.writeInt(columnFamilies.size()); + + // we don't know how many key/value pairs there are in each column family. + // We prefix every written element with a byte that signifies to which + // column family it belongs, this way we can restore the column families + byte count = 0; + Map columnFamilyMapping = new HashMap<>(); + for (Map.Entry> column: columnFamilies.entrySet()) { + columnFamilyMapping.put(column.getKey(), count); + + outputView.writeByte(count); + + ObjectOutputStream ooOut = new ObjectOutputStream(outputView); + ooOut.writeObject(column.getValue().f1); + ooOut.flush(); + + count++; + } + + for (Map.Entry> column: columnFamilies.entrySet()) { + byte columnByte = columnFamilyMapping.get(column.getKey()); + ReadOptions readOptions = new ReadOptions(); + readOptions.setSnapshot(snapshot); + RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions); + iterator.seekToFirst(); + while (iterator.isValid()) { + outputView.writeByte(columnByte); + BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(), outputView); + BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(), outputView); + iterator.next(); + } + } + + StateHandle stateHandle = outputView.closeAndGetHandle(); + + long endTime = System.currentTimeMillis(); + LOG.info("Fully asynchronous RocksDB materialization to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms."); + return new FinalFullyAsyncSnapshot(stateHandle, checkpointId); + } finally { + db.releaseSnapshot(snapshot); + snapshot = null; + } + } + + @Override + protected void finalize() throws Throwable { + if (snapshot != null) { + db.releaseSnapshot(snapshot); + } + super.finalize(); + } + } + + /** + * Dummy {@link KvStateSnapshot} that holds the state of our one RocksDB data base. This + * results from {@link FullyAsynSnapshot}. + */ + private static class FinalFullyAsyncSnapshot implements KvStateSnapshot, ValueStateDescriptor, RocksDBStateBackend> { + private static final long serialVersionUID = 1L; + + final StateHandle stateHandle; + final long checkpointId; + + /** + * Creates a new snapshot from the given state parameters. + */ + private FinalFullyAsyncSnapshot(StateHandle stateHandle, long checkpointId) { + this.stateHandle = stateHandle; + this.checkpointId = checkpointId; + } + + @Override + public final KvState, ValueStateDescriptor, RocksDBStateBackend> restoreState( + RocksDBStateBackend stateBackend, + TypeSerializer keySerializer, + ClassLoader classLoader, + long recoveryTimestamp) throws Exception { + throw new RuntimeException("Should never happen."); + } + + @Override + public final void discardState() throws Exception { + stateHandle.discardState(); + } + + @Override + public final long getStateSize() throws Exception { + return stateHandle.getStateSize(); + } + } + // ------------------------------------------------------------------------ // State factories // ------------------------------------------------------------------------ - + + /** + * Creates a column family handle for use with a k/v state. When restoring from a snapshot + * we don't restore the individual k/v states, just the global RocksDB data base and the + * list of column families. When a k/v state is first requested we check here whether we + * already have a column family for that and return it or create a new one if it doesn't exist. + * + *

This also checks whether the {@link StateDescriptor} for a state matches the one + * that we checkpointed, i.e. is already in the map of column families. + */ + private ColumnFamilyHandle getColumnFamily(StateDescriptor descriptor) { + + Tuple2 stateInfo = kvStateInformation.get(descriptor.getName()); + + if (stateInfo != null) { + if (!stateInfo.f1.equals(descriptor)) { + throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 + " trying access with " + descriptor); + } + return stateInfo.f0; + } + + ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(descriptor.getName().getBytes(), getColumnOptions()); + + try { + ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor); + kvStateInformation.put(descriptor.getName(), new Tuple2<>(columnFamily, descriptor)); + return columnFamily; + } catch (RocksDBException e) { + throw new RuntimeException("Error creating ColumnFamilyHandle.", e); + } + } + + /** + * Used by k/v states to access the current key. + */ + Object currentKey() { + return currentKey; + } + + /** + * Used by k/v states to access the key serializer. + */ + TypeSerializer keySerializer() { + return keySerializer; + } + @Override protected ValueState createValueState(TypeSerializer namespaceSerializer, ValueStateDescriptor stateDesc) throws Exception { - File dbPath = getDbPath(stateDesc.getName()); - String checkpointPath = getCheckpointPath(stateDesc.getName()); - - return new RocksDBValueState<>(keySerializer, namespaceSerializer, - stateDesc, dbPath, checkpointPath, getRocksDBOptions()); + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc); + + return new RocksDBValueState<>(columnFamily, namespaceSerializer, stateDesc, this); } @Override protected ListState createListState(TypeSerializer namespaceSerializer, ListStateDescriptor stateDesc) throws Exception { - File dbPath = getDbPath(stateDesc.getName()); - String checkpointPath = getCheckpointPath(stateDesc.getName()); - - return new RocksDBListState<>(keySerializer, namespaceSerializer, - stateDesc, dbPath, checkpointPath, getRocksDBOptions()); + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc); + + return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this); } @Override protected ReducingState createReducingState(TypeSerializer namespaceSerializer, ReducingStateDescriptor stateDesc) throws Exception { - File dbPath = getDbPath(stateDesc.getName()); - String checkpointPath = getCheckpointPath(stateDesc.getName()); - - return new RocksDBReducingState<>(keySerializer, namespaceSerializer, - stateDesc, dbPath, checkpointPath, getRocksDBOptions()); + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc); + + return new RocksDBReducingState<>(columnFamily, namespaceSerializer, stateDesc, this); } @Override protected FoldingState createFoldingState(TypeSerializer namespaceSerializer, FoldingStateDescriptor stateDesc) throws Exception { - File dbPath = getDbPath(stateDesc.getName()); - String checkpointPath = getCheckpointPath(stateDesc.getName()); - return new RocksDBFoldingState<>(keySerializer, namespaceSerializer, - stateDesc, dbPath, checkpointPath, getRocksDBOptions()); + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc); + + return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this); } + // ------------------------------------------------------------------------ + // Non-partitioned state + // ------------------------------------------------------------------------ + @Override public CheckpointStateOutputStream createCheckpointStateOutputStream( long checkpointID, long timestamp) throws Exception { @@ -304,6 +901,28 @@ public StateHandle checkpointStateSerializable( // Parameters // ------------------------------------------------------------------------ + /** + * Enables fully asynchronous snapshotting of the partitioned state held in RocksDB. + * + *

By default, this is disabled. This means that RocksDB state is copied in a synchronous + * step, during which normal processing of elements pauses, followed by an asynchronous step + * of copying the RocksDB backup to the final checkpoint location. Fully asynchronous + * snapshots take longer (linear time requirement with respect to number of unique keys) + * but normal processing of elements is not paused. + */ + public void enableFullyAsyncSnapshots() { + this.fullyAsyncBackup = true; + } + + /** + * Disables fully asynchronous snapshotting of the partitioned state held in RocksDB. + * + *

By default, this is disabled. + */ + public void disableFullyAsyncSnapshots() { + this.fullyAsyncBackup = false; + } + /** * Sets the path where the RocksDB local database files should be stored on the local * file system. Setting this path overrides the default behavior, where the @@ -431,24 +1050,41 @@ public OptionsFactory getOptions() { } /** - * Gets the RocksDB Options to be used for all RocksDB instances. + * Gets the RocksDB {@link DBOptions} to be used for all RocksDB instances. */ - Options getRocksDBOptions() { - if (rocksDbOptions == null) { + public DBOptions getDbOptions() { + if (dbOptions == null) { // initial options from pre-defined profile - Options opt = predefinedOptions.createOptions(); + DBOptions opt = predefinedOptions.createDBOptions(); // add user-defined options, if specified if (optionsFactory != null) { - opt = optionsFactory.createOptions(opt); + opt = optionsFactory.createDBOptions(opt); } // add necessary default options opt = opt.setCreateIfMissing(true); - opt = opt.setMergeOperator(new StringAppendOperator()); - - rocksDbOptions = opt; + + dbOptions = opt; + } + return dbOptions; + } + + /** + * Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances. + */ + public ColumnFamilyOptions getColumnOptions() { + if (columnOptions == null) { + // initial options from pre-defined profile + ColumnFamilyOptions opt = predefinedOptions.createColumnOptions(); + + // add user-defined options, if specified + if (optionsFactory != null) { + opt = optionsFactory.createColumnOptions(opt); + } + + columnOptions = opt; } - return rocksDbOptions; + return columnOptions; } } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java index 15b460ada85f9..2af06adcbdb3f 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java @@ -23,17 +23,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.KvState; -import org.rocksdb.Options; +import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; import org.rocksdb.WriteOptions; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.File; import java.io.IOException; -import java.net.URI; import static java.util.Objects.requireNonNull; @@ -44,7 +41,7 @@ * @param The type of the namespace. * @param The type of value that the state state stores. */ -public class RocksDBValueState +class RocksDBValueState extends AbstractRocksDBState, ValueStateDescriptor> implements ValueState { @@ -52,60 +49,28 @@ public class RocksDBValueState private final TypeSerializer valueSerializer; /** This holds the name of the state and can create an initial default value for the state. */ - protected final ValueStateDescriptor stateDesc; + private final ValueStateDescriptor stateDesc; /** * We disable writes to the write-ahead-log here. We can't have these in the base class * because JNI segfaults for some reason if they are. */ - protected final WriteOptions writeOptions; + private final WriteOptions writeOptions; /** * Creates a new {@code RocksDBValueState}. * - * @param keySerializer The serializer for the keys. * @param namespaceSerializer The serializer for the namespace. * @param stateDesc The state identifier for the state. This contains name * and can create a default state value. - * @param dbPath The path on the local system where RocksDB data should be stored. - * @param backupPath The path where to store backups. */ - protected RocksDBValueState(TypeSerializer keySerializer, + RocksDBValueState(ColumnFamilyHandle columnFamily, TypeSerializer namespaceSerializer, ValueStateDescriptor stateDesc, - File dbPath, - String backupPath, - Options options) { - - super(keySerializer, namespaceSerializer, dbPath, backupPath, options); - this.stateDesc = requireNonNull(stateDesc); - this.valueSerializer = stateDesc.getSerializer(); + RocksDBStateBackend backend) { - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); - } - - /** - * Creates a {@code RocksDBValueState} by restoring from a directory. - * - * @param keySerializer The serializer for the keys. - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - * @param dbPath The path on the local system where RocksDB data should be stored. - * @param backupPath The path where to store backups. - * @param restorePath The path on the local file system that we are restoring from. - */ - protected RocksDBValueState(TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - ValueStateDescriptor stateDesc, - File dbPath, - String backupPath, - String restorePath, - Options options) { - - super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options); - this.stateDesc = stateDesc; + super(columnFamily, namespaceSerializer, backend); + this.stateDesc = requireNonNull(stateDesc); this.valueSerializer = stateDesc.getSerializer(); writeOptions = new WriteOptions(); @@ -119,13 +84,13 @@ public V value() { try { writeKeyAndNamespace(out); byte[] key = baos.toByteArray(); - byte[] valueBytes = db.get(key); + byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { return stateDesc.getDefaultValue(); } return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); } catch (IOException|RocksDBException e) { - throw new RuntimeException("Error while retrieving data from RocksDB", e); + throw new RuntimeException("Error while retrieving data from RocksDB.", e); } } @@ -142,54 +107,10 @@ public void update(V value) throws IOException { byte[] key = baos.toByteArray(); baos.reset(); valueSerializer.serialize(value, out); - db.put(writeOptions, key, baos.toByteArray()); + backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); } catch (Exception e) { throw new RuntimeException("Error while adding data to RocksDB", e); } } - - @Override - protected AbstractRocksDBSnapshot, ValueStateDescriptor> createRocksDBSnapshot( - URI backupUri, - long checkpointId) { - - return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); - } - - private static class Snapshot - extends AbstractRocksDBSnapshot, ValueStateDescriptor> - { - private static final long serialVersionUID = 1L; - - public Snapshot(File dbPath, - String checkpointPath, - URI backupUri, - long checkpointId, - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - ValueStateDescriptor stateDesc) { - super(dbPath, - checkpointPath, - backupUri, - checkpointId, - keySerializer, - namespaceSerializer, - stateDesc); - } - - @Override - protected KvState, ValueStateDescriptor, RocksDBStateBackend> createRocksDBState( - TypeSerializer keySerializer, - TypeSerializer namespaceSerializer, - ValueStateDescriptor stateDesc, - File basePath, - String backupPath, - String restorePath, - Options options) throws Exception { - - return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, basePath, - checkpointPath, restorePath, options); - } - } } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/FullyAsyncRocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/FullyAsyncRocksDBStateBackendTest.java new file mode 100644 index 0000000000000..7861542521657 --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/FullyAsyncRocksDBStateBackendTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.state.StateBackendTestBase; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.OperatingSystem; +import org.junit.Assume; +import org.junit.Before; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +/** + * Tests for the partitioned state part of {@link RocksDBStateBackend} with fully asynchronous + * checkpointing enabled. + */ +public class FullyAsyncRocksDBStateBackendTest extends StateBackendTestBase { + + private File dbDir; + private File chkDir; + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows()); + } + + @Override + protected RocksDBStateBackend getStateBackend() throws IOException { + dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state"); + chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots"); + + RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend()); + backend.setDbStoragePath(dbDir.getAbsolutePath()); + backend.enableFullyAsyncSnapshots(); + return backend; + } + + @Override + protected void cleanup() { + try { + FileUtils.deleteDirectory(dbDir); + FileUtils.deleteDirectory(chkDir); + } catch (IOException ignore) {} + } +} diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java index 8f3c59d51baf2..8eb8dfe95dbfe 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java @@ -92,7 +92,7 @@ public void testAsyncCheckpoints() throws Exception { final OneShotLatch ensureCheckpointLatch = new OneShotLatch(); final OneInputStreamTask task = new OneInputStreamTask<>(); - + final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); testHarness.configureForKeyedStream(new KeySelector() { @@ -144,7 +144,109 @@ public void acknowledgeCheckpoint(long checkpointId, StateHandle state) { // should be only one k/v state StreamTaskState taskState = stateList.getState(this.getUserClassLoader())[0]; assertEquals(1, taskState.getKvStates().size()); - assertTrue(taskState.getKvStates().get("count") instanceof AbstractRocksDBState.AbstractRocksDBSnapshot); + + // we now know that the checkpoint went through + ensureCheckpointLatch.trigger(); + } + }; + + testHarness.invoke(mockEnv); + + // wait for the task to be running + for (Field field: StreamTask.class.getDeclaredFields()) { + if (field.getName().equals("isRunning")) { + field.setAccessible(true); + while (!field.getBoolean(task)) { + Thread.sleep(10); + } + + } + } + + testHarness.processElement(new StreamRecord<>("Wohoo", 0)); + + task.triggerCheckpoint(42, 17); + + // now we allow the checkpoint + delayCheckpointLatch.trigger(); + + // wait for the checkpoint to go through + ensureCheckpointLatch.await(); + + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + } + + /** + * This ensures that asynchronous state handles are actually materialized asynchonously. + * + *

We use latches to block at various stages and see if the code still continues through + * the parts that are not asynchronous. If the checkpoint is not done asynchronously the + * test will simply lock forever. + */ + @Test + public void testFullyAsyncCheckpoints() throws Exception { + LocalFileSystem localFS = new LocalFileSystem(); + localFS.initialize(new URI("file:///"), new Configuration()); + PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", URI.class, Configuration.class)).toReturn(localFS); + + final OneShotLatch delayCheckpointLatch = new OneShotLatch(); + final OneShotLatch ensureCheckpointLatch = new OneShotLatch(); + + final OneInputStreamTask task = new OneInputStreamTask<>(); + + final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.configureForKeyedStream(new KeySelector() { + @Override + public String getKey(String value) throws Exception { + return value; + } + }, BasicTypeInfo.STRING_TYPE_INFO); + + StreamConfig streamConfig = testHarness.getStreamConfig(); + + File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state"); + File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots"); + + RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend()); + backend.setDbStoragePath(dbDir.getAbsolutePath()); + backend.enableFullyAsyncSnapshots(); + + streamConfig.setStateBackend(backend); + + streamConfig.setStreamOperator(new AsyncCheckpointOperator()); + + StreamMockEnvironment mockEnv = new StreamMockEnvironment( + testHarness.jobConfig, + testHarness.taskConfig, + testHarness.memorySize, + new MockInputSplitProvider(), + testHarness.bufferSize) { + + @Override + public void acknowledgeCheckpoint(long checkpointId) { + super.acknowledgeCheckpoint(checkpointId); + } + + @Override + public void acknowledgeCheckpoint(long checkpointId, StateHandle state) { + super.acknowledgeCheckpoint(checkpointId, state); + + // block on the latch, to verify that triggerCheckpoint returns below, + // even though the async checkpoint would not finish + try { + delayCheckpointLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + assertTrue(state instanceof StreamTaskStateList); + StreamTaskStateList stateList = (StreamTaskStateList) state; + + // should be only one k/v state + StreamTaskState taskState = stateList.getState(this.getUserClassLoader())[0]; + assertEquals(1, taskState.getKvStates().size()); // we now know that the checkpoint went through ensureCheckpointLatch.trigger(); diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 42ba2755d4a4a..fca577369f9ab 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -33,8 +33,9 @@ import org.junit.Before; import org.junit.Test; +import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactionStyle; -import org.rocksdb.Options; +import org.rocksdb.DBOptions; import java.io.File; import java.util.UUID; @@ -162,7 +163,7 @@ public void testContinueOnSomeDbDirectoriesMissing() throws Exception { try { assertTrue(targetDir1.mkdirs()); assertTrue(targetDir2.mkdirs()); - + if (!targetDir1.setWritable(false, false)) { System.err.println("Cannot execute 'testContinueOnSomeDbDirectoriesMissing' because cannot mark directory non-writable"); return; @@ -201,13 +202,18 @@ public void testPredefinedOptions() throws Exception { rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions()); - - Options opt1 = rocksDbBackend.getRocksDBOptions(); - Options opt2 = rocksDbBackend.getRocksDBOptions(); + + DBOptions opt1 = rocksDbBackend.getDbOptions(); + DBOptions opt2 = rocksDbBackend.getDbOptions(); assertEquals(opt1, opt2); - - assertEquals(CompactionStyle.LEVEL, opt1.compactionStyle()); + + ColumnFamilyOptions columnOpt1 = rocksDbBackend.getColumnOptions(); + ColumnFamilyOptions columnOpt2 = rocksDbBackend.getColumnOptions(); + + assertEquals(columnOpt1, columnOpt2); + + assertEquals(CompactionStyle.LEVEL, columnOpt1.compactionStyle()); } @Test @@ -216,13 +222,18 @@ public void testOptionsFactory() throws Exception { rocksDbBackend.setOptions(new OptionsFactory() { @Override - public Options createOptions(Options currentOptions) { + public DBOptions createDBOptions(DBOptions currentOptions) { + return currentOptions; + } + + @Override + public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { return currentOptions.setCompactionStyle(CompactionStyle.FIFO); } }); assertNotNull(rocksDbBackend.getOptions()); - assertEquals(CompactionStyle.FIFO, rocksDbBackend.getRocksDBOptions().compactionStyle()); + assertEquals(CompactionStyle.FIFO, rocksDbBackend.getColumnOptions().compactionStyle()); } @Test @@ -234,20 +245,25 @@ public void testPredefinedAndOptionsFactory() throws Exception { rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); rocksDbBackend.setOptions(new OptionsFactory() { @Override - public Options createOptions(Options currentOptions) { + public DBOptions createDBOptions(DBOptions currentOptions) { + return currentOptions; + } + + @Override + public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { return currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL); } }); assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions()); assertNotNull(rocksDbBackend.getOptions()); - assertEquals(CompactionStyle.UNIVERSAL, rocksDbBackend.getRocksDBOptions().compactionStyle()); + assertEquals(CompactionStyle.UNIVERSAL, rocksDbBackend.getColumnOptions().compactionStyle()); } @Test public void testPredefinedOptionsEnum() { for (PredefinedOptions o : PredefinedOptions.values()) { - Options opt = o.createOptions(); + DBOptions opt = o.createDBOptions(); try { assertNotNull(opt); } finally { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java index 034e97acee753..32fa9f3ee1917 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java @@ -107,4 +107,35 @@ public FoldingState bind(StateBackend stateBackend) throws Exception { public FoldFunction getFoldFunction() { return foldFunction; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FoldingStateDescriptor that = (FoldingStateDescriptor) o; + + return serializer.equals(that.serializer) && name.equals(that.name); + + } + + @Override + public int hashCode() { + int result = serializer.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "FoldingStateDescriptor{" + + "serializer=" + serializer + + ", initialValue=" + defaultValue + + ", foldFunction=" + foldFunction + + '}'; + } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java index 8807e8e1095fb..077109cfff235 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java @@ -72,4 +72,33 @@ public ListStateDescriptor(String name, TypeSerializer typeSerializer) { public ListState bind(StateBackend stateBackend) throws Exception { return stateBackend.createListState(this); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ListStateDescriptor that = (ListStateDescriptor) o; + + return serializer.equals(that.serializer) && name.equals(that.name); + + } + + @Override + public int hashCode() { + int result = serializer.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "ListStateDescriptor{" + + "serializer=" + serializer + + '}'; + } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java index 7bf1be9cd6d8b..8d79da4b74489 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java @@ -96,4 +96,34 @@ public ReducingState bind(StateBackend stateBackend) throws Exception { public ReduceFunction getReduceFunction() { return reduceFunction; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ReducingStateDescriptor that = (ReducingStateDescriptor) o; + + return serializer.equals(that.serializer) && name.equals(that.name); + + } + + @Override + public int hashCode() { + int result = serializer.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "ReducingStateDescriptor{" + + "serializer=" + serializer + + ", reduceFunction=" + reduceFunction + + '}'; + } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java index 243ebcd20bb66..87ed71d6b1a8b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -49,14 +49,14 @@ public abstract class StateDescriptor implements Serializabl private static final long serialVersionUID = 1L; /** Name that uniquely identifies state created from this StateDescriptor. */ - private final String name; + protected final String name; /** The serializer for the type. May be eagerly initialized in the constructor, * or lazily once the type is serialized or an ExecutionConfig is provided. */ - private TypeSerializer serializer; + protected TypeSerializer serializer; /** The default value returned by the state when no other value is bound to a key */ - private transient T defaultValue; + protected transient T defaultValue; /** The type information describing the value type. Only used to lazily create the serializer * and dropped during serialization */ @@ -210,23 +210,10 @@ private void ensureSerializerCreated() { // ------------------------------------------------------------------------ @Override - public int hashCode() { - return name.hashCode() + 41; - } + public abstract int hashCode(); @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - else if (o == null || getClass() != o.getClass()) { - return false; - } - else { - StateDescriptor that = (StateDescriptor) o; - return this.name.equals(that.name); - } - } + public abstract boolean equals(Object o); @Override public String toString() { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java index 0c61bb10d7daa..10bcd589852d7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java @@ -79,4 +79,35 @@ public ValueStateDescriptor(String name, TypeSerializer typeSerializer, T def public ValueState bind(StateBackend stateBackend) throws Exception { return stateBackend.createValueState(this); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ValueStateDescriptor that = (ValueStateDescriptor) o; + + return serializer.equals(that.serializer) && name.equals(that.name); + + } + + @Override + public int hashCode() { + int result = serializer.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "ValueStateDescriptor{" + + "name=" + name + + ", defaultValue=" + defaultValue + + ", serializer=" + serializer + + '}'; + } } diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java index 942a3c9934f1b..e32c1a64f3798 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java @@ -119,7 +119,20 @@ protected void cleanup() throws Exception { // ------------------------------------------------------------------------ // Tests // ------------------------------------------------------------------------ - + + // disable these because the verification does not work for this state backend + @Override + @Test + public void testValueStateRestoreWithWrongSerializers() {} + + @Override + @Test + public void testListStateRestoreWithWrongSerializers() {} + + @Override + @Test + public void testReducingStateRestoreWithWrongSerializers() {} + @Test public void testSetupAndSerialization() { try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index df6fcd38f1597..95ca13f11b418 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -65,7 +65,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable { private transient KvState[] keyValueStates; /** So that we can give out state when the user uses the same key. */ - private transient HashMap> keyValueStatesByName; + protected transient HashMap> keyValueStatesByName; /** For caching the last accessed partitioned state */ private transient String lastName; @@ -110,11 +110,15 @@ public void initializeForJob(Environment env, public abstract void close() throws Exception; public void dispose() { + lastName = null; + lastState = null; if (keyValueStates != null) { for (KvState state : keyValueStates) { state.dispose(); } } + keyValueStates = null; + keyValueStatesByName = null; } // ------------------------------------------------------------------------ @@ -337,7 +341,7 @@ public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { * @param keyValueStateSnapshots The Map of snapshots */ @SuppressWarnings("unchecked,rawtypes") - public final void injectKeyValueStateSnapshots(HashMap keyValueStateSnapshots, long recoveryTimestamp) throws Exception { + public void injectKeyValueStateSnapshots(HashMap keyValueStateSnapshots, long recoveryTimestamp) throws Exception { if (keyValueStateSnapshots != null) { if (keyValueStatesByName == null) { keyValueStatesByName = new HashMap<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java index 56450389aa325..a7926d50e5137 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java @@ -54,6 +54,19 @@ protected void cleanup() throws Exception { deleteDirectorySilently(stateDir); } + // disable these because the verification does not work for this state backend + @Override + @Test + public void testValueStateRestoreWithWrongSerializers() {} + + @Override + @Test + public void testListStateRestoreWithWrongSerializers() {} + + @Override + @Test + public void testReducingStateRestoreWithWrongSerializers() {} + @Test public void testSetupAndSerialization() { File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java index 34354c1b0036e..73d919be6d609 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java @@ -41,6 +41,19 @@ protected MemoryStateBackend getStateBackend() throws Exception { @Override protected void cleanup() throws Exception { } + // disable these because the verification does not work for this state backend + @Override + @Test + public void testValueStateRestoreWithWrongSerializers() {} + + @Override + @Test + public void testListStateRestoreWithWrongSerializers() {} + + @Override + @Test + public void testReducingStateRestoreWithWrongSerializers() {} + @Test public void testSerializableState() { try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 6c3930ba34f5e..cc36f4ababe69 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -36,6 +36,7 @@ import org.apache.flink.api.common.typeutils.base.FloatSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -45,6 +46,8 @@ import org.junit.Before; import org.junit.Test; +import java.util.HashMap; + import static org.junit.Assert.*; /** @@ -79,89 +82,83 @@ public void testValueState() throws Exception { ValueState state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - @SuppressWarnings("unchecked") - KvState, ValueStateDescriptor, B> kv = - (KvState, ValueStateDescriptor, B>) state; - // some modifications to the state - kv.setCurrentKey(1); + backend.setCurrentKey(1); assertNull(state.value()); state.update("1"); - kv.setCurrentKey(2); + backend.setCurrentKey(2); assertNull(state.value()); state.update("2"); - kv.setCurrentKey(1); + backend.setCurrentKey(1); assertEquals("1", state.value()); // draw a snapshot - KvStateSnapshot, ValueStateDescriptor, B> snapshot1 = - kv.snapshot(682375462378L, 2); + HashMap> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2); - if (snapshot1 instanceof AsynchronousKvStateSnapshot) { - snapshot1 = ((AsynchronousKvStateSnapshot, ValueStateDescriptor, B>) snapshot1).materialize(); + for (String key: snapshot1.keySet()) { + if (snapshot1.get(key) instanceof AsynchronousKvStateSnapshot) { + snapshot1.put(key, ((AsynchronousKvStateSnapshot) snapshot1.get(key)).materialize()); + } } // make some more modifications - kv.setCurrentKey(1); + backend.setCurrentKey(1); state.update("u1"); - kv.setCurrentKey(2); + backend.setCurrentKey(2); state.update("u2"); - kv.setCurrentKey(3); + backend.setCurrentKey(3); state.update("u3"); // draw another snapshot - KvStateSnapshot, ValueStateDescriptor, B> snapshot2 = - kv.snapshot(682375462379L, 4); + HashMap> snapshot2 = backend.snapshotPartitionedState(682375462379L, 4); - if (snapshot2 instanceof AsynchronousKvStateSnapshot) { - snapshot2 = ((AsynchronousKvStateSnapshot, ValueStateDescriptor, B>) snapshot2).materialize(); + for (String key: snapshot2.keySet()) { + if (snapshot2.get(key) instanceof AsynchronousKvStateSnapshot) { + snapshot2.put(key, ((AsynchronousKvStateSnapshot) snapshot2.get(key)).materialize()); + } } // validate the original state - kv.setCurrentKey(1); + backend.setCurrentKey(1); assertEquals("u1", state.value()); - kv.setCurrentKey(2); + backend.setCurrentKey(2); assertEquals("u2", state.value()); - kv.setCurrentKey(3); + backend.setCurrentKey(3); assertEquals("u3", state.value()); - kv.dispose(); + backend.dispose(); + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - // restore the first snapshot and validate it - KvState, ValueStateDescriptor, B> restored1 = snapshot1.restoreState( - backend, - IntSerializer.INSTANCE, - this.getClass().getClassLoader(), 10); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); - snapshot1.discardState(); + for (String key: snapshot1.keySet()) { + snapshot1.get(key).discardState(); + } - @SuppressWarnings("unchecked") - ValueState restored1State = (ValueState) restored1; + ValueState restored1 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - restored1.setCurrentKey(1); - assertEquals("1", restored1State.value()); - restored1.setCurrentKey(2); - assertEquals("2", restored1State.value()); + backend.setCurrentKey(1); + assertEquals("1", restored1.value()); + backend.setCurrentKey(2); + assertEquals("2", restored1.value()); - restored1.dispose(); + backend.dispose(); + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - // restore the second snapshot and validate it - KvState, ValueStateDescriptor, B> restored2 = snapshot2.restoreState( - backend, - IntSerializer.INSTANCE, - this.getClass().getClassLoader(), 10); + backend.injectKeyValueStateSnapshots((HashMap) snapshot2, 100); - snapshot2.discardState(); + for (String key: snapshot2.keySet()) { + snapshot2.get(key).discardState(); + } - @SuppressWarnings("unchecked") - ValueState restored2State = (ValueState) restored2; + ValueState restored2 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - restored2.setCurrentKey(1); - assertEquals("u1", restored2State.value()); - restored2.setCurrentKey(2); - assertEquals("u2", restored2State.value()); - restored2.setCurrentKey(3); - assertEquals("u3", restored2State.value()); + backend.setCurrentKey(1); + assertEquals("u1", restored2.value()); + backend.setCurrentKey(2); + assertEquals("u2", restored2.value()); + backend.setCurrentKey(3); + assertEquals("u3", restored2.value()); } /** @@ -190,22 +187,18 @@ public void testValueStateNullUpdate() throws Exception { ValueState state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - @SuppressWarnings("unchecked") - KvState, ValueStateDescriptor, B> kv = - (KvState, ValueStateDescriptor, B>) state; - // some modifications to the state - kv.setCurrentKey(1); + backend.setCurrentKey(1); // verify default value assertEquals(42L, (long) state.value()); state.update(1L); assertEquals(1L, (long) state.value()); - kv.setCurrentKey(2); + backend.setCurrentKey(2); assertEquals(42L, (long) state.value()); - kv.setCurrentKey(1); + backend.setCurrentKey(1); state.clear(); assertEquals(42L, (long) state.value()); @@ -215,22 +208,26 @@ public void testValueStateNullUpdate() throws Exception { state.update(null); assertEquals(42L, (long) state.value()); - // draw a snapshot, this would fail with a NPE if update(null) would not act as clear() - KvStateSnapshot, ValueStateDescriptor, B> snapshot1 = - kv.snapshot(682375462378L, 2); + // draw a snapshot + HashMap> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2); - if (snapshot1 instanceof AsynchronousKvStateSnapshot) { - snapshot1 = ((AsynchronousKvStateSnapshot, ValueStateDescriptor, B>) snapshot1).materialize(); + for (String key: snapshot1.keySet()) { + if (snapshot1.get(key) instanceof AsynchronousKvStateSnapshot) { + snapshot1.put(key, ((AsynchronousKvStateSnapshot) snapshot1.get(key)).materialize()); + } } - kv.dispose(); + backend.dispose(); + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); + + backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); + + for (String key: snapshot1.keySet()) { + snapshot1.get(key).discardState(); + } - // restore the snapshot - snapshot1.restoreState( - backend, - IntSerializer.INSTANCE, - this.getClass().getClassLoader(), 10); + backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); } @Test @@ -242,91 +239,86 @@ public void testListState() { ListStateDescriptor kvId = new ListStateDescriptor<>("id", String.class); ListState state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - @SuppressWarnings("unchecked") - KvState, ListStateDescriptor, B> kv = - (KvState, ListStateDescriptor, B>) state; - Joiner joiner = Joiner.on(","); // some modifications to the state - kv.setCurrentKey(1); + backend.setCurrentKey(1); assertEquals("", joiner.join(state.get())); state.add("1"); - kv.setCurrentKey(2); + backend.setCurrentKey(2); assertEquals("", joiner.join(state.get())); state.add("2"); - kv.setCurrentKey(1); + backend.setCurrentKey(1); assertEquals("1", joiner.join(state.get())); // draw a snapshot - KvStateSnapshot, ListStateDescriptor, B> snapshot1 = - kv.snapshot(682375462378L, 2); + HashMap> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2); - if (snapshot1 instanceof AsynchronousKvStateSnapshot) { - snapshot1 = ((AsynchronousKvStateSnapshot, ListStateDescriptor, B>) snapshot1).materialize(); + for (String key: snapshot1.keySet()) { + if (snapshot1.get(key) instanceof AsynchronousKvStateSnapshot) { + snapshot1.put(key, ((AsynchronousKvStateSnapshot) snapshot1.get(key)).materialize()); + } } - // make some more modifications - kv.setCurrentKey(1); + backend.setCurrentKey(1); state.add("u1"); - kv.setCurrentKey(2); + backend.setCurrentKey(2); state.add("u2"); - kv.setCurrentKey(3); + backend.setCurrentKey(3); state.add("u3"); // draw another snapshot - KvStateSnapshot, ListStateDescriptor, B> snapshot2 = - kv.snapshot(682375462379L, 4); + HashMap> snapshot2 = backend.snapshotPartitionedState(682375462379L, 4); - if (snapshot2 instanceof AsynchronousKvStateSnapshot) { - snapshot2 = ((AsynchronousKvStateSnapshot, ListStateDescriptor, B>) snapshot2).materialize(); + for (String key: snapshot2.keySet()) { + if (snapshot2.get(key) instanceof AsynchronousKvStateSnapshot) { + snapshot2.put(key, ((AsynchronousKvStateSnapshot) snapshot2.get(key)).materialize()); + } } // validate the original state - kv.setCurrentKey(1); + backend.setCurrentKey(1); assertEquals("1,u1", joiner.join(state.get())); - kv.setCurrentKey(2); + backend.setCurrentKey(2); assertEquals("2,u2", joiner.join(state.get())); - kv.setCurrentKey(3); + backend.setCurrentKey(3); assertEquals("u3", joiner.join(state.get())); - kv.dispose(); + backend.dispose(); // restore the first snapshot and validate it - KvState, ListStateDescriptor, B> restored1 = snapshot1.restoreState( - backend, - IntSerializer.INSTANCE, - this.getClass().getClassLoader(), 10); + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); - snapshot1.discardState(); + for (String key: snapshot1.keySet()) { + snapshot1.get(key).discardState(); + } - @SuppressWarnings("unchecked") - ListState restored1State = (ListState) restored1; + ListState restored1 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - restored1.setCurrentKey(1); - assertEquals("1", joiner.join(restored1State.get())); - restored1.setCurrentKey(2); - assertEquals("2", joiner.join(restored1State.get())); + backend.setCurrentKey(1); + assertEquals("1", joiner.join(restored1.get())); + backend.setCurrentKey(2); + assertEquals("2", joiner.join(restored1.get())); - restored1.dispose(); + backend.dispose(); // restore the second snapshot and validate it - KvState, ListStateDescriptor, B> restored2 = snapshot2.restoreState( - backend, - IntSerializer.INSTANCE, - this.getClass().getClassLoader(), 20); + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); + backend.injectKeyValueStateSnapshots((HashMap) snapshot2, 100); + + for (String key: snapshot2.keySet()) { + snapshot2.get(key).discardState(); + } - snapshot2.discardState(); + ListState restored2 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - @SuppressWarnings("unchecked") - ListState restored2State = (ListState) restored2; - - restored2.setCurrentKey(1); - assertEquals("1,u1", joiner.join(restored2State.get())); - restored2.setCurrentKey(2); - assertEquals("2,u2", joiner.join(restored2State.get())); - restored2.setCurrentKey(3); - assertEquals("u3", joiner.join(restored2State.get())); + backend.setCurrentKey(1); + assertEquals("1,u1", joiner.join(restored2.get())); + backend.setCurrentKey(2); + assertEquals("2,u2", joiner.join(restored2.get())); + backend.setCurrentKey(3); + assertEquals("u3", joiner.join(restored2.get())); } catch (Exception e) { e.printStackTrace(); @@ -335,107 +327,94 @@ public void testListState() { } @Test - @SuppressWarnings("unchecked,rawtypes") public void testReducingState() { try { backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - ReducingStateDescriptor kvId = new ReducingStateDescriptor<>("id", - new ReduceFunction() { - private static final long serialVersionUID = 1L; - - @Override - public String reduce(String value1, String value2) throws Exception { - return value1 + "," + value2; - } - }, - String.class); - ReducingState state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); + ReducingStateDescriptor kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class); - @SuppressWarnings("unchecked") - KvState, ReducingStateDescriptor, B> kv = - (KvState, ReducingStateDescriptor, B>) state; + ReducingState state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - Joiner joiner = Joiner.on(","); // some modifications to the state - kv.setCurrentKey(1); + backend.setCurrentKey(1); assertEquals(null, state.get()); state.add("1"); - kv.setCurrentKey(2); + backend.setCurrentKey(2); assertEquals(null, state.get()); state.add("2"); - kv.setCurrentKey(1); + backend.setCurrentKey(1); assertEquals("1", state.get()); // draw a snapshot - KvStateSnapshot, ReducingStateDescriptor, B> snapshot1 = - kv.snapshot(682375462378L, 2); + HashMap> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2); - if (snapshot1 instanceof AsynchronousKvStateSnapshot) { - snapshot1 = ((AsynchronousKvStateSnapshot, ReducingStateDescriptor, B>) snapshot1).materialize(); + for (String key: snapshot1.keySet()) { + if (snapshot1.get(key) instanceof AsynchronousKvStateSnapshot) { + snapshot1.put(key, ((AsynchronousKvStateSnapshot) snapshot1.get(key)).materialize()); + } } // make some more modifications - kv.setCurrentKey(1); + backend.setCurrentKey(1); state.add("u1"); - kv.setCurrentKey(2); + backend.setCurrentKey(2); state.add("u2"); - kv.setCurrentKey(3); + backend.setCurrentKey(3); state.add("u3"); // draw another snapshot - KvStateSnapshot, ReducingStateDescriptor, B> snapshot2 = - kv.snapshot(682375462379L, 4); + HashMap> snapshot2 = backend.snapshotPartitionedState(682375462379L, 4); - if (snapshot2 instanceof AsynchronousKvStateSnapshot) { - snapshot2 = ((AsynchronousKvStateSnapshot, ReducingStateDescriptor, B>) snapshot2).materialize(); + for (String key: snapshot2.keySet()) { + if (snapshot2.get(key) instanceof AsynchronousKvStateSnapshot) { + snapshot2.put(key, ((AsynchronousKvStateSnapshot) snapshot2.get(key)).materialize()); + } } // validate the original state - kv.setCurrentKey(1); + backend.setCurrentKey(1); assertEquals("1,u1", state.get()); - kv.setCurrentKey(2); + backend.setCurrentKey(2); assertEquals("2,u2", state.get()); - kv.setCurrentKey(3); + backend.setCurrentKey(3); assertEquals("u3", state.get()); - kv.dispose(); + backend.dispose(); // restore the first snapshot and validate it - KvState, ReducingStateDescriptor, B> restored1 = snapshot1.restoreState( - backend, - IntSerializer.INSTANCE, - this.getClass().getClassLoader(), 10); + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); - snapshot1.discardState(); + for (String key: snapshot1.keySet()) { + snapshot1.get(key).discardState(); + } - @SuppressWarnings("unchecked") - ReducingState restored1State = (ReducingState) restored1; + ReducingState restored1 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - restored1.setCurrentKey(1); - assertEquals("1", restored1State.get()); - restored1.setCurrentKey(2); - assertEquals("2", restored1State.get()); + backend.setCurrentKey(1); + assertEquals("1", restored1.get()); + backend.setCurrentKey(2); + assertEquals("2", restored1.get()); - restored1.dispose(); + backend.dispose(); // restore the second snapshot and validate it - KvState, ReducingStateDescriptor, B> restored2 = snapshot2.restoreState( - backend, - IntSerializer.INSTANCE, - this.getClass().getClassLoader(), 20); + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); + backend.injectKeyValueStateSnapshots((HashMap) snapshot2, 100); - snapshot2.discardState(); + for (String key: snapshot2.keySet()) { + snapshot2.get(key).discardState(); + } - @SuppressWarnings("unchecked") - ReducingState restored2State = (ReducingState) restored2; - - restored2.setCurrentKey(1); - assertEquals("1,u1", restored2State.get()); - restored2.setCurrentKey(2); - assertEquals("2,u2", restored2State.get()); - restored2.setCurrentKey(3); - assertEquals("u3", restored2State.get()); + ReducingState restored2 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); + + + backend.setCurrentKey(1); + assertEquals("1,u1", restored2.get()); + backend.setCurrentKey(2); + assertEquals("2,u2", restored2.get()); + backend.setCurrentKey(3); + assertEquals("u3", restored2.get()); } catch (Exception e) { e.printStackTrace(); @@ -450,103 +429,93 @@ public void testFoldingState() { backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); FoldingStateDescriptor kvId = new FoldingStateDescriptor<>("id", - "Fold-Initial:", - new FoldFunction() { - private static final long serialVersionUID = 1L; - - @Override - public String fold(String acc, Integer value) throws Exception { - return acc + "," + value; - } - }, + "Fold-Initial:", + new AppendingFold(), String.class); - FoldingState state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - @SuppressWarnings("unchecked") - KvState, FoldingStateDescriptor, B> kv = - (KvState, FoldingStateDescriptor, B>) state; + FoldingState state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - Joiner joiner = Joiner.on(","); // some modifications to the state - kv.setCurrentKey(1); + backend.setCurrentKey(1); assertEquals("Fold-Initial:", state.get()); state.add(1); - kv.setCurrentKey(2); + backend.setCurrentKey(2); assertEquals("Fold-Initial:", state.get()); state.add(2); - kv.setCurrentKey(1); + backend.setCurrentKey(1); assertEquals("Fold-Initial:,1", state.get()); // draw a snapshot - KvStateSnapshot, FoldingStateDescriptor, B> snapshot1 = - kv.snapshot(682375462378L, 2); + HashMap> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2); - if (snapshot1 instanceof AsynchronousKvStateSnapshot) { - snapshot1 = ((AsynchronousKvStateSnapshot, FoldingStateDescriptor, B>) snapshot1).materialize(); + for (String key: snapshot1.keySet()) { + if (snapshot1.get(key) instanceof AsynchronousKvStateSnapshot) { + snapshot1.put(key, ((AsynchronousKvStateSnapshot) snapshot1.get(key)).materialize()); + } } // make some more modifications - kv.setCurrentKey(1); + backend.setCurrentKey(1); state.clear(); state.add(101); - kv.setCurrentKey(2); + backend.setCurrentKey(2); state.add(102); - kv.setCurrentKey(3); + backend.setCurrentKey(3); state.add(103); // draw another snapshot - KvStateSnapshot, FoldingStateDescriptor, B> snapshot2 = - kv.snapshot(682375462379L, 4); + HashMap> snapshot2 = backend.snapshotPartitionedState(682375462379L, 4); - if (snapshot2 instanceof AsynchronousKvStateSnapshot) { - snapshot2 = ((AsynchronousKvStateSnapshot, FoldingStateDescriptor, B>) snapshot2).materialize(); + for (String key: snapshot2.keySet()) { + if (snapshot2.get(key) instanceof AsynchronousKvStateSnapshot) { + snapshot2.put(key, ((AsynchronousKvStateSnapshot) snapshot2.get(key)).materialize()); + } } // validate the original state - kv.setCurrentKey(1); + backend.setCurrentKey(1); assertEquals("Fold-Initial:,101", state.get()); - kv.setCurrentKey(2); + backend.setCurrentKey(2); assertEquals("Fold-Initial:,2,102", state.get()); - kv.setCurrentKey(3); + backend.setCurrentKey(3); assertEquals("Fold-Initial:,103", state.get()); - kv.dispose(); + backend.dispose(); // restore the first snapshot and validate it - KvState, FoldingStateDescriptor, B> restored1 = snapshot1.restoreState( - backend, - IntSerializer.INSTANCE, - this.getClass().getClassLoader(), 10); + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); - snapshot1.discardState(); + for (String key: snapshot1.keySet()) { + snapshot1.get(key).discardState(); + } - @SuppressWarnings("unchecked") - FoldingState restored1State = (FoldingState) restored1; + FoldingState restored1 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - restored1.setCurrentKey(1); - assertEquals("Fold-Initial:,1", restored1State.get()); - restored1.setCurrentKey(2); - assertEquals("Fold-Initial:,2", restored1State.get()); + backend.setCurrentKey(1); + assertEquals("Fold-Initial:,1", restored1.get()); + backend.setCurrentKey(2); + assertEquals("Fold-Initial:,2", restored1.get()); - restored1.dispose(); + backend.dispose(); // restore the second snapshot and validate it - KvState, FoldingStateDescriptor, B> restored2 = snapshot2.restoreState( - backend, - IntSerializer.INSTANCE, - this.getClass().getClassLoader(), 20); + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); + backend.injectKeyValueStateSnapshots((HashMap) snapshot2, 100); - snapshot2.discardState(); + for (String key: snapshot2.keySet()) { + snapshot2.get(key).discardState(); + } @SuppressWarnings("unchecked") - FoldingState restored2State = (FoldingState) restored2; - - restored2.setCurrentKey(1); - assertEquals("Fold-Initial:,101", restored2State.get()); - restored2.setCurrentKey(2); - assertEquals("Fold-Initial:,2,102", restored2State.get()); - restored2.setCurrentKey(3); - assertEquals("Fold-Initial:,103", restored2State.get()); + FoldingState restored2 = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + assertEquals("Fold-Initial:,101", restored2.get()); + backend.setCurrentKey(2); + assertEquals("Fold-Initial:,2,102", restored2.get()); + backend.setCurrentKey(3); + assertEquals("Fold-Initial:,103", restored2.get()); } catch (Exception e) { e.printStackTrace(); @@ -566,33 +535,49 @@ public void testValueStateRestoreWithWrongSerializers() { ValueState state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - @SuppressWarnings("unchecked") - KvState, ValueStateDescriptor, B> kv = - (KvState, ValueStateDescriptor, B>) state; - - kv.setCurrentKey(1); + backend.setCurrentKey(1); state.update("1"); - kv.setCurrentKey(2); + backend.setCurrentKey(2); state.update("2"); - KvStateSnapshot, ValueStateDescriptor, B> snapshot = - kv.snapshot(682375462378L, System.currentTimeMillis()); + // draw a snapshot + HashMap> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2); + + for (String key: snapshot1.keySet()) { + if (snapshot1.get(key) instanceof AsynchronousKvStateSnapshot) { + snapshot1.put(key, ((AsynchronousKvStateSnapshot) snapshot1.get(key)).materialize()); + } + } + + backend.dispose(); + + // restore the first snapshot and validate it + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); - if (snapshot instanceof AsynchronousKvStateSnapshot) { - snapshot = ((AsynchronousKvStateSnapshot, ValueStateDescriptor, B>) snapshot).materialize(); + for (String key: snapshot1.keySet()) { + snapshot1.get(key).discardState(); } @SuppressWarnings("unchecked") - TypeSerializer fakeIntSerializer = - (TypeSerializer) (TypeSerializer) FloatSerializer.INSTANCE; + TypeSerializer fakeStringSerializer = + (TypeSerializer) (TypeSerializer) FloatSerializer.INSTANCE; try { - snapshot.restoreState(backend, fakeIntSerializer, getClass().getClassLoader(), 1); + kvId = new ValueStateDescriptor<>("id", fakeStringSerializer, null); + + state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); + + state.value(); + fail("should recognize wrong serializers"); - } catch (IllegalArgumentException e) { + } catch (RuntimeException e) { + if (!e.getMessage().contains("Trying to access state using wrong StateDescriptor")) { + fail("wrong exception " + e); + } // expected } catch (Exception e) { - fail("wrong exception"); + fail("wrong exception " + e); } } catch (Exception e) { @@ -609,32 +594,46 @@ public void testListStateRestoreWithWrongSerializers() { ListStateDescriptor kvId = new ListStateDescriptor<>("id", String.class); ListState state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - @SuppressWarnings("unchecked") - KvState, ListStateDescriptor, B> kv = - (KvState, ListStateDescriptor, B>) state; - - kv.setCurrentKey(1); + backend.setCurrentKey(1); state.add("1"); - kv.setCurrentKey(2); + backend.setCurrentKey(2); state.add("2"); - KvStateSnapshot, ListStateDescriptor, B> snapshot = - kv.snapshot(682375462378L, System.currentTimeMillis()); + // draw a snapshot + HashMap> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2); - if (snapshot instanceof AsynchronousKvStateSnapshot) { - snapshot = ((AsynchronousKvStateSnapshot, ListStateDescriptor, B>) snapshot).materialize(); + for (String key: snapshot1.keySet()) { + if (snapshot1.get(key) instanceof AsynchronousKvStateSnapshot) { + snapshot1.put(key, ((AsynchronousKvStateSnapshot) snapshot1.get(key)).materialize()); + } } - kv.dispose(); + backend.dispose(); + + // restore the first snapshot and validate it + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); + + for (String key: snapshot1.keySet()) { + snapshot1.get(key).discardState(); + } @SuppressWarnings("unchecked") - TypeSerializer fakeIntSerializer = - (TypeSerializer) (TypeSerializer) FloatSerializer.INSTANCE; + TypeSerializer fakeStringSerializer = + (TypeSerializer) (TypeSerializer) FloatSerializer.INSTANCE; try { - snapshot.restoreState(backend, fakeIntSerializer, getClass().getClassLoader(), 1); + kvId = new ListStateDescriptor<>("id", fakeStringSerializer); + + state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); + + state.get(); + fail("should recognize wrong serializers"); - } catch (IllegalArgumentException e) { + } catch (RuntimeException e) { + if (!e.getMessage().contains("Trying to access state using wrong StateDescriptor")) { + fail("wrong exception " + e); + } // expected } catch (Exception e) { fail("wrong exception " + e); @@ -652,41 +651,51 @@ public void testReducingStateRestoreWithWrongSerializers() { backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); ReducingStateDescriptor kvId = new ReducingStateDescriptor<>("id", - new ReduceFunction() { - @Override - public String reduce(String value1, String value2) throws Exception { - return value1 + "," + value2; - } - }, - String.class); + new AppendingReduce(), + StringSerializer.INSTANCE); ReducingState state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - @SuppressWarnings("unchecked") - KvState, ReducingStateDescriptor, B> kv = - (KvState, ReducingStateDescriptor, B>) state; - - kv.setCurrentKey(1); + backend.setCurrentKey(1); state.add("1"); - kv.setCurrentKey(2); + backend.setCurrentKey(2); state.add("2"); - KvStateSnapshot, ReducingStateDescriptor, B> snapshot = - kv.snapshot(682375462378L, System.currentTimeMillis()); - if (snapshot instanceof AsynchronousKvStateSnapshot) { - snapshot = ((AsynchronousKvStateSnapshot, ReducingStateDescriptor, B>) snapshot).materialize(); + // draw a snapshot + HashMap> snapshot1 = backend.snapshotPartitionedState(682375462378L, 2); + + for (String key: snapshot1.keySet()) { + if (snapshot1.get(key) instanceof AsynchronousKvStateSnapshot) { + snapshot1.put(key, ((AsynchronousKvStateSnapshot) snapshot1.get(key)).materialize()); + } } - kv.dispose(); + backend.dispose(); + + // restore the first snapshot and validate it + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); + + for (String key: snapshot1.keySet()) { + snapshot1.get(key).discardState(); + } @SuppressWarnings("unchecked") - TypeSerializer fakeIntSerializer = - (TypeSerializer) (TypeSerializer) FloatSerializer.INSTANCE; + TypeSerializer fakeStringSerializer = + (TypeSerializer) (TypeSerializer) FloatSerializer.INSTANCE; try { - snapshot.restoreState(backend, fakeIntSerializer, getClass().getClassLoader(), 1); + kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), fakeStringSerializer); + + state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); + + state.get(); + fail("should recognize wrong serializers"); - } catch (IllegalArgumentException e) { + } catch (RuntimeException e) { + if (!e.getMessage().contains("Trying to access state using wrong StateDescriptor")) { + fail("wrong exception " + e); + } // expected } catch (Exception e) { fail("wrong exception " + e); @@ -699,33 +708,39 @@ public String reduce(String value1, String value2) throws Exception { } @Test - public void testCopyDefaultValue() { - try { - backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); + public void testCopyDefaultValue() throws Exception { + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - ValueStateDescriptor kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1)); - kvId.initializeSerializerUnlessSet(new ExecutionConfig()); - - ValueState state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); + ValueStateDescriptor kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1)); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); - @SuppressWarnings("unchecked") - KvState, ValueStateDescriptor, B> kv = - (KvState, ValueStateDescriptor, B>) state; + ValueState state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + IntValue default1 = state.value(); - kv.setCurrentKey(1); - IntValue default1 = state.value(); + backend.setCurrentKey(2); + IntValue default2 = state.value(); - kv.setCurrentKey(2); - IntValue default2 = state.value(); + assertNotNull(default1); + assertNotNull(default2); + assertEquals(default1, default2); + assertFalse(default1 == default2); + } - assertNotNull(default1); - assertNotNull(default2); - assertEquals(default1, default2); - assertFalse(default1 == default2); + private static class AppendingReduce implements ReduceFunction { + @Override + public String reduce(String value1, String value2) throws Exception { + return value1 + "," + value2; } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + } + + private static class AppendingFold implements FoldFunction { + private static final long serialVersionUID = 1L; + + @Override + public String fold(String acc, Integer value) throws Exception { + return acc + "," + value; } } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index ce705e1b4aa25..199a6af985256 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -124,6 +124,16 @@ public void initStateBackend() throws IOException { this.stateBackend = rdb; break; } + case ROCKSDB_FULLY_ASYNC: { + String rocksDb = tempFolder.newFolder().getAbsolutePath(); + String rocksDbBackups = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()); + rdb.setDbStoragePath(rocksDb); + rdb.enableFullyAsyncSnapshots(); + this.stateBackend = rdb; + break; + } + } } @@ -762,14 +772,14 @@ public static Collection parameters(){ return Arrays.asList(new Object[][] { {StateBackendEnum.MEM}, {StateBackendEnum.FILE}, -// {StateBackendEnum.DB}, - {StateBackendEnum.ROCKSDB} + {StateBackendEnum.ROCKSDB}, + {StateBackendEnum.ROCKSDB_FULLY_ASYNC} } ); } private enum StateBackendEnum { - MEM, FILE, DB, ROCKSDB + MEM, FILE, ROCKSDB, ROCKSDB_FULLY_ASYNC }