diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index ef3d038cf289f..696d25f8ed484 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.types._ class CSVInferSchema(val options: CSVOptions) extends Serializable { private val timestampParser = TimestampFormatter( - options.timestampFormat, + options.timestampFormatInRead, options.zoneId, options.locale, legacyFormat = FAST_DATE_FORMAT, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 9aa4bf43898a7..79624b9a608a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -147,9 +147,17 @@ class CSVOptions( // A language tag in IETF BCP 47 format val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US) - val dateFormat: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern) + val dateFormatInRead: Option[String] = parameters.get("dateFormat") + val dateFormatInWrite: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern) - val timestampFormat: String = parameters.getOrElse("timestampFormat", + val timestampFormatInRead: Option[String] = + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + Some(parameters.getOrElse("timestampFormat", + s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX")) + } else { + parameters.get("timestampFormat") + } + val timestampFormatInWrite: String = parameters.getOrElse("timestampFormat", if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 322779681163e..8a04e4ca56c5d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -43,19 +43,19 @@ class UnivocityGenerator( schema.map(_.dataType).map(makeConverter).toArray private val timestampFormatter = TimestampFormatter( - options.timestampFormat, + options.timestampFormatInWrite, options.zoneId, options.locale, legacyFormat = FAST_DATE_FORMAT, isParsing = false) private val timestampNTZFormatter = TimestampFormatter( - options.timestampFormat, + options.timestampFormatInWrite, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = false, forTimestampNTZ = true) private val dateFormatter = DateFormatter( - options.dateFormat, + options.dateFormatInWrite, options.locale, legacyFormat = FAST_DATE_FORMAT, isParsing = false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 7eafd72a2efcd..cd5621bbb7856 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -88,19 +88,19 @@ class UnivocityParser( private val noRows = None private lazy val timestampFormatter = TimestampFormatter( - options.timestampFormat, + options.timestampFormatInRead, options.zoneId, options.locale, legacyFormat = FAST_DATE_FORMAT, isParsing = true) private lazy val timestampNTZFormatter = TimestampFormatter( - options.timestampFormat, + options.timestampFormatInRead, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = true, forTimestampNTZ = true) private lazy val dateFormatter = DateFormatter( - options.dateFormat, + options.dateFormatInRead, options.locale, legacyFormat = FAST_DATE_FORMAT, isParsing = true) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 47be83a41d613..029c014fedc90 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -89,9 +89,17 @@ private[sql] class JSONOptions( val zoneId: ZoneId = DateTimeUtils.getZoneId( parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)) - val dateFormat: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern) + val dateFormatInRead: Option[String] = parameters.get("dateFormat") + val dateFormatInWrite: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern) - val timestampFormat: String = parameters.getOrElse("timestampFormat", + val timestampFormatInRead: Option[String] = + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + Some(parameters.getOrElse("timestampFormat", + s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX")) + } else { + parameters.get("timestampFormat") + } + val timestampFormatInWrite: String = parameters.getOrElse("timestampFormat", if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 3d0f9be371f66..d00065b19c6a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -85,19 +85,19 @@ private[sql] class JacksonGenerator( private val lineSeparator: String = options.lineSeparatorInWrite private val timestampFormatter = TimestampFormatter( - options.timestampFormat, + options.timestampFormatInWrite, options.zoneId, options.locale, legacyFormat = FAST_DATE_FORMAT, isParsing = false) private val timestampNTZFormatter = TimestampFormatter( - options.timestampFormat, + options.timestampFormatInWrite, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = false, forTimestampNTZ = true) private val dateFormatter = DateFormatter( - options.dateFormat, + options.dateFormatInWrite, options.locale, legacyFormat = FAST_DATE_FORMAT, isParsing = false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 56907cbfb827b..cb6a079aacc8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -60,19 +60,19 @@ class JacksonParser( private val factory = options.buildJsonFactory() private lazy val timestampFormatter = TimestampFormatter( - options.timestampFormat, + options.timestampFormatInRead, options.zoneId, options.locale, legacyFormat = FAST_DATE_FORMAT, isParsing = true) private lazy val timestampNTZFormatter = TimestampFormatter( - options.timestampFormat, + options.timestampFormatInRead, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = true, forTimestampNTZ = true) private lazy val dateFormatter = DateFormatter( - options.dateFormat, + options.dateFormatInRead, options.locale, legacyFormat = FAST_DATE_FORMAT, isParsing = true) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index eebb9a404257b..3b17cde9f0684 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -39,7 +39,7 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { private val decimalParser = ExprUtils.getDecimalParser(options.locale) private val timestampFormatter = TimestampFormatter( - options.timestampFormat, + options.timestampFormatInRead, options.zoneId, options.locale, legacyFormat = FAST_DATE_FORMAT, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index d9ccf3091f839..15e04049da4a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -198,6 +198,14 @@ object DateFormatter { } } + def apply( + format: Option[String], + locale: Locale, + legacyFormat: LegacyDateFormat, + isParsing: Boolean): DateFormatter = { + getFormatter(format, locale, legacyFormat, isParsing) + } + def apply( format: String, locale: Locale, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index fb8502adeae9e..e12653c26a7ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -184,6 +184,12 @@ class DefaultTimestampFormatter( DateTimeUtils.stringToTimestampAnsi(UTF8String.fromString(s), zoneId) } catch checkParsedDiff(s, legacyFormatter.parse) } + + override def parseWithoutTimeZone(s: String): Long = { + try { + DateTimeUtils.stringToTimestampWithoutTimeZoneAnsi(UTF8String.fromString(s)) + } catch checkParsedDiff(s, legacyFormatter.parse) + } } /** @@ -393,6 +399,15 @@ object TimestampFormatter { } } + def apply( + format: Option[String], + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat, + isParsing: Boolean): TimestampFormatter = { + getFormatter(format, zoneId, locale, legacyFormat, isParsing) + } + def apply( format: String, zoneId: ZoneId, @@ -410,6 +425,15 @@ object TimestampFormatter { getFormatter(Some(format), zoneId, defaultLocale, legacyFormat, isParsing) } + def apply( + format: Option[String], + zoneId: ZoneId, + legacyFormat: LegacyDateFormat, + isParsing: Boolean, + forTimestampNTZ: Boolean): TimestampFormatter = { + getFormatter(format, zoneId, defaultLocale, legacyFormat, isParsing, forTimestampNTZ) + } + def apply( format: String, zoneId: ZoneId, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 822008007ebbc..4166401d040f1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -120,7 +120,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { parser = new UnivocityParser(StructType(Seq.empty), timestampsOptions) val customTimestamp = "31/01/2015 00:00" var format = FastDateFormat.getInstance( - timestampsOptions.timestampFormat, + timestampsOptions.timestampFormatInRead.get, TimeZone.getTimeZone(timestampsOptions.zoneId), timestampsOptions.locale) val expectedTime = format.parse(customTimestamp).getTime @@ -132,7 +132,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), false, "UTC") parser = new UnivocityParser(StructType(Seq.empty), dateOptions) format = FastDateFormat.getInstance( - dateOptions.dateFormat, + dateOptions.dateFormatInRead.get, TimeZone.getTimeZone(dateOptions.zoneId), dateOptions.locale) val expectedDate = DateTimeUtils.millisToMicros(format.parse(customDate).getTime) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index b58911fb946ca..c46e84ab26d89 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -22,6 +22,7 @@ import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException} import java.nio.file.{Files, StandardOpenOption} import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat +import java.time.{Instant, LocalDate, LocalDateTime} import java.util.Locale import java.util.zip.GZIPOutputStream @@ -35,7 +36,7 @@ import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.{SparkConf, SparkException, TestUtils} import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, QueryTest, Row} -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -2483,6 +2484,40 @@ abstract class CSVSuite checkAnswer(df, Row(1, null) :: Nil) } } + + test("SPARK-36536: use casting when datetime pattern is not set") { + def isLegacy: Boolean = { + spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY).toUpperCase(Locale.ROOT) == + SQLConf.LegacyBehaviorPolicy.LEGACY.toString + } + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true", + SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.UTC.getId) { + withTempPath { path => + Seq( + """d,ts_ltz,ts_ntz""", + """2021,2021,2021""", + """2021-01,2021-01 ,2021-01""", + """ 2021-2-1,2021-3-02,2021-10-1""", + """2021-8-18 00:00:00,2021-8-18 21:44:30Z,2021-8-18T21:44:30.123""" + ).toDF().repartition(1).write.text(path.getCanonicalPath) + val readback = spark.read.schema("d date, ts_ltz timestamp_ltz, ts_ntz timestamp_ntz") + .option("header", true) + .csv(path.getCanonicalPath) + checkAnswer( + readback, + Seq( + Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), + if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)), + Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), + if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)), + Row(LocalDate.of(2021, 2, 1), Instant.parse("2021-03-02T00:00:00Z"), + if (isLegacy) null else LocalDateTime.of(2021, 10, 1, 0, 0, 0)), + Row(LocalDate.of(2021, 8, 18), Instant.parse("2021-08-18T21:44:30Z"), + if (isLegacy) null else LocalDateTime.of(2021, 8, 18, 21, 44, 30, 123000000)))) + } + } + } } class CSVv1Suite extends CSVSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0910ff8a65c43..e5c82603d8893 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -21,7 +21,7 @@ import java.io._ import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException} import java.nio.file.Files import java.sql.{Date, Timestamp} -import java.time.ZoneId +import java.time.{Instant, LocalDate, LocalDateTime, ZoneId} import java.util.Locale import com.fasterxml.jackson.core.JsonFactory @@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, TestUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{functions => F, _} import org.apache.spark.sql.catalyst.json._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} import org.apache.spark.sql.execution.ExternalRDD import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, DataSource, InMemoryFileIndex, NoopCache} import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder @@ -2958,6 +2958,39 @@ abstract class JsonSuite .json(Seq("""[{"a": "str"}, null, {"a": "str"}]""").toDS), Row(null) :: Nil) } + + test("SPARK-36536: use casting when datetime pattern is not set") { + def isLegacy: Boolean = { + spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY).toUpperCase(Locale.ROOT) == + SQLConf.LegacyBehaviorPolicy.LEGACY.toString + } + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true", + SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.UTC.getId) { + withTempPath { path => + Seq( + """{"d":"2021","ts_ltz":"2021","ts_ntz": "2021"}""", + """{"d":"2021-01","ts_ltz":"2021-01 ","ts_ntz":"2021-01"}""", + """{"d":" 2021-2-1","ts_ltz":"2021-3-02","ts_ntz": "2021-10-1"}""", + """{"d":"2021-8-18 00:00:00","ts_ltz":"2021-8-18 21:44:30Z"""" + + ""","ts_ntz":"2021-8-18T21:44:30.123"}""" + ).toDF().repartition(1).write.text(path.getCanonicalPath) + val readback = spark.read.schema("d date, ts_ltz timestamp_ltz, ts_ntz timestamp_ntz") + .json(path.getCanonicalPath) + checkAnswer( + readback, + Seq( + Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), + if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)), + Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), + if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)), + Row(LocalDate.of(2021, 2, 1), Instant.parse("2021-03-02T00:00:00Z"), + if (isLegacy) null else LocalDateTime.of(2021, 10, 1, 0, 0, 0)), + Row(LocalDate.of(2021, 8, 18), Instant.parse("2021-08-18T21:44:30Z"), + if (isLegacy) null else LocalDateTime.of(2021, 8, 18, 21, 44, 30, 123000000)))) + } + } + } } class JsonV1Suite extends JsonSuite {