Skip to content

Commit

Permalink
[SPARK-29347][SQL] Add JSON serialization for external Rows
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR adds JSON serialization for Spark external Rows.

### Why are the changes needed?
This is to be used for observable metrics where the `StreamingQueryProgress` contains a map of observed metrics rows which needs to be serialized in some cases.

### Does this PR introduce any user-facing change?
Yes, a user can call `toJson` on rows returned when collecting a DataFrame to the driver.

### How was this patch tested?
Added a new test suite: `RowJsonSuite` that should test this.

Closes apache#26013 from hvanhovell/SPARK-29347.

Authored-by: herman <[email protected]>
Signed-off-by: herman <[email protected]>
  • Loading branch information
hvanhovell committed Oct 14, 2019
1 parent ff9fcd5 commit 1f1443e
Show file tree
Hide file tree
Showing 2 changed files with 246 additions and 2 deletions.
108 changes: 106 additions & 2 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,24 @@

package org.apache.spark.sql

import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}
import java.util.{Base64, TimeZone}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.hashing.MurmurHash3

import org.apache.spark.annotation.Stable
import org.json4s._
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.{Private, Stable, Unstable}
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, Decimal, MapType, StringType, StructType, UserDefinedType}

/**
* @since 1.3.0
Expand Down Expand Up @@ -501,4 +513,96 @@ trait Row extends Serializable {
private def getAnyValAs[T <: AnyVal](i: Int): T =
if (isNullAt(i)) throw new NullPointerException(s"Value at index $i is null")
else getAs[T](i)

/**
* The compact JSON representation of this row.
* @since 3.0
*/
@Unstable
def json: String = compact(jsonValue)

/**
* The pretty (i.e. indented) JSON representation of this row.
* @since 3.0
*/
@Unstable
def prettyJson: String = pretty(render(jsonValue))

/**
* JSON representation of the row.
*
* Note that this only supports the data types that are also supported by
* [[org.apache.spark.sql.catalyst.encoders.RowEncoder]].
*
* @return the JSON representation of the row.
*/
private[sql] def jsonValue: JValue = {
require(schema != null, "JSON serialization requires a non-null schema.")

lazy val zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)
lazy val dateFormatter = DateFormatter.apply(zoneId)
lazy val timestampFormatter = TimestampFormatter(zoneId)

// Convert an iterator of values to a json array
def iteratorToJsonArray(iterator: Iterator[_], elementType: DataType): JArray = {
JArray(iterator.map(toJson(_, elementType)).toList)
}

// Convert a value to json.
def toJson(value: Any, dataType: DataType): JValue = (value, dataType) match {
case (null, _) => JNull
case (b: Boolean, _) => JBool(b)
case (b: Byte, _) => JLong(b)
case (s: Short, _) => JLong(s)
case (i: Int, _) => JLong(i)
case (l: Long, _) => JLong(l)
case (f: Float, _) => JDouble(f)
case (d: Double, _) => JDouble(d)
case (d: BigDecimal, _) => JDecimal(d)
case (d: java.math.BigDecimal, _) => JDecimal(d)
case (d: Decimal, _) => JDecimal(d.toBigDecimal)
case (s: String, _) => JString(s)
case (b: Array[Byte], BinaryType) =>
JString(Base64.getEncoder.encodeToString(b))
case (d: LocalDate, _) =>
JString(dateFormatter.format(DateTimeUtils.localDateToDays(d)))
case (d: Date, _) =>
JString(dateFormatter.format(DateTimeUtils.fromJavaDate(d)))
case (i: Instant, _) =>
JString(timestampFormatter.format(DateTimeUtils.instantToMicros(i)))
case (t: Timestamp, _) =>
JString(timestampFormatter.format(DateTimeUtils.fromJavaTimestamp(t)))
case (a: Array[_], ArrayType(elementType, _)) =>
iteratorToJsonArray(a.iterator, elementType)
case (s: Seq[_], ArrayType(elementType, _)) =>
iteratorToJsonArray(s.iterator, elementType)
case (m: Map[String @unchecked, _], MapType(StringType, valueType, _)) =>
new JObject(m.toList.sortBy(_._1).map {
case (k, v) => k -> toJson(v, valueType)
})
case (m: Map[_, _], MapType(keyType, valueType, _)) =>
new JArray(m.iterator.map {
case (k, v) =>
new JObject("key" -> toJson(k, keyType) :: "value" -> toJson(v, valueType) :: Nil)
}.toList)
case (r: Row, _) => r.jsonValue
case (v: Any, udt: UserDefinedType[Any @unchecked]) =>
val dataType = udt.sqlType
toJson(CatalystTypeConverters.convertToScala(udt.serialize(v), dataType), dataType)
case _ =>
throw new IllegalArgumentException(s"Failed to convert value $value " +
s"(class of ${value.getClass}}) with the type of $dataType to JSON.")
}

// Convert the row fields to json
var n = 0
var elements = new mutable.ListBuffer[JField]
val len = length
while (n < len) {
val field = schema(n)
elements += (field.name -> toJson(apply(n), field.dataType))
n += 1
}
new JObject(elements.toList)
}
}
140 changes: 140 additions & 0 deletions sql/catalyst/src/test/scala/org/apache/spark/sql/RowJsonSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}

import org.json4s.JsonAST.{JArray, JBool, JDecimal, JDouble, JLong, JNull, JObject, JString, JValue}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.encoders.{ExamplePoint, ExamplePointUDT}
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
* Test suite for [[Row]] JSON serialization.
*/
class RowJsonSuite extends SparkFunSuite {
private val schema = new StructType()
.add("c1", "string")
.add("c2", IntegerType)

private def testJson(name: String, value: Any, dt: DataType, expected: JValue): Unit = {
test(name) {
val row = new GenericRowWithSchema(Array(value), new StructType().add("a", dt))
assert(row.jsonValue === JObject("a" -> expected))
}
}

private def testJson(value: Any, dt: DataType, expected: JValue): Unit = {
testJson(s"$dt $value", value, dt, expected)
}

// Nulls
private def testJsonNull(dt: DataType, expected: JValue): Unit = {
testJson(null, dt, JNull)
}
testJsonNull(IntegerType, JNull)
testJsonNull(FloatType, JNull)
testJsonNull(ArrayType(DoubleType, containsNull = true), JNull)

// Primitives
testJson(true, BooleanType, JBool(true))
testJson(false, BooleanType, JBool(false))
testJson(23.toByte, ByteType, JLong(23))
testJson(-126.toByte, ByteType, JLong(-126))
testJson(20281.toShort, ShortType, JLong(20281))
testJson(-8752.toShort, ShortType, JLong(-8752))
testJson(1078231987, IntegerType, JLong(1078231987))
testJson(-10, IntegerType, JLong(-10))
testJson(139289832109874199L, LongType, JLong(139289832109874199L))
testJson(-7873748239973488L, LongType, JLong(-7873748239973488L))
testJson(10.232e10f, FloatType, JDouble(10.232e10f))
testJson(9.7e-13f, FloatType, JDouble(9.7e-13f))
testJson(3.891e98d, DoubleType, JDouble(3.891e98d))
testJson(-7.8e5d, DoubleType, JDouble(-7.8e5d))
testJson(BigDecimal("1092.88"), DecimalType(10, 2), JDecimal(BigDecimal("1092.88")))
testJson(Decimal("782.0003"), DecimalType(7, 4), JDecimal(BigDecimal("782.0003")))
testJson(new java.math.BigDecimal("-77.89"), DecimalType(4, 2), JDecimal(BigDecimal("-77.89")))
testJson("hello world", StringType, JString("hello world"))
testJson("BinaryType", Array('a'.toByte, 'b'.toByte), BinaryType, JString("YWI="))
testJson(Date.valueOf("2019-04-22"), DateType, JString("2019-04-22"))
testJson(LocalDate.of(2018, 5, 14), DateType, JString("2018-05-14"))
testJson(
Timestamp.valueOf("2017-01-06 10:22:03.00"),
TimestampType,
JString("2017-01-06 10:22:03"))
testJson(
Timestamp.valueOf("2017-05-30 10:22:03.00").toInstant,
TimestampType,
JString("2017-05-30 10:22:03"))

// Complex types
testJson(
"ArrayType(LongType,true)",
Array(1L, null, 77L),
ArrayType(LongType, containsNull = true),
JArray(JLong(1L) :: JNull :: JLong(77L) :: Nil))

testJson(
Seq(1, -2, 3),
ArrayType(IntegerType, containsNull = false),
JArray(JLong(1) :: JLong(-2) :: JLong(3) :: Nil))

testJson(
Map("a" -> "b", "c" -> "d", "e" -> null),
MapType(StringType, StringType, valueContainsNull = true),
JObject("a" -> JString("b"), "c" -> JString("d"), "e" -> JNull))

testJson(
Map(1 -> "b", 2 -> "d", 3 -> null),
MapType(IntegerType, StringType, valueContainsNull = true),
JArray(
JObject("key" -> JLong(1), "value" -> JString("b")) ::
JObject("key" -> JLong(2), "value" -> JString("d")) ::
JObject("key" -> JLong(3), "value" -> JNull) :: Nil))

testJson(
new GenericRowWithSchema(Array("1", 2), schema),
schema,
JObject("c1" -> JString("1"), "c2" -> JLong(2)))

testJson(
"UDT",
new ExamplePoint(3.4d, 8.98d),
new ExamplePointUDT,
JArray(JDouble(3.4d) :: JDouble(8.98d) :: Nil))

test("no schema") {
val e = intercept[IllegalArgumentException] {
Row("a").jsonValue
}
assert(e.getMessage.contains("requires a non-null schema"))
}

test("unsupported type") {
val e = intercept[IllegalArgumentException] {
val row = new GenericRowWithSchema(
Array((1, 2)),
new StructType().add("a", ObjectType(classOf[(Int, Int)])))
row.jsonValue
}
assert(e.getMessage.contains("Failed to convert value"))
}
}

0 comments on commit 1f1443e

Please sign in to comment.