From 2dff05df22d540f82afe375fe39b0782f7855f01 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Fri, 1 Oct 2021 10:34:56 +0200 Subject: [PATCH] [FLINK-24417] Update MigrationVersion with 1.14 --- .../FlinkKafkaConsumerBaseMigrationTest.java | 3 ++- .../FlinkKafkaProducerMigrationTest.java | 3 ++- .../FlinkKinesisConsumerMigrationTest.java | 3 ++- .../TypeSerializerUpgradeTestBase.java | 2 +- .../testutils/migration/MigrationVersion.java | 3 ++- .../flink/cep/operator/CEPMigrationTest.java | 3 ++- .../WindowOperatorMigrationTest.java | 3 ++- .../LinkedListSerializerUpgradeTest.java | 23 +++++++++++++------ .../StatefulJobSavepointMigrationITCase.java | 4 +++- ...efulJobWBroadcastStateMigrationITCase.java | 4 +++- ...TypeSerializerSnapshotMigrationITCase.java | 4 +++- .../AbstractKeyedOperatorRestoreTestBase.java | 3 ++- ...stractNonKeyedOperatorRestoreTestBase.java | 3 ++- .../StatefulJobSavepointMigrationITCase.scala | 19 ++++++++++----- ...fulJobWBroadcastStateMigrationITCase.scala | 19 ++++++++++----- 15 files changed, 68 insertions(+), 31 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java index cffbff5035d3f..8eae1b5bbe8d1 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -104,7 +104,8 @@ public static Collection parameters() { MigrationVersion.v1_10, MigrationVersion.v1_11, MigrationVersion.v1_12, - MigrationVersion.v1_13); + MigrationVersion.v1_13, + MigrationVersion.v1_14); } public FlinkKafkaConsumerBaseMigrationTest(MigrationVersion testMigrateVersion) { diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java index 1fa1bc6ac393a..433a6e96df17f 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java @@ -49,7 +49,8 @@ public static Collection parameters() { MigrationVersion.v1_10, MigrationVersion.v1_11, MigrationVersion.v1_12, - MigrationVersion.v1_13); + MigrationVersion.v1_13, + MigrationVersion.v1_14); } public FlinkKafkaProducerMigrationTest(MigrationVersion testMigrateVersion) { diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java index 7a8dca4a25152..09290566c621f 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java @@ -107,7 +107,8 @@ public static Collection parameters() { MigrationVersion.v1_10, MigrationVersion.v1_11, MigrationVersion.v1_12, - MigrationVersion.v1_13); + MigrationVersion.v1_13, + MigrationVersion.v1_14); } public FlinkKinesisConsumerMigrationTest(MigrationVersion testMigrateVersion) { diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java index 9751e2df8294d..a69d15403039b 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java @@ -54,7 +54,7 @@ public abstract class TypeSerializerUpgradeTestBase testSpecification; diff --git a/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java index d0243821e26f8..ec5d03121f6aa 100644 --- a/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java +++ b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java @@ -45,7 +45,8 @@ public enum MigrationVersion { v1_10("1.10"), v1_11("1.11"), v1_12("1.12"), - v1_13("1.13"); + v1_13("1.13"), + v1_14("1.14"); private final String versionStr; diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java index f7aba18ce262d..a1da4a60a4910 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java @@ -80,7 +80,8 @@ public static Collection parameters() { MigrationVersion.v1_10, MigrationVersion.v1_11, MigrationVersion.v1_12, - MigrationVersion.v1_13); + MigrationVersion.v1_13, + MigrationVersion.v1_14); } public CEPMigrationTest(MigrationVersion migrateVersion) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java index 1b1293fe74dac..88a612ebaa205 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java @@ -96,7 +96,8 @@ public static Collection parameters() { MigrationVersion.v1_10, MigrationVersion.v1_11, MigrationVersion.v1_12, - MigrationVersion.v1_13); + MigrationVersion.v1_13, + MigrationVersion.v1_14); } private static final TypeInformation> STRING_INT_TUPLE = diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerUpgradeTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerUpgradeTest.java index ef34e4a178064..f683d4eb82db0 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerUpgradeTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerUpgradeTest.java @@ -24,14 +24,15 @@ import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.testutils.migration.MigrationVersion; +import org.apache.flink.util.FlinkRuntimeException; import org.hamcrest.Matcher; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.util.Collection; -import java.util.Collections; import java.util.LinkedList; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.is; @@ -47,12 +48,20 @@ public LinkedListSerializerUpgradeTest( @Parameterized.Parameters(name = "Test Specification = {0}") public static Collection> testSpecifications() throws Exception { - return Collections.singletonList( - new TestSpecification<>( - "linked-list-serializer", - MigrationVersion.v1_13, - LinkedListSerializerSetup.class, - LinkedListSerializerVerifier.class)); + return MigrationVersion.v1_13.orHigher().stream() + .map( + version -> { + try { + return new TestSpecification<>( + "linked-list-serializer", + version, + LinkedListSerializerSetup.class, + LinkedListSerializerVerifier.class); + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + }) + .collect(Collectors.toList()); } public static TypeSerializer> createLinkedListSerializer() { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java index c115be2353eb2..135e9e5eda85f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java @@ -97,7 +97,9 @@ public static Collection> parameters() { Tuple2.of(MigrationVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), Tuple2.of(MigrationVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); } private final MigrationVersion testMigrateVersion; diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java index 78612694e8076..794f40e3e41ae 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java @@ -81,7 +81,9 @@ public static Collection> parameters() { Tuple2.of(MigrationVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), Tuple2.of(MigrationVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); } private final MigrationVersion testMigrateVersion; diff --git a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java index f4a7a2d8ca71a..3f654f050223c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java @@ -99,7 +99,9 @@ public static Collection> parameters() { Tuple2.of(MigrationVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), Tuple2.of(MigrationVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); } private final MigrationVersion testMigrateVersion; diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java index 2b0b290e6b8b6..6eab80fe0f247 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java @@ -50,7 +50,8 @@ public static Collection parameters() { MigrationVersion.v1_10, MigrationVersion.v1_11, MigrationVersion.v1_12, - MigrationVersion.v1_13); + MigrationVersion.v1_13, + MigrationVersion.v1_14); } public AbstractKeyedOperatorRestoreTestBase(MigrationVersion migrationVersion) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java index fdfa1afb4688b..9acaca4c982d7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java @@ -57,7 +57,8 @@ public static Collection parameters() { MigrationVersion.v1_10, MigrationVersion.v1_11, MigrationVersion.v1_12, - MigrationVersion.v1_13); + MigrationVersion.v1_13, + MigrationVersion.v1_14); } protected AbstractNonKeyedOperatorRestoreTestBase(MigrationVersion migrationVersion) { diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala index 916a3193d81c7..30bc48a80263a 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala @@ -25,6 +25,8 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor} import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.configuration.Configuration +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend import org.apache.flink.runtime.state.memory.MemoryStateBackend import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction @@ -38,7 +40,6 @@ import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.testutils.migration.MigrationVersion import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -70,15 +71,17 @@ object StatefulJobSavepointMigrationITCase { (MigrationVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), (MigrationVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (MigrationVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)) + (MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), + (MigrationVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME), + (MigrationVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)) } // TODO to generate savepoints for a specific Flink version / backend type, // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB, // TODO set as (MigrationVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME) // TODO Note: You should generate the savepoint based on the release branch instead of the master. - val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_9 - val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME + val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_14 + val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.HASHMAP_STATE_BACKEND_NAME val NUM_ELEMENTS = 4 } @@ -99,9 +102,11 @@ class StatefulJobSavepointMigrationITCase( StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match { case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME => - env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())) + env.setStateBackend(new EmbeddedRocksDBStateBackend()) case StateBackendLoader.MEMORY_STATE_BACKEND_NAME => env.setStateBackend(new MemoryStateBackend()) + case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME => + env.setStateBackend(new HashMapStateBackend()) case _ => throw new UnsupportedOperationException } @@ -140,9 +145,11 @@ class StatefulJobSavepointMigrationITCase( migrationVersionAndBackend._2 match { case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME => - env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())) + env.setStateBackend(new EmbeddedRocksDBStateBackend()) case StateBackendLoader.MEMORY_STATE_BACKEND_NAME => env.setStateBackend(new MemoryStateBackend()) + case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME => + env.setStateBackend(new HashMapStateBackend()) case _ => throw new UnsupportedOperationException } diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala index 1aba6583d699c..1ea1df196f74c 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala @@ -29,7 +29,8 @@ import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum import org.apache.flink.configuration.Configuration -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend import org.apache.flink.runtime.state.memory.MemoryStateBackend import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader} import org.apache.flink.streaming.api.TimeCharacteristic @@ -70,15 +71,17 @@ object StatefulJobWBroadcastStateMigrationITCase { (MigrationVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), (MigrationVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (MigrationVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)) + (MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), + (MigrationVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME), + (MigrationVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)) } // TODO to generate savepoints for a specific Flink version / backend type, // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB, // TODO set as (MigrationVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME) // TODO Note: You should generate the savepoint based on the release branch instead of the master. - val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_9 - val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME + val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_14 + val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.HASHMAP_STATE_BACKEND_NAME val NUM_ELEMENTS = 4 } @@ -99,9 +102,11 @@ class StatefulJobWBroadcastStateMigrationITCase( StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match { case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME => - env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())) + env.setStateBackend(new EmbeddedRocksDBStateBackend()) case StateBackendLoader.MEMORY_STATE_BACKEND_NAME => env.setStateBackend(new MemoryStateBackend()) + case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME => + env.setStateBackend(new HashMapStateBackend()) case _ => throw new UnsupportedOperationException } @@ -166,9 +171,11 @@ class StatefulJobWBroadcastStateMigrationITCase( migrationVersionAndBackend._2 match { case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME => - env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())) + env.setStateBackend(new EmbeddedRocksDBStateBackend()) case StateBackendLoader.MEMORY_STATE_BACKEND_NAME => env.setStateBackend(new MemoryStateBackend()) + case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME => + env.setStateBackend(new HashMapStateBackend()) case _ => throw new UnsupportedOperationException }