Skip to content

Commit

Permalink
[FLINK-22861][table-planner] Fix return value deduction of TIMESTAMPA…
Browse files Browse the repository at this point in the history
…DD function

This closes apache#16511
  • Loading branch information
tsreaper authored Jul 22, 2021
1 parent 56bbd6f commit fce9c1d
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.sql.fun;

import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;

/**
* The <code>TIMESTAMPADD</code> function, which adds an interval to a datetime (TIMESTAMP, TIME or
* DATE).
*
* <p>The SQL syntax is
*
* <blockquote>
*
* <code>TIMESTAMPADD(<i>timestamp interval</i>, <i>quantity</i>,
* <i>datetime</i>)</code>
*
* </blockquote>
*
* <p>The interval time unit can one of the following literals:
*
* <ul>
* <li>NANOSECOND (and synonym SQL_TSI_FRAC_SECOND)
* <li>MICROSECOND (and synonyms SQL_TSI_MICROSECOND, FRAC_SECOND)
* <li>SECOND (and synonym SQL_TSI_SECOND)
* <li>MINUTE (and synonym SQL_TSI_MINUTE)
* <li>HOUR (and synonym SQL_TSI_HOUR)
* <li>DAY (and synonym SQL_TSI_DAY)
* <li>WEEK (and synonym SQL_TSI_WEEK)
* <li>MONTH (and synonym SQL_TSI_MONTH)
* <li>QUARTER (and synonym SQL_TSI_QUARTER)
* <li>YEAR (and synonym SQL_TSI_YEAR)
* </ul>
*
* <p>Returns modified datetime.
*
* <p>This class was copied over from Calcite to fix the return type deduction issue on timestamp
* with local time zone type (CALCITE-4698).
*/
public class SqlTimestampAddFunction extends SqlFunction {

private static final int MILLISECOND_PRECISION = 3;
private static final int MICROSECOND_PRECISION = 6;

private static final SqlReturnTypeInference RETURN_TYPE_INFERENCE =
opBinding -> {
final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
return deduceType(
typeFactory,
opBinding.getOperandLiteralValue(0, TimeUnit.class),
opBinding.getOperandType(1),
opBinding.getOperandType(2));
};

// BEGIN FLINK MODIFICATION
// Reason: this method is changed to deduce return type on timestamp with local time zone
// correctly
// Whole class should be removed after CALCITE-4698 is fixed
public static RelDataType deduceType(
RelDataTypeFactory typeFactory,
TimeUnit timeUnit,
RelDataType intervalType,
RelDataType datetimeType) {
RelDataType type;
switch (timeUnit) {
case MILLISECOND:
type =
typeFactory.createSqlType(
timestampOrTimestampLtz(datetimeType),
Math.max(MILLISECOND_PRECISION, datetimeType.getPrecision()));
break;
case MICROSECOND:
type =
typeFactory.createSqlType(
timestampOrTimestampLtz(datetimeType),
Math.max(MICROSECOND_PRECISION, datetimeType.getPrecision()));
break;
case HOUR:
case MINUTE:
case SECOND:
if (datetimeType.getFamily() == SqlTypeFamily.TIME) {
type = datetimeType;
} else if (datetimeType.getFamily() == SqlTypeFamily.TIMESTAMP) {
type =
typeFactory.createSqlType(
timestampOrTimestampLtz(datetimeType),
datetimeType.getPrecision());
} else {
type = typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
}
break;
default:
type = datetimeType;
}
return typeFactory.createTypeWithNullability(
type, intervalType.isNullable() || datetimeType.isNullable());
}

private static SqlTypeName timestampOrTimestampLtz(RelDataType datetimeType) {
return datetimeType.getSqlTypeName() == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE
? SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE
: SqlTypeName.TIMESTAMP;
}
// END FLINK MODIFICATION

/** Creates a SqlTimestampAddFunction. */
SqlTimestampAddFunction() {
super(
"TIMESTAMPADD",
SqlKind.TIMESTAMP_ADD,
RETURN_TYPE_INFERENCE,
null,
OperandTypes.family(
SqlTypeFamily.ANY, SqlTypeFamily.INTEGER, SqlTypeFamily.DATETIME),
SqlFunctionCategory.TIMEDATE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.junit.rules.ExpectedException

import java.nio.charset.StandardCharsets
import java.sql.{Date, Time, Timestamp}
import java.time.{Instant, LocalDate, ZoneId}
import java.time.{Instant, LocalDate, LocalDateTime, LocalTime, ZoneId}
import java.util

import scala.collection.Seq
Expand Down Expand Up @@ -1521,4 +1521,91 @@ class CalcITCase extends BatchTestBase {
"select cast(b as boolean) from MyTable",
Seq(row(true), row(false), row(null), row(null)))
}

@Test
def testTimestampAdd(): Unit = {
// we're not adding this test to ScalarFunctionsTest because that test
// directly uses the generated code and does not check for expression types
val dataId = TestValuesTableFactory.registerData(
Seq(row(
LocalDateTime.of(2021, 7, 15, 16, 50, 0, 123000000),
LocalDateTime.of(2021, 7, 15, 16, 50, 0, 123456789),
Instant.ofEpochMilli(1626339000123L),
Instant.ofEpochSecond(1626339000, 123456789),
LocalDate.of(2021, 7, 15),
LocalTime.of(16, 50, 0, 123000000)
)))
val ddl =
s"""
|CREATE TABLE MyTable (
| a TIMESTAMP(3),
| b TIMESTAMP(9),
| c TIMESTAMP_LTZ(3),
| d TIMESTAMP_LTZ(9),
| e DATE,
| f TIME(3)
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$dataId',
| 'bounded' = 'true'
|)
|""".stripMargin
tEnv.executeSql(ddl)

checkResult(
"""
|select
| timestampadd(day, 1, a),
| timestampadd(hour, 1, a),
| timestampadd(minute, 1, a),
| timestampadd(second, 1, a),
| timestampadd(day, 1, b),
| timestampadd(hour, 1, b),
| timestampadd(minute, 1, b),
| timestampadd(second, 1, b),
| timestampadd(day, 1, c),
| timestampadd(hour, 1, c),
| timestampadd(minute, 1, c),
| timestampadd(second, 1, c),
| timestampadd(day, 1, d),
| timestampadd(hour, 1, d),
| timestampadd(minute, 1, d),
| timestampadd(second, 1, d),
| timestampadd(day, 1, e),
| timestampadd(hour, 1, e),
| timestampadd(minute, 1, e),
| timestampadd(second, 1, e),
| timestampadd(day, 1, f),
| timestampadd(hour, 1, f),
| timestampadd(minute, 1, f),
| timestampadd(second, 1, f)
|from MyTable
|""".stripMargin,
Seq(row(
LocalDateTime.of(2021, 7, 16, 16, 50, 0, 123000000),
LocalDateTime.of(2021, 7, 15, 17, 50, 0, 123000000),
LocalDateTime.of(2021, 7, 15, 16, 51, 0, 123000000),
LocalDateTime.of(2021, 7, 15, 16, 50, 1, 123000000),
LocalDateTime.of(2021, 7, 16, 16, 50, 0, 123456789),
LocalDateTime.of(2021, 7, 15, 17, 50, 0, 123456789),
LocalDateTime.of(2021, 7, 15, 16, 51, 0, 123456789),
LocalDateTime.of(2021, 7, 15, 16, 50, 1, 123456789),
Instant.ofEpochMilli(1626339000123L + 24 * 3600 * 1000L),
Instant.ofEpochMilli(1626339000123L + 3600 * 1000L),
Instant.ofEpochMilli(1626339000123L + 60 * 1000L),
Instant.ofEpochMilli(1626339000123L + 1000L),
Instant.ofEpochSecond(1626339000 + 24 * 3600, 123456789),
Instant.ofEpochSecond(1626339000 + 3600, 123456789),
Instant.ofEpochSecond(1626339000 + 60, 123456789),
Instant.ofEpochSecond(1626339000 + 1, 123456789),
LocalDate.of(2021, 7, 16),
LocalDateTime.of(2021, 7, 15, 1, 0, 0),
LocalDateTime.of(2021, 7, 15, 0, 1, 0),
LocalDateTime.of(2021, 7, 15, 0, 0, 1),
LocalTime.of(16, 50, 0, 123000000),
LocalTime.of(17, 50, 0, 123000000),
LocalTime.of(16, 51, 0, 123000000),
LocalTime.of(16, 50, 1, 123000000)
)))
}
}

0 comments on commit fce9c1d

Please sign in to comment.