Skip to content

Commit

Permalink
[LIVY-613][REPL] Livy can't handle the java.sql.Date type correctly.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

When Spark table has `java.sql.Date` type column, Livy can't handle the `java.sql.Date` type correctly. e.g
```
create table test(
    name string,
    birthday date
);

insert into test values ('Livy', '2019-07-24')

curl -H "Content-Type:application/json" -X POST -d '{"code":"select * from test", "kind":"sql"}' 192.168.1.6:8998/sessions/48/statements
{"id":1,"code":"select * from test","state":"waiting","output":null,"progress":0.0}

curl 192.168.1.6:8998/sessions/48/statements/1
{"id":1,"code":"select * from test","state":"available","output":{"status":"ok","execution_count":1,"data":{"application/json":{"schema":{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"birthday","type":"date","nullable":true,"metadata":{}}]},"data":[["Livy",{}]]}}},"progress":1.0}
```
as you can see, the output of `select * from test` is `["Livy",{}]` , birthday column's value isn't handle  correctly.

The reason is that json4s can't handle `java.sql.Date`, so we should define the `CustomSerializer` for `java.sql.Date`.

This PR  add a `DateSerializer` to support `java.sql.Date` parser.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review https://livy.incubator.apache.org/community/ before opening a pull request.

Author: yangping.wyp <[email protected]>

Closes apache#186 from 397090770/master.
  • Loading branch information
wypb authored and jerryshao committed Jul 26, 2019
1 parent 7dee3cc commit 92062e1
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 2 deletions.
11 changes: 10 additions & 1 deletion repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.livy.repl

import java.lang.reflect.InvocationTargetException
import java.sql.Date

import scala.util.control.NonFatal

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.json4s._
import org.json4s.JsonAST.{JNull, JString}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

Expand Down Expand Up @@ -66,7 +68,14 @@ class SQLInterpreter(
rscConf: RSCConf,
sparkEntries: SparkEntries) extends Interpreter with Logging {

private implicit def formats = DefaultFormats
case object DateSerializer extends CustomSerializer[Date](_ => ( {
case JString(s) => Date.valueOf(s)
case JNull => null
}, {
case d: Date => JString(d.toString)
}))

private implicit def formats: Formats = DefaultFormats + DateSerializer

private var spark: SparkSession = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@

package org.apache.livy.repl

import java.sql.Date

import scala.util.Try

import org.apache.spark.SparkConf
import org.json4s.{DefaultFormats, JValue}
import org.json4s.JsonAST.JArray
import org.json4s.JsonAST.{JArray, JNull}
import org.json4s.JsonDSL._

import org.apache.livy.rsc.RSCConf
import org.apache.livy.rsc.driver.SparkEntries

case class People(name: String, age: Int)
case class Person(name: String, birthday: Date)

class SQLInterpreterSpec extends BaseInterpreterSpec {

Expand All @@ -43,6 +46,70 @@ class SQLInterpreterSpec extends BaseInterpreterSpec {
new SQLInterpreter(conf, new RSCConf(), sparkEntries)
}

it should "handle java.sql.Date tpye" in withInterpreter { interpreter =>
val personList = Seq(Person("Jerry", Date.valueOf("2019-07-24")),
Person("Michael", Date.valueOf("2019-07-23")))

val rdd = sparkEntries.sc().parallelize(personList)
val df = sparkEntries.sqlctx().createDataFrame(rdd)
df.createOrReplaceTempView("person")

// Test normal behavior
val resp1 = interpreter.execute("SELECT * FROM person")

val expectedResult = Interpreter.ExecuteSuccess(
APPLICATION_JSON -> (("schema" ->
(("type" -> "struct") ~
("fields" -> List(
("name" -> "name") ~ ("type" -> "string") ~ ("nullable" -> true) ~
("metadata" -> List()),
("name" -> "birthday") ~ ("type" -> "date") ~ ("nullable" -> true) ~
("metadata" -> List())
)))) ~
("data" -> List(
List[JValue]("Jerry", "2019-07-24"),
List[JValue]("Michael", "2019-07-23")
)))
)

val result = Try { resp1 should equal(expectedResult)}
if (result.isFailure) {
fail(s"$resp1 doesn't equal to expected result")
}
}

it should "test java.sql.Date null" in withInterpreter { interpreter =>
val personList = Seq(Person("Jerry", null),
Person("Michael", Date.valueOf("2019-07-23")))

val rdd = sparkEntries.sc().parallelize(personList)
val df = sparkEntries.sqlctx().createDataFrame(rdd)
df.createOrReplaceTempView("person")

// Test normal behavior
val resp1 = interpreter.execute("SELECT * FROM person")

val expectedResult = Interpreter.ExecuteSuccess(
APPLICATION_JSON -> (("schema" ->
(("type" -> "struct") ~
("fields" -> List(
("name" -> "name") ~ ("type" -> "string") ~ ("nullable" -> true) ~
("metadata" -> List()),
("name" -> "birthday") ~ ("type" -> "date") ~ ("nullable" -> true) ~
("metadata" -> List())
)))) ~
("data" -> List(
List[JValue]("Jerry", JNull),
List[JValue]("Michael", "2019-07-23")
)))
)

val result = Try { resp1 should equal(expectedResult)}
if (result.isFailure) {
fail(s"$resp1 doesn't equal to expected result")
}
}

it should "execute sql queries" in withInterpreter { interpreter =>
val rdd = sparkEntries.sc().parallelize(Seq(People("Jerry", 20), People("Michael", 21)))
val df = sparkEntries.sqlctx().createDataFrame(rdd)
Expand Down

0 comments on commit 92062e1

Please sign in to comment.