Skip to content

Commit

Permalink
[FLINK-17339][table-planner-blink] Simplify test cases due to default…
Browse files Browse the repository at this point in the history
… planner changing.
  • Loading branch information
KurtYoung committed Apr 26, 2020
1 parent 2421f94 commit cb523d3
Show file tree
Hide file tree
Showing 21 changed files with 32 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ public class EnvironmentTest {
@Test
public void testPassingExecutionParameters() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.getConfig().addConfiguration(
new Configuration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void before() {
env.setParallelism(4);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
tEnv = StreamTableEnvironment.create(env, settings);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class CatalogConstraintTest {

@Before
public void setup() {
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
tEnv = TableEnvironment.create(settings);
catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).orElse(null);
assertNotNull(catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testCreateCatalog() {
}

private TableEnvironment getTableEnvironment() {
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
return StreamTableEnvironment.create(env, settings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class CatalogStatisticsTest {

@Before
public void setup() {
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
tEnv = TableEnvironment.create(settings);
catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).orElse(null);
assertNotNull(catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
var tEnv: TableEnvironment = _

private val settings = if (isStreaming) {
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
EnvironmentSettings.newInstance().inStreamingMode().build()
} else {
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
EnvironmentSettings.newInstance().inBatchMode().build()
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class TableEnvironmentTest {
@Test
def testStreamTableEnvironmentExplain(): Unit = {
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(execEnv, settings)

tEnv.registerTableSource("MyTable", TestTableSourceSinks.getPersonCsvTableSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
//~ Instance fields --------------------------------------------------------

private val settings = if (isStreamingMode) {
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
EnvironmentSettings.newInstance().inStreamingMode().build()
} else {
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
EnvironmentSettings.newInstance().inBatchMode().build()
}

private val tableEnv: TableEnvironment = TableEnvironmentImpl.create(settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.junit.Assert.assertEquals
class CatalogTableTest {

val tEnv: TableEnvironment = TableEnvironmentImpl.create(
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build())
EnvironmentSettings.newInstance().inStreamingMode().build())

@Test
def testDDLSchema(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ class CatalogViewITCase(isStreamingMode: Boolean) extends AbstractTestBase {
//~ Instance fields --------------------------------------------------------

private val settings = if (isStreamingMode) {
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
EnvironmentSettings.newInstance().inStreamingMode().build()
} else {
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
EnvironmentSettings.newInstance().inBatchMode().build()
}

private val tableEnv: TableEnvironment = TableEnvironmentImpl.create(settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ abstract class AggTestBase(isBatchMode: Boolean) {
val typeFactory: FlinkTypeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
val env = new ScalaStreamExecEnv(new LocalStreamEnvironment)
private val tEnv = if (isBatchMode) {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val settings = EnvironmentSettings.newInstance().inBatchMode().build()
// use impl class instead of interface class to avoid
// "Static methods in interface require -target:jvm-1.8"
TableEnvironmentImpl.create(settings)
} else {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
StreamTableEnvironment.create(env, settings)
}
private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ abstract class ExpressionTestBase {
// (originalExpr, optimizedExpr, expectedResult)
private val testExprs = mutable.ArrayBuffer[(String, RexNode, String)]()
private val env = StreamExecutionEnvironment.createLocalEnvironment(4)
private val setting = EnvironmentSettings.newInstance()
.useBlinkPlanner().inStreamingMode().build()
private val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
// use impl class instead of interface class to avoid
// "Static methods in interface require -target:jvm-1.8"
private val tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class JoinValidationTest extends TableTestBase {

@Test(expected = classOf[ValidationException])
def testJoinTablesFromDifferentEnvs(): Unit = {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val settings = EnvironmentSettings.newInstance().inBatchMode().build()
val tEnv1 = TableEnvironmentImpl.create(settings)
val tEnv2 = TableEnvironmentImpl.create(settings)
val ds1 = CollectionBatchExecTable.getSmall3TupleDataSet(tEnv1, "a, b, c")
Expand All @@ -108,7 +108,7 @@ class JoinValidationTest extends TableTestBase {

@Test(expected = classOf[ValidationException])
def testJoinTablesFromDifferentEnvsJava() {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val settings = EnvironmentSettings.newInstance().inBatchMode().build()
val tEnv1 = TableEnvironmentImpl.create(settings)
val tEnv2 = TableEnvironmentImpl.create(settings)
val ds1 = CollectionBatchExecTable.getSmall3TupleDataSet(tEnv1, "a, b, c")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class SetOperatorsValidationTest extends TableTestBase {

@Test(expected = classOf[ValidationException])
def testUnionTablesFromDifferentEnvs(): Unit = {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val settings = EnvironmentSettings.newInstance().inBatchMode().build()
val tEnv1 = TableEnvironmentImpl.create(settings)
val tEnv2 = TableEnvironmentImpl.create(settings)

Expand All @@ -76,7 +76,7 @@ class SetOperatorsValidationTest extends TableTestBase {

@Test(expected = classOf[ValidationException])
def testMinusAllTablesFromDifferentEnvs(): Unit = {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val settings = EnvironmentSettings.newInstance().inBatchMode().build()
val tEnv1 = TableEnvironmentImpl.create(settings)
val tEnv2 = TableEnvironmentImpl.create(settings)

Expand All @@ -100,7 +100,7 @@ class SetOperatorsValidationTest extends TableTestBase {

@Test(expected = classOf[ValidationException])
def testIntersectTablesFromDifferentEnvs(): Unit = {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val settings = EnvironmentSettings.newInstance().inBatchMode().build()
val tEnv1 = TableEnvironmentImpl.create(settings)
val tEnv2 = TableEnvironmentImpl.create(settings)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class FlinkRelOptUtilTest {

@Before
def before(): Unit = {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().build()
val settings = EnvironmentSettings.newInstance().build()
val tEnv = TableEnvironmentImpl.create(settings)
BatchTableEnvUtil.registerCollection(
tEnv,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class GroupAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(
@Before
override def before(): Unit = {
super.before()
val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
val config = new TestTableConfig
this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode
@Before
override def before(): Unit = {
super.before()
val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
val config = new TestTableConfig
this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(
@Before
override def before(): Unit = {
super.before()
val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
val config = new TestTableConfig
this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import scala.util.Sorting

class BatchTestBase extends BatchAbstractTestBase {

private val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
private val settings = EnvironmentSettings.newInstance().inBatchMode().build()
private val testingTableEnv: TestingTableEnvironment = TestingTableEnvironment
.create(settings, catalogManager = None, new TableConfig)
val tEnv: TableEnvironment = testingTableEnv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class StreamingTestBase extends AbstractTestBase {
if (enableObjectReuse) {
this.env.getConfig.enableObjectReuse()
}
val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
this.tEnv = StreamTableEnvironment.create(env, setting)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
protected lazy val diffRepository: DiffRepository = DiffRepository.lookup(test.getClass)

protected val setting: EnvironmentSettings = if (isStreamingMode) {
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
EnvironmentSettings.newInstance().inStreamingMode().build()
} else {
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
EnvironmentSettings.newInstance().inBatchMode().build()
}

// a counter for unique table names
Expand Down Expand Up @@ -1117,10 +1117,9 @@ object TestingTableEnvironment {

object TableTestUtil {

val STREAM_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner().inStreamingMode().build()
val BATCH_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner().inBatchMode().build()
val STREAM_SETTING: EnvironmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().build()
val BATCH_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance().inBatchMode().build()

/**
* Converts operation tree in the given table to a RelNode tree.
Expand Down

0 comments on commit cb523d3

Please sign in to comment.