Skip to content

Commit

Permalink
[SPARK-18182] Expose ReplayListenerBus.read() overload which takes st…
Browse files Browse the repository at this point in the history
…ring iterator

The `ReplayListenerBus.read()` method is used when implementing a custom `ApplicationHistoryProvider`. The current interface only exposes a `read()` method which takes an `InputStream` and performs stream-to-lines conversion itself, but it would also be useful to expose an overloaded method which accepts an iterator of strings, thereby enabling events to be provided from non-`InputStream` sources.

Author: Josh Rosen <[email protected]>

Closes apache#15698 from JoshRosen/replay-listener-bus-interface.
  • Loading branch information
JoshRosen authored and rxin committed Nov 1, 2016
1 parent 6e62981 commit b929537
Showing 1 changed file with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,24 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
sourceName: String,
maybeTruncated: Boolean = false,
eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
val lines = Source.fromInputStream(logData).getLines()
replay(lines, sourceName, maybeTruncated, eventsFilter)
}

/**
* Overloaded variant of [[replay()]] which accepts an iterator of lines instead of an
* [[InputStream]]. Exposed for use by custom ApplicationHistoryProvider implementations.
*/
def replay(
lines: Iterator[String],
sourceName: String,
maybeTruncated: Boolean,
eventsFilter: ReplayEventsFilter): Unit = {
var currentLine: String = null
var lineNumber: Int = 0

try {
val lineEntries = Source.fromInputStream(logData)
.getLines()
val lineEntries = lines
.zipWithIndex
.filter { case (line, _) => eventsFilter(line) }

Expand Down

0 comments on commit b929537

Please sign in to comment.