diff --git a/docs/monitoring/large_state_tuning.md b/docs/monitoring/large_state_tuning.md index a356970121cc5..d8b52ee23e2dc 100644 --- a/docs/monitoring/large_state_tuning.md +++ b/docs/monitoring/large_state_tuning.md @@ -217,7 +217,21 @@ parallelism when re-scaling the program (via a savepoint). Flink's internal bookkeeping tracks parallel state in the granularity of max-parallelism-many *key groups*. Flink's design strives to make it efficient to have a very high value for the maximum parallelism, even if -executing the program with a low parallelism. +executing the program with a low parallelism. +## Compression +Flink offers optional compression (default: off) for all checkpoints and savepoints. Currently, compression always uses +the [snappy compression algorithm (version 1.1.4)](https://github.com/xerial/snappy-java) but we are planning to support +custom compression algorithms in the future. Compression works on the granularity of key-groups in keyed state, i.e. +each key-group can be decompressed individually, which is important for rescaling. +Compression can be activated through the `ExecutionConfig`: + +{% highlight java %} + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setUseSnapshotCompression(true); +{% endhighlight %} + +**Notice:** The compression option has no impact on incremental snapshots, because they are using RocksDB's internal +format which is always using snappy compression out of the box. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java index b932cb94c000d..63d2453689f8f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.heap.HeapReducingStateTest; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.util.TestLogger; import org.apache.commons.io.IOUtils; import org.junit.Assert; @@ -37,7 +38,7 @@ import static org.mockito.Mockito.mock; -public class StateSnapshotCompressionTest { +public class StateSnapshotCompressionTest extends TestLogger { @Test public void testCompressionConfiguration() throws Exception { @@ -55,7 +56,8 @@ public void testCompressionConfiguration() throws Exception { executionConfig); try { - Assert.assertTrue(SnappyStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator())); + Assert.assertTrue( + SnappyStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator())); } finally { IOUtils.closeQuietly(stateBackend); @@ -75,7 +77,8 @@ public void testCompressionConfiguration() throws Exception { executionConfig); try { - Assert.assertTrue(UncompressedStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator())); + Assert.assertTrue( + UncompressedStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator())); } finally { IOUtils.closeQuietly(stateBackend); @@ -132,7 +135,8 @@ private void snapshotRestoreRoundtrip(boolean useCompression) throws Exception { state.setCurrentNamespace(VoidNamespace.INSTANCE); state.update("45"); CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4 * 1024 * 1024); - RunnableFuture snapshot = stateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint()); + RunnableFuture snapshot = + stateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint()); snapshot.run(); stateHandle = snapshot.get();