Skip to content

Commit

Permalink
[SPARK-9244] Increase some memory defaults
Browse files Browse the repository at this point in the history
There are a few memory limits that people hit often and that we could
make higher, especially now that memory sizes have grown.

- spark.akka.frameSize: This defaults at 10 but is often hit for map
  output statuses in large shuffles. This memory is not fully allocated
  up-front, so we can just make this larger and still not affect jobs
  that never sent a status that large. We increase it to 128.

- spark.executor.memory: Defaults at 512m, which is really small. We
  increase it to 1g.

Author: Matei Zaharia <[email protected]>

Closes apache#7586 from mateiz/configs and squashes the following commits:

ce0038a [Matei Zaharia] [SPARK-9244] Increase some memory defaults
  • Loading branch information
mateiz committed Jul 22, 2015
1 parent 1aca9c1 commit fe26584
Show file tree
Hide file tree
Showing 27 changed files with 78 additions and 80 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(512)
.getOrElse(1024)

// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private[spark] object AkkaUtils extends Logging {

/** Returns the configured max frame size for Akka messages in bytes. */
def maxFrameSizeBytes(conf: SparkConf): Int = {
val frameSizeInMB = conf.getInt("spark.akka.frameSize", 10)
val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128)
if (frameSizeInMB > AKKA_MAX_FRAME_SIZE_IN_MB) {
throw new IllegalArgumentException(
s"spark.akka.frameSize should not be greater than $AKKA_MAX_FRAME_SIZE_IN_MB MB")
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -1783,7 +1783,7 @@ public void testGuavaOptional() {
// Stop the context created in setUp() and start a local-cluster one, to force usage of the
// assembly.
sc.stop();
JavaSparkContext localCluster = new JavaSparkContext("local-cluster[1,1,512]", "JavaAPISuite");
JavaSparkContext localCluster = new JavaSparkContext("local-cluster[1,1,1024]", "JavaAPISuite");
try {
JavaRDD<Integer> rdd1 = localCluster.parallelize(Arrays.asList(1, 2, null), 3);
JavaRDD<Optional<Integer>> rdd2 = rdd1.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
sc.stop()

val conf2 = new SparkConf()
.setMaster("local-cluster[2, 1, 512]")
.setMaster("local-cluster[2, 1, 1024]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
.set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
Expand Down Expand Up @@ -370,7 +370,7 @@ class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[Sor
sc.stop()

val conf2 = new SparkConf()
.setMaster("local-cluster[2, 1, 512]")
.setMaster("local-cluster[2, 1, 1024]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
.set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
Expand Down
16 changes: 8 additions & 8 deletions core/src/test/scala/org/apache/spark/DistributedSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {

class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContext {

val clusterUrl = "local-cluster[2,1,512]"
val clusterUrl = "local-cluster[2,1,1024]"

test("task throws not serializable exception") {
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
Expand All @@ -40,7 +40,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
val numSlaves = 3
val numPartitions = 10

sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test")
sc = new SparkContext("local-cluster[%s,1,1024]".format(numSlaves), "test")
val data = sc.parallelize(1 to 100, numPartitions).
map(x => throw new NotSerializableExn(new NotSerializableClass))
intercept[SparkException] {
Expand All @@ -50,16 +50,16 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
}

test("local-cluster format") {
sc = new SparkContext("local-cluster[2,1,512]", "test")
sc = new SparkContext("local-cluster[2,1,1024]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
resetSparkContext()
sc = new SparkContext("local-cluster[2 , 1 , 512]", "test")
sc = new SparkContext("local-cluster[2 , 1 , 1024]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
resetSparkContext()
sc = new SparkContext("local-cluster[2, 1, 512]", "test")
sc = new SparkContext("local-cluster[2, 1, 1024]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
resetSparkContext()
sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test")
sc = new SparkContext("local-cluster[ 2, 1, 1024 ]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
resetSparkContext()
}
Expand Down Expand Up @@ -276,7 +276,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
DistributedSuite.amMaster = true
// Using more than two nodes so we don't have a symmetric communication pattern and might
// cache a partially correct list of peers.
sc = new SparkContext("local-cluster[3,1,512]", "test")
sc = new SparkContext("local-cluster[3,1,1024]", "test")
for (i <- 1 to 3) {
val data = sc.parallelize(Seq(true, false, false, false), 4)
data.persist(StorageLevel.MEMORY_ONLY_2)
Expand All @@ -294,7 +294,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex

test("unpersist RDDs") {
DistributedSuite.amMaster = true
sc = new SparkContext("local-cluster[3,1,512]", "test")
sc = new SparkContext("local-cluster[3,1,1024]", "test")
val data = sc.parallelize(Seq(true, false, false, false), 4)
data.persist(StorageLevel.MEMORY_ONLY_2)
data.count
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/DriverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DriverSuite extends SparkFunSuite with Timeouts {

ignore("driver should exit after finishing without cleanup (SPARK-530)") {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val masters = Table("master", "local", "local-cluster[2,1,512]")
val masters = Table("master", "local", "local-cluster[2,1,1024]")
forAll(masters) { (master: String) =>
val process = Utils.executeCommand(
Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {

// This test ensures that the external shuffle service is actually in use for the other tests.
test("using external shuffle service") {
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient])

Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/FileServerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class FileServerSuite extends SparkFunSuite with LocalSparkContext {
}

test("Distributing files on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
sc.addFile(tmpFile.toString)
val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
val result = sc.parallelize(testData).reduceByKey {
Expand All @@ -153,7 +153,7 @@ class FileServerSuite extends SparkFunSuite with LocalSparkContext {
}

test ("Dynamically adding JARS on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
sc.addJar(tmpJarUrl)
val testData = Array((1, 1))
sc.parallelize(testData).foreach { x =>
Expand All @@ -164,7 +164,7 @@ class FileServerSuite extends SparkFunSuite with LocalSparkContext {
}

test ("Dynamically adding JARS on a standalone cluster using local: URL") {
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
sc.addJar(tmpJarUrl.replace("file", "local"))
val testData = Array((1, 1))
sc.parallelize(testData).foreach { x =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft

test("cluster mode, FIFO scheduler") {
val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
Expand All @@ -75,7 +75,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
conf.set("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
Expand Down
20 changes: 10 additions & 10 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}

test("shuffle non-zero block size") {
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val NUM_BLOCKS = 3

val a = sc.parallelize(1 to 10, 2)
Expand All @@ -73,7 +73,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC

test("shuffle serializer") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
(x, new NonJavaSerializableClass(x * 2))
Expand All @@ -89,7 +89,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC

test("zero sized blocks") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)

// 201 partitions (greater than "spark.shuffle.sort.bypassMergeThreshold") from 4 keys
val NUM_BLOCKS = 201
Expand All @@ -116,7 +116,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC

test("zero sized blocks without kryo") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)

// 201 partitions (greater than "spark.shuffle.sort.bypassMergeThreshold") from 4 keys
val NUM_BLOCKS = 201
Expand All @@ -141,7 +141,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC

test("shuffle on mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
Expand All @@ -154,7 +154,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
test("sorting on mutable pairs") {
// This is not in SortingSuite because of the local cluster setup.
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22))
val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
Expand All @@ -168,7 +168,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC

test("cogroup using mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
Expand All @@ -195,7 +195,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC

test("subtract mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1), p(3, 33))
val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"))
Expand All @@ -210,7 +210,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
test("sort with Java non serializable class - Kryo") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sc = new SparkContext("local-cluster[2,1,512]", "test", myConf)
sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf)
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
(new NonJavaSerializableClass(x), x)
Expand All @@ -223,7 +223,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC

test("sort with Java non serializable class - Java") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
(new NonJavaSerializableClass(x), x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class SparkContextSchedulerCreationSuite
}

test("local-cluster") {
createTaskScheduler("local-cluster[3, 14, 512]").backend match {
createTaskScheduler("local-cluster[3, 14, 1024]").backend match {
case s: SparkDeploySchedulerBackend => // OK
case _ => fail()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
val conf = httpConf.clone
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.broadcast.compress", "true")
sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf)
sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf)
val list = List[Int](1, 2, 3, 4)
val broadcast = sc.broadcast(list)
val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))
Expand Down Expand Up @@ -97,7 +97,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
val conf = torrentConf.clone
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.broadcast.compress", "true")
sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf)
sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf)
val list = List[Int](1, 2, 3, 4)
val broadcast = sc.broadcast(list)
val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))
Expand Down Expand Up @@ -125,7 +125,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
test("Test Lazy Broadcast variables with TorrentBroadcast") {
val numSlaves = 2
val conf = torrentConf.clone
sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf)
sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf)
val rdd = sc.parallelize(1 to numSlaves)

val results = new DummyBroadcastClass(rdd).doSomething()
Expand Down Expand Up @@ -308,7 +308,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {

sc = if (distributed) {
val _sc =
new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", broadcastConf)
// Wait until all salves are up
_sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000)
_sc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
private val WAIT_TIMEOUT_MILLIS = 10000

test("verify that correct log urls get propagated from workers") {
sc = new SparkContext("local-cluster[2,1,512]", "test")
sc = new SparkContext("local-cluster[2,1,1024]", "test")

val listener = new SaveExecutorInfo
sc.addSparkListener(listener)
Expand Down Expand Up @@ -66,7 +66,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
}
val conf = new MySparkConf().set(
"spark.extraListeners", classOf[SaveExecutorInfo].getName)
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)

// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ class SparkSubmitSuite
val args = Seq(
"--class", JarCreationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local-cluster[2,1,512]",
"--master", "local-cluster[2,1,1024]",
"--jars", jarsString,
unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
runSparkSubmit(args)
Expand All @@ -352,7 +352,7 @@ class SparkSubmitSuite
val args = Seq(
"--class", JarCreationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local-cluster[2,1,512]",
"--master", "local-cluster[2,1,1024]",
"--packages", Seq(main, dep).mkString(","),
"--repositories", repo,
"--conf", "spark.ui.enabled=false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
val conf = new SparkConf
conf.set("spark.akka.frameSize", "1")
conf.set("spark.default.parallelism", "1")
sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf)
sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
val larger = sc.parallelize(Seq(buffer))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
// into SPARK-6688.
val conf = getLoggingConf(testDirPath, compressionCodec)
.set("spark.hadoop.fs.defaultFS", "unsupported://example.com")
val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
val sc = new SparkContext("local-cluster[2,2,1024]", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val eventLogPath = eventLogger.logPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter {
fileSystem.mkdirs(logDirPath)

val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName)
val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf)
val sc = new SparkContext("local-cluster[2,1,1024]", "Test replay", conf)

// Run a few jobs
sc.parallelize(1 to 100, 1).count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext
val WAIT_TIMEOUT_MILLIS = 10000

before {
sc = new SparkContext("local-cluster[2,1,512]", "SparkListenerSuite")
sc = new SparkContext("local-cluster[2,1,1024]", "SparkListenerSuite")
}

test("SparkListener sends executor added message") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite {
val jar = TestUtils.createJarWithClasses(List(AppJarRegistrator.customClassName))
conf.setJars(List(jar.getPath))

val sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
val sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val original = Thread.currentThread.getContextClassLoader
val loader = new java.net.URLClassLoader(Array(jar), Utils.getContextOrSparkClassLoader)
SparkEnv.get.serializer.setDefaultClassLoader(loader)
Expand Down
Loading

0 comments on commit fe26584

Please sign in to comment.