Skip to content

Commit

Permalink
[FLINK-5884] [table] Integrate time indicators for Table API & SQL
Browse files Browse the repository at this point in the history
This closes apache#3808.
  • Loading branch information
twalthr authored and fhueske committed May 5, 2017
1 parent 28ab737 commit 495f104
Show file tree
Hide file tree
Showing 82 changed files with 2,368 additions and 1,921 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,30 @@ abstract class BatchTableEnvironment(
protected def registerDataSetInternal[T](
name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {

val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields)
val (fieldNames, fieldIndexes) = getFieldInfo[T](
dataSet.getType,
fields,
ignoreTimeAttributes = true)

// validate and extract time attributes
val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields)

// don't allow proctime on batch
proctime match {
case Some(_) =>
throw new ValidationException(
"A proctime attribute is not allowed in a batch environment. " +
"Working with processing-time on batch would lead to non-deterministic results.")
case _ => // ok
}
// rowtime must not extend the schema of a batch table
rowtime match {
case Some((idx, _)) if idx >= dataSet.getType.getArity =>
throw new ValidationException(
"A rowtime attribute must be defined on an existing field in a batch environment.")
case _ => // ok
}

val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)
registerTableInternal(name, dataSetTable)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.calcite.RelTimeIndicatorConverter
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.plan.nodes.FlinkConventions
Expand Down Expand Up @@ -86,47 +87,6 @@ abstract class StreamTableEnvironment(
/** Returns a unique table name according to the internal naming pattern. */
protected def createUniqueTableName(): String = "_DataStreamTable_" + nameCntr.getAndIncrement()

/**
* Returns field names and field positions for a given [[TypeInformation]].
*
* Field names are automatically extracted for
* [[org.apache.flink.api.common.typeutils.CompositeType]].
* The method fails if inputType is not a
* [[org.apache.flink.api.common.typeutils.CompositeType]].
*
* @param inputType The TypeInformation extract the field names and positions from.
* @tparam A The type of the TypeInformation.
* @return A tuple of two arrays holding the field names and corresponding field positions.
*/
override protected[flink] def getFieldInfo[A](inputType: TypeInformation[A])
: (Array[String], Array[Int]) = {
val fieldInfo = super.getFieldInfo(inputType)
if (fieldInfo._1.contains("rowtime")) {
throw new TableException("'rowtime' ia a reserved field name in stream environment.")
}
fieldInfo
}

/**
* Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
* [[Expression]].
*
* @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
* @param exprs The expressions that define the field names.
* @tparam A The type of the TypeInformation.
* @return A tuple of two arrays holding the field names and corresponding field positions.
*/
override protected[flink] def getFieldInfo[A](
inputType: TypeInformation[A],
exprs: Array[Expression])
: (Array[String], Array[Int]) = {
val fieldInfo = super.getFieldInfo(inputType, exprs)
if (fieldInfo._1.contains("rowtime")) {
throw new TableException("'rowtime' is a reserved field name in stream environment.")
}
fieldInfo
}

/**
* Registers an external [[StreamTableSource]] in this [[TableEnvironment]]'s catalog.
* Registered tables can be referenced in SQL queries.
Expand All @@ -145,6 +105,7 @@ abstract class StreamTableEnvironment(
"StreamTableEnvironment")
}
}

/**
* Writes a [[Table]] to a [[TableSink]].
*
Expand Down Expand Up @@ -185,7 +146,9 @@ abstract class StreamTableEnvironment(
val dataStreamTable = new DataStreamTable[T](
dataStream,
fieldIndexes,
fieldNames
fieldNames,
None,
None
)
registerTableInternal(name, dataStreamTable)
}
Expand All @@ -200,15 +163,26 @@ abstract class StreamTableEnvironment(
* @tparam T The type of the [[DataStream]].
*/
protected def registerDataStreamInternal[T](
name: String,
dataStream: DataStream[T],
fields: Array[Expression]): Unit = {
name: String,
dataStream: DataStream[T],
fields: Array[Expression])
: Unit = {

val (fieldNames, fieldIndexes) = getFieldInfo[T](
dataStream.getType,
fields,
ignoreTimeAttributes = false)

// validate and extract time attributes
val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields)


val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields)
val dataStreamTable = new DataStreamTable[T](
dataStream,
fieldIndexes,
fieldNames
fieldNames,
rowtime,
proctime
)
registerTableInternal(name, dataStreamTable)
}
Expand Down Expand Up @@ -259,15 +233,18 @@ abstract class StreamTableEnvironment(
// 1. decorrelate
val decorPlan = RelDecorrelator.decorrelateQuery(relNode)

// 2. normalize the logical plan
// 2. convert time indicators
val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)

// 3. normalize the logical plan
val normRuleSet = getNormRuleSet
val normalizedPlan = if (normRuleSet.iterator().hasNext) {
runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, decorPlan, decorPlan.getTraitSet)
} else {
decorPlan
}

// 3. optimize the logical Flink plan
// 4. optimize the logical Flink plan
val logicalOptRuleSet = getLogicalOptRuleSet
val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
Expand All @@ -276,7 +253,7 @@ abstract class StreamTableEnvironment(
normalizedPlan
}

// 4. optimize the physical Flink plan
// 5. optimize the physical Flink plan
val physicalOptRuleSet = getPhysicalOptRuleSet
val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()
val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
Expand All @@ -285,7 +262,7 @@ abstract class StreamTableEnvironment(
logicalPlan
}

// 5. decorate the optimized plan
// 6. decorate the optimized plan
val decoRuleSet = getDecoRuleSet
val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, physicalPlan, physicalPlan.getTraitSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.functions.{ScalarFunction, TableFunction, AggregateFunction}
import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
import org.apache.flink.table.plan.cost.DataSetCostFactory
import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.table.plan.rules.FlinkRuleSets
Expand Down Expand Up @@ -598,70 +601,94 @@ abstract class TableEnvironment(val config: TableConfig) {

/**
* Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
* [[Expression]].
* [[Expression]]. It does not handle time attributes but considers them in indices, if
* ignore flag is not false.
*
* @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
* @param exprs The expressions that define the field names.
* @param ignoreTimeAttributes ignore time attributes and handle them as regular expressions.
* @tparam A The type of the TypeInformation.
* @return A tuple of two arrays holding the field names and corresponding field positions.
*/
protected[flink] def getFieldInfo[A](
inputType: TypeInformation[A],
exprs: Array[Expression]): (Array[String], Array[Int]) = {
inputType: TypeInformation[A],
exprs: Array[Expression],
ignoreTimeAttributes: Boolean)
: (Array[String], Array[Int]) = {

TableEnvironment.validateType(inputType)

val filteredExprs = if (ignoreTimeAttributes) {
exprs.map {
case ta: TimeAttribute => ta.expression
case e@_ => e
}
} else {
exprs
}

val indexedNames: Array[(Int, String)] = inputType match {
case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
throw new TableException(
"An input of GenericTypeInfo<Row> cannot be converted to Table. " +
"Please specify the type of the input with a RowTypeInfo.")
case a: AtomicType[A] =>
if (exprs.length != 1) {
throw new TableException("Table of atomic type can only have a single field.")
}
exprs.map {
case UnresolvedFieldReference(name) => (0, name)
filteredExprs.zipWithIndex flatMap {
case (UnresolvedFieldReference(name), idx) =>
if (idx > 0) {
throw new TableException("Table of atomic type can only have a single field.")
}
Some((0, name))
case (_: TimeAttribute, _) if ignoreTimeAttributes =>
None
case _ => throw new TableException("Field reference expression requested.")
}
case t: TupleTypeInfo[A] =>
exprs.zipWithIndex.map {
case (UnresolvedFieldReference(name), idx) => (idx, name)
filteredExprs.zipWithIndex flatMap {
case (UnresolvedFieldReference(name), idx) =>
Some((idx, name))
case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
val idx = t.getFieldIndex(origName)
if (idx < 0) {
throw new TableException(s"$origName is not a field of type $t")
}
(idx, name)
Some((idx, name))
case (_: TimeAttribute, _) =>
None
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
}
case c: CaseClassTypeInfo[A] =>
exprs.zipWithIndex.map {
case (UnresolvedFieldReference(name), idx) => (idx, name)
filteredExprs.zipWithIndex flatMap {
case (UnresolvedFieldReference(name), idx) =>
Some((idx, name))
case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
val idx = c.getFieldIndex(origName)
if (idx < 0) {
throw new TableException(s"$origName is not a field of type $c")
}
(idx, name)
Some((idx, name))
case (_: TimeAttribute, _) =>
None
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
}
case p: PojoTypeInfo[A] =>
exprs.map {
filteredExprs flatMap {
case (UnresolvedFieldReference(name)) =>
val idx = p.getFieldIndex(name)
if (idx < 0) {
throw new TableException(s"$name is not a field of type $p")
}
(idx, name)
Some((idx, name))
case Alias(UnresolvedFieldReference(origName), name, _) =>
val idx = p.getFieldIndex(origName)
if (idx < 0) {
throw new TableException(s"$origName is not a field of type $p")
}
(idx, name)
Some((idx, name))
case _: TimeAttribute =>
None
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
}
Expand Down Expand Up @@ -795,6 +822,42 @@ abstract class TableEnvironment(val config: TableConfig) {
Some(mapFunction)
}

/**
* Checks for at most one rowtime and proctime attribute.
* Returns the time attributes.
*
* @return rowtime attribute and proctime attribute
*/
protected def validateAndExtractTimeAttributes(
fieldNames: Seq[String],
fieldIndices: Seq[Int],
exprs: Array[Expression])
: (Option[(Int, String)], Option[(Int, String)]) = {

var rowtime: Option[(Int, String)] = None
var proctime: Option[(Int, String)] = None

exprs.zipWithIndex.foreach {
case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
if (rowtime.isDefined) {
throw new TableException(
"The rowtime attribute can only be defined once in a table schema.")
} else {
rowtime = Some(idx, name)
}
case (ProctimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
if (proctime.isDefined) {
throw new TableException(
"The proctime attribute can only be defined once in a table schema.")
} else {
proctime = Some(idx, name)
}
case _ =>
// do nothing
}

(rowtime, proctime)
}
}

/**
Expand All @@ -803,6 +866,10 @@ abstract class TableEnvironment(val config: TableConfig) {
*/
object TableEnvironment {

// default names that can be used in in TableSources etc.
val DEFAULT_ROWTIME_ATTRIBUTE = "rowtime"
val DEFAULT_PROCTIME_ATTRIBUTE = "proctime"

/**
* Returns a [[JavaBatchTableEnv]] for a Java [[JavaBatchExecEnv]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ trait ImplicitExpressionOperations {
*/
def millis = milli

// row interval type
// Row interval type

/**
* Creates an interval of rows.
Expand All @@ -634,6 +634,8 @@ trait ImplicitExpressionOperations {
*/
def rows = toRowInterval(expr)

// Advanced type helper functions

/**
* Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and
* returns it's value.
Expand Down Expand Up @@ -680,6 +682,20 @@ trait ImplicitExpressionOperations {
* @return the first and only element of an array with a single element
*/
def element() = ArrayElement(expr)

// Schema definition

/**
* Declares a field as the rowtime attribute for indicating, accessing, and working in
* Flink's event time.
*/
def rowtime = RowtimeAttribute(expr)

/**
* Declares a field as the proctime attribute for indicating, accessing, and working in
* Flink's processing time.
*/
def proctime = ProctimeAttribute(expr)
}

/**
Expand Down
Loading

0 comments on commit 495f104

Please sign in to comment.