Skip to content

Commit

Permalink
[FLINK-3580] [table] Implement FLOOR/CEIL for time points
Browse files Browse the repository at this point in the history
This closes apache#2391.
  • Loading branch information
twalthr committed Aug 26, 2016
1 parent b05ea69 commit 6a456c6
Show file tree
Hide file tree
Showing 9 changed files with 396 additions and 20 deletions.
66 changes: 66 additions & 0 deletions docs/dev/table_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,28 @@ TEMPORAL.extract(TIMEINTERVALUNIT)
</td>
</tr>

<tr>
<td>
{% highlight java %}
TIMEPOINT.floor(TIMEINTERVALUNIT)
{% endhighlight %}
</td>
<td>
<p>Rounds a time point down to the given unit. E.g. <code>"12:44:31".toDate.floor(MINUTE)</code> leads to 12:44:00.</p>
</td>
</tr>

<tr>
<td>
{% highlight java %}
TIMEPOINT.ceil(TIMEINTERVALUNIT)
{% endhighlight %}
</td>
<td>
<p>Rounds a time point up to the given unit. E.g. <code>"12:44:31".toTime.floor(MINUTE)</code> leads to 12:45:00.</p>
</td>
</tr>

</tbody>
</table>

Expand Down Expand Up @@ -1683,6 +1705,28 @@ TEMPORAL.extract(TimeIntervalUnit)
</td>
</tr>

<tr>
<td>
{% highlight scala %}
TIMEPOINT.floor(TimeIntervalUnit)
{% endhighlight %}
</td>
<td>
<p>Rounds a time point down to the given unit. E.g. <code>"12:44:31".toTime.floor(TimeIntervalUnit.MINUTE)</code> leads to 12:44:00.</p>
</td>
</tr>

<tr>
<td>
{% highlight scala %}
TIMEPOINT.ceil(TimeIntervalUnit)
{% endhighlight %}
</td>
<td>
<p>Rounds a time point up to the given unit. E.g. <code>"12:44:31".toTime.floor(TimeIntervalUnit.MINUTE)</code> leads to 12:45:00.</p>
</td>
</tr>

</tbody>
</table>
</div>
Expand Down Expand Up @@ -1926,6 +1970,28 @@ EXTRACT(TIMEINTERVALUNIT FROM TEMPORAL)
</td>
</tr>

<tr>
<td>
{% highlight sql %}
FLOOR(TIMEPOINT TO TIMEINTERVALUNIT)
{% endhighlight %}
</td>
<td>
<p>Rounds a time point down to the given unit. E.g. <code>FLOOR(TIME '12:44:31' TO MINUTE)</code> leads to 12:44:00.</p>
</td>
</tr>

<tr>
<td>
{% highlight sql %}
CEIL(TIMEPOINT TO TIMEINTERVALUNIT)
{% endhighlight %}
</td>
<td>
<p>Rounds a time point up to the given unit. E.g. <code>CEIL(TIME '12:44:31' TO MINUTE)</code> leads to 12:45:00.</p>
</td>
</tr>

</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ trait ImplicitExpressionOperations {
*/
def ceil() = Ceil(expr)

// String operations

/**
* Creates a substring of the given string at given index for a given length.
*
Expand Down Expand Up @@ -216,6 +218,8 @@ trait ImplicitExpressionOperations {
*/
def similar(pattern: Expression) = Similar(expr, pattern)

// Temporal operations

/**
* Parses a date String in the form "yy-mm-dd" to a SQL Date.
*/
Expand All @@ -238,7 +242,21 @@ trait ImplicitExpressionOperations {
*/
def extract(timeIntervalUnit: TimeIntervalUnit) = Extract(timeIntervalUnit, expr)

// interval types
/**
* Rounds down a time point to the given unit.
*
* e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00
*/
def floor(timeIntervalUnit: TimeIntervalUnit) = TemporalFloor(timeIntervalUnit, expr)

/**
* Rounds up a time point to the given unit.
*
* e.g. "12:44:31".toDate.ceil(MINUTE) leads to 12:45:00
*/
def ceil(timeIntervalUnit: TimeIntervalUnit) = TemporalCeil(timeIntervalUnit, expr)

// Interval types

/**
* Creates an interval of the given number of years.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,17 @@ object CodeGenUtils {
s"${qualifyMethod(BuiltInMethod.TIMESTAMP_TO_LONG.method)}($resultTerm)"
}

def compareEnum(term: String, enum: Enum[_]): Boolean =
term == qualifyEnum(enum)
def compareEnum(term: String, enum: Enum[_]): Boolean = term == qualifyEnum(enum)

def getEnum(genExpr: GeneratedExpression): Enum[_] = {
val split = genExpr.resultTerm.split('.')
val value = split.last
val clazz = genExpr.resultType.getTypeClass
enumValueOf(clazz, value)
}

def enumValueOf[T <: Enum[T]](cls: Class[_], stringValue: String): Enum[_] =
Enum.valueOf(cls.asInstanceOf[Class[T]], stringValue).asInstanceOf[Enum[_]]


// ----------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,54 @@ package org.apache.flink.api.table.codegen.calls

import java.lang.reflect.Method

import org.apache.flink.api.common.typeinfo.BasicTypeInfo.
{DOUBLE_TYPE_INFO, FLOAT_TYPE_INFO,BIG_DEC_TYPE_INFO}
import org.apache.calcite.avatica.util.TimeUnitRange
import org.apache.calcite.avatica.util.TimeUnitRange.{MONTH, YEAR}
import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, DOUBLE_TYPE_INFO, FLOAT_TYPE_INFO}
import org.apache.flink.api.table.codegen.CodeGenUtils.{getEnum, primitiveTypeTermForTypeInfo, qualifyMethod}
import org.apache.flink.api.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}

/**
* Generates arithmetic floor/ceil function calls.
* Generates floor/ceil function calls.
*/
class FloorCeilCallGen(method: Method) extends MultiTypeMethodCallGen(method) {
class FloorCeilCallGen(
arithmeticMethod: Method,
temporalMethod: Option[Method] = None)
extends MultiTypeMethodCallGen(arithmeticMethod) {

override def generate(
codeGenerator: CodeGenerator,
operands: Seq[GeneratedExpression])
: GeneratedExpression = {
operands.head.resultType match {
case FLOAT_TYPE_INFO | DOUBLE_TYPE_INFO | BIG_DEC_TYPE_INFO =>
super.generate(codeGenerator, operands)
case _ =>
operands.head // no floor/ceil necessary
}
}
: GeneratedExpression = operands.size match {
// arithmetic
case 1 =>
operands.head.resultType match {
case FLOAT_TYPE_INFO | DOUBLE_TYPE_INFO | BIG_DEC_TYPE_INFO =>
super.generate(codeGenerator, operands)
case _ =>
operands.head // no floor/ceil necessary
}

// temporal
case 2 =>
val operand = operands.head
val unit = getEnum(operands(1)).asInstanceOf[TimeUnitRange]
val internalType = primitiveTypeTermForTypeInfo(operand.resultType)

generateCallIfArgsNotNull(codeGenerator.nullCheck, operand.resultType, operands) {
(terms) =>
unit match {
case YEAR | MONTH =>
s"""
|($internalType) ${qualifyMethod(temporalMethod.get)}(${terms(1)}, ${terms.head})
|""".stripMargin
case _ =>
s"""
|${qualifyMethod(arithmeticMethod)}(
| ($internalType) ${terms.head},
| ($internalType) ${unit.startUnit.multiplier.intValue()})
|""".stripMargin
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable._
import org.apache.calcite.sql.fun.SqlTrimFunction
import org.apache.calcite.util.BuiltInMethod
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.table.functions.utils.ScalarSqlFunction

Expand Down Expand Up @@ -181,6 +181,48 @@ object ScalarFunctions {
LONG_TYPE_INFO,
BuiltInMethod.UNIX_DATE_EXTRACT.method)

addSqlFunction(
FLOOR,
Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])),
new FloorCeilCallGen(
BuiltInMethod.FLOOR.method,
Some(BuiltInMethod.UNIX_DATE_FLOOR.method)))

addSqlFunction(
FLOOR,
Seq(SqlTimeTypeInfo.TIME, new GenericTypeInfo(classOf[TimeUnitRange])),
new FloorCeilCallGen(
BuiltInMethod.FLOOR.method,
Some(BuiltInMethod.UNIX_DATE_FLOOR.method)))

addSqlFunction(
FLOOR,
Seq(SqlTimeTypeInfo.TIMESTAMP, new GenericTypeInfo(classOf[TimeUnitRange])),
new FloorCeilCallGen(
BuiltInMethod.FLOOR.method,
Some(BuiltInMethod.UNIX_TIMESTAMP_FLOOR.method)))

addSqlFunction(
CEIL,
Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])),
new FloorCeilCallGen(
BuiltInMethod.CEIL.method,
Some(BuiltInMethod.UNIX_DATE_CEIL.method)))

addSqlFunction(
CEIL,
Seq(SqlTimeTypeInfo.TIME, new GenericTypeInfo(classOf[TimeUnitRange])),
new FloorCeilCallGen(
BuiltInMethod.CEIL.method,
Some(BuiltInMethod.UNIX_DATE_CEIL.method)))

addSqlFunction(
CEIL,
Seq(SqlTimeTypeInfo.TIMESTAMP, new GenericTypeInfo(classOf[TimeUnitRange])),
new FloorCeilCallGen(
BuiltInMethod.CEIL.method,
Some(BuiltInMethod.UNIX_TIMESTAMP_CEIL.method)))

// ----------------------------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.flink.api.table.expressions.TimePointUnit.TimePointUnit
import org.apache.flink.api.table.expressions.TrimMode.TrimMode
import org.apache.flink.api.table.typeutils.IntervalTypeInfo

import scala.language.implicitConversions
import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}

/**
Expand Down Expand Up @@ -65,6 +66,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
lazy val TRIM: Keyword = Keyword("trim")
lazy val EXTRACT: Keyword = Keyword("extract")
lazy val FLOOR: Keyword = Keyword("floor")
lazy val CEIL: Keyword = Keyword("ceil")
lazy val YEAR: Keyword = Keyword("year")
lazy val MONTH: Keyword = Keyword("month")
lazy val DAY: Keyword = Keyword("day")
Expand Down Expand Up @@ -213,6 +216,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
case operand ~ _ ~ _ ~ _ ~ unit ~ _ => Extract(unit, operand)
}

lazy val suffixFloor = composite ~ "." ~ FLOOR ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
case operand ~ _ ~ _ ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
}

lazy val suffixCeil = composite ~ "." ~ CEIL ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
case operand ~ _ ~ _ ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
}

lazy val suffixFunctionCall =
composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args)
Expand Down Expand Up @@ -255,7 +266,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
suffixTimeInterval | suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax |
suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs |
suffixIf | suffixAsc | suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime |
suffixExtract | suffixFunctionCall // function call must always be at the end
suffixExtract | suffixFloor | suffixCeil |
suffixFunctionCall // function call must always be at the end

// prefix operators

Expand Down Expand Up @@ -311,10 +323,18 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
case _ ~ _ ~ operand ~ _ ~ unit ~ _ => Extract(unit, operand)
}

lazy val prefixFloor = FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
}

lazy val prefixCeil = CEIL ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
}

lazy val prefixed: PackratParser[Expression] =
prefixIsNull | prefixIsNotNull | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg |
prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract |
prefixFunctionCall // function call must always be at the end
prefixFloor | prefixCeil | prefixFunctionCall // function call must always be at the end

// suffix/prefix composite

Expand Down
Loading

0 comments on commit 6a456c6

Please sign in to comment.