Skip to content

Commit

Permalink
[FLINK-8097] [table] Add built-in support for min/max aggregation for…
Browse files Browse the repository at this point in the history
… Date/Time

This closes apache#5027.
  • Loading branch information
dianfu authored and twalthr committed Nov 21, 2017
1 parent 9e3439c commit 44c603d
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package org.apache.flink.table.functions.aggfunctions

import java.math.BigDecimal
import java.lang.{Iterable => JIterable}
import java.sql.Timestamp
import java.sql.{Date, Time, Timestamp}

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
import org.apache.flink.table.functions.aggfunctions.Ordering._
import org.apache.flink.table.functions.AggregateFunction

/** The initial accumulator for Max aggregate function */
Expand Down Expand Up @@ -170,3 +170,19 @@ class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] {
override def getInitValue: Timestamp = new Timestamp(0)
override def getValueTypeInfo = Types.SQL_TIMESTAMP
}

/**
* Built-in Date Max aggregate function
*/
class DateMaxAggFunction extends MaxAggFunction[Date] {
override def getInitValue: Date = new Date(0)
override def getValueTypeInfo = Types.SQL_DATE
}

/**
* Built-in Time Max aggregate function
*/
class TimeMaxAggFunction extends MaxAggFunction[Time] {
override def getInitValue: Time = new Time(0)
override def getValueTypeInfo = Types.SQL_TIME
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package org.apache.flink.table.functions.aggfunctions
import java.math.BigDecimal
import java.util.{HashMap => JHashMap}
import java.lang.{Iterable => JIterable}
import java.sql.Timestamp
import java.sql.{Date, Time, Timestamp}

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
import org.apache.flink.table.functions.aggfunctions.Ordering._
import org.apache.flink.table.functions.AggregateFunction

/** The initial accumulator for Max with retraction aggregate function */
Expand Down Expand Up @@ -227,3 +227,19 @@ class TimestampMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Times
override def getInitValue: Timestamp = new Timestamp(0)
override def getValueTypeInfo = Types.SQL_TIMESTAMP
}

/**
* Built-in Date Max with retraction aggregate function
*/
class DateMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Date] {
override def getInitValue: Date = new Date(0)
override def getValueTypeInfo = Types.SQL_DATE
}

/**
* Built-in Time Max with retraction aggregate function
*/
class TimeMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Time] {
override def getInitValue: Time = new Time(0)
override def getValueTypeInfo = Types.SQL_TIME
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package org.apache.flink.table.functions.aggfunctions

import java.math.BigDecimal
import java.lang.{Iterable => JIterable}
import java.sql.Timestamp
import java.sql.{Date, Time, Timestamp}

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
import org.apache.flink.table.functions.aggfunctions.Ordering._
import org.apache.flink.table.functions.AggregateFunction

/** The initial accumulator for Min aggregate function */
Expand Down Expand Up @@ -170,3 +170,19 @@ class TimestampMinAggFunction extends MinAggFunction[Timestamp] {
override def getInitValue: Timestamp = new Timestamp(0)
override def getValueTypeInfo = Types.SQL_TIMESTAMP
}

/**
* Built-in Date Min aggregate function
*/
class DateMinAggFunction extends MinAggFunction[Date] {
override def getInitValue: Date = new Date(0)
override def getValueTypeInfo = Types.SQL_DATE
}

/**
* Built-in Time Min aggregate function
*/
class TimeMinAggFunction extends MinAggFunction[Time] {
override def getInitValue: Time = new Time(0)
override def getValueTypeInfo = Types.SQL_TIME
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package org.apache.flink.table.functions.aggfunctions
import java.math.BigDecimal
import java.util.{HashMap => JHashMap}
import java.lang.{Iterable => JIterable}
import java.sql.Timestamp
import java.sql.{Date, Time, Timestamp}

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
import org.apache.flink.table.functions.aggfunctions.Ordering._
import org.apache.flink.table.functions.AggregateFunction

/** The initial accumulator for Min with retraction aggregate function */
Expand Down Expand Up @@ -227,3 +227,19 @@ class TimestampMinWithRetractAggFunction extends MinWithRetractAggFunction[Times
override def getInitValue: Timestamp = new Timestamp(0)
override def getValueTypeInfo = Types.SQL_TIMESTAMP
}

/**
* Built-in Date Min with retraction aggregate function
*/
class DateMinWithRetractAggFunction extends MinWithRetractAggFunction[Date] {
override def getInitValue: Date = new Date(0)
override def getValueTypeInfo = Types.SQL_DATE
}

/**
* Built-in Time Min with retraction aggregate function
*/
class TimeMinWithRetractAggFunction extends MinWithRetractAggFunction[Time] {
override def getInitValue: Time = new Time(0)
override def getValueTypeInfo = Types.SQL_TIME
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@

package org.apache.flink.table.functions.aggfunctions

import java.sql.Timestamp
import java.sql.{Date, Time, Timestamp}

object Ordering {
implicit object TimestampOrdering extends Ordering[Timestamp] {
override def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)
}

implicit object DateOrdering extends Ordering[Date] {
override def compare(x: Date, y: Date): Int = x.compareTo(y)
}

implicit object TimeOrdering extends Ordering[Time] {
override def compare(x: Time, y: Time): Int = x.compareTo(y)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,10 @@ object AggregateUtil {
new StringMinWithRetractAggFunction
case TIMESTAMP =>
new TimestampMinWithRetractAggFunction
case DATE =>
new DateMinWithRetractAggFunction
case TIME =>
new TimeMinWithRetractAggFunction
case sqlType: SqlTypeName =>
throw new TableException(
s"Min with retract aggregate does no support type: '$sqlType'")
Expand All @@ -1295,6 +1299,10 @@ object AggregateUtil {
new StringMinAggFunction
case TIMESTAMP =>
new TimestampMinAggFunction
case DATE =>
new DateMinAggFunction
case TIME =>
new TimeMinAggFunction
case sqlType: SqlTypeName =>
throw new TableException(s"Min aggregate does no support type: '$sqlType'")
}
Expand Down Expand Up @@ -1322,6 +1330,10 @@ object AggregateUtil {
new StringMaxWithRetractAggFunction
case TIMESTAMP =>
new TimestampMaxWithRetractAggFunction
case DATE =>
new DateMaxWithRetractAggFunction
case TIME =>
new TimeMaxWithRetractAggFunction
case sqlType: SqlTypeName =>
throw new TableException(
s"Max with retract aggregate does no support type: '$sqlType'")
Expand All @@ -1348,6 +1360,10 @@ object AggregateUtil {
new StringMaxAggFunction
case TIMESTAMP =>
new TimestampMaxAggFunction
case DATE =>
new DateMaxAggFunction
case TIME =>
new TimeMaxAggFunction
case sqlType: SqlTypeName =>
throw new TableException(s"Max aggregate does no support type: '$sqlType'")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.table.runtime.aggfunctions

import java.math.BigDecimal
import java.sql.Timestamp
import java.sql.{Date, Time, Timestamp}

import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.functions.aggfunctions._
Expand Down Expand Up @@ -257,3 +257,55 @@ class TimestampMaxAggFunctionTest
override def aggregator: AggregateFunction[Timestamp, MaxAccumulator[Timestamp]] =
new TimestampMaxAggFunction()
}

class DateMaxAggFunctionTest
extends AggFunctionTestBase[Date, MaxAccumulator[Date]] {
override def inputValueSets: Seq[Seq[_]] = Seq(
Seq(
new Date(0),
new Date(1000),
new Date(100),
null.asInstanceOf[Date],
new Date(10)
),
Seq(
null.asInstanceOf[Date],
null.asInstanceOf[Date],
null.asInstanceOf[Date]
)
)

override def expectedResults: Seq[Date] = Seq(
new Date(1000),
null.asInstanceOf[Date]
)

override def aggregator: AggregateFunction[Date, MaxAccumulator[Date]] =
new DateMaxAggFunction()
}

class TimeMaxAggFunctionTest
extends AggFunctionTestBase[Time, MaxAccumulator[Time]] {
override def inputValueSets: Seq[Seq[_]] = Seq(
Seq(
new Time(0),
new Time(1000),
new Time(100),
null.asInstanceOf[Time],
new Time(10)
),
Seq(
null.asInstanceOf[Time],
null.asInstanceOf[Time],
null.asInstanceOf[Time]
)
)

override def expectedResults: Seq[Time] = Seq(
new Time(1000),
null.asInstanceOf[Time]
)

override def aggregator: AggregateFunction[Time, MaxAccumulator[Time]] =
new TimeMaxAggFunction()
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.table.runtime.aggfunctions

import java.math.BigDecimal
import java.sql.Timestamp
import java.sql.{Date, Time, Timestamp}

import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.functions.aggfunctions._
Expand Down Expand Up @@ -272,3 +272,61 @@ class TimestampMaxWithRetractAggFunctionTest

override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
}

class DateMaxWithRetractAggFunctionTest
extends AggFunctionTestBase[Date, MaxWithRetractAccumulator[Date]] {

override def inputValueSets: Seq[Seq[_]] = Seq(
Seq(
new Date(0),
new Date(1000),
new Date(100),
null.asInstanceOf[Date],
new Date(10)
),
Seq(
null.asInstanceOf[Date],
null.asInstanceOf[Date],
null.asInstanceOf[Date]
)
)

override def expectedResults: Seq[Date] = Seq(
new Date(1000),
null.asInstanceOf[Date]
)

override def aggregator: AggregateFunction[Date, MaxWithRetractAccumulator[Date]] =
new DateMaxWithRetractAggFunction()

override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
}

class TimeMaxWithRetractAggFunctionTest
extends AggFunctionTestBase[Time, MaxWithRetractAccumulator[Time]] {

override def inputValueSets: Seq[Seq[_]] = Seq(
Seq(
new Time(0),
new Time(1000),
new Time(100),
null.asInstanceOf[Time],
new Time(10)
),
Seq(
null.asInstanceOf[Time],
null.asInstanceOf[Time],
null.asInstanceOf[Time]
)
)

override def expectedResults: Seq[Time] = Seq(
new Time(1000),
null.asInstanceOf[Time]
)

override def aggregator: AggregateFunction[Time, MaxWithRetractAccumulator[Time]] =
new TimeMaxWithRetractAggFunction()

override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
}
Loading

0 comments on commit 44c603d

Please sign in to comment.