Skip to content

Commit

Permalink
[SPARK-22322][CORE] Update FutureAction for compatibility with Scala …
Browse files Browse the repository at this point in the history
…2.12 Future

## What changes were proposed in this pull request?

Scala 2.12's `Future` defines two new methods to implement, `transform` and `transformWith`. These can be implemented naturally in Spark's `FutureAction` extension and subclasses, but, only in terms of the new methods that don't exist in Scala 2.11. To support both at the same time, reflection is used to implement these.

## How was this patch tested?

Existing tests.

Author: Sean Owen <[email protected]>

Closes apache#19561 from srowen/SPARK-22322.
  • Loading branch information
srowen committed Oct 25, 2017
1 parent 427359f commit 6c69508
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 6 deletions.
59 changes: 58 additions & 1 deletion core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ trait FutureAction[T] extends Future[T] {
*/
override def value: Option[Try[T]]

// These two methods must be implemented in Scala 2.12, but won't be used by Spark
// These two methods must be implemented in Scala 2.12. They're implemented as a no-op here
// and then filled in with a real implementation in the two subclasses below. The no-op exists
// here so that those implementations can declare "override", necessary in 2.12, while working
// in 2.11, where the method doesn't exist in the superclass.
// After 2.11 support goes away, remove these two:

def transform[S](f: (Try[T]) => Try[S])(implicit executor: ExecutionContext): Future[S] =
throw new UnsupportedOperationException()
Expand All @@ -113,6 +117,42 @@ trait FutureAction[T] extends Future[T] {

}

/**
* Scala 2.12 defines the two new transform/transformWith methods mentioned above. Impementing
* these for 2.12 in the Spark class here requires delegating to these same methods in an
* underlying Future object. But that only exists in 2.12. But these methods are only called
* in 2.12. So define helper shims to access these methods on a Future by reflection.
*/
private[spark] object FutureAction {

private val transformTryMethod =
try {
classOf[Future[_]].getMethod("transform", classOf[(_) => _], classOf[ExecutionContext])
} catch {
case _: NoSuchMethodException => null // Would fail later in 2.11, but not called in 2.11
}

private val transformWithTryMethod =
try {
classOf[Future[_]].getMethod("transformWith", classOf[(_) => _], classOf[ExecutionContext])
} catch {
case _: NoSuchMethodException => null // Would fail later in 2.11, but not called in 2.11
}

private[spark] def transform[T, S](
future: Future[T],
f: (Try[T]) => Try[S],
executor: ExecutionContext): Future[S] =
transformTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]]

private[spark] def transformWith[T, S](
future: Future[T],
f: (Try[T]) => Future[S],
executor: ExecutionContext): Future[S] =
transformWithTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]]

}


/**
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
Expand Down Expand Up @@ -153,6 +193,18 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
jobWaiter.completionFuture.value.map {res => res.map(_ => resultFunc)}

def jobIds: Seq[Int] = Seq(jobWaiter.jobId)

override def transform[S](f: (Try[T]) => Try[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transform(
jobWaiter.completionFuture,
(u: Try[Unit]) => f(u.map(_ => resultFunc)),
e)

override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transformWith(
jobWaiter.completionFuture,
(u: Try[Unit]) => f(u.map(_ => resultFunc)),
e)
}


Expand Down Expand Up @@ -246,6 +298,11 @@ class ComplexFutureAction[T](run : JobSubmitter => Future[T])

def jobIds: Seq[Int] = subActions.flatMap(_.jobIds)

override def transform[S](f: (Try[T]) => Try[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transform(p.future, f, e)

override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transformWith(p.future, f, e)
}


Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2692,7 +2692,7 @@
<profile>
<id>scala-2.12</id>
<properties>
<scala.version>2.12.3</scala.version>
<scala.version>2.12.4</scala.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.execution.RDDScanExec
import org.apache.spark.sql.execution.streaming.{FlatMapGroupsWithStateExec, GroupStateImpl, MemoryStream}
import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreId, StateStoreMetrics, UnsafeRowPair}
import org.apache.spark.sql.streaming.FlatMapGroupsWithStateSuite.MemoryStateStore
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.types.{DataType, IntegerType}

Expand Down Expand Up @@ -1201,7 +1200,7 @@ object FlatMapGroupsWithStateSuite {
} catch {
case u: UnsupportedOperationException =>
return
case _ =>
case _: Throwable =>
throw new TestFailedException("Unexpected exception when trying to get watermark", 20)
}
throw new TestFailedException("Could get watermark when not expected", 20)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
assert(returnedValue === expectedReturnValue, "Returned value does not match expected")
}
}
AwaitTerminationTester.test(expectedBehavior, awaitTermFunc)
AwaitTerminationTester.test(expectedBehavior, () => awaitTermFunc())
true // If the control reached here, then everything worked as expected
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private[streaming] class FileBasedWriteAheadLog(
def readFile(file: String): Iterator[ByteBuffer] = {
logDebug(s"Creating log reader with $file")
val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _)
CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, () => reader.close())
}
if (!closeFileAfterWrite) {
logFilesToRead.iterator.map(readFile).flatten.asJava
Expand Down

0 comments on commit 6c69508

Please sign in to comment.