Skip to content

Commit

Permalink
[FLINK-5884] [table] Integrate time indicators for Table API & SQL. C…
Browse files Browse the repository at this point in the history
…ontinued
  • Loading branch information
fhueske committed May 5, 2017
1 parent 495f104 commit 24bf61c
Show file tree
Hide file tree
Showing 22 changed files with 628 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, TimeAttribute}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
Expand Down Expand Up @@ -196,26 +196,11 @@ abstract class BatchTableEnvironment(

val (fieldNames, fieldIndexes) = getFieldInfo[T](
dataSet.getType,
fields,
ignoreTimeAttributes = true)
fields)

// validate and extract time attributes
val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields)

// don't allow proctime on batch
proctime match {
case Some(_) =>
throw new ValidationException(
"A proctime attribute is not allowed in a batch environment. " +
"Working with processing-time on batch would lead to non-deterministic results.")
case _ => // ok
}
// rowtime must not extend the schema of a batch table
rowtime match {
case Some((idx, _)) if idx >= dataSet.getType.getArity =>
throw new ValidationException(
"A rowtime attribute must be defined on an existing field in a batch environment.")
case _ => // ok
if (fields.exists(_.isInstanceOf[TimeAttribute])) {
throw new ValidationException(
".rowtime and .proctime time indicators are not allowed in a batch environment.")
}

val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,21 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{RuleSet, RuleSets}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.calcite.RelTimeIndicatorConverter
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.table.plan.schema.{DataStreamTable, TableSourceTable}
import org.apache.flink.table.plan.schema.{DataStreamTable, StreamTableSourceTable, TableSourceTable}
import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.table.typeutils.TypeCheckUtils
import org.apache.flink.types.Row

import _root_.scala.collection.JavaConverters._
Expand Down Expand Up @@ -99,7 +101,7 @@ abstract class StreamTableEnvironment(

tableSource match {
case streamTableSource: StreamTableSource[_] =>
registerTableInternal(name, new TableSourceTable(streamTableSource))
registerTableInternal(name, new StreamTableSourceTable(streamTableSource))
case _ =>
throw new TableException("Only StreamTableSource can be registered in " +
"StreamTableEnvironment")
Expand Down Expand Up @@ -168,14 +170,13 @@ abstract class StreamTableEnvironment(
fields: Array[Expression])
: Unit = {

val (fieldNames, fieldIndexes) = getFieldInfo[T](
dataStream.getType,
fields,
ignoreTimeAttributes = false)
val streamType = dataStream.getType

// validate and extract time attributes
val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields)
// get field names and types for all non-replaced fields
val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType, fields)

// validate and extract time attributes
val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, fields)

val dataStreamTable = new DataStreamTable[T](
dataStream,
Expand All @@ -187,6 +188,71 @@ abstract class StreamTableEnvironment(
registerTableInternal(name, dataStreamTable)
}

/**
* Checks for at most one rowtime and proctime attribute.
* Returns the time attributes.
*
* @return rowtime attribute and proctime attribute
*/
private def validateAndExtractTimeAttributes(
streamType: TypeInformation[_],
exprs: Array[Expression])
: (Option[(Int, String)], Option[(Int, String)]) = {

val fieldTypes: Array[TypeInformation[_]] = streamType match {
case c: CompositeType[_] => (0 until c.getArity).map(i => c.getTypeAt(i)).toArray
case a: AtomicType[_] => Array(a)
}

var fieldNames: List[String] = Nil
var rowtime: Option[(Int, String)] = None
var proctime: Option[(Int, String)] = None

exprs.zipWithIndex.foreach {
case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
if (rowtime.isDefined) {
throw new TableException(
"The rowtime attribute can only be defined once in a table schema.")
} else {
// check type of field that is replaced
if (idx < fieldTypes.length &&
!(TypeCheckUtils.isLong(fieldTypes(idx)) ||
TypeCheckUtils.isTimePoint(fieldTypes(idx)))) {
throw new TableException(
"The rowtime attribute can only be replace a field with a valid time type, such as " +
"Timestamp or Long.")
}
rowtime = Some(idx, name)
}
case (ProctimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
if (proctime.isDefined) {
throw new TableException(
"The proctime attribute can only be defined once in a table schema.")
} else {
// check that proctime is only appended
if (idx < fieldTypes.length) {
throw new TableException(
"The proctime attribute can only be appended to the table schema and not replace " +
"an existing field. Please move it to the end of the schema.")
}
proctime = Some(idx, name)
}
case (u: UnresolvedFieldReference, _) => fieldNames = u.name :: fieldNames
}

if (rowtime.isDefined && fieldNames.contains(rowtime.get._2)) {
throw new TableException(
"The rowtime attribute may not have the same name as an another field.")
}

if (proctime.isDefined && fieldNames.contains(proctime.get._2)) {
throw new TableException(
"The proctime attribute may not have the same name as an another field.")
}

(rowtime, proctime)
}

/**
* Returns the decoration rule set for this environment
* including a custom RuleSet configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,50 +601,36 @@ abstract class TableEnvironment(val config: TableConfig) {

/**
* Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
* [[Expression]]. It does not handle time attributes but considers them in indices, if
* ignore flag is not false.
* [[Expression]]. It does not handle time attributes but considers them in indices.
*
* @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
* @param exprs The expressions that define the field names.
* @param ignoreTimeAttributes ignore time attributes and handle them as regular expressions.
* @param exprs The expressions that define the field names.
* @tparam A The type of the TypeInformation.
* @return A tuple of two arrays holding the field names and corresponding field positions.
*/
protected[flink] def getFieldInfo[A](
inputType: TypeInformation[A],
exprs: Array[Expression],
ignoreTimeAttributes: Boolean)
exprs: Array[Expression])
: (Array[String], Array[Int]) = {

TableEnvironment.validateType(inputType)

val filteredExprs = if (ignoreTimeAttributes) {
exprs.map {
case ta: TimeAttribute => ta.expression
case e@_ => e
}
} else {
exprs
}

val indexedNames: Array[(Int, String)] = inputType match {
case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
throw new TableException(
"An input of GenericTypeInfo<Row> cannot be converted to Table. " +
"Please specify the type of the input with a RowTypeInfo.")
case a: AtomicType[A] =>
filteredExprs.zipWithIndex flatMap {
exprs.zipWithIndex flatMap {
case (UnresolvedFieldReference(name), idx) =>
if (idx > 0) {
throw new TableException("Table of atomic type can only have a single field.")
}
Some((0, name))
case (_: TimeAttribute, _) if ignoreTimeAttributes =>
None
case _ => throw new TableException("Field reference expression requested.")
}
case t: TupleTypeInfo[A] =>
filteredExprs.zipWithIndex flatMap {
exprs.zipWithIndex flatMap {
case (UnresolvedFieldReference(name), idx) =>
Some((idx, name))
case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
Expand All @@ -659,7 +645,7 @@ abstract class TableEnvironment(val config: TableConfig) {
"Field reference expression or alias on field expression expected.")
}
case c: CaseClassTypeInfo[A] =>
filteredExprs.zipWithIndex flatMap {
exprs.zipWithIndex flatMap {
case (UnresolvedFieldReference(name), idx) =>
Some((idx, name))
case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
Expand All @@ -674,7 +660,7 @@ abstract class TableEnvironment(val config: TableConfig) {
"Field reference expression or alias on field expression expected.")
}
case p: PojoTypeInfo[A] =>
filteredExprs flatMap {
exprs flatMap {
case (UnresolvedFieldReference(name)) =>
val idx = p.getFieldIndex(name)
if (idx < 0) {
Expand Down Expand Up @@ -822,42 +808,6 @@ abstract class TableEnvironment(val config: TableConfig) {
Some(mapFunction)
}

/**
* Checks for at most one rowtime and proctime attribute.
* Returns the time attributes.
*
* @return rowtime attribute and proctime attribute
*/
protected def validateAndExtractTimeAttributes(
fieldNames: Seq[String],
fieldIndices: Seq[Int],
exprs: Array[Expression])
: (Option[(Int, String)], Option[(Int, String)]) = {

var rowtime: Option[(Int, String)] = None
var proctime: Option[(Int, String)] = None

exprs.zipWithIndex.foreach {
case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
if (rowtime.isDefined) {
throw new TableException(
"The rowtime attribute can only be defined once in a table schema.")
} else {
rowtime = Some(idx, name)
}
case (ProctimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
if (proctime.isDefined) {
throw new TableException(
"The proctime attribute can only be defined once in a table schema.")
} else {
proctime = Some(idx, name)
}
case _ =>
// do nothing
}

(rowtime, proctime)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
createSqlIntervalType(
new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))

case TimeIndicatorTypeInfo.ROWTIME_INDICATOR =>
createRowtimeIndicatorType()

case TimeIndicatorTypeInfo.PROCTIME_INDICATOR =>
createProctimeIndicatorType()
case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
createRowtimeIndicatorType()
} else {
createProctimeIndicatorType()
}

case _ =>
createSqlType(sqlType)
Expand Down Expand Up @@ -114,9 +115,11 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
* @param fieldNames field names
* @param fieldTypes field types, every element is Flink's [[TypeInformation]]
* @param rowtime optional system field to indicate event-time; the index determines the index
* in the final record and might replace an existing field
* in the final record. If the index is smaller than the number of specified
* fields, it shifts all following fields.
* @param proctime optional system field to indicate processing-time; the index determines the
* index in the final record and might replace an existing field
* index in the final record. If the index is smaller than the number of
* specified fields, it shifts all following fields.
* @return a struct type with the input fieldNames, input fieldTypes, and system fields
*/
def buildLogicalRowType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val UNBOUNDED_ROW: Keyword = Keyword("unbounded_row")
lazy val UNBOUNDED_RANGE: Keyword = Keyword("unbounded_range")
lazy val ASIN: Keyword = Keyword("asin")
lazy val ROWTIME: Keyword = Keyword("rowtime")
lazy val PROCTIME: Keyword = Keyword("proctime")

def functionIdent: ExpressionParser.Parser[String] =
not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
not(SUM0) ~ not(STDDEV_POP) ~ not(STDDEV_SAMP) ~ not(VAR_POP) ~ not(VAR_SAMP) ~
not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~ not(IF) ~
not(ROWTIME) ~ not(PROCTIME) ~
not(CURRENT_ROW) ~ not(UNBOUNDED_ROW) ~ not(CURRENT_RANGE) ~ not(UNBOUNDED_RANGE) ~>
super.ident

Expand Down Expand Up @@ -532,12 +535,25 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {

// alias

lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
lazy val alias: PackratParser[Expression] = timeIndicator |
logic ~ AS ~ fieldReference ^^ {
case e ~ _ ~ name => Alias(e, name.name)
} | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
case e ~ _ ~ _ ~ names ~ _ => Alias(e, names.head.name, names.tail.map(_.name))
} | logic

// time indicators

lazy val timeIndicator: PackratParser[Expression] = procTime | rowTime

lazy val procTime: PackratParser[Expression] = fieldReference ~ "." ~ PROCTIME ^^ {
case f ~ _ ~ _ => ProctimeAttribute(f)
}

lazy val rowTime: PackratParser[Expression] = fieldReference ~ "." ~ ROWTIME ^^ {
case f ~ _ ~ _ => RowtimeAttribute(f)
}

lazy val expression: PackratParser[Expression] = alias |
failure("Invalid expression.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.flink.table.expressions.{Expression, WindowReference}
import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}

/**
* Logical super class for all types of windows (group-windows and row-windows).
* Logical super class for group windows.
*
* @param aliasAttribute window alias
* @param timeAttribute time field indicating event-time or processing-time
Expand Down
Loading

0 comments on commit 24bf61c

Please sign in to comment.