Skip to content

Commit

Permalink
[FLINK-4068] [table] Reduce expression also for filter/project
Browse files Browse the repository at this point in the history
  • Loading branch information
twalthr committed Oct 4, 2016
1 parent a711339 commit f00e1e7
Show file tree
Hide file tree
Showing 6 changed files with 427 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.jdbc.JavaTypeFactoryImpl
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
import org.apache.calcite.sql.SqlIntervalQualifier
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName, SqlTypeUtil}
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
Expand Down Expand Up @@ -68,6 +68,16 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
}
}

override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
// it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
// always set those to default value
if (typeName == VARCHAR && precision < 0) {
createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName))
} else {
super.createSqlType(typeName, precision)
}
}

private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
// TODO add specific RelDataTypes
// for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.table

import org.apache.calcite.rel.`type`.RelDataTypeSystemImpl
import org.apache.calcite.sql.`type`.SqlTypeName

/**
* Custom type system for Flink.
Expand All @@ -33,4 +34,12 @@ class FlinkTypeSystem extends RelDataTypeSystemImpl {
// half should be enough for all use cases
override def getMaxNumericPrecision: Int = Int.MaxValue / 2

override def getDefaultPrecision(typeName: SqlTypeName): Int = typeName match {
// by default all VARCHARs can have the Java default length
case SqlTypeName.VARCHAR =>
Int.MaxValue
case _ =>
super.getDefaultPrecision(typeName)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ object FlinkRuleSets {
SortRemoveRule.INSTANCE,

// simplify expressions rules
ReduceExpressionsRule.FILTER_INSTANCE,
ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.CALC_INSTANCE,
ReduceExpressionsRule.JOIN_INSTANCE,

Expand Down Expand Up @@ -113,6 +115,9 @@ object FlinkRuleSets {
val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(

RemoveDeltaRule.INSTANCE,

// convert a logical table scan to a relational expression
TableScanRule.INSTANCE,
EnumerableToLogicalTableScan.INSTANCE,

// calc rules
Expand All @@ -133,6 +138,8 @@ object FlinkRuleSets {
ProjectRemoveRule.INSTANCE,

// simplify expressions rules
ReduceExpressionsRule.FILTER_INSTANCE,
ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.CALC_INSTANCE,

// merge and push unions rules
Expand Down

This file was deleted.

Loading

0 comments on commit f00e1e7

Please sign in to comment.