Skip to content

Commit

Permalink
[SPARK-36536][SQL] Use CAST for datetime in CSV/JSON by default
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
In the PR, I propose to split the `dateFormat` and `timestampFormat` options in CSV/JSON datasources to:
- In write (`dateFormatInWrite`/`timestampFormatInWrite`). CSV/JSON datasource will use it in formatting of dates/timestamps. If an user doesn't initialise it, it will be set to a default value.
- In read (`dateFormatInRead`/`timestampFormatInRead`). The datasources will use it while parsing of input dates/timestamps strings. If an user doesn't set it, we will keep it as uninitialized (None), and use CAST to parse the input dates/timestamps strings.

### Why are the changes needed?
This should improve user experience with Spark SQL, and make the default parsing behavior more flexible.

### Does this PR introduce _any_ user-facing change?
Potentially, it can.

### How was this patch tested?
By existing test suites, and by new tests that are added to `JsonSuite` and to `CSVSuite`:
```
$ build/sbt "sql/test:testOnly *CSVLegacyTimeParserSuite"
$ build/sbt "sql/test:testOnly *JsonFunctionsSuite"
$ build/sbt "sql/test:testOnly *CSVv1Suite"
$ build/sbt "sql/test:testOnly *JsonV2Suite"
```

Closes apache#33769 from MaxGekk/split-datetime-ds-options.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
  • Loading branch information
MaxGekk committed Aug 19, 2021
1 parent 013f2b7 commit 1235bd2
Show file tree
Hide file tree
Showing 13 changed files with 139 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.types._
class CSVInferSchema(val options: CSVOptions) extends Serializable {

private val timestampParser = TimestampFormatter(
options.timestampFormat,
options.timestampFormatInRead,
options.zoneId,
options.locale,
legacyFormat = FAST_DATE_FORMAT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,17 @@ class CSVOptions(
// A language tag in IETF BCP 47 format
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)

val dateFormat: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)
val dateFormatInRead: Option[String] = parameters.get("dateFormat")
val dateFormatInWrite: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)

val timestampFormat: String = parameters.getOrElse("timestampFormat",
val timestampFormatInRead: Option[String] =
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
Some(parameters.getOrElse("timestampFormat",
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"))
} else {
parameters.get("timestampFormat")
}
val timestampFormatInWrite: String = parameters.getOrElse("timestampFormat",
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ class UnivocityGenerator(
schema.map(_.dataType).map(makeConverter).toArray

private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timestampFormatInWrite,
options.zoneId,
options.locale,
legacyFormat = FAST_DATE_FORMAT,
isParsing = false)
private val timestampNTZFormatter = TimestampFormatter(
options.timestampFormat,
options.timestampFormatInWrite,
options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = false,
forTimestampNTZ = true)
private val dateFormatter = DateFormatter(
options.dateFormat,
options.dateFormatInWrite,
options.locale,
legacyFormat = FAST_DATE_FORMAT,
isParsing = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,19 @@ class UnivocityParser(
private val noRows = None

private lazy val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timestampFormatInRead,
options.zoneId,
options.locale,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)
private lazy val timestampNTZFormatter = TimestampFormatter(
options.timestampFormat,
options.timestampFormatInRead,
options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true,
forTimestampNTZ = true)
private lazy val dateFormatter = DateFormatter(
options.dateFormat,
options.dateFormatInRead,
options.locale,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,17 @@ private[sql] class JSONOptions(
val zoneId: ZoneId = DateTimeUtils.getZoneId(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

val dateFormat: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)
val dateFormatInRead: Option[String] = parameters.get("dateFormat")
val dateFormatInWrite: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)

val timestampFormat: String = parameters.getOrElse("timestampFormat",
val timestampFormatInRead: Option[String] =
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
Some(parameters.getOrElse("timestampFormat",
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"))
} else {
parameters.get("timestampFormat")
}
val timestampFormatInWrite: String = parameters.getOrElse("timestampFormat",
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,19 @@ private[sql] class JacksonGenerator(
private val lineSeparator: String = options.lineSeparatorInWrite

private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timestampFormatInWrite,
options.zoneId,
options.locale,
legacyFormat = FAST_DATE_FORMAT,
isParsing = false)
private val timestampNTZFormatter = TimestampFormatter(
options.timestampFormat,
options.timestampFormatInWrite,
options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = false,
forTimestampNTZ = true)
private val dateFormatter = DateFormatter(
options.dateFormat,
options.dateFormatInWrite,
options.locale,
legacyFormat = FAST_DATE_FORMAT,
isParsing = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,19 @@ class JacksonParser(
private val factory = options.buildJsonFactory()

private lazy val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timestampFormatInRead,
options.zoneId,
options.locale,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)
private lazy val timestampNTZFormatter = TimestampFormatter(
options.timestampFormat,
options.timestampFormatInRead,
options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true,
forTimestampNTZ = true)
private lazy val dateFormatter = DateFormatter(
options.dateFormat,
options.dateFormatInRead,
options.locale,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
private val decimalParser = ExprUtils.getDecimalParser(options.locale)

private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timestampFormatInRead,
options.zoneId,
options.locale,
legacyFormat = FAST_DATE_FORMAT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ object DateFormatter {
}
}

def apply(
format: Option[String],
locale: Locale,
legacyFormat: LegacyDateFormat,
isParsing: Boolean): DateFormatter = {
getFormatter(format, locale, legacyFormat, isParsing)
}

def apply(
format: String,
locale: Locale,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ class DefaultTimestampFormatter(
DateTimeUtils.stringToTimestampAnsi(UTF8String.fromString(s), zoneId)
} catch checkParsedDiff(s, legacyFormatter.parse)
}

override def parseWithoutTimeZone(s: String): Long = {
try {
DateTimeUtils.stringToTimestampWithoutTimeZoneAnsi(UTF8String.fromString(s))
} catch checkParsedDiff(s, legacyFormatter.parse)
}
}

/**
Expand Down Expand Up @@ -393,6 +399,15 @@ object TimestampFormatter {
}
}

def apply(
format: Option[String],
zoneId: ZoneId,
locale: Locale,
legacyFormat: LegacyDateFormat,
isParsing: Boolean): TimestampFormatter = {
getFormatter(format, zoneId, locale, legacyFormat, isParsing)
}

def apply(
format: String,
zoneId: ZoneId,
Expand All @@ -410,6 +425,15 @@ object TimestampFormatter {
getFormatter(Some(format), zoneId, defaultLocale, legacyFormat, isParsing)
}

def apply(
format: Option[String],
zoneId: ZoneId,
legacyFormat: LegacyDateFormat,
isParsing: Boolean,
forTimestampNTZ: Boolean): TimestampFormatter = {
getFormatter(format, zoneId, defaultLocale, legacyFormat, isParsing, forTimestampNTZ)
}

def apply(
format: String,
zoneId: ZoneId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
parser = new UnivocityParser(StructType(Seq.empty), timestampsOptions)
val customTimestamp = "31/01/2015 00:00"
var format = FastDateFormat.getInstance(
timestampsOptions.timestampFormat,
timestampsOptions.timestampFormatInRead.get,
TimeZone.getTimeZone(timestampsOptions.zoneId),
timestampsOptions.locale)
val expectedTime = format.parse(customTimestamp).getTime
Expand All @@ -132,7 +132,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), false, "UTC")
parser = new UnivocityParser(StructType(Seq.empty), dateOptions)
format = FastDateFormat.getInstance(
dateOptions.dateFormat,
dateOptions.dateFormatInRead.get,
TimeZone.getTimeZone(dateOptions.zoneId),
dateOptions.locale)
val expectedDate = DateTimeUtils.millisToMicros(format.parse(customDate).getTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException}
import java.nio.file.{Files, StandardOpenOption}
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.time.{Instant, LocalDate, LocalDateTime}
import java.util.Locale
import java.util.zip.GZIPOutputStream

Expand All @@ -35,7 +36,7 @@ import org.apache.hadoop.io.compress.GzipCodec

import org.apache.spark.{SparkConf, SparkException, TestUtils}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -2483,6 +2484,40 @@ abstract class CSVSuite
checkAnswer(df, Row(1, null) :: Nil)
}
}

test("SPARK-36536: use casting when datetime pattern is not set") {
def isLegacy: Boolean = {
spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY).toUpperCase(Locale.ROOT) ==
SQLConf.LegacyBehaviorPolicy.LEGACY.toString
}
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.UTC.getId) {
withTempPath { path =>
Seq(
"""d,ts_ltz,ts_ntz""",
"""2021,2021,2021""",
"""2021-01,2021-01 ,2021-01""",
""" 2021-2-1,2021-3-02,2021-10-1""",
"""2021-8-18 00:00:00,2021-8-18 21:44:30Z,2021-8-18T21:44:30.123"""
).toDF().repartition(1).write.text(path.getCanonicalPath)
val readback = spark.read.schema("d date, ts_ltz timestamp_ltz, ts_ntz timestamp_ntz")
.option("header", true)
.csv(path.getCanonicalPath)
checkAnswer(
readback,
Seq(
Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"),
if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)),
Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"),
if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)),
Row(LocalDate.of(2021, 2, 1), Instant.parse("2021-03-02T00:00:00Z"),
if (isLegacy) null else LocalDateTime.of(2021, 10, 1, 0, 0, 0)),
Row(LocalDate.of(2021, 8, 18), Instant.parse("2021-08-18T21:44:30Z"),
if (isLegacy) null else LocalDateTime.of(2021, 8, 18, 21, 44, 30, 123000000))))
}
}
}
}

class CSVv1Suite extends CSVSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io._
import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException}
import java.nio.file.Files
import java.sql.{Date, Timestamp}
import java.time.ZoneId
import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}
import java.util.Locale

import com.fasterxml.jackson.core.JsonFactory
Expand All @@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, TestUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{functions => F, _}
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.execution.ExternalRDD
import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, DataSource, InMemoryFileIndex, NoopCache}
import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder
Expand Down Expand Up @@ -2958,6 +2958,39 @@ abstract class JsonSuite
.json(Seq("""[{"a": "str"}, null, {"a": "str"}]""").toDS),
Row(null) :: Nil)
}

test("SPARK-36536: use casting when datetime pattern is not set") {
def isLegacy: Boolean = {
spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY).toUpperCase(Locale.ROOT) ==
SQLConf.LegacyBehaviorPolicy.LEGACY.toString
}
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.UTC.getId) {
withTempPath { path =>
Seq(
"""{"d":"2021","ts_ltz":"2021","ts_ntz": "2021"}""",
"""{"d":"2021-01","ts_ltz":"2021-01 ","ts_ntz":"2021-01"}""",
"""{"d":" 2021-2-1","ts_ltz":"2021-3-02","ts_ntz": "2021-10-1"}""",
"""{"d":"2021-8-18 00:00:00","ts_ltz":"2021-8-18 21:44:30Z"""" +
""","ts_ntz":"2021-8-18T21:44:30.123"}"""
).toDF().repartition(1).write.text(path.getCanonicalPath)
val readback = spark.read.schema("d date, ts_ltz timestamp_ltz, ts_ntz timestamp_ntz")
.json(path.getCanonicalPath)
checkAnswer(
readback,
Seq(
Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"),
if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)),
Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"),
if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)),
Row(LocalDate.of(2021, 2, 1), Instant.parse("2021-03-02T00:00:00Z"),
if (isLegacy) null else LocalDateTime.of(2021, 10, 1, 0, 0, 0)),
Row(LocalDate.of(2021, 8, 18), Instant.parse("2021-08-18T21:44:30Z"),
if (isLegacy) null else LocalDateTime.of(2021, 8, 18, 21, 44, 30, 123000000))))
}
}
}
}

class JsonV1Suite extends JsonSuite {
Expand Down

0 comments on commit 1235bd2

Please sign in to comment.