Skip to content

Commit

Permalink
[STREAMING] [TEST] [HOTFIX] Fixed Kinesis test to not throw weird err…
Browse files Browse the repository at this point in the history
…ors when Kinesis tests are enabled without AWS keys

If Kinesis tests are enabled by env ENABLE_KINESIS_TESTS = 1 but no AWS credentials are found, the desired behavior is the fail the test using with
```
Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDSuite *** ABORTED *** (3 seconds, 5 milliseconds)
[info]   java.lang.Exception: Kinesis tests enabled, but could get not AWS credentials
```

Instead KinesisStreamSuite fails with

```
[info] - basic operation *** FAILED *** (3 seconds, 35 milliseconds)
[info]   java.lang.IllegalArgumentException: requirement failed: Stream not yet created, call createStream() to create one
[info]   at scala.Predef$.require(Predef.scala:233)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.streamName(KinesisTestUtils.scala:77)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils$$anonfun$deleteStream$1.apply(KinesisTestUtils.scala:150)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils$$anonfun$deleteStream$1.apply(KinesisTestUtils.scala:150)
[info]   at org.apache.spark.Logging$class.logWarning(Logging.scala:71)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.logWarning(KinesisTestUtils.scala:39)
[info]   at org.apache.spark.streaming.kinesis.KinesisTestUtils.deleteStream(KinesisTestUtils.scala:150)
[info]   at org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply$mcV$sp(KinesisStreamSuite.scala:111)
[info]   at org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply(KinesisStreamSuite.scala:86)
[info]   at org.apache.spark.streaming.kinesis.KinesisStreamSuite$$anonfun$3.apply(KinesisStreamSuite.scala:86)
```
This is because attempting to delete a non-existent Kinesis stream throws uncaught exception. This PR fixes it.

Author: Tathagata Das <[email protected]>

Closes #7809 from tdas/kinesis-test-hotfix and squashes the following commits:

7c372e6 [Tathagata Das] Fixed test
  • Loading branch information
tdas committed Jul 30, 2015
1 parent 04c8409 commit 1afdeb7
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ private class KinesisTestUtils(

@volatile
private var streamCreated = false

@volatile
private var _streamName: String = _

private lazy val kinesisClient = {
Expand Down Expand Up @@ -115,21 +117,9 @@ private class KinesisTestUtils(
shardIdToSeqNumbers.toMap
}

def describeStream(streamNameToDescribe: String = streamName): Option[StreamDescription] = {
try {
val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
Some(desc)
} catch {
case rnfe: ResourceNotFoundException =>
None
}
}

def deleteStream(): Unit = {
try {
if (describeStream().nonEmpty) {
val deleteStreamRequest = new DeleteStreamRequest()
if (streamCreated) {
kinesisClient.deleteStream(streamName)
}
} catch {
Expand All @@ -149,6 +139,17 @@ private class KinesisTestUtils(
}
}

private def describeStream(streamNameToDescribe: String): Option[StreamDescription] = {
try {
val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
Some(desc)
} catch {
case rnfe: ResourceNotFoundException =>
None
}
}

private def findNonExistentStreamName(): String = {
var testStreamName: String = null
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ class KinesisStreamSuite extends KinesisFunSuite
try {
kinesisTestUtils.createStream()
ssc = new StreamingContext(sc, Seconds(1))
val aWSCredentials = KinesisTestUtils.getAWSCredentials()
val awsCredentials = KinesisTestUtils.getAWSCredentials()
val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName,
kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY,
aWSCredentials.getAWSAccessKeyId, aWSCredentials.getAWSSecretKey)
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)

val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
Expand Down

0 comments on commit 1afdeb7

Please sign in to comment.