Skip to content

Commit

Permalink
Revert "[SPARK-23715][SQL] the input of to/from_utc_timestamp can not…
Browse files Browse the repository at this point in the history
… have timezone

## What changes were proposed in this pull request?

This reverts commit 417ad92.

We decided to keep the current behaviors unchanged and will consider whether we will deprecate the  these functions in 3.0. For more details, see the discussion in https://issues.apache.org/jira/browse/SPARK-23715

## How was this patch tested?

The existing tests.

Closes apache#22505 from gatorsmile/revertSpark-23715.

Authored-by: gatorsmile <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
gatorsmile authored and cloud-fan committed Sep 21, 2018
1 parent 950ab79 commit 5d25e15
Show file tree
Hide file tree
Showing 9 changed files with 13 additions and 278 deletions.
1 change: 0 additions & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1955,7 +1955,6 @@ working with timestamps in `pandas_udf`s to get the best performance, see
- Since Spark 2.4, renaming a managed table to existing location is not allowed. An exception is thrown when attempting to rename a managed table to existing location.
- Since Spark 2.4, the type coercion rules can automatically promote the argument types of the variadic SQL functions (e.g., IN/COALESCE) to the widest common type, no matter how the input arguments order. In prior Spark versions, the promotion could fail in some specific orders (e.g., TimestampType, IntegerType and StringType) and throw an exception.
- Since Spark 2.4, Spark has enabled non-cascading SQL cache invalidation in addition to the traditional cache invalidation mechanism. The non-cascading cache invalidation mechanism allows users to remove a cache without impacting its dependent caches. This new cache invalidation mechanism is used in scenarios where the data of the cache to be removed is still valid, e.g., calling unpersist() on a Dataset, or dropping a temporary view. This allows users to free up memory and keep the desired caches valid at the same time.
- In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behavior to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround.
- In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files.
- Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
- In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object TypeCoercion {
IfCoercion ::
StackCoercion ::
Division ::
new ImplicitTypeCasts(conf) ::
ImplicitTypeCasts ::
DateTimeOperations ::
WindowFrameCoercion ::
Nil
Expand Down Expand Up @@ -841,33 +841,12 @@ object TypeCoercion {
/**
* Casts types according to the expected input types for [[Expression]]s.
*/
class ImplicitTypeCasts(conf: SQLConf) extends TypeCoercionRule {

private def rejectTzInString = conf.getConf(SQLConf.REJECT_TIMEZONE_IN_STRING)

object ImplicitTypeCasts extends TypeCoercionRule {
override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

// Special rules for `from/to_utc_timestamp`. These 2 functions assume the input timestamp
// string is in a specific timezone, so the string itself should not contain timezone.
// TODO: We should move the type coercion logic to expressions instead of a central
// place to put all the rules.
case e: FromUTCTimestamp if e.left.dataType == StringType =>
if (rejectTzInString) {
e.copy(left = StringToTimestampWithoutTimezone(e.left))
} else {
e.copy(left = Cast(e.left, TimestampType))
}

case e: ToUTCTimestamp if e.left.dataType == StringType =>
if (rejectTzInString) {
e.copy(left = StringToTimestampWithoutTimezone(e.left))
} else {
e.copy(left = Cast(e.left, TimestampType))
}

case b @ BinaryOperator(left, right) if left.dataType != right.dataType =>
findTightestCommonType(left.dataType, right.dataType).map { commonType =>
if (b.inputType.acceptsType(commonType)) {
Expand All @@ -884,7 +863,7 @@ object TypeCoercion {
case e: ImplicitCastInputTypes if e.inputTypes.nonEmpty =>
val children: Seq[Expression] = e.children.zip(e.inputTypes).map { case (in, expected) =>
// If we cannot do the implicit cast, just use the original input.
ImplicitTypeCasts.implicitCast(in, expected).getOrElse(in)
implicitCast(in, expected).getOrElse(in)
}
e.withNewChildren(children)

Expand All @@ -900,9 +879,6 @@ object TypeCoercion {
}
e.withNewChildren(children)
}
}

object ImplicitTypeCasts {

/**
* Given an expected data type, try to cast the expression and return the cast expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1017,48 +1017,6 @@ case class TimeAdd(start: Expression, interval: Expression, timeZoneId: Option[S
}
}

/**
* A special expression used to convert the string input of `to/from_utc_timestamp` to timestamp,
* which requires the timestamp string to not have timezone information, otherwise null is returned.
*/
case class StringToTimestampWithoutTimezone(child: Expression, timeZoneId: Option[String] = None)
extends UnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes {

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override def inputTypes: Seq[AbstractDataType] = Seq(StringType)
override def dataType: DataType = TimestampType
override def nullable: Boolean = true
override def toString: String = child.toString
override def sql: String = child.sql

override def nullSafeEval(input: Any): Any = {
DateTimeUtils.stringToTimestamp(
input.asInstanceOf[UTF8String], timeZone, rejectTzInString = true).orNull
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
val tz = ctx.addReferenceObj("timeZone", timeZone)
val longOpt = ctx.freshName("longOpt")
val eval = child.genCode(ctx)
val code = code"""
|${eval.code}
|${CodeGenerator.JAVA_BOOLEAN} ${ev.isNull} = true;
|${CodeGenerator.JAVA_LONG} ${ev.value} = ${CodeGenerator.defaultValue(TimestampType)};
|if (!${eval.isNull}) {
| scala.Option<Long> $longOpt = $dtu.stringToTimestamp(${eval.value}, $tz, true);
| if ($longOpt.isDefined()) {
| ${ev.value} = ((Long) $longOpt.get()).longValue();
| ${ev.isNull} = false;
| }
|}
""".stripMargin
ev.copy(code = code)
}
}

/**
* Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in UTC, and renders
* that time as a timestamp in the given time zone. For example, 'GMT+1' would yield
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,28 +300,10 @@ object DateTimeUtils {
* `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m`
*/
def stringToTimestamp(s: UTF8String): Option[SQLTimestamp] = {
stringToTimestamp(s, defaultTimeZone(), rejectTzInString = false)
stringToTimestamp(s, defaultTimeZone())
}

def stringToTimestamp(s: UTF8String, timeZone: TimeZone): Option[SQLTimestamp] = {
stringToTimestamp(s, timeZone, rejectTzInString = false)
}

/**
* Converts a timestamp string to microseconds from the unix epoch, w.r.t. the given timezone.
* Returns None if the input string is not a valid timestamp format.
*
* @param s the input timestamp string.
* @param timeZone the timezone of the timestamp string, will be ignored if the timestamp string
* already contains timezone information and `forceTimezone` is false.
* @param rejectTzInString if true, rejects timezone in the input string, i.e., if the
* timestamp string contains timezone, like `2000-10-10 00:00:00+00:00`,
* return None.
*/
def stringToTimestamp(
s: UTF8String,
timeZone: TimeZone,
rejectTzInString: Boolean): Option[SQLTimestamp] = {
if (s == null) {
return None
}
Expand Down Expand Up @@ -439,8 +421,6 @@ object DateTimeUtils {
return None
}

if (tz.isDefined && rejectTzInString) return None

val c = if (tz.isEmpty) {
Calendar.getInstance(timeZone)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1408,13 +1408,6 @@ object SQLConf {
.stringConf
.createWithDefault("")

val REJECT_TIMEZONE_IN_STRING = buildConf("spark.sql.function.rejectTimezoneInString")
.internal()
.doc("If true, `to_utc_timestamp` and `from_utc_timestamp` return null if the input string " +
"contains a timezone part, e.g. `2000-10-10 00:00:00+00:00`.")
.booleanConf
.createWithDefault(true)

object PartitionOverwriteMode extends Enumeration {
val STATIC, DYNAMIC = Value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,23 +680,23 @@ class TypeCoercionSuite extends AnalysisTest {
test("cast NullType for expressions that implement ExpectsInputTypes") {
import TypeCoercionSuite._

ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
ruleTest(TypeCoercion.ImplicitTypeCasts,
AnyTypeUnaryExpression(Literal.create(null, NullType)),
AnyTypeUnaryExpression(Literal.create(null, NullType)))

ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
ruleTest(TypeCoercion.ImplicitTypeCasts,
NumericTypeUnaryExpression(Literal.create(null, NullType)),
NumericTypeUnaryExpression(Literal.create(null, DoubleType)))
}

test("cast NullType for binary operators") {
import TypeCoercionSuite._

ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
ruleTest(TypeCoercion.ImplicitTypeCasts,
AnyTypeBinaryOperator(Literal.create(null, NullType), Literal.create(null, NullType)),
AnyTypeBinaryOperator(Literal.create(null, NullType), Literal.create(null, NullType)))

ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
ruleTest(TypeCoercion.ImplicitTypeCasts,
NumericTypeBinaryOperator(Literal.create(null, NullType), Literal.create(null, NullType)),
NumericTypeBinaryOperator(Literal.create(null, DoubleType), Literal.create(null, DoubleType)))
}
Expand Down Expand Up @@ -976,7 +976,7 @@ class TypeCoercionSuite extends AnalysisTest {
}

test("type coercion for CaseKeyWhen") {
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
ruleTest(TypeCoercion.ImplicitTypeCasts,
CaseKeyWhen(Literal(1.toShort), Seq(Literal(1), Literal("a"))),
CaseKeyWhen(Cast(Literal(1.toShort), IntegerType), Seq(Literal(1), Literal("a")))
)
Expand Down Expand Up @@ -1436,7 +1436,7 @@ class TypeCoercionSuite extends AnalysisTest {
}

test("SPARK-17117 null type coercion in divide") {
val rules = Seq(FunctionArgumentConversion, Division, new ImplicitTypeCasts(conf))
val rules = Seq(FunctionArgumentConversion, Division, ImplicitTypeCasts)
val nullLit = Literal.create(null, NullType)
ruleTest(rules, Divide(1L, nullLit), Divide(Cast(1L, DoubleType), Cast(nullLit, DoubleType)))
ruleTest(rules, Divide(nullLit, 1L), Divide(Cast(nullLit, DoubleType), Cast(1L, DoubleType)))
Expand Down
33 changes: 0 additions & 33 deletions sql/core/src/test/resources/sql-tests/inputs/datetime.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,3 @@ select current_date = current_date(), current_timestamp = current_timestamp(), a
select a, b from ttf2 order by a, current_date;

select weekday('2007-02-03'), weekday('2009-07-30'), weekday('2017-05-27'), weekday(null), weekday('1582-10-15 13:10:15');

select from_utc_timestamp('2015-07-24 00:00:00', 'PST');

select from_utc_timestamp('2015-01-24 00:00:00', 'PST');

select from_utc_timestamp(null, 'PST');

select from_utc_timestamp('2015-07-24 00:00:00', null);

select from_utc_timestamp(null, null);

select from_utc_timestamp(cast(0 as timestamp), 'PST');

select from_utc_timestamp(cast('2015-01-24' as date), 'PST');

select to_utc_timestamp('2015-07-24 00:00:00', 'PST');

select to_utc_timestamp('2015-01-24 00:00:00', 'PST');

select to_utc_timestamp(null, 'PST');

select to_utc_timestamp('2015-07-24 00:00:00', null);

select to_utc_timestamp(null, null);

select to_utc_timestamp(cast(0 as timestamp), 'PST');

select to_utc_timestamp(cast('2015-01-24' as date), 'PST');

-- SPARK-23715: the input of to/from_utc_timestamp can not have timezone
select from_utc_timestamp('2000-10-10 00:00:00+00:00', 'PST');

select to_utc_timestamp('2000-10-10 00:00:00+00:00', 'PST');
Loading

0 comments on commit 5d25e15

Please sign in to comment.