Skip to content

Commit

Permalink
[FLINK-7962] Add built-in support for min/max aggregation for Timestamp
Browse files Browse the repository at this point in the history
This closes apache#4936
  • Loading branch information
dianfu authored and wuchong committed Nov 16, 2017
1 parent b6a2dc3 commit d0b2aa2
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package org.apache.flink.table.functions.aggfunctions

import java.math.BigDecimal
import java.lang.{Iterable => JIterable}
import java.sql.Timestamp

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
import org.apache.flink.table.functions.AggregateFunction

/** The initial accumulator for Max aggregate function */
Expand Down Expand Up @@ -159,3 +162,11 @@ class StringMaxAggFunction extends MaxAggFunction[String] {
override def getInitValue = ""
override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
}

/**
* Built-in Timestamp Max aggregate function
*/
class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] {
override def getInitValue: Timestamp = new Timestamp(0)
override def getValueTypeInfo = Types.SQL_TIMESTAMP
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ package org.apache.flink.table.functions.aggfunctions
import java.math.BigDecimal
import java.util.{HashMap => JHashMap}
import java.lang.{Iterable => JIterable}
import java.sql.Timestamp

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
import org.apache.flink.table.functions.AggregateFunction

/** The initial accumulator for Max with retraction aggregate function */
Expand Down Expand Up @@ -216,3 +219,11 @@ class StringMaxWithRetractAggFunction extends MaxWithRetractAggFunction[String]
override def getInitValue: String = ""
override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
}

/**
* Built-in Timestamp Max with retraction aggregate function
*/
class TimestampMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Timestamp] {
override def getInitValue: Timestamp = new Timestamp(0)
override def getValueTypeInfo = Types.SQL_TIMESTAMP
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package org.apache.flink.table.functions.aggfunctions

import java.math.BigDecimal
import java.lang.{Iterable => JIterable}
import java.sql.Timestamp

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
import org.apache.flink.table.functions.AggregateFunction

/** The initial accumulator for Min aggregate function */
Expand Down Expand Up @@ -159,3 +162,11 @@ class StringMinAggFunction extends MinAggFunction[String] {
override def getInitValue = ""
override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
}

/**
* Built-in Timestamp Min aggregate function
*/
class TimestampMinAggFunction extends MinAggFunction[Timestamp] {
override def getInitValue: Timestamp = new Timestamp(0)
override def getValueTypeInfo = Types.SQL_TIMESTAMP
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ package org.apache.flink.table.functions.aggfunctions
import java.math.BigDecimal
import java.util.{HashMap => JHashMap}
import java.lang.{Iterable => JIterable}
import java.sql.Timestamp

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
import org.apache.flink.table.functions.AggregateFunction

/** The initial accumulator for Min with retraction aggregate function */
Expand Down Expand Up @@ -216,3 +219,11 @@ class StringMinWithRetractAggFunction extends MinWithRetractAggFunction[String]
override def getInitValue: String = ""
override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
}

/**
* Built-in Timestamp Min with retraction aggregate function
*/
class TimestampMinWithRetractAggFunction extends MinWithRetractAggFunction[Timestamp] {
override def getInitValue: Timestamp = new Timestamp(0)
override def getValueTypeInfo = Types.SQL_TIMESTAMP
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.functions.aggfunctions

import java.sql.Timestamp

object Ordering {
implicit object TimestampOrdering extends Ordering[Timestamp] {
override def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,8 @@ object AggregateUtil {
new BooleanMinWithRetractAggFunction
case VARCHAR | CHAR =>
new StringMinWithRetractAggFunction
case TIMESTAMP =>
new TimestampMinWithRetractAggFunction
case sqlType: SqlTypeName =>
throw new TableException(
s"Min with retract aggregate does no support type: '$sqlType'")
Expand All @@ -1291,6 +1293,8 @@ object AggregateUtil {
new BooleanMinAggFunction
case VARCHAR | CHAR =>
new StringMinAggFunction
case TIMESTAMP =>
new TimestampMinAggFunction
case sqlType: SqlTypeName =>
throw new TableException(s"Min aggregate does no support type: '$sqlType'")
}
Expand All @@ -1316,6 +1320,8 @@ object AggregateUtil {
new BooleanMaxWithRetractAggFunction
case VARCHAR | CHAR =>
new StringMaxWithRetractAggFunction
case TIMESTAMP =>
new TimestampMaxWithRetractAggFunction
case sqlType: SqlTypeName =>
throw new TableException(
s"Max with retract aggregate does no support type: '$sqlType'")
Expand All @@ -1340,6 +1346,8 @@ object AggregateUtil {
new BooleanMaxAggFunction
case VARCHAR | CHAR =>
new StringMaxAggFunction
case TIMESTAMP =>
new TimestampMaxAggFunction
case sqlType: SqlTypeName =>
throw new TableException(s"Max aggregate does no support type: '$sqlType'")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.aggfunctions

import java.math.BigDecimal
import java.sql.Timestamp

import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.functions.aggfunctions._
Expand Down Expand Up @@ -230,3 +231,29 @@ class StringMaxAggFunctionTest extends AggFunctionTestBase[String, MaxAccumulato
override def aggregator: AggregateFunction[String, MaxAccumulator[String]] =
new StringMaxAggFunction()
}

class TimestampMaxAggFunctionTest
extends AggFunctionTestBase[Timestamp, MaxAccumulator[Timestamp]] {
override def inputValueSets: Seq[Seq[_]] = Seq(
Seq(
new Timestamp(0),
new Timestamp(1000),
new Timestamp(100),
null.asInstanceOf[Timestamp],
new Timestamp(10)
),
Seq(
null.asInstanceOf[Timestamp],
null.asInstanceOf[Timestamp],
null.asInstanceOf[Timestamp]
)
)

override def expectedResults: Seq[Timestamp] = Seq(
new Timestamp(1000),
null.asInstanceOf[Timestamp]
)

override def aggregator: AggregateFunction[Timestamp, MaxAccumulator[Timestamp]] =
new TimestampMaxAggFunction()
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.aggfunctions

import java.math.BigDecimal
import java.sql.Timestamp

import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.functions.aggfunctions._
Expand Down Expand Up @@ -242,3 +243,32 @@ class StringMaxWithRetractAggFunctionTest

override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
}

class TimestampMaxWithRetractAggFunctionTest
extends AggFunctionTestBase[Timestamp, MaxWithRetractAccumulator[Timestamp]] {

override def inputValueSets: Seq[Seq[_]] = Seq(
Seq(
new Timestamp(0),
new Timestamp(1000),
new Timestamp(100),
null.asInstanceOf[Timestamp],
new Timestamp(10)
),
Seq(
null.asInstanceOf[Timestamp],
null.asInstanceOf[Timestamp],
null.asInstanceOf[Timestamp]
)
)

override def expectedResults: Seq[Timestamp] = Seq(
new Timestamp(1000),
null.asInstanceOf[Timestamp]
)

override def aggregator: AggregateFunction[Timestamp, MaxWithRetractAccumulator[Timestamp]] =
new TimestampMaxWithRetractAggFunction()

override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.aggfunctions

import java.math.BigDecimal
import java.sql.Timestamp

import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.functions.aggfunctions._
Expand Down Expand Up @@ -231,3 +232,29 @@ class StringMinAggFunctionTest
override def aggregator: AggregateFunction[String, MinAccumulator[String]] =
new StringMinAggFunction()
}

class TimestampMinAggFunctionTest
extends AggFunctionTestBase[Timestamp, MinAccumulator[Timestamp]] {
override def inputValueSets: Seq[Seq[_]] = Seq(
Seq(
new Timestamp(0),
new Timestamp(1000),
new Timestamp(100),
null.asInstanceOf[Timestamp],
new Timestamp(10)
),
Seq(
null.asInstanceOf[Timestamp],
null.asInstanceOf[Timestamp],
null.asInstanceOf[Timestamp]
)
)

override def expectedResults: Seq[Timestamp] = Seq(
new Timestamp(0),
null.asInstanceOf[Timestamp]
)

override def aggregator: AggregateFunction[Timestamp, MinAccumulator[Timestamp]] =
new TimestampMinAggFunction()
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.aggfunctions

import java.math.BigDecimal
import java.sql.Timestamp

import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.functions.aggfunctions._
Expand Down Expand Up @@ -242,3 +243,34 @@ class StringMinWithRetractAggFunctionTest

override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
}

class TimestampMinWithRetractAggFunctionTest
extends AggFunctionTestBase[Timestamp, MinWithRetractAccumulator[Timestamp]] {

override def inputValueSets: Seq[Seq[_]] = Seq(
Seq(
new Timestamp(0),
new Timestamp(1000),
new Timestamp(100),
null.asInstanceOf[Timestamp],
new Timestamp(10)
),
Seq(
null,
null,
null,
null,
null
)
)

override def expectedResults: Seq[Timestamp] = Seq(
new Timestamp(0),
null
)

override def aggregator: AggregateFunction[Timestamp, MinWithRetractAccumulator[Timestamp]] =
new TimestampMinWithRetractAggFunction()

override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
}

0 comments on commit d0b2aa2

Please sign in to comment.