Skip to content

Commit

Permalink
[SPARK-7838] [STREAMING] Set scope for kinesis stream
Browse files Browse the repository at this point in the history
Author: Tathagata Das <[email protected]>

Closes apache#6369 from tdas/SPARK-7838 and squashes the following commits:

87d1c7f [Tathagata Das] Addressed comment
37775d8 [Tathagata Das] set scope for kinesis stream
  • Loading branch information
tdas authored and Andrew Or committed May 23, 2015
1 parent 017b340 commit baa8983
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ object KinesisUtils {
checkpointInterval: Duration,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]] = {
ssc.receiverStream(
new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName),
initialPositionInStream, checkpointInterval, storageLevel, None))
// Setting scope to override receiver stream's scope of "receiver stream"
ssc.withNamedScope("kinesis stream") {
ssc.receiverStream(
new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName),
initialPositionInStream, checkpointInterval, storageLevel, None))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class StreamingContext private[streaming] (
*
* Note: Return statements are NOT allowed in the given body.
*/
private def withNamedScope[U](name: String)(body: => U): U = {
private[streaming] def withNamedScope[U](name: String)(body: => U): U = {
RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body)
}

Expand Down

0 comments on commit baa8983

Please sign in to comment.