Skip to content

Commit

Permalink
[FLINK-26463][table-planner] Use MiniCluster for TableEnvironmentITCa…
Browse files Browse the repository at this point in the history
…se and moved some tests to TableEnvironmentTest

This closes apache#18968.
  • Loading branch information
slinkydeveloper authored and twalthr committed Mar 7, 2022
1 parent 8746017 commit 898f2d3
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,25 @@

package org.apache.flink.table.api

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.Types.STRING
import org.apache.flink.api.scala._
import org.apache.flink.configuration.ExecutionOptions
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecutionEnvironment}
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment => ScalaStreamTableEnvironment, _}
import org.apache.flink.table.api.config.TableConfigOptions
import org.apache.flink.table.api.internal.{TableEnvironmentImpl, TableEnvironmentInternal}
import org.apache.flink.table.catalog._
import org.apache.flink.table.functions.TestGenericUDF
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink
import org.apache.flink.table.planner.utils.TableTestUtil.{readFromResource, replaceStageId}
import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks, TestTableSourceWithTime}
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.{Row, RowKind}
import org.apache.flink.util.{CollectionUtil, FileUtils, TestLogger}
import org.apache.flink.util.{CollectionUtil, FileUtils}

import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.rules.{ExpectedException, TemporaryFolder}
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{Assert, Before, Rule, Test}
Expand All @@ -47,21 +45,14 @@ import java.io.{File, FileFilter}
import java.lang.{Long => JLong}
import java.util

import scala.annotation.meta.getter
import scala.collection.mutable

@RunWith(classOf[Parameterized])
class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends TestLogger {
class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends AbstractTestBase {

// used for accurate exception information checking.
val expectedException: ExpectedException = ExpectedException.none()

@Rule
def thrown: ExpectedException = expectedException

private val _tempFolder = new TemporaryFolder()

@Rule
def tempFolder: TemporaryFolder = _tempFolder
@(Rule @getter)
val tempFolder: TemporaryFolder = new TemporaryFolder()

var tEnv: TableEnvironment = _

Expand All @@ -84,26 +75,6 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
TestTableSourceSinks.createPersonCsvTemporaryTable(tEnv, "MyTable")
}

@Test
def testSetExecutionMode(): Unit = {
if (isStreaming) {
tEnv.getConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
} else {
tEnv.getConfig.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.STREAMING)
}

thrown.expect(classOf[IllegalArgumentException])
thrown.expectMessage(
"Mismatch between configured runtime mode and actual runtime mode. " +
"Currently, the 'execution.runtime-mode' can only be set when instantiating the " +
"table environment. Subsequent changes are not supported. " +
"Please instantiate a new TableEnvironment if necessary."
)

tEnv.explainSql("select first from MyTable")
}

@Test
def testExecuteTwiceUsingSameTableEnv(): Unit = {
val sink1Path = TestTableSourceSinks.createCsvTemporarySinkTable(
Expand Down Expand Up @@ -158,7 +129,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
return
}

val sinkPath = _tempFolder.newFolder().toString
val sinkPath = tempFolder.newFolder().toString
tEnv.executeSql(
s"""
|create table MySink (
Expand Down Expand Up @@ -335,7 +306,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
// Streaming mode not support overwrite for FileSystemTableSink.
return
}
val sinkPath = _tempFolder.newFolder().toString
val sinkPath = tempFolder.newFolder().toString
tEnv.executeSql(
s"""
|create table MySink (
Expand All @@ -359,7 +330,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
@Test
def testTableDMLSync(): Unit = {
tEnv.getConfig.set(TableConfigOptions.TABLE_DML_SYNC, Boolean.box(true))
val sink1Path = _tempFolder.newFolder().toString
val sink1Path = tempFolder.newFolder().toString
tEnv.executeSql(
s"""
|create table MySink1 (
Expand All @@ -373,7 +344,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
""".stripMargin
)

val sink2Path = _tempFolder.newFolder().toString
val sink2Path = tempFolder.newFolder().toString
tEnv.executeSql(
s"""
|create table MySink2 (
Expand All @@ -386,7 +357,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
""".stripMargin
)

val sink3Path = _tempFolder.newFolder().toString
val sink3Path = tempFolder.newFolder().toString
tEnv.executeSql(
s"""
|create table MySink3 (
Expand Down Expand Up @@ -477,7 +448,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
// Streaming mode not support overwrite for FileSystemTableSink.
return
}
val sink1Path = _tempFolder.newFolder().toString
val sink1Path = tempFolder.newFolder().toString
tEnv.executeSql(
s"""
|create table MySink1 (
Expand All @@ -490,7 +461,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
""".stripMargin
)

val sink2Path = _tempFolder.newFolder().toString
val sink2Path = tempFolder.newFolder().toString
tEnv.executeSql(
s"""
|create table MySink2 (
Expand Down Expand Up @@ -536,7 +507,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
// Streaming mode not support overwrite for FileSystemTableSink.
return
}
val sinkPath = _tempFolder.newFolder().toString
val sinkPath = tempFolder.newFolder().toString
tEnv.executeSql(
s"""
|create table MySink (
Expand Down Expand Up @@ -680,77 +651,6 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
tableEnv.executeSql("insert into dest2 select x from src").await()
}

@Test
def testTemporaryOperationListener(): Unit = {
val listener = new ListenerCatalog("listener_cat")
val currentCat = tEnv.getCurrentCatalog
tEnv.registerCatalog(listener.getName, listener)
// test temporary table
tEnv.executeSql("create temporary table tbl1 (x int)")
assertEquals(0, listener.numTempTable)
tEnv.executeSql(s"create temporary table ${listener.getName}.`default`.tbl1 (x int)")
assertEquals(1, listener.numTempTable)
val tableResult = tEnv.asInstanceOf[TableEnvironmentInternal].getCatalogManager
.getTable(ObjectIdentifier.of(listener.getName, "default", "tbl1"))
assertTrue(tableResult.isPresent)
assertEquals(listener.tableComment, tableResult.get().getTable[CatalogBaseTable].getComment)
tEnv.executeSql("drop temporary table tbl1")
assertEquals(1, listener.numTempTable)
tEnv.executeSql(s"drop temporary table ${listener.getName}.`default`.tbl1")
assertEquals(0, listener.numTempTable)
tEnv.useCatalog(listener.getName)
tEnv.executeSql("create temporary table tbl1 (x int)")
assertEquals(1, listener.numTempTable)
tEnv.executeSql("drop temporary table tbl1")
assertEquals(0, listener.numTempTable)
tEnv.useCatalog(currentCat)

// test temporary view
tEnv.executeSql("create temporary view v1 as select 1")
assertEquals(0, listener.numTempTable)
tEnv.executeSql(s"create temporary view ${listener.getName}.`default`.v1 as select 1")
assertEquals(1, listener.numTempTable)
val viewResult = tEnv.asInstanceOf[TableEnvironmentInternal].getCatalogManager
.getTable(ObjectIdentifier.of(listener.getName, "default", "v1"))
assertTrue(viewResult.isPresent)
assertEquals(listener.tableComment, viewResult.get().getTable[CatalogBaseTable].getComment)
tEnv.executeSql("drop temporary view v1")
assertEquals(1, listener.numTempTable)
tEnv.executeSql(s"drop temporary view ${listener.getName}.`default`.v1")
assertEquals(0, listener.numTempTable)
tEnv.useCatalog(listener.getName)
tEnv.executeSql("create temporary view v1 as select 1")
assertEquals(1, listener.numTempTable)
tEnv.executeSql("drop temporary view v1")
assertEquals(0, listener.numTempTable)
tEnv.useCatalog(currentCat)

// test temporary function
val clzName = "foo.class.name"
try {
tEnv.executeSql(s"create temporary function func1 as '${clzName}'")
fail("Creating a temporary function with invalid class should fail")
} catch {
case _: Exception => //expected
}
assertEquals(0, listener.numTempFunc)
tEnv.executeSql(
s"create temporary function ${listener.getName}.`default`.func1 as '${clzName}'")
assertEquals(1, listener.numTempFunc)
tEnv.executeSql("drop temporary function if exists func1")
assertEquals(1, listener.numTempFunc)
tEnv.executeSql(s"drop temporary function ${listener.getName}.`default`.func1")
assertEquals(0, listener.numTempFunc)
tEnv.useCatalog(listener.getName)
tEnv.executeSql(s"create temporary function func1 as '${clzName}'")
assertEquals(1, listener.numTempFunc)
tEnv.executeSql("drop temporary function func1")
assertEquals(0, listener.numTempFunc)
tEnv.useCatalog(currentCat)

listener.close()
}

def getPersonData: List[(String, Int, Double, String)] = {
val data = new mutable.MutableList[(String, Int, Double, String)]
data.+=(("Mike", 1, 12.3, "Smith"))
Expand Down Expand Up @@ -817,38 +717,6 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
}
}

class ListenerCatalog(name: String)
extends GenericInMemoryCatalog(name) with TemporaryOperationListener {

val tableComment: String = "listener_comment"
val funcClzName: String = classOf[TestGenericUDF].getName

var numTempTable = 0
var numTempFunc = 0

override def onCreateTemporaryTable(tablePath: ObjectPath, table: CatalogBaseTable)
: CatalogBaseTable = {
numTempTable += 1
if (table.isInstanceOf[CatalogTable]) {
new CatalogTableImpl(table.getSchema, table.getOptions, tableComment)
} else {
val view = table.asInstanceOf[CatalogView]
new CatalogViewImpl(view.getOriginalQuery, view.getExpandedQuery,
view.getSchema, view.getOptions, tableComment)
}
}

override def onDropTemporaryTable(tablePath: ObjectPath): Unit = numTempTable -= 1

override def onCreateTemporaryFunction(functionPath: ObjectPath, function: CatalogFunction)
: CatalogFunction = {
numTempFunc += 1
new CatalogFunctionImpl(funcClzName, function.getFunctionLanguage)
}

override def onDropTemporaryFunction(functionPath: ObjectPath): Unit = numTempFunc -= 1
}

}

object TableEnvironmentITCase {
Expand Down
Loading

0 comments on commit 898f2d3

Please sign in to comment.