Skip to content

Commit

Permalink
[FLINK-3587] Bump Calcite version to 1.7.0
Browse files Browse the repository at this point in the history
- Add DataSetValues and DataStreamValues due to changed Calcite RelNode generation.
- Pass TableEnvironment instead of TableConfig for DataSet and DataStream translation.
- Add methods to create new DataSources to BatchTableEnvironment and StreamTableEnvironment.
- Remove copied Calcite rule that got fixed.

This closes apache#1897
  • Loading branch information
fhueske authored and zentol committed Apr 18, 2016
1 parent 7eb5877 commit 367687d
Show file tree
Hide file tree
Showing 29 changed files with 476 additions and 199 deletions.
2 changes: 1 addition & 1 deletion flink-libraries/flink-table/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ under the License.
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.5.0</version>
<version>1.7.0</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
*/
package org.apache.flink.api.java.table

import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.table.expressions.ExpressionParser
import org.apache.flink.api.table.{TableConfig, Table}
import org.apache.flink.api.table.{Row, TableConfig, Table}

/**
* The [[org.apache.flink.api.table.TableEnvironment]] for a Java batch [[DataSet]]
Expand Down Expand Up @@ -162,4 +163,18 @@ class BatchTableEnvironment(
translate[T](table)(typeInfo)
}

/**
* Creates a [[Row]] [[DataSet]] from an [[InputFormat]].
*
* @param inputFormat [[InputFormat]] from which the [[DataSet]] is created.
* @param typeInfo [[TypeInformation]] of the type of the [[DataSet]].
* @return A [[Row]] [[DataSet]] created from the [[InputFormat]].
*/
override private[flink] def createDataSetSource(
inputFormat: InputFormat[Row, _],
typeInfo: TypeInformation[Row]): DataSet[Row] = {

execEnv.createInput(inputFormat, typeInfo)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
*/
package org.apache.flink.api.java.table

import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.table.{TableConfig, Table}
import org.apache.flink.api.table.{Row, TableConfig, Table}
import org.apache.flink.api.table.expressions.ExpressionParser
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
Expand Down Expand Up @@ -120,4 +121,18 @@ class StreamTableEnvironment(
translate[T](table)(typeInfo)
}

/**
* Creates a [[Row]] [[DataStream]] from an [[InputFormat]].
*
* @param inputFormat [[InputFormat]] from which the [[DataStream]] is created.
* @param typeInfo [[TypeInformation]] of the type of the [[DataStream]].
* @return A [[Row]] [[DataStream]] created from the [[InputFormat]].
*/
override private[flink] def createDataStreamSource(
inputFormat: InputFormat[Row, _],
typeInfo: TypeInformation[Row]): DataStream[Row] = {

execEnv.createInput(inputFormat, typeInfo)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.apache.flink.api.scala.table

import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.api.java.{DataSet => JavaSet}
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.{TableConfig, Table}
import org.apache.flink.api.table.{Row, TableConfig, Table}

import scala.reflect.ClassTag

Expand Down Expand Up @@ -139,4 +141,18 @@ class BatchTableEnvironment(
wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
}

/**
* Creates a [[Row]] [[JavaSet]] from an [[InputFormat]].
*
* @param inputFormat [[InputFormat]] from which the [[JavaSet]] is created.
* @param typeInfo [[TypeInformation]] of the type of the [[JavaSet]].
* @return A [[Row]] [[JavaSet]] created from the [[InputFormat]].
*/
override private[flink] def createDataSetSource(
inputFormat: InputFormat[Row, _],
typeInfo: TypeInformation[Row]): JavaSet[Row] = {

execEnv.createInput(inputFormat).javaSet
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
*/
package org.apache.flink.api.scala.table

import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.table.{TableConfig, Table}
import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.apache.flink.api.table.{Row, TableConfig, Table}
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream}

import org.apache.flink.streaming.api.scala.asScalaStream
Expand Down Expand Up @@ -99,4 +102,18 @@ class StreamTableEnvironment(
asScalaStream(translate(table))
}

/**
* Creates a [[Row]] [[JavaStream]] from an [[InputFormat]].
*
* @param inputFormat [[InputFormat]] from which the [[JavaStream]] is created.
* @param typeInfo [[TypeInformation]] of the type of the [[JavaStream]].
* @return A [[Row]] [[JavaStream]] created from the [[InputFormat]].
*/
override private[flink] def createDataStreamSource(
inputFormat: InputFormat[Row, _],
typeInfo: TypeInformation[Row]): JavaStream[Row] = {

execEnv.createInput(inputFormat)(typeInfo).javaStream
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.Programs
import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.io.DiscardingOutputFormat
Expand All @@ -34,6 +35,7 @@ import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetRel, DataSetConvention}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
import org.apache.flink.api.table.plan.schema.DataSetTable
import org.apache.flink.streaming.api.datastream.DataStream

/**
* The abstract base class for batch TableEnvironments.
Expand Down Expand Up @@ -226,11 +228,22 @@ abstract class BatchTableEnvironment(config: TableConfig) extends TableEnvironme
dataSetPlan match {
case node: DataSetRel =>
node.translateToPlan(
config,
this,
Some(tpe.asInstanceOf[TypeInformation[Any]])
).asInstanceOf[DataSet[A]]
case _ => ???
}
}

/**
* Creates a [[Row]] [[DataSet]] from an [[InputFormat]].
*
* @param inputFormat [[InputFormat]] from which the [[DataSet]] is created.
* @param typeInfo [[TypeInformation]] of the type of the [[DataSet]].
* @return A [[Row]] [[DataSet]] created from the [[InputFormat]].
*/
private[flink] def createDataSetSource(
inputFormat: InputFormat[Row, _],
typeInfo: TypeInformation[Row]): DataSet[Row]

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.Programs
import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.PlanGenException
Expand Down Expand Up @@ -185,12 +186,23 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
dataStreamPlan match {
case node: DataStreamRel =>
node.translateToPlan(
config,
this,
Some(tpe.asInstanceOf[TypeInformation[Any]])
).asInstanceOf[DataStream[A]]
case _ => ???
}

}

/**
* Creates a [[Row]] [[DataStream]] from an [[InputFormat]].
*
* @param inputFormat [[InputFormat]] from which the [[DataStream]] is created.
* @param typeInfo [[TypeInformation]] of the type of the [[DataStream]].
* @return A [[Row]] [[DataStream]] created from the [[InputFormat]].
*/
private[flink] def createDataStreamSource(
inputFormat: InputFormat[Row, _],
typeInfo: TypeInformation[Row]): DataStream[Row]

}
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,14 @@ class CodeGenerator(
case VARCHAR | CHAR =>
generateNonNullLiteral(resultType, "\"" + value.toString + "\"")
case SYMBOL =>
val symbolOrdinal = value.asInstanceOf[SqlLiteral.SqlSymbol].ordinal()

val symbolOrdinal =
if (classOf[Enum[_]].isAssignableFrom(value.getClass) ) {
value.asInstanceOf[Enum[_]].ordinal()
} else {
value.asInstanceOf[SqlLiteral.SqlSymbol].ordinal()
}

generateNonNullLiteral(resultType, symbolOrdinal.toString)
case _ => ??? // TODO more types
}
Expand Down Expand Up @@ -747,6 +754,8 @@ class CodeGenerator(

override def visitOver(over: RexOver): GeneratedExpression = ???

override def visitSubQuery(subQuery: RexSubQuery): GeneratedExpression = ???

// ----------------------------------------------------------------------------------------------
// generator helping methods
// ----------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.flink.api.table.runtime.MapRunner
import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
import org.apache.flink.api.table.typeutils.{TypeConverter, RowTypeInfo}
import org.apache.flink.api.table.{Row, TableConfig}
import org.apache.flink.api.table.{BatchTableEnvironment, Row, TableConfig}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -76,26 +76,28 @@ class DataSetAggregate(
.item("select", aggregationToString)
}

override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {

val child = this.getInput
val rowCnt = RelMetadataQuery.getRowCount(child)
val rowCnt = metadata.getRowCount(child)
val rowSize = this.estimateRowSize(child.getRowType)
val aggCnt = this.namedAggregates.size
planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
}

override def translateToPlan(
config: TableConfig,
tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {

val config = tableEnv.getConfig

val groupingKeys = grouping.indices.toArray
// add grouping fields, position keys in the input, and input type
val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
inputType, rowType, grouping, config)

val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
config,
tableEnv,
// tell the input operator that this operator currently only supports Rows as input
Some(TypeConverter.DEFAULT_ROW_TYPE))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.flink.api.table.codegen.CodeGenerator
import org.apache.flink.api.table.plan.nodes.FlinkCalc
import org.apache.flink.api.table.typeutils.TypeConverter
import TypeConverter._
import org.apache.flink.api.table.TableConfig
import org.apache.flink.api.table.BatchTableEnvironment
import org.apache.calcite.rex._

/**
Expand Down Expand Up @@ -69,17 +69,17 @@ class DataSetCalc(
calcProgram.getCondition != null)
}

override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {

val child = this.getInput
val rowCnt = RelMetadataQuery.getRowCount(child)
val rowCnt = metadata.getRowCount(child)
val exprCnt = calcProgram.getExprCount
planner.getCostFactory.makeCost(rowCnt, rowCnt * exprCnt, 0)
}

override def getRows: Double = {
override def estimateRowCount(metadata: RelMetadataQuery): Double = {
val child = this.getInput
val rowCnt = RelMetadataQuery.getRowCount(child)
val rowCnt = metadata.getRowCount(child)

if (calcProgram.getCondition != null) {
// we reduce the result card to push filters down
Expand All @@ -89,10 +89,13 @@ class DataSetCalc(
}
}

override def translateToPlan(config: TableConfig,
override def translateToPlan(
tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {

val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config)
val config = tableEnv.getConfig

val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(tableEnv)

val returnType = determineReturnType(
getRowType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.apache.flink.api.table.plan.nodes.dataset
import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.JoinInfo
import org.apache.calcite.rel.logical.LogicalJoin
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
Expand All @@ -31,10 +30,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.operators.join.JoinType
import org.apache.flink.api.table.codegen.CodeGenerator
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.runtime.FlatJoinRunner
import org.apache.flink.api.table.typeutils.TypeConverter
import org.apache.flink.api.table.{TableException, TableConfig}
import org.apache.flink.api.table.{BatchTableEnvironment, TableException}
import org.apache.flink.api.common.functions.FlatJoinFunction
import TypeConverter.determineReturnType
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -92,7 +90,7 @@ class DataSetJoin(
.item("join", joinSelectionToString)
}

override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {

if (!translatable) {
// join cannot be translated. Make huge costs
Expand All @@ -101,7 +99,7 @@ class DataSetJoin(
// join can be translated. Compute cost estimate
val children = this.getInputs
children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
val rowCnt = RelMetadataQuery.getRowCount(child)
val rowCnt = metadata.getRowCount(child)
val rowSize = this.estimateRowSize(child.getRowType)
cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
}
Expand All @@ -110,9 +108,11 @@ class DataSetJoin(
}

override def translateToPlan(
config: TableConfig,
tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {

val config = tableEnv.getConfig

val returnType = determineReturnType(
getRowType,
expectedType,
Expand Down Expand Up @@ -156,8 +156,8 @@ class DataSetJoin(
})
}

val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(config)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(config)
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)

val generator = new CodeGenerator(config, leftDataSet.getType, Some(rightDataSet.getType))
val conversion = generator.generateConverterResultExpression(
Expand Down
Loading

0 comments on commit 367687d

Please sign in to comment.