Skip to content

Commit

Permalink
[FLINK-6773] [documentation] Added documentation for compressed check…
Browse files Browse the repository at this point in the history
…points
  • Loading branch information
StefanRRichter committed Jul 4, 2017
1 parent 5171513 commit 41806ba
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
16 changes: 15 additions & 1 deletion docs/monitoring/large_state_tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,21 @@ parallelism when re-scaling the program (via a savepoint).

Flink's internal bookkeeping tracks parallel state in the granularity of max-parallelism-many *key groups*.
Flink's design strives to make it efficient to have a very high value for the maximum parallelism, even if
executing the program with a low parallelism.
executing the program with a low parallelism.

## Compression

Flink offers optional compression (default: off) for all checkpoints and savepoints. Currently, compression always uses
the [snappy compression algorithm (version 1.1.4)](https://github.com/xerial/snappy-java) but we are planning to support
custom compression algorithms in the future. Compression works on the granularity of key-groups in keyed state, i.e.
each key-group can be decompressed individually, which is important for rescaling.

Compression can be activated through the `ExecutionConfig`:

{% highlight java %}
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setUseSnapshotCompression(true);
{% endhighlight %}

**Notice:** The compression option has no impact on incremental snapshots, because they are using RocksDB's internal
format which is always using snappy compression out of the box.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.state.heap.HeapReducingStateTest;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.util.TestLogger;

import org.apache.commons.io.IOUtils;
import org.junit.Assert;
Expand All @@ -37,7 +38,7 @@

import static org.mockito.Mockito.mock;

public class StateSnapshotCompressionTest {
public class StateSnapshotCompressionTest extends TestLogger {

@Test
public void testCompressionConfiguration() throws Exception {
Expand All @@ -55,7 +56,8 @@ public void testCompressionConfiguration() throws Exception {
executionConfig);

try {
Assert.assertTrue(SnappyStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator()));
Assert.assertTrue(
SnappyStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator()));

} finally {
IOUtils.closeQuietly(stateBackend);
Expand All @@ -75,7 +77,8 @@ public void testCompressionConfiguration() throws Exception {
executionConfig);

try {
Assert.assertTrue(UncompressedStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator()));
Assert.assertTrue(
UncompressedStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator()));

} finally {
IOUtils.closeQuietly(stateBackend);
Expand Down Expand Up @@ -132,7 +135,8 @@ private void snapshotRestoreRoundtrip(boolean useCompression) throws Exception {
state.setCurrentNamespace(VoidNamespace.INSTANCE);
state.update("45");
CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4 * 1024 * 1024);
RunnableFuture<KeyedStateHandle> snapshot = stateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
RunnableFuture<KeyedStateHandle> snapshot =
stateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
snapshot.run();
stateHandle = snapshot.get();

Expand Down

0 comments on commit 41806ba

Please sign in to comment.