Skip to content

Commit

Permalink
[FLINK-12969][table-planner-blink] Remove dependencies on RelNode fro…
Browse files Browse the repository at this point in the history
…m TableImpl in blink planner (apache#8866)

This closes apache#8866
  • Loading branch information
godfreyhe authored and KurtYoung committed Jun 25, 2019
1 parent ab0ef7b commit adb7aab
Show file tree
Hide file tree
Showing 16 changed files with 56 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.flink.table.sources._
import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
import org.apache.flink.table.util.PlanUtil
import org.apache.flink.util.InstantiationUtil

import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
import org.apache.calcite.rel.{RelCollationTraitDef, RelNode}
import org.apache.calcite.sql.SqlExplainLevel
Expand Down Expand Up @@ -201,7 +202,7 @@ abstract class BatchTableEnvironment(
* @param extended Flag to include detailed optimizer estimates.
*/
def explain(table: Table, extended: Boolean): String = {
val ast = table.asInstanceOf[TableImpl].getRelNode
val ast = getRelBuilder.queryOperation(table.getQueryOperation).build()
val execNodeDag = compileToExecNodePlan(ast)
val transformations = translateToPlan(execNodeDag)
val streamGraph = translateStreamGraph(transformations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ abstract class StreamTableEnvironment(
resultType: TypeInformation[T]): DataStream[T] = {
val sink = new DataStreamTableSink[T](table, resultType, updatesAsRetraction, withChangeFlag)
val sinkName = createUniqueTableName()
val sinkNode = LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName)
val input = getRelBuilder.queryOperation(table.getQueryOperation).build()
val sinkNode = LogicalSink.create(input, sink, sinkName)
val transformation = translateSink(sinkNode)
new DataStream(execEnv, transformation).asInstanceOf[DataStream[T]]
}
Expand Down Expand Up @@ -262,7 +263,7 @@ abstract class StreamTableEnvironment(
* @param extended Flag to include detailed optimizer estimates.
*/
def explain(table: Table, extended: Boolean): String = {
val ast = table.asInstanceOf[TableImpl].getRelNode
val ast = getRelBuilder.queryOperation(table.getQueryOperation).build()
val execNodeDag = compileToExecNodePlan(ast)
val transformations = translateToPlan(execNodeDag)
val streamGraph = translateStreamGraph(transformations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ abstract class TableEnvironment(
table: Table,
sink: TableSink[T],
sinkName: String = null): Unit = {
sinkNodes += LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName)
val input = getRelBuilder.queryOperation(table.getQueryOperation).build()
sinkNodes += LogicalSink.create(input, sink, sinkName)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.TemporalTableFunction
import org.apache.flink.table.operations.QueryOperation

import org.apache.calcite.rel.RelNode

/**
* The implementation of the [[Table]].
*
Expand All @@ -39,13 +37,6 @@ class TableImpl(val tableEnv: TableEnvironment, operationTree: QueryOperation) e

private lazy val tableSchema: TableSchema = operationTree.getTableSchema

/**
* Returns the Calcite RelNode represent this Table.
*/
def getRelNode: RelNode = {
tableEnv.getRelBuilder.queryOperation(operationTree).build()
}

override def getQueryOperation: QueryOperation = operationTree

override def getSchema: TableSchema = tableSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.table.util.TableTestUtil

import org.apache.calcite.plan.RelOptUtil
import org.junit.Assert.assertEquals
Expand Down Expand Up @@ -52,7 +53,8 @@ class TableEnvironmentTest {
val table = env.fromElements[(Int, Long, String, Boolean)]().toTable(tableEnv, 'a, 'b, 'c, 'd)
tableEnv.registerTable("MyTable", table)
val scanTable = tableEnv.scan("MyTable")
val actual = RelOptUtil.toString(scanTable.asInstanceOf[TableImpl].getRelNode)
val relNode = TableTestUtil.toRelNode(scanTable)
val actual = RelOptUtil.toString(relNode)
val expected = "LogicalTableScan(table=[[default_catalog, default_database, MyTable]])\n"
assertEquals(expected, actual)

Expand All @@ -68,7 +70,8 @@ class TableEnvironmentTest {
val table = env.fromElements[(Int, Long, String, Boolean)]().toTable(tableEnv, 'a, 'b, 'c, 'd)
tableEnv.registerTable("MyTable", table)
val queryTable = tableEnv.sqlQuery("SELECT a, c, d FROM MyTable")
val actual = RelOptUtil.toString(queryTable.asInstanceOf[TableImpl].getRelNode)
val relNode = TableTestUtil.toRelNode(queryTable)
val actual = RelOptUtil.toString(relNode)
val expected = "LogicalProject(a=[$0], c=[$2], d=[$3])\n" +
" LogicalTableScan(table=[[default_catalog, default_database, MyTable]])\n"
assertEquals(expected, actual)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
package org.apache.flink.table.plan.batch.sql.agg

import org.apache.flink.api.scala._
import org.apache.flink.table.api.TableImpl
import org.apache.flink.table.plan.util.FlinkRelOptUtil
import org.apache.flink.table.util.TableTestBase
import org.apache.flink.table.util.{TableTestBase, TableTestUtil}

import org.junit.Assert.assertEquals
import org.junit.Test
Expand Down Expand Up @@ -450,8 +449,8 @@ class GroupingSetsTest extends TableTestBase {
def verifyPlanIdentical(sql1: String, sql2: String): Unit = {
val table1 = util.tableEnv.sqlQuery(sql1)
val table2 = util.tableEnv.sqlQuery(sql2)
val optimized1 = util.tableEnv.optimize(table1.asInstanceOf[TableImpl].getRelNode)
val optimized2 = util.tableEnv.optimize(table2.asInstanceOf[TableImpl].getRelNode)
val optimized1 = util.tableEnv.optimize(TableTestUtil.toRelNode(table1))
val optimized2 = util.tableEnv.optimize(TableTestUtil.toRelNode(table2))
assertEquals(FlinkRelOptUtil.toString(optimized1), FlinkRelOptUtil.toString(optimized2))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
package org.apache.flink.table.plan.stream.sql

import org.apache.flink.api.scala._
import org.apache.flink.table.api.{TableConfigOptions, TableImpl}
import org.apache.flink.table.api.TableConfigOptions
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.plan.`trait`.RelModifiedMonotonicity
import org.apache.flink.table.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.plan.util.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
import org.apache.flink.table.util.{StreamTableTestUtil, TableTestBase}
import org.apache.flink.table.util.{StreamTableTestUtil, TableTestBase, TableTestUtil}

import org.apache.calcite.sql.validate.SqlMonotonicity.{CONSTANT, DECREASING, INCREASING, NOT_MONOTONIC}
import org.junit.Assert.assertEquals
Expand Down Expand Up @@ -247,7 +247,7 @@ class ModifiedMonotonicityTest extends TableTestBase {

def verifyMonotonicity(sql: String, expect: RelModifiedMonotonicity): Unit = {
val table = util.tableEnv.sqlQuery(sql)
val relNode = table.asInstanceOf[TableImpl].getRelNode
val relNode = TableTestUtil.toRelNode(table)
val optimized = util.tableEnv.optimize(relNode)

val actualMono = FlinkRelMetadataQuery.reuseOrCreate(optimized.getCluster.getMetadataQuery)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
package org.apache.flink.table.plan.stream.sql.agg

import org.apache.flink.api.scala._
import org.apache.flink.table.api.{TableException, TableImpl}
import org.apache.flink.table.plan.util.FlinkRelOptUtil
import org.apache.flink.table.util.TableTestBase
import org.apache.flink.table.util.{TableTestBase, TableTestUtil}

import org.junit.Assert.assertEquals
import org.junit.Test
Expand Down Expand Up @@ -450,8 +449,8 @@ class GroupingSetsTest extends TableTestBase {
def verifyPlanIdentical(sql1: String, sql2: String): Unit = {
val table1 = util.tableEnv.sqlQuery(sql1)
val table2 = util.tableEnv.sqlQuery(sql2)
val optimized1 = util.tableEnv.optimize(table1.asInstanceOf[TableImpl].getRelNode)
val optimized2 = util.tableEnv.optimize(table2.asInstanceOf[TableImpl].getRelNode)
val optimized1 = util.tableEnv.optimize(TableTestUtil.toRelNode(table1))
val optimized2 = util.tableEnv.optimize(TableTestUtil.toRelNode(table2))
assertEquals(FlinkRelOptUtil.toString(optimized1), FlinkRelOptUtil.toString(optimized2))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.flink.table.plan.stream.sql.agg

import org.apache.flink.api.scala._
import org.apache.flink.table.api.{TableException, TableImpl, ValidationException}
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.plan.util.FlinkRelOptUtil
import org.apache.flink.table.runtime.utils.JavaUserDefinedScalarFunctions.OverAgg0
import org.apache.flink.table.util.TableTestBase
import org.apache.flink.table.util.{TableTestBase, TableTestUtil}

import org.junit.Assert.assertEquals
import org.junit.Test
Expand All @@ -35,8 +35,8 @@ class OverAggregateTest extends TableTestBase {
def verifyPlanIdentical(sql1: String, sql2: String): Unit = {
val table1 = util.tableEnv.sqlQuery(sql1)
val table2 = util.tableEnv.sqlQuery(sql2)
val optimized1 = util.tableEnv.optimize(table1.asInstanceOf[TableImpl].getRelNode)
val optimized2 = util.tableEnv.optimize(table2.asInstanceOf[TableImpl].getRelNode)
val optimized1 = util.tableEnv.optimize(TableTestUtil.toRelNode(table1))
val optimized2 = util.tableEnv.optimize(TableTestUtil.toRelNode(table2))
assertEquals(FlinkRelOptUtil.toString(optimized1), FlinkRelOptUtil.toString(optimized2))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.flink.table.plan.stream.sql.join
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{TableException, TableImpl}
import org.apache.flink.table.plan.util.WindowJoinUtil
import org.apache.flink.table.util.{StreamTableTestUtil, TableTestBase}
import org.apache.flink.table.util.{StreamTableTestUtil, TableTestBase, TableTestUtil}

import org.apache.calcite.rel.logical.LogicalJoin
import org.junit.Assert.assertEquals
Expand Down Expand Up @@ -411,7 +411,7 @@ class WindowJoinTest extends TableTestBase {
""".stripMargin

val table = util.tableEnv.sqlQuery(query)
val relNode = table.asInstanceOf[TableImpl].getRelNode
val relNode = TableTestUtil.toRelNode(table)
val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
val rexNode = joinNode.getCondition
val (windowBounds, _) = WindowJoinUtil.extractWindowBoundsFromPredicate(
Expand All @@ -432,7 +432,7 @@ class WindowJoinTest extends TableTestBase {
expectConditionStr: String): Unit = {

val table = util.tableEnv.sqlQuery(sqlQuery)
val relNode = table.asInstanceOf[TableImpl].getRelNode
val relNode = TableTestUtil.toRelNode(table)
val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
val joinInfo = joinNode.analyzeCondition
val rexNode = joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package org.apache.flink.table.plan.util

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableConfig, TableImpl}
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.table.util.TableTestUtil

import org.apache.calcite.sql.SqlExplainLevel
import org.junit.Assert.assertEquals
import org.junit.Test
Expand All @@ -43,7 +45,7 @@ class FlinkRelOptUtilTest {
|SELECT * FROM t1 JOIN t2 ON t1.a = t2.a
""".stripMargin
val result = tableEnv.sqlQuery(sqlQuery)
val rel = result.asInstanceOf[TableImpl].getRelNode
val rel = TableTestUtil.toRelNode(result)

val expected1 =
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package org.apache.flink.table.plan.util
import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{DOUBLE_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableImpl}
import org.apache.flink.table.api.{TableConfig, TableEnvironment}
import org.apache.flink.table.runtime.utils.BatchTableEnvUtil
import org.apache.flink.table.runtime.utils.BatchTestBase.row
import org.apache.flink.table.util.TableTestUtil

import org.junit.Assert.assertEquals
import org.junit.{Before, Test}
Expand Down Expand Up @@ -58,7 +59,7 @@ class RelDigestUtilTest {
|INTERSECT
|(SELECT id AS random FROM MyTable ORDER BY rand() LIMIT 1)
""".stripMargin)
val rel = table.asInstanceOf[TableImpl].getRelNode
val rel = TableTestUtil.toRelNode(table)
val expected = readFromResource("testGetDigestWithDynamicFunction.out")
assertEquals(expected, RelDigestUtil.getDigest(rel))
}
Expand All @@ -75,7 +76,7 @@ class RelDigestUtilTest {
|INTERSECT
|(SELECT * FROM MyView)
""".stripMargin)
val rel = table.asInstanceOf[TableImpl].getRelNode.accept(new ExpandTableScanShuttle())
val rel = TableTestUtil.toRelNode(table).accept(new ExpandTableScanShuttle())
val expected = readFromResource("testGetDigestWithDynamicFunctionView.out")
assertEquals(expected, RelDigestUtil.getDigest(rel))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.flink.table.plan.stats.FlinkStatistic
import org.apache.flink.table.plan.util.FlinkRelOptUtil
import org.apache.flink.table.runtime.utils.BatchAbstractTestBase.DEFAULT_PARALLELISM
import org.apache.flink.table.types.logical.LogicalType
import org.apache.flink.table.util.{BaseRowTestUtil, DiffRepository}
import org.apache.flink.table.util.{BaseRowTestUtil, DiffRepository, TableTestUtil}
import org.apache.flink.types.Row

import org.apache.calcite.rel.RelNode
Expand Down Expand Up @@ -79,7 +79,7 @@ class BatchTestBase extends BatchAbstractTestBase {
* @return string presentation of of explaining
*/
def explainLogical(table: Table): String = {
val ast = table.asInstanceOf[TableImpl].getRelNode
val ast = TableTestUtil.toRelNode(table)
val logicalPlan = getPlan(ast)

s"== Abstract Syntax Tree ==" +
Expand Down Expand Up @@ -133,7 +133,7 @@ class BatchTestBase extends BatchAbstractTestBase {
def verifyPlan(sqlQuery: String): Unit = verifyPlan(parseQuery(sqlQuery))

def verifyPlan(table: Table): Unit = {
val relNode = table.asInstanceOf[TableImpl].getRelNode
val relNode = TableTestUtil.toRelNode(table)
val actual = SystemUtils.LINE_SEPARATOR + getPlan(relNode)
assertEqualsOrExpand("planAfter", actual.toString, expand = false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.flink.table.api.{BatchTableEnvironment, TableImpl}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.sinks.{CollectRowTableSink, CollectTableSink}
import org.apache.flink.table.types.TypeInfoLogicalTypeConverter
import org.apache.flink.table.util.TableTestUtil
import org.apache.flink.types.Row

import _root_.scala.collection.JavaConversions._
Expand Down Expand Up @@ -51,7 +52,7 @@ object TableUtil {
def collectSink[T](
table: TableImpl, sink: CollectTableSink[T], jobName: Option[String] = None): Seq[T] = {
// get schema information of table
val rowType = table.getRelNode.getRowType
val rowType = TableTestUtil.toRelNode(table).getRowType
val fieldNames = rowType.getFieldNames.asScala.toArray
val fieldTypes = rowType.getFieldList
.map(field => FlinkTypeFactory.toLogicalType(field.getType)).toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

package org.apache.flink.table.runtime.utils

import org.apache.flink.table.api.{Table, TableException, TableImpl}
import org.apache.flink.table.api.{Table, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.dataformat.GenericRow
import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.types.TypeInfoLogicalTypeConverter
import org.apache.flink.table.util.TableTestUtil
import org.apache.flink.types.Row
import org.apache.flink.util.StringUtils

Expand All @@ -37,7 +38,7 @@ import scala.collection.JavaConverters._
object TestSinkUtil {

def configureSink[T <: TableSink[_]](table: Table, sink: T): T = {
val rowType = table.asInstanceOf[TableImpl].getRelNode.getRowType
val rowType = TableTestUtil.toRelNode(table).getRowType
val fieldNames = rowType.getFieldNames.asScala.toArray
val fieldTypes = rowType.getFieldList.asScala
.map(field => FlinkTypeFactory.toLogicalType(field.getType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ abstract class TableTestBase {
def batchTestUtil(): BatchTableTestUtil = BatchTableTestUtil(this)

def verifyTableEquals(expected: Table, actual: Table): Unit = {
val expectedString = FlinkRelOptUtil.toString(expected.asInstanceOf[TableImpl].getRelNode)
val actualString = FlinkRelOptUtil.toString(actual.asInstanceOf[TableImpl].getRelNode)
val expectedString = FlinkRelOptUtil.toString(TableTestUtil.toRelNode(expected))
val actualString = FlinkRelOptUtil.toString(TableTestUtil.toRelNode(actual))
assertEquals(
"Logical plans do not match",
LogicalPlanFormatUtils.formatTempTableId(expectedString),
Expand Down Expand Up @@ -259,7 +259,7 @@ abstract class TableTestUtil(test: TableTestBase) {

def verifyPlanNotExpected(table: Table, notExpected: String*): Unit = {
require(notExpected.nonEmpty)
val relNode = table.asInstanceOf[TableImpl].getRelNode
val relNode = TableTestUtil.toRelNode(table)
val optimizedPlan = getOptimizedPlan(
Array(relNode),
explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
Expand Down Expand Up @@ -307,7 +307,7 @@ abstract class TableTestUtil(test: TableTestBase) {
withRowType: Boolean,
printPlanBefore: Boolean): Unit = {
val table = getTableEnv.sqlQuery(sql)
val relNode = table.asInstanceOf[TableImpl].getRelNode
val relNode = TableTestUtil.toRelNode(table)
val optimizedPlan = getOptimizedPlan(
Array(relNode),
explainLevel,
Expand Down Expand Up @@ -361,7 +361,7 @@ abstract class TableTestUtil(test: TableTestBase) {
withRetractTraits: Boolean,
printPlanBefore: Boolean,
printResource: Boolean = false): Unit = {
val relNode = table.asInstanceOf[TableImpl].getRelNode
val relNode = TableTestUtil.toRelNode(table)
val optimizedPlan = getOptimizedPlan(
Array(relNode),
explainLevel,
Expand Down Expand Up @@ -715,4 +715,11 @@ object TableTestUtil {
config
}

/**
* Converts operation tree in the given table to a RelNode tree.
*/
def toRelNode(table: Table): RelNode = {
table.asInstanceOf[TableImpl].tableEnv
.getRelBuilder.queryOperation(table.getQueryOperation).build()
}
}

0 comments on commit adb7aab

Please sign in to comment.