Skip to content

Commit

Permalink
[SPARK-7112][Streaming][WIP] Add a InputInfoTracker to track all the …
Browse files Browse the repository at this point in the history
…input streams

Author: jerryshao <[email protected]>
Author: Saisai Shao <[email protected]>

Closes #5680 from jerryshao/SPARK-7111 and squashes the following commits:

339f854 [Saisai Shao] Add an end-to-end test
812bcaf [jerryshao] Continue address the comments
abd0036 [jerryshao] Address the comments
727264e [jerryshao] Fix comment typo
6682bef [jerryshao] Fix compile issue
8325787 [jerryshao] Fix rebase issue
17fa251 [jerryshao] Refactor to build InputInfoTracker
ee1b536 [jerryshao] Add DirectStreamTracker to track the direct streams
  • Loading branch information
jerryshao authored and tdas committed May 2, 2015
1 parent ebc25a4 commit b88c275
Show file tree
Hide file tree
Showing 16 changed files with 247 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class StreamingContext private[streaming] (
}
}

private val nextReceiverInputStreamId = new AtomicInteger(0)
private val nextInputStreamId = new AtomicInteger(0)

private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
Expand Down Expand Up @@ -241,7 +241,7 @@ class StreamingContext private[streaming] (
if (isCheckpointPresent) cp_ else null
}

private[streaming] def getNewReceiverStreamId() = nextReceiverInputStreamId.getAndIncrement()
private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement()

/**
* Create an input stream with any arbitrary user implemented receiver.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)

ssc.graph.addInputStream(this)

/** This is an unique identifier for the input stream. */
val id = ssc.getNewInputStreamId()

/**
* Checks whether the 'time' is valid wrt slideDuration for generating RDD.
* Additionally it also ensures valid times are in strictly increasing order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming._
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.{Receiver, WriteAheadLogBasedStoreResult}
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.streaming.scheduler.{InputInfo, ReceivedBlockInfo}

/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
Expand All @@ -39,9 +39,6 @@ import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {

/** This is an unique identifier for the receiver input stream. */
val id = ssc.getNewReceiverStreamId()

/**
* Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
Expand Down Expand Up @@ -72,6 +69,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
val blockStoreResults = blockInfos.map { _.blockStoreResult }
val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray

// Register the input blocks information into InputInfoTracker
val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

// Check whether all the results are of the same type
val resultTypes = blockStoreResults.map { _.getClass }.distinct
if (resultTypes.size > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.streaming.Time
* :: DeveloperApi ::
* Class having information on completed batches.
* @param batchTime Time of the batch
* @param streamIdToNumRecords A map of input stream id to record number
* @param submissionTime Clock time of when jobs of this batch was submitted to
* the streaming scheduler queue
* @param processingStartTime Clock time of when the first job of this batch started processing
Expand All @@ -32,7 +33,7 @@ import org.apache.spark.streaming.Time
@DeveloperApi
case class BatchInfo(
batchTime: Time,
receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],
streamIdToNumRecords: Map[Int, Long],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long]
Expand All @@ -58,4 +59,9 @@ case class BatchInfo(
*/
def totalDelay: Option[Long] = schedulingDelay.zip(processingDelay)
.map(x => x._1 + x._2).headOption

/**
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = streamIdToNumRecords.values.sum
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.scheduler

import scala.collection.mutable

import org.apache.spark.Logging
import org.apache.spark.streaming.{Time, StreamingContext}

/** To track the information of input stream at specified batch time. */
private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long)

/**
* This class manages all the input streams as well as their input data statistics. The information
* will be exposed through StreamingListener for monitoring.
*/
private[streaming] class InputInfoTracker(ssc: StreamingContext) extends Logging {

// Map to track all the InputInfo related to specific batch time and input stream.
private val batchTimeToInputInfos = new mutable.HashMap[Time, mutable.HashMap[Int, InputInfo]]

/** Report the input information with batch time to the tracker */
def reportInfo(batchTime: Time, inputInfo: InputInfo): Unit = synchronized {
val inputInfos = batchTimeToInputInfos.getOrElseUpdate(batchTime,
new mutable.HashMap[Int, InputInfo]())

if (inputInfos.contains(inputInfo.inputStreamId)) {
throw new IllegalStateException(s"Input stream ${inputInfo.inputStreamId}} for batch" +
s"$batchTime is already added into InputInfoTracker, this is a illegal state")
}
inputInfos += ((inputInfo.inputStreamId, inputInfo))
}

/** Get the all the input stream's information of specified batch time */
def getInfo(batchTime: Time): Map[Int, InputInfo] = synchronized {
val inputInfos = batchTimeToInputInfos.get(batchTime)
// Convert mutable HashMap to immutable Map for the caller
inputInfos.map(_.toMap).getOrElse(Map[Int, InputInfo]())
}

/** Cleanup the tracked input information older than threshold batch time */
def cleanup(batchThreshTime: Time): Unit = synchronized {
val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime)
logInfo(s"remove old batch metadata: ${timesToCleanup.mkString(" ")}")
batchTimeToInputInfos --= timesToCleanup
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val receivedBlockInfos =
jobScheduler.receiverTracker.getBlocksOfBatch(time).mapValues { _.toArray }
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfos))
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
val streamIdToNumRecords = streamIdToInputInfos.mapValues(_.numRecords)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToNumRecords))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
Expand All @@ -266,6 +266,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// checkpointing of this batch to complete.
val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
markBatchFullyProcessed(time)
}
}
Expand All @@ -278,6 +279,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// been saved to checkpoints, so its safe to delete block metadata and data WAL files
val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
markBatchFullyProcessed(time)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null
// A tracker to track all the input stream information as well as processed record number
var inputInfoTracker: InputInfoTracker = null

private var eventLoop: EventLoop[JobSchedulerEvent] = null

def start(): Unit = synchronized {
Expand All @@ -65,6 +68,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {

listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[streaming]
case class JobSet(
time: Time,
jobs: Seq[Job],
receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty) {
streamIdToNumRecords: Map[Int, Long] = Map.empty) {

private val incompleteJobs = new HashSet[Job]()
private val submissionTime = System.currentTimeMillis() // when this jobset was submitted
Expand Down Expand Up @@ -64,7 +64,7 @@ case class JobSet(
def toBatchInfo: BatchInfo = {
new BatchInfo(
time,
receivedBlockInfo,
streamIdToNumRecords,
submissionTime,
if (processingStartTime >= 0 ) Some(processingStartTime) else None,
if (processingEndTime >= 0 ) Some(processingEndTime) else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private[ui] object BatchUIData {
def apply(batchInfo: BatchInfo): BatchUIData = {
new BatchUIData(
batchInfo.batchTime,
batchInfo.receivedBlockInfo.mapValues(_.map(_.numRecords).sum),
batchInfo.streamIdToNumRecords,
batchInfo.submissionTime,
batchInfo.processingStartTime,
batchInfo.processingEndTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,32 +188,37 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}

def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
val latestBatches = retainedBatches.reverse.take(batchUIDataLimit)
(0 until numReceivers).map { receiverId =>
val recordsOfParticularReceiver = latestBatches.map { batch =>
// calculate records per second for each batch
batch.receiverNumRecords.get(receiverId).sum.toDouble * 1000 / batchDuration
}
val distributionOption = Distribution(recordsOfParticularReceiver)
(receiverId, distributionOption)
val latestBatchInfos = retainedBatches.reverse.take(batchUIDataLimit)
val latestReceiverNumRecords = latestBatchInfos.map(_.receiverNumRecords)
val streamIds = ssc.graph.getInputStreams().map(_.id)
streamIds.map { id =>
val recordsOfParticularReceiver =
latestReceiverNumRecords.map(v => v.getOrElse(id, 0L).toDouble * 1000 / batchDuration)
val distribution = Distribution(recordsOfParticularReceiver)
(id, distribution)
}.toMap
}

def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receiverNumRecords)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
(0 until numReceivers).map { receiverId =>
(receiverId, lastReceivedBlockInfo.getOrElse(receiverId, 0L))
val lastReceiverNumRecords = lastReceivedBatch.map(_.receiverNumRecords)
val streamIds = ssc.graph.getInputStreams().map(_.id)
lastReceiverNumRecords.map { receiverNumRecords =>
streamIds.map { id =>
(id, receiverNumRecords.getOrElse(id, 0L))
}.toMap
}.getOrElse {
(0 until numReceivers).map(receiverId => (receiverId, 0L)).toMap
streamIds.map(id => (id, 0L)).toMap
}
}

def receiverInfo(receiverId: Int): Option[ReceiverInfo] = synchronized {
receiverInfos.get(receiverId)
}

def receiverIds(): Iterable[Int] = synchronized {
receiverInfos.keys
}

def lastCompletedBatch: Option[BatchUIData] = synchronized {
completedBatchUIData.sortBy(_.batchTime)(Time.ordering).lastOption
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
"Maximum rate\n[events/sec]",
"Last Error"
)
val dataRows = (0 until listener.numReceivers).map { receiverId =>
val dataRows = listener.receiverIds().map { receiverId =>
val receiverInfo = listener.receiverInfo(receiverId)
val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId")
val receiverActive = receiverInfo.map { info =>
Expand All @@ -114,7 +114,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
}.getOrElse(emptyCell)
Seq(receiverName, receiverActive, receiverLocation, receiverLastBatchRecords) ++
receivedRecordStats ++ Seq(receiverLastError)
}
}.toSeq
Some(listingTable(headerRow, dataRows))
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQu
import scala.language.postfixOps

import com.google.common.io.Files
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually._

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{ManualClock, Utils}
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.rdd.RDD
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path

class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {

Expand Down Expand Up @@ -278,6 +279,30 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}

test("test track the number of input stream") {
val ssc = new StreamingContext(conf, batchDuration)

class TestInputDStream extends InputDStream[String](ssc) {
def start() { }
def stop() { }
def compute(validTime: Time): Option[RDD[String]] = None
}

class TestReceiverInputDStream extends ReceiverInputDStream[String](ssc) {
def getReceiver: Receiver[String] = null
}

// Register input streams
val receiverInputStreams = Array(new TestReceiverInputDStream, new TestReceiverInputDStream)
val inputStreams = Array(new TestInputDStream, new TestInputDStream, new TestInputDStream)

assert(ssc.graph.getInputStreams().length == receiverInputStreams.length + inputStreams.length)
assert(ssc.graph.getReceiverInputStreams().length == receiverInputStreams.length)
assert(ssc.graph.getReceiverInputStreams() === receiverInputStreams)
assert(ssc.graph.getInputStreams().map(_.id) === Array.tabulate(5)(i => i))
assert(receiverInputStreams.map(_.id) === Array(0, 1))
}

def testFileStream(newFilesOnly: Boolean) {
val testDir: File = null
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
info.totalDelay should be (None)
})

batchInfosSubmitted.foreach { info =>
info.numRecords should be (1L)
info.streamIdToNumRecords should be (Map(0 -> 1L))
}

isInIncreasingOrder(batchInfosSubmitted.map(_.submissionTime)) should be (true)

// SPARK-6766: processingStartTime of batch info should not be None when starting
Expand All @@ -70,6 +75,11 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
info.totalDelay should be (None)
})

batchInfosStarted.foreach { info =>
info.numRecords should be (1L)
info.streamIdToNumRecords should be (Map(0 -> 1L))
}

isInIncreasingOrder(batchInfosStarted.map(_.submissionTime)) should be (true)
isInIncreasingOrder(batchInfosStarted.map(_.processingStartTime.get)) should be (true)

Expand All @@ -86,6 +96,11 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
info.totalDelay.get should be >= 0L
})

batchInfosCompleted.foreach { info =>
info.numRecords should be (1L)
info.streamIdToNumRecords should be (Map(0 -> 1L))
}

isInIncreasingOrder(batchInfosCompleted.map(_.submissionTime)) should be (true)
isInIncreasingOrder(batchInfosCompleted.map(_.processingStartTime.get)) should be (true)
isInIncreasingOrder(batchInfosCompleted.map(_.processingEndTime.get)) should be (true)
Expand Down
Loading

0 comments on commit b88c275

Please sign in to comment.