Skip to content

Commit

Permalink
[FLINK-8472] [tests] Extend AbstractOperatorRestoreTestBases for Flin…
Browse files Browse the repository at this point in the history
…k 1.4

This closes apache#5364.
  • Loading branch information
tzulitai committed Feb 6, 2018
1 parent a12aca3 commit 130ca4e
Show file tree
Hide file tree
Showing 13 changed files with 43 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.migration.MigrationVersion;
import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
import org.apache.flink.test.state.operator.restore.ExecutionMode;

Expand All @@ -36,17 +37,15 @@
@RunWith(Parameterized.class)
public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {

private final String savepointPath;
private final MigrationVersion migrationVersion;

@Parameterized.Parameters(name = "Migrate Savepoint: {0}")
public static Collection<String> parameters () {
return Arrays.asList(
"complexKeyed-flink1.2",
"complexKeyed-flink1.3");
public static Collection<MigrationVersion> parameters () {
return Arrays.asList(MigrationVersion.v1_2, MigrationVersion.v1_3, MigrationVersion.v1_4);
}

public AbstractKeyedOperatorRestoreTestBase(String savepointPath) {
this.savepointPath = savepointPath;
public AbstractKeyedOperatorRestoreTestBase(MigrationVersion migrationVersion) {
this.migrationVersion = migrationVersion;
}

@Override
Expand All @@ -65,6 +64,6 @@ public void createMigrationJob(StreamExecutionEnvironment env) {

@Override
protected String getMigrationSavepointName() {
return savepointPath;
return "complexKeyed-flink" + migrationVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.migration.MigrationVersion;
import org.apache.flink.test.state.operator.restore.ExecutionMode;

/**
* Test state restoration for a keyed operator restore tests.
*/
public class KeyedComplexChainTest extends AbstractKeyedOperatorRestoreTestBase {

public KeyedComplexChainTest(String savepointPath) {
super(savepointPath);
public KeyedComplexChainTest(MigrationVersion migrationVersion) {
super(migrationVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,17 @@ public static SingleOutputStreamOperator<Integer> createWindowFunction(Execution
public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
SingleOutputStreamOperator<Integer> map = input
.map(new StatefulStringStoringMap(mode, "first"))
.setParallelism(4);

if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
map.uid("first");
}
.setParallelism(4)
.uid("first");

return map;
}

public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
SingleOutputStreamOperator<Integer> map = input
.map(new StatefulStringStoringMap(mode, "second"))
.setParallelism(4);

if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
map.uid("second");
}
.setParallelism(4)
.uid("second");

return map;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.migration.MigrationVersion;
import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
import org.apache.flink.test.state.operator.restore.ExecutionMode;

Expand All @@ -42,22 +43,20 @@
@RunWith(Parameterized.class)
public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {

private final String savepointPath;
private final MigrationVersion migrationVersion;

@Parameterized.Parameters(name = "Migrate Savepoint: {0}")
public static Collection<String> parameters () {
return Arrays.asList(
"nonKeyed-flink1.2",
"nonKeyed-flink1.3");
public static Collection<MigrationVersion> parameters () {
return Arrays.asList(MigrationVersion.v1_2, MigrationVersion.v1_3, MigrationVersion.v1_4);
}

protected AbstractNonKeyedOperatorRestoreTestBase(String savepointPath) {
this.savepointPath = savepointPath;
protected AbstractNonKeyedOperatorRestoreTestBase(MigrationVersion migrationVersion) {
this.migrationVersion = migrationVersion;
}

protected AbstractNonKeyedOperatorRestoreTestBase(String savepointPath, boolean allowNonRestoredState) {
protected AbstractNonKeyedOperatorRestoreTestBase(MigrationVersion migrationVersion, boolean allowNonRestoredState) {
super(allowNonRestoredState);
this.savepointPath = savepointPath;
this.migrationVersion = migrationVersion;
}

@Override
Expand All @@ -80,6 +79,6 @@ public void createMigrationJob(StreamExecutionEnvironment env) {

@Override
protected String getMigrationSavepointName() {
return savepointPath;
return "nonKeyed-flink" + migrationVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.migration.MigrationVersion;
import org.apache.flink.test.state.operator.restore.ExecutionMode;

import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
Expand All @@ -34,8 +35,8 @@
*/
public class ChainBreakTest extends AbstractNonKeyedOperatorRestoreTestBase {

public ChainBreakTest(String savepointPath) {
super(savepointPath);
public ChainBreakTest(MigrationVersion migrationVersion) {
super(migrationVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.migration.MigrationVersion;
import org.apache.flink.test.state.operator.restore.ExecutionMode;

import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
Expand All @@ -33,8 +34,8 @@
*/
public class ChainLengthDecreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {

public ChainLengthDecreaseTest(String savepointPath) {
super(savepointPath);
public ChainLengthDecreaseTest(MigrationVersion migrationVersion) {
super(migrationVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.migration.MigrationVersion;
import org.apache.flink.test.state.operator.restore.ExecutionMode;

import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
Expand All @@ -34,8 +35,8 @@
*/
public class ChainLengthIncreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {

public ChainLengthIncreaseTest(String savepointPath) {
super(savepointPath);
public ChainLengthIncreaseTest(MigrationVersion migrationVersion) {
super(migrationVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.migration.MigrationVersion;
import org.apache.flink.test.state.operator.restore.ExecutionMode;

import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
Expand All @@ -36,8 +37,8 @@
*/
public class ChainLengthStatelessDecreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {

public ChainLengthStatelessDecreaseTest(String savepointPath) {
super(savepointPath, false);
public ChainLengthStatelessDecreaseTest(MigrationVersion migrationVersion) {
super(migrationVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.migration.MigrationVersion;
import org.apache.flink.test.state.operator.restore.ExecutionMode;

import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
Expand All @@ -34,8 +35,8 @@
*/
public class ChainOrderTest extends AbstractNonKeyedOperatorRestoreTestBase {

public ChainOrderTest(String savepointPath) {
super(savepointPath);
public ChainOrderTest(MigrationVersion migrationVersion) {
super(migrationVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.migration.MigrationVersion;
import org.apache.flink.test.state.operator.restore.ExecutionMode;

import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
Expand All @@ -34,8 +35,8 @@
*/
public class ChainUnionTest extends AbstractNonKeyedOperatorRestoreTestBase {

public ChainUnionTest(String savepointPath) {
super(savepointPath);
public ChainUnionTest(MigrationVersion migrationVersion) {
super(migrationVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,8 @@ public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(Execut
public static SingleOutputStreamOperator<Integer> createThirdStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
SingleOutputStreamOperator<Integer> map = input
.map(new StatefulStringStoringMap(mode, "third"))
.setParallelism(4);

// we cannot set the uid on a chained operator in 1.2
if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
map.uid("third");
}
.setParallelism(4)
.uid("third");

return map;
}
Expand Down
Binary file not shown.
Binary file not shown.

0 comments on commit 130ca4e

Please sign in to comment.