Skip to content

Commit

Permalink
FutureAction result tests
Browse files Browse the repository at this point in the history
  • Loading branch information
markhamstra committed Dec 6, 2013
1 parent aebb123 commit ee888f6
Showing 1 changed file with 26 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.rdd

import java.util.concurrent.Semaphore

import scala.concurrent.{Await, TimeoutException}
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global

import org.scalatest.{BeforeAndAfterAll, FunSuite}
Expand Down Expand Up @@ -173,4 +175,28 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll with Timeouts
sem.acquire(2)
}
}

/**
* Awaiting FutureAction results
*/
test("FutureAction result, infinite wait") {
val f = sc.parallelize(1 to 100, 4)
.countAsync()
assert(Await.result(f, Duration.Inf) === 100)
}

test("FutureAction result, finite wait") {
val f = sc.parallelize(1 to 100, 4)
.countAsync()
assert(Await.result(f, Duration(30, "seconds")) === 100)
}

test("FutureAction result, timeout") {
val f = sc.parallelize(1 to 100, 4)
.mapPartitions(itr => { Thread.sleep(20); itr })
.countAsync()
intercept[TimeoutException] {
Await.result(f, Duration(20, "milliseconds"))
}
}
}

0 comments on commit ee888f6

Please sign in to comment.