Skip to content

Commit

Permalink
[FLINK-4825] [table] Implement a RexExecutor that uses Flink's code g…
Browse files Browse the repository at this point in the history
…eneration.

This closes apache#2884
This closes apache#2874 (closing PR with Public API breaking changes)
  • Loading branch information
twalthr authored and fhueske committed Nov 29, 2016
1 parent 910f733 commit db441de
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ object FlinkRelBuilder {

// create context instances with Flink type factory
val planner = new VolcanoPlanner(Contexts.empty())
planner.setExecutor(config.getExecutor)
planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))
val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.calcite.config.Lex
import org.apache.calcite.plan.RelOptPlanner
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.RexExecutorImpl
import org.apache.calcite.schema.{Schemas, SchemaPlus}
import org.apache.calcite.schema.{SchemaPlus, Schemas}
import org.apache.calcite.schema.impl.AbstractTable
import org.apache.calcite.sql.SqlOperatorTable
import org.apache.calcite.sql.parser.SqlParser
Expand All @@ -38,6 +38,7 @@ import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
import org.apache.flink.api.table.codegen.ExpressionReducer
import org.apache.flink.api.table.expressions.{Alias, Expression, UnresolvedFieldReference}
import org.apache.flink.api.table.functions.{ScalarFunction, UserDefinedFunction}
import org.apache.flink.api.table.plan.cost.DataSetCostFactory
Expand Down Expand Up @@ -71,7 +72,7 @@ abstract class TableEnvironment(val config: TableConfig) {
.typeSystem(new FlinkTypeSystem)
.operatorTable(getSqlOperatorTable)
// set the executor to evaluate constant expressions
.executor(new RexExecutorImpl(Schemas.createDataContext(null)))
.executor(new ExpressionReducer(config))
.build

// the builder for Calcite RelNodes, Calcite's representation of a relational expression tree.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
* limitations under the License.
*/

package org.apache.flink.api.table.runtime
package org.apache.flink.api.table.codegen

import org.apache.flink.api.common.InvalidProgramException
import org.apache.flink.api.common.functions.Function
import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.SimpleCompiler

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.table.codegen

import java.util

import org.apache.calcite.plan.RelOptPlanner
import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableConfig}

import scala.collection.JavaConverters._

/**
* Evaluates constant expressions using Flink's [[CodeGenerator]].
*/
class ExpressionReducer(config: TableConfig)
extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] {

private val EMPTY_ROW_INFO = TypeConverter.DEFAULT_ROW_TYPE
private val EMPTY_ROW = new Row(0)

override def reduce(
rexBuilder: RexBuilder,
constExprs: util.List[RexNode],
reducedValues: util.List[RexNode]): Unit = {

val typeFactory = rexBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]

val literals = constExprs.asScala.map(e => (e.getType.getSqlTypeName, e)).flatMap {

// we need to cast here for RexBuilder.makeLiteral
case (SqlTypeName.DATE, e) =>
Some(
rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
)
case (SqlTypeName.TIME, e) =>
Some(
rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
)
case (SqlTypeName.TIMESTAMP, e) =>
Some(
rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO), e)
)

// we don't support object literals yet, we skip those constant expressions
case (SqlTypeName.ANY, _) | (SqlTypeName.ROW, _) => None

case (_, e) => Some(e)
}

val literalTypes = literals.map(e => FlinkTypeFactory.toTypeInfo(e.getType))
val resultType = new RowTypeInfo(literalTypes)

// generate MapFunction
val generator = new CodeGenerator(config, false, EMPTY_ROW_INFO)

val result = generator.generateResultExpression(
resultType,
resultType.getFieldNames,
literals)

val generatedFunction = generator.generateFunction[MapFunction[Row, Row]](
"ExpressionReducer",
classOf[MapFunction[Row, Row]],
s"""
|${result.code}
|return ${result.resultTerm};
|""".stripMargin,
resultType.asInstanceOf[TypeInformation[Any]])

val clazz = compile(getClass.getClassLoader, generatedFunction.name, generatedFunction.code)
val function = clazz.newInstance()

// execute
val reduced = function.map(EMPTY_ROW)

// add the reduced results or keep them unreduced
var i = 0
var reducedIdx = 0
while (i < constExprs.size()) {
val unreduced = constExprs.get(i)
unreduced.getType.getSqlTypeName match {
// we insert the original expression for object literals
case SqlTypeName.ANY | SqlTypeName.ROW =>
reducedValues.add(unreduced)
case _ =>
val literal = rexBuilder.makeLiteral(
reduced.productElement(reducedIdx),
unreduced.getType,
true)
reducedValues.add(literal)
reducedIdx += 1
}
i += 1
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,10 @@ object FlinkRuleSets {
SortRemoveRule.INSTANCE,

// simplify expressions rules
// TODO uncomment if FLINK-4825 is solved
// ReduceExpressionsRule.FILTER_INSTANCE,
// ReduceExpressionsRule.PROJECT_INSTANCE,
// ReduceExpressionsRule.CALC_INSTANCE,
// ReduceExpressionsRule.JOIN_INSTANCE,
ReduceExpressionsRule.FILTER_INSTANCE,
ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.CALC_INSTANCE,
ReduceExpressionsRule.JOIN_INSTANCE,

// prune empty results rules
PruneEmptyRules.AGGREGATE_INSTANCE,
Expand Down Expand Up @@ -137,10 +136,9 @@ object FlinkRuleSets {
ProjectRemoveRule.INSTANCE,

// simplify expressions rules
// TODO uncomment if FLINK-4825 is solved
// ReduceExpressionsRule.FILTER_INSTANCE,
// ReduceExpressionsRule.PROJECT_INSTANCE,
// ReduceExpressionsRule.CALC_INSTANCE,
ReduceExpressionsRule.FILTER_INSTANCE,
ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.CALC_INSTANCE,

// merge and push unions rules
UnionEliminatorRule.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime
import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.table.codegen.Compiler
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import org.slf4j.LoggerFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime
import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.table.codegen.Compiler
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import org.slf4j.LoggerFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.table.codegen.Compiler
import org.apache.flink.configuration.Configuration
import org.slf4j.LoggerFactory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.io
import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.table.runtime.Compiler
import org.apache.flink.api.table.codegen.Compiler
import org.apache.flink.core.io.GenericInputSplit
import org.slf4j.LoggerFactory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class SetOperatorsTest extends TableTestBase {
term("join", "b_long", "b_int", "b_string", "a_long"),
term("joinType", "InnerJoin")
),
term("select", "a_long", "true AS $f0")
term("select", "true AS $f0", "a_long")
),
term("groupBy", "a_long"),
term("select", "a_long", "MIN($f0) AS $f1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.utils.TableTestBase
import org.apache.flink.api.table.utils.TableTestUtil._
import org.junit.{Ignore, Test}
import org.junit.Test

// TODO enable if FLINK-4825 is solved
@Ignore
class ExpressionReductionTest extends TableTestBase {

@Test
Expand Down Expand Up @@ -64,7 +62,7 @@ class ExpressionReductionTest extends TableTestBase {
"true AS EXPR$9",
"2 AS EXPR$10",
"true AS EXPR$11",
"'TRUEX' AS EXPR$12"
"'trueX' AS EXPR$12"
),
term("where", ">(a, 8)")
)
Expand Down Expand Up @@ -109,7 +107,7 @@ class ExpressionReductionTest extends TableTestBase {
"true AS EXPR$9",
"2 AS EXPR$10",
"true AS EXPR$11",
"'TRUEX' AS EXPR$12"
"'trueX' AS EXPR$12"
)
)

Expand Down Expand Up @@ -164,7 +162,7 @@ class ExpressionReductionTest extends TableTestBase {
"false AS _c5",
"true AS _c6",
"2E0 AS _c7",
"'TRUEX' AS _c8"
"'trueX' AS _c8"
),
term("where", ">(a, 8)")
)
Expand Down Expand Up @@ -200,7 +198,7 @@ class ExpressionReductionTest extends TableTestBase {
"false AS _c5",
"true AS _c6",
"2E0 AS _c7",
"'TRUEX' AS _c8"
"'trueX' AS _c8"
)
)

Expand Down Expand Up @@ -262,7 +260,7 @@ class ExpressionReductionTest extends TableTestBase {
"true AS EXPR$9",
"2 AS EXPR$10",
"true AS EXPR$11",
"'TRUEX' AS EXPR$12"
"'trueX' AS EXPR$12"
),
term("where", ">(a, 8)")
)
Expand Down Expand Up @@ -307,7 +305,7 @@ class ExpressionReductionTest extends TableTestBase {
"true AS EXPR$9",
"2 AS EXPR$10",
"true AS EXPR$11",
"'TRUEX' AS EXPR$12"
"'trueX' AS EXPR$12"
)
)

Expand Down Expand Up @@ -362,7 +360,7 @@ class ExpressionReductionTest extends TableTestBase {
"false AS _c5",
"true AS _c6",
"2E0 AS _c7",
"'TRUEX' AS _c8"
"'trueX' AS _c8"
),
term("where", ">(a, 8)")
)
Expand Down Expand Up @@ -398,7 +396,7 @@ class ExpressionReductionTest extends TableTestBase {
"false AS _c5",
"true AS _c6",
"2E0 AS _c7",
"'TRUEX' AS _c8"
"'trueX' AS _c8"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class ScalarOperatorsTest extends ExpressionTestBase {
testSqlApi(
"CASE 1 WHEN 1, 2 THEN '1 or 2' WHEN 2 THEN 'not possible' WHEN 3, 2 THEN '3' " +
"ELSE 'none of the above' END",
"1 or 2")
"1 or 2 ")
testSqlApi("CASE WHEN 'a'='a' THEN 1 END", "1")
testSqlApi("CASE 2 WHEN 1 THEN 'a' WHEN 2 THEN 'bcd' END", "bcd")
testSqlApi("CASE f2 WHEN 1 THEN 11 WHEN 2 THEN 4 ELSE NULL END", "11")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{DataSet => JDataSet}
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.table._
import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction}
import org.apache.flink.api.table.codegen.{CodeGenerator, Compiler, GeneratedFunction}
import org.apache.flink.api.table.expressions.{Expression, ExpressionParser}
import org.apache.flink.api.table.functions.UserDefinedFunction
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
import org.apache.flink.api.table.runtime.Compiler
import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.junit.Assert._
import org.junit.{After, Before}
Expand Down

0 comments on commit db441de

Please sign in to comment.