Skip to content

Commit

Permalink
fix tests for spark context initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Burns committed Nov 2, 2015
1 parent 612a8ef commit cc46b63
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,11 @@ class BrokerBridgeSpec extends FunSpec with Matchers with OneInstancePerTest
{
private val mockBrokerState = mock[BrokerState]
private val mockKernel = mock[KernelLike]
private val mockSparkConf = mock[SparkConf]
private val mockSparkContext = mock[SparkContext]

private val mockJavaSparkContext = mock[JavaSparkContext]
doReturn(mockSparkContext).when(mockJavaSparkContext).sc
private val mockSqlContext = mock[SQLContext]
doReturn(mockSparkContext).when(mockSqlContext).sparkContext

// A new SQLContext is created per request, meaning this needs mocking
doReturn(mockSparkConf).when(mockSparkContext).getConf
doReturn(Array[(String, String)]()).when(mockSparkConf).getAll

private val brokerBridge = new BrokerBridge(
mockBrokerState,
mockKernel,
mockSparkContext
) with JavaSparkContextProducerLike with SQLContextProducerLike {
override def newJavaSparkContext(sparkContext: SparkContext): JavaSparkContext = mockJavaSparkContext
override def newSQLContext(sparkContext: SparkContext): SQLContext = mockSqlContext
}
mockKernel
)

describe("BrokerBridge") {
describe("#state") {
Expand All @@ -57,28 +42,10 @@ class BrokerBridgeSpec extends FunSpec with Matchers with OneInstancePerTest
}
}

describe("#javaSparkContext") {
it("should return a JavaSparkContext wrapping the SparkContext") {
brokerBridge.javaSparkContext.sc should be (mockSparkContext)
}
}

describe("#sqlContext") {
it("should return a SQLContext wrapping the SparkContext") {
brokerBridge.sqlContext.sparkContext should be (mockSparkContext)
}
}

describe("#kernel") {
it("should return the kernel from the constructor") {
brokerBridge.kernel should be (mockKernel)
}
}

describe("#sparkConf") {
it("should return the configuration from the SparkContext") {
brokerBridge.sparkConf should be (mockSparkConf)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,19 @@ trait StandardComponentInitialization extends ComponentInitialization {
interpreter
}

if(config.getString("sparkcontext") != "no") {
kernel.createSparkContext(config.getString("spark.master"), appName)
}
initializeSparkContext(config, kernel, appName)

(commStorage, commRegistrar, commManager, defaultInterpreter, kernel,
dependencyDownloader, magicLoader, responseMap)

}

def initializeSparkContext(config:Config, kernel:Kernel, appName:String) = {
if(config.getString("sparkcontext") != "no") {
kernel.createSparkContext(config.getString("spark.master"), appName)
}
}

private def initializeCommObjects(actorLoader: ActorLoader) = {
logger.debug("Constructing Comm storage")
val commStorage = new CommStorage()
Expand Down
19 changes: 8 additions & 11 deletions kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,13 @@ class Kernel (
}

override def createSparkContext(conf: SparkConf): SparkContext = {
val t = initializeSparkContext(conf)
_sparkContext = t._1
_sparkConf = t._2
_sparkConf = createSparkConf(conf)
_sparkContext = initializeSparkContext(sparkConf);
_javaSparkContext = new JavaSparkContext(_sparkContext)
_sqlContext = new SQLContext(_sparkContext)

updateInterpreterWithSparkContext(sparkContext)

magicLoader.dependencyMap =
magicLoader.dependencyMap.setSparkContext(_sparkContext)

Expand All @@ -334,7 +335,7 @@ class Kernel (
}

// TODO: Think of a better way to test without exposing this
private def initializeSparkContext(conf: SparkConf) = {
protected[kernel] def createSparkConf(conf: SparkConf) = {

logger.info("Setting deployMode to client")
conf.set("spark.submit.deployMode", "client")
Expand All @@ -354,15 +355,11 @@ class Kernel (
logger.info("REPL Class Server Uri: " + interpreter.classServerURI)
conf.set("spark.repl.class.uri", interpreter.classServerURI)

val sparkContext = reallyInitializeSparkContext(conf);

updateInterpreterWithSparkContext(sparkContext)

(sparkContext, conf)
conf
}

// TODO: Think of a better way to test without exposing this
private def reallyInitializeSparkContext(sparkConf: SparkConf): SparkContext = {
protected[kernel] def initializeSparkContext(sparkConf: SparkConf): SparkContext = {

logger.debug("Constructing new Spark Context")
// TODO: Inject stream redirect headers in Spark dynamically
Expand All @@ -383,7 +380,7 @@ class Kernel (
}

// TODO: Think of a better way to test without exposing this
private def updateInterpreterWithSparkContext(
protected[kernel] def updateInterpreterWithSparkContext(
sparkContext: SparkContext
) = {
interpreter.doQuietly {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class StandardComponentInitializationSpec extends FunSpec with Matchers
spyComponentInitialization = spy(new TestComponentInitialization())
}

/*
describe("StandardComponentInitialization") {
describe("when spark.master is set in config") {
it("should set spark.master in SparkConf") {
Expand Down Expand Up @@ -109,4 +110,5 @@ class StandardComponentInitializationSpec extends FunSpec with Matchers
}
}
}
*/
}
53 changes: 52 additions & 1 deletion kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import com.ibm.spark.kernel.protocol.v5._
import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
import com.ibm.spark.magic.MagicLoader
import com.typesafe.config.Config
import org.apache.spark.{SparkConf, SparkContext}
import org.mockito.ArgumentCaptor
import org.mockito.Mockito._
import org.mockito.Matchers._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
import com.ibm.spark.global.ExecuteRequestState
Expand All @@ -24,15 +27,20 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
"StackTrace: 1"

private var mockConfig: Config = _
private var mockSparkContext: SparkContext = _
private var mockSparkConf: SparkConf = _
private var mockActorLoader: ActorLoader = _
private var mockInterpreter: Interpreter = _
private var mockCommManager: CommManager = _
private var mockMagicLoader: MagicLoader = _
private var kernel: KernelLike = _
private var kernel: Kernel = _
private var spyKernel: Kernel = _

before {
mockConfig = mock[Config]
mockInterpreter = mock[Interpreter]
mockSparkContext = mock[SparkContext]
mockSparkConf = mock[SparkConf]
when(mockInterpreter.interpret(BadCode.get))
.thenReturn((Results.Incomplete, null))
when(mockInterpreter.interpret(GoodCode.get))
Expand All @@ -48,6 +56,9 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
mockConfig, mockActorLoader, mockInterpreter, mockCommManager,
mockMagicLoader
)

spyKernel = spy(kernel)

}

after {
Expand Down Expand Up @@ -140,5 +151,45 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
kernel.stream shouldBe a [StreamMethods]
}
}

describe("when spark.master is set in config") {

it("should create SparkConf") {
val expected = "some value"
doReturn(expected).when(mockConfig).getString("spark.master")
doReturn("").when(mockConfig).getString("spark_configuration")

// Provide stub for interpreter classServerURI since also executed
doReturn("").when(mockInterpreter).classServerURI

val sparkConf = kernel.createSparkConf(new SparkConf().setMaster(expected))

sparkConf.get("spark.master") should be (expected)
}

it("should not add ourselves as a jar if spark.master is not local") {
val sparkConf = new SparkConf().setMaster("local[*]")
doReturn("local[*]").when(mockConfig).getString("spark.master")
doReturn(sparkConf).when(mockSparkContext).getConf

kernel.updateInterpreterWithSparkContext(mockSparkContext)
verify(mockSparkContext, never()).addJar(anyString())
}

it("should add ourselves as a jar if spark.master is not local") {
val sparkConf = new SparkConf().setMaster("foo://bar")
doReturn("notlocal").when(mockConfig).getString("spark.master")
doReturn(sparkConf).when(mockSparkContext).getConf

// TODO: This is going to be outdated when we determine a way to
// re-include all jars
val expected =
com.ibm.spark.SparkKernel.getClass.getProtectionDomain
.getCodeSource.getLocation.getPath

kernel.updateInterpreterWithSparkContext(mockSparkContext)
verify(mockSparkContext).addJar(expected)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ class InterpreterActorSpecForIntegration extends TestKit(
interpreter.doQuietly({
conf.set("spark.repl.class.uri", interpreter.classServerURI)
//context = new SparkContext(conf) with NoSparkLogging
context = SparkContextProvider.sparkContext
interpreter.bind(
"sc", "org.apache.spark.SparkContext",
context, List( """@transient"""))
//context = SparkContextProvider.sparkContext
//interpreter.bind(
// "sc", "org.apache.spark.SparkContext",
// context, List( """@transient"""))
})
}

Expand Down
6 changes: 5 additions & 1 deletion kernel/src/test/scala/test/utils/SparkKernelDeployer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.OutputStream
import akka.actor.{Actor, Props, ActorRef, ActorSystem}
import akka.testkit.TestProbe
import com.ibm.spark.boot.{CommandLineOptions, KernelBootstrap}
import com.ibm.spark.kernel.api.KernelLike
import com.ibm.spark.kernel.interpreter.scala.{StandardTaskManagerProducer, StandardSparkIMainProducer, StandardSettingsProducer, ScalaInterpreter}
import com.ibm.spark.kernel.protocol.v5.{KMBuilder, SocketType}
import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
Expand Down Expand Up @@ -81,7 +82,9 @@ object SparkKernelDeployer extends LogLike with MockitoSugar {
interpreter
}

override protected[layer] def reallyInitializeSparkContext(

/*
def reallyInitializeSparkContext(
config: Config,
actorLoader: ActorLoader,
kmBuilder: KMBuilder,
Expand All @@ -99,6 +102,7 @@ object SparkKernelDeployer extends LogLike with MockitoSugar {
sparkContext
}
*/

}

Expand Down

0 comments on commit cc46b63

Please sign in to comment.