Skip to content

Commit

Permalink
[SPARK-20189][DSTREAM] Fix spark kinesis testcases to remove deprecat…
Browse files Browse the repository at this point in the history
…ed createStream and use Builders

## What changes were proposed in this pull request?

The spark-kinesis testcases use the KinesisUtils.createStream which are deprecated now. Modify the testcases to use the recommended KinesisInputDStream.builder instead.
This change will also enable the testcases to automatically use the session tokens automatically.

## How was this patch tested?

All the existing testcases work fine as expected with the changes.

https://issues.apache.org/jira/browse/SPARK-20189

Author: Yash Sharma <[email protected]>

Closes apache#17506 from yssharma/ysharma/cleanup_kinesis_testcases.
  • Loading branch information
yashs360 authored and srowen committed Apr 13, 2017
1 parent c5f1cc3 commit ec68d8f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ object KinesisInputDStream {
getRequiredParam(checkpointAppName, "checkpointAppName"),
checkpointInterval.getOrElse(ssc.graph.batchDuration),
storageLevel.getOrElse(DEFAULT_STORAGE_LEVEL),
handler,
ssc.sc.clean(handler),
kinesisCredsProvider.getOrElse(DefaultCredentials),
dynamoDBCredsProvider,
cloudWatchCredsProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random

import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.Record
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
Expand Down Expand Up @@ -173,11 +172,15 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
* and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
*/
testIfEnabled("basic operation") {
val awsCredentials = KinesisTestUtils.getAWSCredentials()
val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
val stream = KinesisInputDStream.builder.streamingContext(ssc)
.checkpointAppName(appName)
.streamName(testUtils.streamName)
.endpointUrl(testUtils.endpointUrl)
.regionName(testUtils.regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.build()

val collected = new mutable.HashSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
Expand All @@ -198,12 +201,17 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
}

testIfEnabled("custom message handling") {
val awsCredentials = KinesisTestUtils.getAWSCredentials()
def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY, addFive(_),
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)

val stream = KinesisInputDStream.builder.streamingContext(ssc)
.checkpointAppName(appName)
.streamName(testUtils.streamName)
.endpointUrl(testUtils.endpointUrl)
.regionName(testUtils.regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.buildWithMessageHandler(addFive(_))

stream shouldBe a [ReceiverInputDStream[_]]

Expand Down Expand Up @@ -233,11 +241,15 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
val localTestUtils = new KPLBasedKinesisTestUtils(1)
localTestUtils.createStream()
try {
val awsCredentials = KinesisTestUtils.getAWSCredentials()
val stream = KinesisUtils.createStream(ssc, localAppName, localTestUtils.streamName,
localTestUtils.endpointUrl, localTestUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
val stream = KinesisInputDStream.builder.streamingContext(ssc)
.checkpointAppName(localAppName)
.streamName(localTestUtils.streamName)
.endpointUrl(localTestUtils.endpointUrl)
.regionName(localTestUtils.regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.build()

val collected = new mutable.HashSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
Expand Down Expand Up @@ -303,13 +315,17 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
ssc = new StreamingContext(sc, Milliseconds(1000))
ssc.checkpoint(checkpointDir)

val awsCredentials = KinesisTestUtils.getAWSCredentials()
val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]

val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
val kinesisStream = KinesisInputDStream.builder.streamingContext(ssc)
.checkpointAppName(appName)
.streamName(testUtils.streamName)
.endpointUrl(testUtils.endpointUrl)
.regionName(testUtils.regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.build()

// Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch
kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
Expand Down

0 comments on commit ec68d8f

Please sign in to comment.