Skip to content

Commit

Permalink
[FLINK-6773] [checkpoint] Introduce compression (snappy) for keyed st…
Browse files Browse the repository at this point in the history
…ate in full checkpoints and savepoints
  • Loading branch information
StefanRRichter committed Jul 4, 2017
1 parent d17a4b9 commit 5171513
Show file tree
Hide file tree
Showing 20 changed files with 759 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,13 @@
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
Expand Down Expand Up @@ -97,7 +100,9 @@
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -106,6 +111,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedMap;
Expand Down Expand Up @@ -603,7 +609,10 @@ private void writeKVStateMetaData() throws IOException {
}

KeyedBackendSerializationProxy<K> serializationProxy =
new KeyedBackendSerializationProxy<>(stateBackend.getKeySerializer(), metaInfoSnapshots);
new KeyedBackendSerializationProxy<>(
stateBackend.getKeySerializer(),
metaInfoSnapshots,
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator));

serializationProxy.write(outputView);
}
Expand All @@ -612,71 +621,88 @@ private void writeKVStateData() throws IOException, InterruptedException {

byte[] previousKey = null;
byte[] previousValue = null;
OutputStream kgOutStream = null;
DataOutputView kgOutView = null;

// Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
try {
// Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
kvStateIterators, stateBackend.keyGroupPrefixBytes)) {

// handover complete, null out to prevent double close
kvStateIterators = null;
// handover complete, null out to prevent double close
kvStateIterators = null;

//preamble: setup with first key-group as our lookahead
if (mergeIterator.isValid()) {
//begin first key-group by recording the offset
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
//write the k/v-state id as metadata
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
outputView.writeShort(mergeIterator.kvStateId());
previousKey = mergeIterator.key();
previousValue = mergeIterator.value();
mergeIterator.next();
}
//preamble: setup with first key-group as our lookahead
if (mergeIterator.isValid()) {
//begin first key-group by recording the offset
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
//write the k/v-state id as metadata
kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kgOutView.writeShort(mergeIterator.kvStateId());
previousKey = mergeIterator.key();
previousValue = mergeIterator.value();
mergeIterator.next();
}

//main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
while (mergeIterator.isValid()) {
//main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
while (mergeIterator.isValid()) {

assert (!hasMetaDataFollowsFlag(previousKey));
assert (!hasMetaDataFollowsFlag(previousKey));

//set signal in first key byte that meta data will follow in the stream after this k/v pair
if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
//set signal in first key byte that meta data will follow in the stream after this k/v pair
if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {

//be cooperative and check for interruption from time to time in the hot loop
checkInterrupted();
//be cooperative and check for interruption from time to time in the hot loop
checkInterrupted();

setMetaDataFollowsFlagInKey(previousKey);
}
setMetaDataFollowsFlagInKey(previousKey);
}

writeKeyValuePair(previousKey, previousValue);
writeKeyValuePair(previousKey, previousValue, kgOutView);

//write meta data if we have to
if (mergeIterator.isNewKeyGroup()) {
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
outputView.writeShort(END_OF_KEY_GROUP_MARK);
//begin new key-group
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
//write the kev-state
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
outputView.writeShort(mergeIterator.kvStateId());
} else if (mergeIterator.isNewKeyValueState()) {
//write the k/v-state
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
outputView.writeShort(mergeIterator.kvStateId());
//write meta data if we have to
if (mergeIterator.isNewKeyGroup()) {
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
// this will just close the outer stream
kgOutStream.close();
//begin new key-group
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
//write the kev-state
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
kgOutView.writeShort(mergeIterator.kvStateId());
} else if (mergeIterator.isNewKeyValueState()) {
//write the k/v-state
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kgOutView.writeShort(mergeIterator.kvStateId());
}

//request next k/v pair
previousKey = mergeIterator.key();
previousValue = mergeIterator.value();
mergeIterator.next();
}
}

//request next k/v pair
previousKey = mergeIterator.key();
previousValue = mergeIterator.value();
mergeIterator.next();
//epilogue: write last key-group
if (previousKey != null) {
assert (!hasMetaDataFollowsFlag(previousKey));
setMetaDataFollowsFlagInKey(previousKey);
writeKeyValuePair(previousKey, previousValue, kgOutView);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
// this will just close the outer stream
kgOutStream.close();
kgOutStream = null;
}
}

//epilogue: write last key-group
if (previousKey != null) {
assert (!hasMetaDataFollowsFlag(previousKey));
setMetaDataFollowsFlagInKey(previousKey);
writeKeyValuePair(previousKey, previousValue);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
outputView.writeShort(END_OF_KEY_GROUP_MARK);
} finally {
// this will just close the outer stream
IOUtils.closeQuietly(kgOutStream);
}
}

Expand All @@ -687,9 +713,9 @@ private KeyGroupsStateHandle closeSnapshotStreamAndGetHandle() throws IOExceptio
return stateHandle != null ? new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null;
}

private void writeKeyValuePair(byte[] key, byte[] value) throws IOException {
BytePrimitiveArraySerializer.INSTANCE.serialize(key, outputView);
BytePrimitiveArraySerializer.INSTANCE.serialize(value, outputView);
private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException {
BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
BytePrimitiveArraySerializer.INSTANCE.serialize(value, out);
}

static void setMetaDataFollowsFlagInKey(byte[] key) {
Expand Down Expand Up @@ -808,8 +834,13 @@ private StreamStateHandle materializeMetaData() throws Exception {
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
closeableRegistry.registerClosable(outputStream);

//no need for compression scheme support because sst-files are already compressed
KeyedBackendSerializationProxy<K> serializationProxy =
new KeyedBackendSerializationProxy<>(stateBackend.keySerializer, stateMetaInfoSnapshots);
new KeyedBackendSerializationProxy<>(
stateBackend.keySerializer,
stateMetaInfoSnapshots,
false);

DataOutputView out = new DataOutputViewStreamWrapper(outputStream);

serializationProxy.write(out);
Expand Down Expand Up @@ -1044,6 +1075,8 @@ static final class RocksDBFullRestoreOperation<K> {
private DataInputView currentStateHandleInView;
/** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */
private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
/** The compression decorator that was used for writing the state, as determined by the meta data. */
private StreamCompressionDecorator keygroupStreamCompressionDecorator;

/**
* Creates a restore operation object for the given state backend instance.
Expand Down Expand Up @@ -1132,6 +1165,9 @@ private void restoreKVStateMetaData() throws IOException, StateMigrationExceptio
"Aborting now since state migration is currently not available");
}

this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ?
SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;

List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots();
currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
Expand Down Expand Up @@ -1188,27 +1224,30 @@ private void restoreKVStateData() throws IOException, RocksDBException {
if (0L != offset) {
currentStateHandleInStream.seek(offset);
boolean keyGroupHasMoreKeys = true;
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
int kvStateId = currentStateHandleInView.readShort();
ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
//insert all k/v pairs into DB
while (keyGroupHasMoreKeys) {
byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView);
byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView);
if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
//clear the signal bit in the key to make it ready for insertion again
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
rocksDBKeyedStateBackend.db.put(handle, key, value);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
& currentStateHandleInView.readShort();
if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
keyGroupHasMoreKeys = false;
try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
int kvStateId = compressedKgInputView.readShort();
ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
//insert all k/v pairs into DB
while (keyGroupHasMoreKeys) {
byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
//clear the signal bit in the key to make it ready for insertion again
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
rocksDBKeyedStateBackend.db.put(handle, key, value);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
& compressedKgInputView.readShort();
if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
keyGroupHasMoreKeys = false;
} else {
handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
}
} else {
handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
rocksDBKeyedStateBackend.db.put(handle, key, value);
}
} else {
rocksDBKeyedStateBackend.db.put(handle, key, value);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

package org.apache.flink.api.common;

import com.esotericsoftware.kryo.Serializer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.TaskManagerOptions;

import com.esotericsoftware.kryo.Serializer;

import java.io.Serializable;
import java.util.Collections;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -146,6 +147,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
private long taskCancellationTimeoutMillis = -1;

/** This flag defines if we use compression for the state snapshot data or not. Default: false */
private boolean useSnapshotCompression = false;

// ------------------------------- User code values --------------------------------------------

private GlobalJobParameters globalJobParameters;
Expand Down Expand Up @@ -840,6 +844,14 @@ public void disableAutoTypeRegistration() {
this.autoTypeRegistrationEnabled = false;
}

public boolean isUseSnapshotCompression() {
return useSnapshotCompression;
}

public void setUseSnapshotCompression(boolean useSnapshotCompression) {
this.useSnapshotCompression = useSnapshotCompression;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof ExecutionConfig) {
Expand All @@ -864,7 +876,8 @@ public boolean equals(Object obj) {
defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses) &&
registeredKryoTypes.equals(other.registeredKryoTypes) &&
registeredPojoTypes.equals(other.registeredPojoTypes) &&
taskCancellationIntervalMillis == other.taskCancellationIntervalMillis;
taskCancellationIntervalMillis == other.taskCancellationIntervalMillis &&
useSnapshotCompression == other.useSnapshotCompression;

} else {
return false;
Expand All @@ -891,7 +904,8 @@ public int hashCode() {
defaultKryoSerializerClasses,
registeredKryoTypes,
registeredPojoTypes,
taskCancellationIntervalMillis);
taskCancellationIntervalMillis,
useSnapshotCompression);
}

public boolean canEqual(Object obj) {
Expand Down
6 changes: 6 additions & 0 deletions flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ under the License.
<artifactId>zookeeper</artifactId>
</dependency>

<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.4</version>
</dependency>

<!--
The KryoSerializer dynamically loads Kryo instances via Chill and requires that Chill
is in the classpath. Because we do not want to have transitive Scala dependencies
Expand Down
Loading

0 comments on commit 5171513

Please sign in to comment.