Skip to content

Commit

Permalink
[FLINK-24417] Update MigrationVersion with 1.14
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Oct 11, 2021
1 parent 6243723 commit 2dff05d
Show file tree
Hide file tree
Showing 15 changed files with 68 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public static Collection<MigrationVersion> parameters() {
MigrationVersion.v1_10,
MigrationVersion.v1_11,
MigrationVersion.v1_12,
MigrationVersion.v1_13);
MigrationVersion.v1_13,
MigrationVersion.v1_14);
}

public FlinkKafkaConsumerBaseMigrationTest(MigrationVersion testMigrateVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public static Collection<MigrationVersion> parameters() {
MigrationVersion.v1_10,
MigrationVersion.v1_11,
MigrationVersion.v1_12,
MigrationVersion.v1_13);
MigrationVersion.v1_13,
MigrationVersion.v1_14);
}

public FlinkKafkaProducerMigrationTest(MigrationVersion testMigrateVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public static Collection<MigrationVersion> parameters() {
MigrationVersion.v1_10,
MigrationVersion.v1_11,
MigrationVersion.v1_12,
MigrationVersion.v1_13);
MigrationVersion.v1_13,
MigrationVersion.v1_14);
}

public FlinkKinesisConsumerMigrationTest(MigrationVersion testMigrateVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
public static final MigrationVersion[] MIGRATION_VERSIONS =
MigrationVersion.v1_11.orHigher().toArray(new MigrationVersion[0]);

public static final MigrationVersion CURRENT_VERSION = MigrationVersion.v1_13;
public static final MigrationVersion CURRENT_VERSION = MigrationVersion.v1_14;

private final TestSpecification<PreviousElementT, UpgradedElementT> testSpecification;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public enum MigrationVersion {
v1_10("1.10"),
v1_11("1.11"),
v1_12("1.12"),
v1_13("1.13");
v1_13("1.13"),
v1_14("1.14");

private final String versionStr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public static Collection<MigrationVersion> parameters() {
MigrationVersion.v1_10,
MigrationVersion.v1_11,
MigrationVersion.v1_12,
MigrationVersion.v1_13);
MigrationVersion.v1_13,
MigrationVersion.v1_14);
}

public CEPMigrationTest(MigrationVersion migrateVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public static Collection<MigrationVersion> parameters() {
MigrationVersion.v1_10,
MigrationVersion.v1_11,
MigrationVersion.v1_12,
MigrationVersion.v1_13);
MigrationVersion.v1_13,
MigrationVersion.v1_14);
}

private static final TypeInformation<Tuple2<String, Integer>> STRING_INT_TUPLE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.FlinkRuntimeException;

import org.hamcrest.Matcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.is;

Expand All @@ -47,12 +48,20 @@ public LinkedListSerializerUpgradeTest(

@Parameterized.Parameters(name = "Test Specification = {0}")
public static Collection<TestSpecification<?, ?>> testSpecifications() throws Exception {
return Collections.singletonList(
new TestSpecification<>(
"linked-list-serializer",
MigrationVersion.v1_13,
LinkedListSerializerSetup.class,
LinkedListSerializerVerifier.class));
return MigrationVersion.v1_13.orHigher().stream()
.map(
version -> {
try {
return new TestSpecification<>(
"linked-list-serializer",
version,
LinkedListSerializerSetup.class,
LinkedListSerializerVerifier.class);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
})
.collect(Collectors.toList());
}

public static TypeSerializer<LinkedList<Long>> createLinkedListSerializer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ public static Collection<Tuple2<MigrationVersion, String>> parameters() {
Tuple2.of(MigrationVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
}

private final MigrationVersion testMigrateVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public static Collection<Tuple2<MigrationVersion, String>> parameters() {
Tuple2.of(MigrationVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
}

private final MigrationVersion testMigrateVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ public static Collection<Tuple2<MigrationVersion, String>> parameters() {
Tuple2.of(MigrationVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
Tuple2.of(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
}

private final MigrationVersion testMigrateVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public static Collection<MigrationVersion> parameters() {
MigrationVersion.v1_10,
MigrationVersion.v1_11,
MigrationVersion.v1_12,
MigrationVersion.v1_13);
MigrationVersion.v1_13,
MigrationVersion.v1_14);
}

public AbstractKeyedOperatorRestoreTestBase(MigrationVersion migrationVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public static Collection<MigrationVersion> parameters() {
MigrationVersion.v1_10,
MigrationVersion.v1_11,
MigrationVersion.v1_12,
MigrationVersion.v1_13);
MigrationVersion.v1_13,
MigrationVersion.v1_14);
}

protected AbstractNonKeyedOperatorRestoreTestBase(MigrationVersion migrationVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
Expand All @@ -38,7 +40,6 @@ import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.testutils.migration.MigrationVersion
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
Expand Down Expand Up @@ -70,15 +71,17 @@ object StatefulJobSavepointMigrationITCase {
(MigrationVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
(MigrationVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(MigrationVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME))
(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(MigrationVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
(MigrationVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME))
}

// TODO to generate savepoints for a specific Flink version / backend type,
// TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB,
// TODO set as (MigrationVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)
// TODO Note: You should generate the savepoint based on the release branch instead of the master.
val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_9
val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME
val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_14
val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.HASHMAP_STATE_BACKEND_NAME

val NUM_ELEMENTS = 4
}
Expand All @@ -99,9 +102,11 @@ class StatefulJobSavepointMigrationITCase(

StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match {
case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()))
env.setStateBackend(new EmbeddedRocksDBStateBackend())
case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
env.setStateBackend(new MemoryStateBackend())
case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME =>
env.setStateBackend(new HashMapStateBackend())
case _ => throw new UnsupportedOperationException
}

Expand Down Expand Up @@ -140,9 +145,11 @@ class StatefulJobSavepointMigrationITCase(

migrationVersionAndBackend._2 match {
case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()))
env.setStateBackend(new EmbeddedRocksDBStateBackend())
case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
env.setStateBackend(new MemoryStateBackend())
case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME =>
env.setStateBackend(new HashMapStateBackend())
case _ => throw new UnsupportedOperationException
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader}
import org.apache.flink.streaming.api.TimeCharacteristic
Expand Down Expand Up @@ -70,15 +71,17 @@ object StatefulJobWBroadcastStateMigrationITCase {
(MigrationVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
(MigrationVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(MigrationVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME))
(MigrationVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(MigrationVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
(MigrationVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME))
}

// TODO to generate savepoints for a specific Flink version / backend type,
// TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB,
// TODO set as (MigrationVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)
// TODO Note: You should generate the savepoint based on the release branch instead of the master.
val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_9
val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME
val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_14
val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.HASHMAP_STATE_BACKEND_NAME

val NUM_ELEMENTS = 4
}
Expand All @@ -99,9 +102,11 @@ class StatefulJobWBroadcastStateMigrationITCase(

StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match {
case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()))
env.setStateBackend(new EmbeddedRocksDBStateBackend())
case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
env.setStateBackend(new MemoryStateBackend())
case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME =>
env.setStateBackend(new HashMapStateBackend())
case _ => throw new UnsupportedOperationException
}

Expand Down Expand Up @@ -166,9 +171,11 @@ class StatefulJobWBroadcastStateMigrationITCase(

migrationVersionAndBackend._2 match {
case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()))
env.setStateBackend(new EmbeddedRocksDBStateBackend())
case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
env.setStateBackend(new MemoryStateBackend())
case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME =>
env.setStateBackend(new HashMapStateBackend())
case _ => throw new UnsupportedOperationException
}

Expand Down

0 comments on commit 2dff05d

Please sign in to comment.