Skip to content

Commit

Permalink
[FLINK-4068] [table] Move constant computations out of code-generated
Browse files Browse the repository at this point in the history
This closes apache#2560.
  • Loading branch information
wuchong authored and twalthr committed Oct 4, 2016
1 parent 171d109 commit a711339
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger

import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.Programs

Expand Down Expand Up @@ -228,21 +229,12 @@ abstract class BatchTableEnvironment(
}

/**
* Translates a [[Table]] into a [[DataSet]].
* Generates the optimized [[RelNode]] tree from the original relational node tree.
*
* The transformation involves optimizing the relational expression tree as defined by
* Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
*
* @param table The root node of the relational expression tree.
* @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
* @tparam A The type of the resulting [[DataSet]].
* @return The [[DataSet]] that corresponds to the translated [[Table]].
* @param relNode The original [[RelNode]] tree
* @return The optimized [[RelNode]] tree
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {

validateType(tpe)

val relNode = table.getRelNode
private[flink] def optimize(relNode: RelNode): RelNode = {

// decorrelate
val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
Expand All @@ -253,8 +245,7 @@ abstract class BatchTableEnvironment(

val dataSetPlan = try {
optProgram.run(getPlanner, decorPlan, flinkOutputProps)
}
catch {
} catch {
case e: CannotPlanException =>
throw new TableException(
s"Cannot generate a valid execution plan for the given query: \n\n" +
Expand All @@ -263,13 +254,32 @@ abstract class BatchTableEnvironment(
s"Please check the documentation for the set of currently supported SQL features.")
case t: TableException =>
throw new TableException(
s"Cannot generate a valid execution plan for the given query: \n\n" +
s"${RelOptUtil.toString(relNode)}\n" +
s"${t.msg}\n" +
s"Please check the documentation for the set of currently supported SQL features.")
s"Cannot generate a valid execution plan for the given query: \n\n" +
s"${RelOptUtil.toString(relNode)}\n" +
s"${t.msg}\n" +
s"Please check the documentation for the set of currently supported SQL features.")
case a: AssertionError =>
throw a.getCause
}
dataSetPlan
}

/**
* Translates a [[Table]] into a [[DataSet]].
*
* The transformation involves optimizing the relational expression tree as defined by
* Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
*
* @param table The root node of the relational expression tree.
* @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
* @tparam A The type of the resulting [[DataSet]].
* @return The [[DataSet]] that corresponds to the translated [[Table]].
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {

validateType(tpe)

val dataSetPlan = optimize(table.getRelNode)

dataSetPlan match {
case node: DataSetRel =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ package org.apache.flink.api.table
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan.{Context, RelOptCluster, RelOptPlanner, RelOptSchema}
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rex.RexBuilder
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.rex.{RexExecutorImpl, RexBuilder}
import org.apache.calcite.schema.{Schemas, SchemaPlus}
import org.apache.calcite.tools.Frameworks.PlannerAction
import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}

Expand Down Expand Up @@ -66,6 +66,7 @@ object FlinkRelBuilder {
}
})
val planner = clusters(0).getPlanner
planner.setExecutor(config.getExecutor)
val defaultRelOptSchema = relOptSchemas(0).asInstanceOf[CalciteCatalogReader]

// create Flink type factory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import java.util.concurrent.atomic.AtomicInteger

import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.Programs

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
Expand Down Expand Up @@ -228,22 +230,12 @@ abstract class StreamTableEnvironment(
}

/**
* Translates a [[Table]] into a [[DataStream]].
*
* The transformation involves optimizing the relational expression tree as defined by
* Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
* Generates the optimized [[RelNode]] tree from the original relational node tree.
*
* @param table The root node of the relational expression tree.
* @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
* @tparam A The type of the resulting [[DataStream]].
* @return The [[DataStream]] that corresponds to the translated [[Table]].
* @param relNode The root node of the relational expression tree.
* @return The optimized [[RelNode]] tree
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {

validateType(tpe)

val relNode = table.getRelNode

private[flink] def optimize(relNode: RelNode): RelNode = {
// decorrelate
val decorPlan = RelDecorrelator.decorrelateQuery(relNode)

Expand All @@ -262,6 +254,26 @@ abstract class StreamTableEnvironment(
s"This exception indicates that the query uses an unsupported SQL feature.\n" +
s"Please check the documentation for the set of currently supported SQL features.")
}
dataStreamPlan
}


/**
* Translates a [[Table]] into a [[DataStream]].
*
* The transformation involves optimizing the relational expression tree as defined by
* Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
*
* @param table The root node of the relational expression tree.
* @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
* @tparam A The type of the resulting [[DataStream]].
* @return The [[DataStream]] that corresponds to the translated [[Table]].
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {

validateType(tpe)

val dataStreamPlan = optimize(table.getRelNode)

dataStreamPlan match {
case node: DataStreamRel =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.calcite.config.Lex
import org.apache.calcite.plan.RelOptPlanner
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.rex.RexExecutorImpl
import org.apache.calcite.schema.{Schemas, SchemaPlus}
import org.apache.calcite.schema.impl.AbstractTable
import org.apache.calcite.sql.SqlOperatorTable
import org.apache.calcite.sql.parser.SqlParser
Expand Down Expand Up @@ -77,6 +78,8 @@ abstract class TableEnvironment(val config: TableConfig) {
.costFactory(new DataSetCostFactory)
.typeSystem(new FlinkTypeSystem)
.operatorTable(sqlOperatorTable)
// set the executor to evaluate constant expressions
.executor(new RexExecutorImpl(Schemas.createDataContext(null)))
.build

// the builder for Calcite RelNodes, Calcite's representation of a relational expression tree.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
// Calcite's RelBuilder does not translate identity projects even if they rename fields.
// Add a projection ourselves (will be automatically removed by translation rules).
val project = LogicalProject.create(relBuilder.peek(),
projectList.map(_.toRexNode(relBuilder)).asJava,
// avoid AS call
projectList.map(_.asInstanceOf[Alias].child.toRexNode(relBuilder)).asJava,
projectList.map(_.name).asJava)
relBuilder.build() // pop previous relNode
relBuilder.push(project)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ trait FlinkCalc {
val proj = calcProgram.getProjectList.asScala.toList
val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
val localExprs = calcProgram.getExprList.asScala.toList
val outFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList

proj
.map(expression(_, inFields, Some(localExprs)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class ExplainTest
val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
val source = scala.io.Source.fromFile(testFilePath +
"../../src/test/scala/resources/testJoin0.out").mkString.replaceAll("\\r\\n", "\n")
assertEquals(result, source)
assertEquals(source, result)
}

@Test
Expand All @@ -87,7 +87,7 @@ class ExplainTest
val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
val source = scala.io.Source.fromFile(testFilePath +
"../../src/test/scala/resources/testJoin1.out").mkString.replaceAll("\\r\\n", "\n")
assertEquals(result, source)
assertEquals(source, result)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.table

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.junit.Assert._
import org.junit.Test


class BatchTableEnvironmentTest {

@Test
def testReduceExpressionForSQL(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val sqlQuery = "SELECT " +
"(3+4)+a, " +
"b+(1+2), " +
"CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
"TRIM(BOTH ' STRING '), " +
"'test' || 'string', " +
"NULLIF(1, 1), " +
"TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
"EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
"1 IS NULL, " +
"'TEST' LIKE '%EST', " +
"FLOOR(2.5), " +
"'TEST' IN ('west', 'TEST', 'rest') " +
"FROM MyTable WHERE a>(1+7)"
val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", ds)

val table = tEnv.sql(sqlQuery)

val optimized = tEnv.optimize(table.getRelNode)
val optimizedString = optimized.toString
assertTrue(optimizedString.contains(">(_1, 8)"))
assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
assertTrue(optimizedString.contains("'b' AS EXPR$2"))
assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
assertTrue(optimizedString.contains("null AS EXPR$5"))
assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
assertTrue(optimizedString.contains("19 AS EXPR$7"))
assertTrue(optimizedString.contains("false AS EXPR$8"))
assertTrue(optimizedString.contains("true AS EXPR$9"))
assertTrue(optimizedString.contains("2 AS EXPR$10"))
assertTrue(optimizedString.contains("true AS EXPR$11"))
}

@Test
def testReduceExpressionForTableAPI(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)

val table = ds
.where('a > (1+7))
.select((3+4).toExpr + 6,
(11 === 1) ? ("a", "b"),
" STRING ".trim,
"test" + "string",
"1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
1.isNull,
"TEST".like("%EST"),
2.5.toExpr.floor())


val optimized = tEnv.optimize(table.getRelNode)
val optimizedString = optimized.toString
assertTrue(optimizedString.contains(">(_1, 8)"))
assertTrue(optimizedString.contains("13 AS _c0"))
assertTrue(optimizedString.contains("'b' AS _c1"))
assertTrue(optimizedString.contains("'STRING' AS _c2"))
assertTrue(optimizedString.contains("'teststring' AS _c3"))
assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
assertTrue(optimizedString.contains("false AS _c5"))
assertTrue(optimizedString.contains("true AS _c6"))
assertTrue(optimizedString.contains("2E0 AS _c7"))
}

}
Loading

0 comments on commit a711339

Please sign in to comment.