From d0b2aa28db529a1c982b3cf61242520fb67efd6a Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Thu, 2 Nov 2017 11:26:25 +0800 Subject: [PATCH] [FLINK-7962] Add built-in support for min/max aggregation for Timestamp This closes #4936 --- .../aggfunctions/MaxAggFunction.scala | 11 +++++++ .../MaxAggFunctionWithRetract.scala | 11 +++++++ .../aggfunctions/MinAggFunction.scala | 11 +++++++ .../MinAggFunctionWithRetract.scala | 11 +++++++ .../functions/aggfunctions/Ordering.scala | 27 ++++++++++++++++ .../runtime/aggregate/AggregateUtil.scala | 8 +++++ .../aggfunctions/MaxAggFunctionTest.scala | 27 ++++++++++++++++ .../MaxWithRetractAggFunctionTest.scala | 30 +++++++++++++++++ .../aggfunctions/MinAggFunctionTest.scala | 27 ++++++++++++++++ .../MinWithRetractAggFunctionTest.scala | 32 +++++++++++++++++++ 10 files changed, 195 insertions(+) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala index 0789bee149afd..9097eba8eb1c0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala @@ -19,10 +19,13 @@ package org.apache.flink.table.functions.aggfunctions import java.math.BigDecimal import java.lang.{Iterable => JIterable} +import java.sql.Timestamp import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.table.api.Types +import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering import org.apache.flink.table.functions.AggregateFunction /** The initial accumulator for Max aggregate function */ @@ -159,3 +162,11 @@ class StringMaxAggFunction extends MaxAggFunction[String] { override def getInitValue = "" override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO } + +/** + * Built-in Timestamp Max aggregate function + */ +class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] { + override def getInitValue: Timestamp = new Timestamp(0) + override def getValueTypeInfo = Types.SQL_TIMESTAMP +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala index c79c06a66e6c7..fdbfef3ad5978 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala @@ -20,10 +20,13 @@ package org.apache.flink.table.functions.aggfunctions import java.math.BigDecimal import java.util.{HashMap => JHashMap} import java.lang.{Iterable => JIterable} +import java.sql.Timestamp import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo} +import org.apache.flink.table.api.Types +import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering import org.apache.flink.table.functions.AggregateFunction /** The initial accumulator for Max with retraction aggregate function */ @@ -216,3 +219,11 @@ class StringMaxWithRetractAggFunction extends MaxWithRetractAggFunction[String] override def getInitValue: String = "" override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO } + +/** + * Built-in Timestamp Max with retraction aggregate function + */ +class TimestampMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Timestamp] { + override def getInitValue: Timestamp = new Timestamp(0) + override def getValueTypeInfo = Types.SQL_TIMESTAMP +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala index d2132c29e5992..1cb1ab00f515d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala @@ -19,10 +19,13 @@ package org.apache.flink.table.functions.aggfunctions import java.math.BigDecimal import java.lang.{Iterable => JIterable} +import java.sql.Timestamp import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.table.api.Types +import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering import org.apache.flink.table.functions.AggregateFunction /** The initial accumulator for Min aggregate function */ @@ -159,3 +162,11 @@ class StringMinAggFunction extends MinAggFunction[String] { override def getInitValue = "" override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO } + +/** + * Built-in Timestamp Min aggregate function + */ +class TimestampMinAggFunction extends MinAggFunction[Timestamp] { + override def getInitValue: Timestamp = new Timestamp(0) + override def getValueTypeInfo = Types.SQL_TIMESTAMP +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala index faa672560a56e..44fa37f8c7de8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala @@ -20,10 +20,13 @@ package org.apache.flink.table.functions.aggfunctions import java.math.BigDecimal import java.util.{HashMap => JHashMap} import java.lang.{Iterable => JIterable} +import java.sql.Timestamp import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo} +import org.apache.flink.table.api.Types +import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering import org.apache.flink.table.functions.AggregateFunction /** The initial accumulator for Min with retraction aggregate function */ @@ -216,3 +219,11 @@ class StringMinWithRetractAggFunction extends MinWithRetractAggFunction[String] override def getInitValue: String = "" override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO } + +/** + * Built-in Timestamp Min with retraction aggregate function + */ +class TimestampMinWithRetractAggFunction extends MinWithRetractAggFunction[Timestamp] { + override def getInitValue: Timestamp = new Timestamp(0) + override def getValueTypeInfo = Types.SQL_TIMESTAMP +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala new file mode 100644 index 0000000000000..15ea2e30444c0 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.sql.Timestamp + +object Ordering { + implicit object TimestampOrdering extends Ordering[Timestamp] { + override def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y) + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index a867b1cd8b013..79cc258d9ab40 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -1267,6 +1267,8 @@ object AggregateUtil { new BooleanMinWithRetractAggFunction case VARCHAR | CHAR => new StringMinWithRetractAggFunction + case TIMESTAMP => + new TimestampMinWithRetractAggFunction case sqlType: SqlTypeName => throw new TableException( s"Min with retract aggregate does no support type: '$sqlType'") @@ -1291,6 +1293,8 @@ object AggregateUtil { new BooleanMinAggFunction case VARCHAR | CHAR => new StringMinAggFunction + case TIMESTAMP => + new TimestampMinAggFunction case sqlType: SqlTypeName => throw new TableException(s"Min aggregate does no support type: '$sqlType'") } @@ -1316,6 +1320,8 @@ object AggregateUtil { new BooleanMaxWithRetractAggFunction case VARCHAR | CHAR => new StringMaxWithRetractAggFunction + case TIMESTAMP => + new TimestampMaxWithRetractAggFunction case sqlType: SqlTypeName => throw new TableException( s"Max with retract aggregate does no support type: '$sqlType'") @@ -1340,6 +1346,8 @@ object AggregateUtil { new BooleanMaxAggFunction case VARCHAR | CHAR => new StringMaxAggFunction + case TIMESTAMP => + new TimestampMaxAggFunction case sqlType: SqlTypeName => throw new TableException(s"Max aggregate does no support type: '$sqlType'") } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala index 8a46ec54b1cff..03faa80137352 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.runtime.aggfunctions import java.math.BigDecimal +import java.sql.Timestamp import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.functions.aggfunctions._ @@ -230,3 +231,29 @@ class StringMaxAggFunctionTest extends AggFunctionTestBase[String, MaxAccumulato override def aggregator: AggregateFunction[String, MaxAccumulator[String]] = new StringMaxAggFunction() } + +class TimestampMaxAggFunctionTest + extends AggFunctionTestBase[Timestamp, MaxAccumulator[Timestamp]] { + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + new Timestamp(0), + new Timestamp(1000), + new Timestamp(100), + null.asInstanceOf[Timestamp], + new Timestamp(10) + ), + Seq( + null.asInstanceOf[Timestamp], + null.asInstanceOf[Timestamp], + null.asInstanceOf[Timestamp] + ) + ) + + override def expectedResults: Seq[Timestamp] = Seq( + new Timestamp(1000), + null.asInstanceOf[Timestamp] + ) + + override def aggregator: AggregateFunction[Timestamp, MaxAccumulator[Timestamp]] = + new TimestampMaxAggFunction() +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala index 246d964fc3ffa..eb620b42d7177 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.runtime.aggfunctions import java.math.BigDecimal +import java.sql.Timestamp import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.functions.aggfunctions._ @@ -242,3 +243,32 @@ class StringMaxWithRetractAggFunctionTest override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } + +class TimestampMaxWithRetractAggFunctionTest + extends AggFunctionTestBase[Timestamp, MaxWithRetractAccumulator[Timestamp]] { + + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + new Timestamp(0), + new Timestamp(1000), + new Timestamp(100), + null.asInstanceOf[Timestamp], + new Timestamp(10) + ), + Seq( + null.asInstanceOf[Timestamp], + null.asInstanceOf[Timestamp], + null.asInstanceOf[Timestamp] + ) + ) + + override def expectedResults: Seq[Timestamp] = Seq( + new Timestamp(1000), + null.asInstanceOf[Timestamp] + ) + + override def aggregator: AggregateFunction[Timestamp, MaxWithRetractAccumulator[Timestamp]] = + new TimestampMaxWithRetractAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala index 80fcacecae74b..992d1fc6808b7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.runtime.aggfunctions import java.math.BigDecimal +import java.sql.Timestamp import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.functions.aggfunctions._ @@ -231,3 +232,29 @@ class StringMinAggFunctionTest override def aggregator: AggregateFunction[String, MinAccumulator[String]] = new StringMinAggFunction() } + +class TimestampMinAggFunctionTest + extends AggFunctionTestBase[Timestamp, MinAccumulator[Timestamp]] { + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + new Timestamp(0), + new Timestamp(1000), + new Timestamp(100), + null.asInstanceOf[Timestamp], + new Timestamp(10) + ), + Seq( + null.asInstanceOf[Timestamp], + null.asInstanceOf[Timestamp], + null.asInstanceOf[Timestamp] + ) + ) + + override def expectedResults: Seq[Timestamp] = Seq( + new Timestamp(0), + null.asInstanceOf[Timestamp] + ) + + override def aggregator: AggregateFunction[Timestamp, MinAccumulator[Timestamp]] = + new TimestampMinAggFunction() +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala index c4273f69cd59d..323a651b34374 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.runtime.aggfunctions import java.math.BigDecimal +import java.sql.Timestamp import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.functions.aggfunctions._ @@ -242,3 +243,34 @@ class StringMinWithRetractAggFunctionTest override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } + +class TimestampMinWithRetractAggFunctionTest + extends AggFunctionTestBase[Timestamp, MinWithRetractAccumulator[Timestamp]] { + + override def inputValueSets: Seq[Seq[_]] = Seq( + Seq( + new Timestamp(0), + new Timestamp(1000), + new Timestamp(100), + null.asInstanceOf[Timestamp], + new Timestamp(10) + ), + Seq( + null, + null, + null, + null, + null + ) + ) + + override def expectedResults: Seq[Timestamp] = Seq( + new Timestamp(0), + null + ) + + override def aggregator: AggregateFunction[Timestamp, MinWithRetractAccumulator[Timestamp]] = + new TimestampMinWithRetractAggFunction() + + override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) +}