Skip to content

Commit

Permalink
[SPARK-17926][SQL][STREAMING] Added json for statuses
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

StreamingQueryStatus exposed through StreamingQueryListener often needs to be recorded (similar to SparkListener events). This PR adds `.json` and `.prettyJson` to `StreamingQueryStatus`, `SourceStatus` and `SinkStatus`.

## How was this patch tested?
New unit tests

Author: Tathagata Das <[email protected]>

Closes apache#15476 from tdas/SPARK-17926.
  • Loading branch information
tdas authored and yhuai committed Oct 21, 2016
1 parent e371040 commit 7a531e3
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 25 deletions.
11 changes: 5 additions & 6 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ def __str__(self):
Pretty string of this query status.
>>> print(sqs)
StreamingQueryStatus:
Query name: query
Status of query 'query'
Query id: 1
Status timestamp: 123
Input rate: 15.5 rows/sec
Expand All @@ -220,15 +219,15 @@ def __str__(self):
numRows.input.total: 100
triggerId: 5
Source statuses [1 source]:
Source 1: MySource1
Source 1 - MySource1
Available offset: #0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
Sink status: MySink
Sink status - MySink
Committed offsets: [#1, -]
"""
return self._jsqs.toString()
Expand Down Expand Up @@ -366,7 +365,7 @@ def __str__(self):
Pretty string of this source status.
>>> print(sqs.sourceStatuses[0])
SourceStatus: MySource1
Status of source MySource1
Available offset: #0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Expand Down Expand Up @@ -457,7 +456,7 @@ def __str__(self):
Pretty string of this source status.
>>> print(sqs.sinkStatus)
SinkStatus: MySink
Status of sink MySink
Committed offsets: [#1, -]
"""
return self._jss.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.spark.sql.streaming

import org.json4s._
import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.streaming.StreamingQueryStatus.indent

Expand All @@ -34,8 +39,19 @@ class SinkStatus private(
val description: String,
val offsetDesc: String) {

/** The compact JSON representation of this status. */
def json: String = compact(render(jsonValue))

/** The pretty (i.e. indented) JSON representation of this status. */
def prettyJson: String = pretty(render(jsonValue))

override def toString: String =
"SinkStatus:" + indent(prettyString)
"Status of sink " + indent(prettyString).trim

private[sql] def jsonValue: JValue = {
("description" -> JString(description)) ~
("offsetDesc" -> JString(offsetDesc))
}

private[sql] def prettyString: String = {
s"""$description
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ import java.{util => ju}

import scala.collection.JavaConverters._

import org.json4s._
import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
import org.apache.spark.util.JsonProtocol

/**
* :: Experimental ::
Expand All @@ -47,8 +53,22 @@ class SourceStatus private(
val processingRate: Double,
val triggerDetails: ju.Map[String, String]) {

/** The compact JSON representation of this status. */
def json: String = compact(render(jsonValue))

/** The pretty (i.e. indented) JSON representation of this status. */
def prettyJson: String = pretty(render(jsonValue))

override def toString: String =
"SourceStatus:" + indent(prettyString)
"Status of source " + indent(prettyString).trim

private[sql] def jsonValue: JValue = {
("description" -> JString(description)) ~
("offsetDesc" -> JString(offsetDesc)) ~
("inputRate" -> JDouble(inputRate)) ~
("processingRate" -> JDouble(processingRate)) ~
("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala))
}

private[sql] def prettyString: String = {
val triggerDetailsLines =
Expand All @@ -59,7 +79,6 @@ class SourceStatus private(
|Processing rate: $processingRate rows/sec
|Trigger details:
|""".stripMargin + indent(triggerDetailsLines)

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ import java.{util => ju}

import scala.collection.JavaConverters._

import org.json4s._
import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset}
import org.apache.spark.util.JsonProtocol

/**
* :: Experimental ::
Expand Down Expand Up @@ -59,29 +65,46 @@ class StreamingQueryStatus private(

import StreamingQueryStatus._

/** The compact JSON representation of this status. */
def json: String = compact(render(jsonValue))

/** The pretty (i.e. indented) JSON representation of this status. */
def prettyJson: String = pretty(render(jsonValue))

override def toString: String = {
val sourceStatusLines = sourceStatuses.zipWithIndex.map { case (s, i) =>
s"Source ${i + 1}:" + indent(s.prettyString)
s"Source ${i + 1} - " + indent(s.prettyString).trim
}
val sinkStatusLines = sinkStatus.prettyString
val sinkStatusLines = sinkStatus.prettyString.trim
val triggerDetailsLines = triggerDetails.asScala.map { case (k, v) => s"$k: $v" }.toSeq.sorted
val numSources = sourceStatuses.length
val numSourcesString = s"$numSources source" + { if (numSources > 1) "s" else "" }

val allLines = s"""
|Query name: $name
|Query id: $id
|Status timestamp: $timestamp
|Input rate: $inputRate rows/sec
|Processing rate $processingRate rows/sec
|Latency: ${latency.getOrElse("-")} ms
|Trigger details:
|${indent(triggerDetailsLines)}
|Source statuses [$numSourcesString]:
|${indent(sourceStatusLines)}
|Sink status: ${indent(sinkStatusLines)}""".stripMargin

s"StreamingQueryStatus:${indent(allLines)}"
val allLines =
s"""|Query id: $id
|Status timestamp: $timestamp
|Input rate: $inputRate rows/sec
|Processing rate $processingRate rows/sec
|Latency: ${latency.getOrElse("-")} ms
|Trigger details:
|${indent(triggerDetailsLines)}
|Source statuses [$numSourcesString]:
|${indent(sourceStatusLines)}
|Sink status - ${indent(sinkStatusLines).trim}""".stripMargin

s"Status of query '$name'\n${indent(allLines)}"
}

private[sql] def jsonValue: JValue = {
("name" -> JString(name)) ~
("id" -> JInt(id)) ~
("timestamp" -> JInt(timestamp)) ~
("inputRate" -> JDouble(inputRate)) ~
("processingRate" -> JDouble(processingRate)) ~
("latency" -> latency.map(JDouble).getOrElse(JNothing)) ~
("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala))
("sourceStatuses" -> JArray(sourceStatuses.map(_.jsonValue).toList)) ~
("sinkStatus" -> sinkStatus.jsonValue)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.streaming

import org.apache.spark.SparkFunSuite

class StreamingQueryStatusSuite extends SparkFunSuite {
test("toString") {
assert(StreamingQueryStatus.testStatus.sourceStatuses(0).toString ===
"""
|Status of source MySource1
| Available offset: #0
| Input rate: 15.5 rows/sec
| Processing rate: 23.5 rows/sec
| Trigger details:
| numRows.input.source: 100
| latency.getOffset.source: 10
| latency.getBatch.source: 20
""".stripMargin.trim, "SourceStatus.toString does not match")

assert(StreamingQueryStatus.testStatus.sinkStatus.toString ===
"""
|Status of sink MySink
| Committed offsets: [#1, -]
""".stripMargin.trim, "SinkStatus.toString does not match")

assert(StreamingQueryStatus.testStatus.toString ===
"""
|Status of query 'query'
| Query id: 1
| Status timestamp: 123
| Input rate: 15.5 rows/sec
| Processing rate 23.5 rows/sec
| Latency: 345.0 ms
| Trigger details:
| isDataPresentInTrigger: true
| isTriggerActive: true
| latency.getBatch.total: 20
| latency.getOffset.total: 10
| numRows.input.total: 100
| triggerId: 5
| Source statuses [1 source]:
| Source 1 - MySource1
| Available offset: #0
| Input rate: 15.5 rows/sec
| Processing rate: 23.5 rows/sec
| Trigger details:
| numRows.input.source: 100
| latency.getOffset.source: 10
| latency.getBatch.source: 20
| Sink status - MySink
| Committed offsets: [#1, -]
""".stripMargin.trim, "StreamingQueryStatus.toString does not match")

}

test("json") {
assert(StreamingQueryStatus.testStatus.json ===
"""
|{"sourceStatuses":[{"description":"MySource1","offsetDesc":"#0","inputRate":15.5,
|"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100",
|"latency.getOffset.source":"10","latency.getBatch.source":"20"}}],
|"sinkStatus":{"description":"MySink","offsetDesc":"[#1, -]"}}
""".stripMargin.replace("\n", "").trim)
}

test("prettyJson") {
assert(
StreamingQueryStatus.testStatus.prettyJson ===
"""
|{
| "sourceStatuses" : [ {
| "description" : "MySource1",
| "offsetDesc" : "#0",
| "inputRate" : 15.5,
| "processingRate" : 23.5,
| "triggerDetails" : {
| "numRows.input.source" : "100",
| "latency.getOffset.source" : "10",
| "latency.getBatch.source" : "20"
| }
| } ],
| "sinkStatus" : {
| "description" : "MySink",
| "offsetDesc" : "[#1, -]"
| }
|}
""".stripMargin.trim)
}
}

0 comments on commit 7a531e3

Please sign in to comment.