Skip to content

Commit

Permalink
[SPARK-17811] SparkR cannot parallelize data.frame with NA or NULL in…
Browse files Browse the repository at this point in the history
… Date columns

## What changes were proposed in this pull request?
NA date values are serialized as "NA" and NA time values are serialized as NaN from R. In the backend we did not have proper logic to deal with them. As a result we got an IllegalArgumentException for Date and wrong value for time. This PR adds support for deserializing NA as Date and Time.

## How was this patch tested?
* [x] TODO

Author: Hossein <[email protected]>

Closes apache#15421 from falaki/SPARK-17811.
  • Loading branch information
falaki authored and Felix Cheung committed Oct 21, 2016
1 parent e21e1c9 commit e371040
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
13 changes: 13 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,19 @@ test_that("create DataFrame with different data types", {
expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
})

test_that("SPARK-17811: can create DataFrame containing NA as date and time", {
df <- data.frame(
id = 1:2,
time = c(as.POSIXlt("2016-01-10"), NA),
date = c(as.Date("2016-10-01"), NA))

DF <- collect(createDataFrame(df))
expect_true(is.na(DF$date[2]))
expect_equal(DF$date[1], as.Date("2016-10-01"))
expect_true(is.na(DF$time[2]))
expect_equal(DF$time[1], as.POSIXlt("2016-01-10"))
})

test_that("create DataFrame with complex types", {
e <- new.env()
assign("n", 3L, envir = e)
Expand Down
31 changes: 25 additions & 6 deletions core/src/main/scala/org/apache/spark/api/r/SerDe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,34 @@ private[spark] object SerDe {
}

def readDate(in: DataInputStream): Date = {
Date.valueOf(readString(in))
try {
val inStr = readString(in)
if (inStr == "NA") {
null
} else {
Date.valueOf(inStr)
}
} catch {
// TODO: SPARK-18011 with some versions of R deserializing NA from R results in NASE
case _: NegativeArraySizeException => null
}
}

def readTime(in: DataInputStream): Timestamp = {
val seconds = in.readDouble()
val sec = Math.floor(seconds).toLong
val t = new Timestamp(sec * 1000L)
t.setNanos(((seconds - sec) * 1e9).toInt)
t
try {
val seconds = in.readDouble()
if (java.lang.Double.isNaN(seconds)) {
null
} else {
val sec = Math.floor(seconds).toLong
val t = new Timestamp(sec * 1000L)
t.setNanos(((seconds - sec) * 1e9).toInt)
t
}
} catch {
// TODO: SPARK-18011 with some versions of R deserializing NA from R results in NASE
case _: NegativeArraySizeException => null
}
}

def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
Expand Down

0 comments on commit e371040

Please sign in to comment.