Skip to content

Commit

Permalink
[SPARK-28459][SQL] Add make_timestamp function
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

New function `make_timestamp()` takes 6 columns `year`, `month`, `day`, `hour`, `min`, `sec` + optionally `timezone`, and makes new column of the `TIMESTAMP` type. If values in the input columns are `null` or out of valid ranges, the function returns `null`. Valid ranges are:
- `year` - `[1, 9999]`
- `month` - `[1, 12]`
- `day` - `[1, 31]`
- `hour` - `[0, 23]`
- `min` - `[0, 59]`
- `sec` - `[0, 60]`. If the `sec` argument equals to 60, the seconds field is set to 0 and 1 minute is added to the final timestamp.
- `timezone` - an identifier of timezone. Actual database of timezones can be found there: https://www.iana.org/time-zones.

Also constructed timestamp must be valid otherwise `make_timestamp` returns `null`.

The function is implemented similarly to `make_timestamp` in PostgreSQL: https://www.postgresql.org/docs/11/functions-datetime.html to maintain feature parity with it.

Here is an example:
```sql
select make_timestamp(2014, 12, 28, 6, 30, 45.887);
  2014-12-28 06:30:45.887
select make_timestamp(2014, 12, 28, 6, 30, 45.887, 'CET');
  2014-12-28 10:30:45.887
select make_timestamp(2019, 6, 30, 23, 59, 60)
  2019-07-01 00:00:00
```

Returned value has Spark Catalyst type `TIMESTAMP` which is similar to Oracle's `TIMESTAMP WITH LOCAL TIME ZONE` (see https://docs.oracle.com/cd/B28359_01/server.111/b28298/ch4datetime.htm#i1006169) where data is stored in the session time zone, and the time zone offset is not stored as part of the column data. When users retrieve the data, Spark returns it in the session time zone specified by the SQL config `spark.sql.session.timeZone`.

## How was this patch tested?

Added new tests to `DateExpressionsSuite`, and uncommented a test for `make_timestamp` in `pgSQL/timestamp.sql`.

Closes apache#25220 from MaxGekk/make_timestamp.

Authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
MaxGekk authored and dongjoon-hyun committed Jul 29, 2019
1 parent 946aef0 commit caa23e3
Show file tree
Hide file tree
Showing 6 changed files with 351 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ object FunctionRegistry {
expression[Year]("year"),
expression[TimeWindow]("window"),
expression[MakeDate]("make_date"),
expression[MakeTimestamp]("make_timestamp"),

// collection functions
expression[CreateArray]("array"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,157 @@ abstract class QuaternaryExpression extends Expression {
}
}

/**
* An expression with six inputs + 7th optional input and one output.
* The output is by default evaluated to null if any input is evaluated to null.
*/
abstract class SeptenaryExpression extends Expression {

override def foldable: Boolean = children.forall(_.foldable)

override def nullable: Boolean = children.exists(_.nullable)

/**
* Default behavior of evaluation according to the default nullability of SeptenaryExpression.
* If subclass of SeptenaryExpression override nullable, probably should also override this.
*/
override def eval(input: InternalRow): Any = {
val exprs = children
val v1 = exprs(0).eval(input)
if (v1 != null) {
val v2 = exprs(1).eval(input)
if (v2 != null) {
val v3 = exprs(2).eval(input)
if (v3 != null) {
val v4 = exprs(3).eval(input)
if (v4 != null) {
val v5 = exprs(4).eval(input)
if (v5 != null) {
val v6 = exprs(5).eval(input)
if (v6 != null) {
if (exprs.length > 6) {
val v7 = exprs(6).eval(input)
if (v7 != null) {
return nullSafeEval(v1, v2, v3, v4, v5, v6, Some(v7))
}
} else {
return nullSafeEval(v1, v2, v3, v4, v5, v6, None)
}
}
}
}
}
}
}
null
}

/**
* Called by default [[eval]] implementation. If subclass of SeptenaryExpression keep the
* default nullability, they can override this method to save null-check code. If we need
* full control of evaluation process, we should override [[eval]].
*/
protected def nullSafeEval(
input1: Any,
input2: Any,
input3: Any,
input4: Any,
input5: Any,
input6: Any,
input7: Option[Any]): Any = {
sys.error("SeptenaryExpression must override either eval or nullSafeEval")
}

/**
* Short hand for generating septenary evaluation code.
* If either of the sub-expressions is null, the result of this computation
* is assumed to be null.
*
* @param f accepts seven variable names and returns Java code to compute the output.
*/
protected def defineCodeGen(
ctx: CodegenContext,
ev: ExprCode,
f: (String, String, String, String, String, String, Option[String]) => String
): ExprCode = {
nullSafeCodeGen(ctx, ev, (eval1, eval2, eval3, eval4, eval5, eval6, eval7) => {
s"${ev.value} = ${f(eval1, eval2, eval3, eval4, eval5, eval6, eval7)};"
})
}

/**
* Short hand for generating septenary evaluation code.
* If either of the sub-expressions is null, the result of this computation
* is assumed to be null.
*
* @param f function that accepts the 7 non-null evaluation result names of children
* and returns Java code to compute the output.
*/
protected def nullSafeCodeGen(
ctx: CodegenContext,
ev: ExprCode,
f: (String, String, String, String, String, String, Option[String]) => String
): ExprCode = {
val firstGen = children(0).genCode(ctx)
val secondGen = children(1).genCode(ctx)
val thirdGen = children(2).genCode(ctx)
val fourthGen = children(3).genCode(ctx)
val fifthGen = children(4).genCode(ctx)
val sixthGen = children(5).genCode(ctx)
val seventhGen = if (children.length > 6) Some(children(6).genCode(ctx)) else None
val resultCode = f(
firstGen.value,
secondGen.value,
thirdGen.value,
fourthGen.value,
fifthGen.value,
sixthGen.value,
seventhGen.map(_.value))

if (nullable) {
val nullSafeEval =
firstGen.code + ctx.nullSafeExec(children(0).nullable, firstGen.isNull) {
secondGen.code + ctx.nullSafeExec(children(1).nullable, secondGen.isNull) {
thirdGen.code + ctx.nullSafeExec(children(2).nullable, thirdGen.isNull) {
fourthGen.code + ctx.nullSafeExec(children(3).nullable, fourthGen.isNull) {
fifthGen.code + ctx.nullSafeExec(children(4).nullable, fifthGen.isNull) {
sixthGen.code + ctx.nullSafeExec(children(5).nullable, sixthGen.isNull) {
val nullSafeResultCode =
s"""
${ev.isNull} = false; // resultCode could change nullability.
$resultCode
"""
seventhGen.map { gen =>
gen.code + ctx.nullSafeExec(children(6).nullable, gen.isNull) {
nullSafeResultCode
}
}.getOrElse(nullSafeResultCode)
}
}
}
}
}
}

ev.copy(code = code"""
boolean ${ev.isNull} = true;
${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
$nullSafeEval""")
} else {
ev.copy(code = code"""
${firstGen.code}
${secondGen.code}
${thirdGen.code}
${fourthGen.code}
${fifthGen.code}
${sixthGen.code}
${seventhGen.map(_.code).getOrElse("")}
${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
$resultCode""", isNull = FalseLiteral)
}
}
}

/**
* A trait used for resolving nullable flags, including `nullable`, `containsNull` of [[ArrayType]]
* and `valueContainsNull` of [[MapType]], containsNull, valueContainsNull flags of the output date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import java.sql.Timestamp
import java.time.{Instant, LocalDate, ZoneId}
import java.time.{DateTimeException, Instant, LocalDate, LocalDateTime, ZoneId}
import java.time.temporal.IsoFields
import java.util.{Locale, TimeZone}

Expand Down Expand Up @@ -1657,3 +1657,156 @@ case class MakeDate(year: Expression, month: Expression, day: Expression)

override def prettyName: String = "make_date"
}

// scalastyle:off line.size.limit
@ExpressionDescription(
usage = "_FUNC_(year, month, day, hour, min, sec[, timezone]) - Create timestamp from year, month, day, hour, min, sec and timezone fields.",
arguments = """
Arguments:
* year - the year to represent, from 1 to 9999
* month - the month-of-year to represent, from 1 (January) to 12 (December)
* day - the day-of-month to represent, from 1 to 31
* hour - the hour-of-day to represent, from 0 to 23
* min - the minute-of-hour to represent, from 0 to 59
* sec - the second-of-minute and its micro-fraction to represent, from
0 to 60. If the sec argument equals to 60, the seconds field is set
to 0 and 1 minute is added to the final timestamp.
* timezone - the time zone identifier. For example, CET, UTC and etc.
""",
examples = """
Examples:
> SELECT _FUNC_(2014, 12, 28, 6, 30, 45.887);
2014-12-28 06:30:45.887
> SELECT _FUNC_(2014, 12, 28, 6, 30, 45.887, 'CET');
2014-12-28 10:30:45.887
> SELECT _FUNC_(2019, 6, 30, 23, 59, 60)
2019-07-01 00:00:00
> SELECT _FUNC_(2019, 13, 1, 10, 11, 12, 13);
NULL
> SELECT _FUNC_(null, 7, 22, 15, 30, 0);
NULL
""",
since = "3.0.0")
// scalastyle:on line.size.limit
case class MakeTimestamp(
year: Expression,
month: Expression,
day: Expression,
hour: Expression,
min: Expression,
sec: Expression,
timezone: Option[Expression] = None,
timeZoneId: Option[String] = None)
extends SeptenaryExpression with TimeZoneAwareExpression with ImplicitCastInputTypes {

def this(
year: Expression,
month: Expression,
day: Expression,
hour: Expression,
min: Expression,
sec: Expression) = {
this(year, month, day, hour, min, sec, None, None)
}

def this(
year: Expression,
month: Expression,
day: Expression,
hour: Expression,
min: Expression,
sec: Expression,
timezone: Expression) = {
this(year, month, day, hour, min, sec, Some(timezone), None)
}

override def children: Seq[Expression] = Seq(year, month, day, hour, min, sec) ++ timezone
override def inputTypes: Seq[AbstractDataType] =
Seq(IntegerType, IntegerType, IntegerType, IntegerType, IntegerType, DoubleType) ++
timezone.map(_ => StringType)
override def dataType: DataType = TimestampType
override def nullable: Boolean = true

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

private def toMicros(
year: Int,
month: Int,
day: Int,
hour: Int,
min: Int,
secAndNanos: Double,
zoneId: ZoneId): Any = {
try {
val seconds = secAndNanos.toInt
val nanos = ((secAndNanos - seconds) * NANOS_PER_SECOND).toInt
val ldt = if (seconds == 60) {
if (nanos == 0) {
// This case of sec = 60 and nanos = 0 is supported for compatibility with PostgreSQL
LocalDateTime.of(year, month, day, hour, min, 0, 0).plusMinutes(1)
} else {
throw new DateTimeException("The fraction of sec must be zero. Valid range is [0, 60].")
}
} else {
LocalDateTime.of(year, month, day, hour, min, seconds, nanos)
}
instantToMicros(ldt.atZone(zoneId).toInstant)
} catch {
case _: DateTimeException => null
}
}

override def nullSafeEval(
year: Any,
month: Any,
day: Any,
hour: Any,
min: Any,
sec: Any,
timezone: Option[Any]): Any = {
val zid = timezone
.map(tz => DateTimeUtils.getZoneId(tz.asInstanceOf[UTF8String].toString))
.getOrElse(zoneId)
toMicros(
year.asInstanceOf[Int],
month.asInstanceOf[Int],
day.asInstanceOf[Int],
hour.asInstanceOf[Int],
min.asInstanceOf[Int],
sec.asInstanceOf[Double],
zid)
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
nullSafeCodeGen(ctx, ev, (year, month, day, hour, min, secAndNanos, timezone) => {
val zoneId = timezone.map(tz => s"$dtu.getZoneId(${tz}.toString())").getOrElse(zid)
s"""
try {
int seconds = (int)$secAndNanos;
int nanos = (int)(($secAndNanos - seconds) * 1000000000L);
java.time.LocalDateTime ldt;
if (seconds == 60) {
if (nanos == 0) {
ldt = java.time.LocalDateTime.of(
$year, $month, $day, $hour, $min, 0, 0).plusMinutes(1);
} else {
throw new java.time.DateTimeException(
"The fraction of sec must be zero. Valid range is [0, 60].");
}
} else {
ldt = java.time.LocalDateTime.of(
$year, $month, $day, $hour, $min, seconds, nanos);
}
java.time.Instant instant = ldt.atZone($zoneId).toInstant();
${ev.value} = $dtu.instantToMicros(instant);
} catch (java.time.DateTimeException e) {
${ev.isNull} = true;
}"""
})
}

override def prettyName: String = "make_timestamp"
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.time.ZoneOffset
import java.time.{ZoneId, ZoneOffset}
import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit._
Expand Down Expand Up @@ -928,4 +928,36 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(MakeDate(Literal(2019), Literal(13), Literal(19)), null)
checkEvaluation(MakeDate(Literal(2019), Literal(7), Literal(32)), null)
}

test("creating values of TimestampType via make_timestamp") {
var makeTimestampExpr = MakeTimestamp(
Literal(2013), Literal(7), Literal(15), Literal(8), Literal(15), Literal(23.5),
Some(Literal(ZoneId.systemDefault().getId)))
val expected = Timestamp.valueOf("2013-7-15 8:15:23.5")
checkEvaluation(makeTimestampExpr, expected)
checkEvaluation(makeTimestampExpr.copy(timezone = None), expected)

checkEvaluation(makeTimestampExpr.copy(year = Literal.create(null, IntegerType)), null)
checkEvaluation(makeTimestampExpr.copy(year = Literal(Int.MaxValue)), null)

checkEvaluation(makeTimestampExpr.copy(month = Literal.create(null, IntegerType)), null)
checkEvaluation(makeTimestampExpr.copy(month = Literal(13)), null)

checkEvaluation(makeTimestampExpr.copy(day = Literal.create(null, IntegerType)), null)
checkEvaluation(makeTimestampExpr.copy(day = Literal(32)), null)

checkEvaluation(makeTimestampExpr.copy(hour = Literal.create(null, IntegerType)), null)
checkEvaluation(makeTimestampExpr.copy(hour = Literal(25)), null)

checkEvaluation(makeTimestampExpr.copy(min = Literal.create(null, IntegerType)), null)
checkEvaluation(makeTimestampExpr.copy(min = Literal(65)), null)

checkEvaluation(makeTimestampExpr.copy(sec = Literal.create(null, DoubleType)), null)
checkEvaluation(makeTimestampExpr.copy(sec = Literal(70.0)), null)

makeTimestampExpr = MakeTimestamp(Literal(2019), Literal(6), Literal(30),
Literal(23), Literal(59), Literal(60.0))
checkEvaluation(makeTimestampExpr, Timestamp.valueOf("2019-07-01 00:00:00"))
checkEvaluation(makeTimestampExpr.copy(sec = Literal(60.5)), null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ SELECT '' AS date_trunc_week, date_trunc( 'week', timestamp '2004-02-29 15:44:17
-- FROM TIMESTAMP_TBL;


--[SPARK-28432] Missing Date/Time Functions: make_timestamp
-- timestamp numeric fields constructor
-- SELECT make_timestamp(2014,12,28,6,30,45.887);
SELECT make_timestamp(2014,12,28,6,30,45.887);

DROP TABLE TIMESTAMP_TBL;
Loading

0 comments on commit caa23e3

Please sign in to comment.