Skip to content

Commit

Permalink
[SPARK-27948][CORE][TEST] Use ResourceName to represent resource names
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Use objects in `ResourceName` to represent resource names.

## How was this patch tested?

Existing tests.

Closes apache#24799 from jiangxb1987/ResourceName.

Authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
jiangxb1987 authored and dongjoon-hyun committed Jun 5, 2019
1 parent ac808e2 commit fcb3fb0
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 48 deletions.
41 changes: 21 additions & 20 deletions core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.EnumSet

import com.google.common.io.Files

import org.apache.spark.ResourceName._
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils

Expand All @@ -43,7 +44,7 @@ class ResourceDiscovererSuite extends SparkFunSuite
val resources =
ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_EXECUTOR_RESOURCE_PREFIX)
assert(resources.size === 0)
assert(resources.get("gpu").isEmpty,
assert(resources.get(GPU).isEmpty,
"Should have a gpus entry that is empty")
}

Expand All @@ -54,11 +55,11 @@ class ResourceDiscovererSuite extends SparkFunSuite
val gpuFile = new File(dir, "gpuDiscoverScript")
val scriptPath = mockDiscoveryScript(gpuFile,
"""'{"name": "gpu","addresses":["0", "1"]}'""")
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
val resources =
ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_EXECUTOR_RESOURCE_PREFIX)
val gpuValue = resources.get("gpu")
val gpuValue = resources.get(GPU)
assert(gpuValue.nonEmpty, "Should have a gpu entry")
assert(gpuValue.get.name == "gpu", "name should be gpu")
assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
Expand All @@ -76,16 +77,16 @@ class ResourceDiscovererSuite extends SparkFunSuite
val fpgaFile = new File(dir, "fpgaDiscoverScript")
val fpgaScript = mockDiscoveryScript(fpgaFile,
"""'{"name": "fpga","addresses":["f0", "f1"]}'""")
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuScript)
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" +
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, fpgaScript)
// it should only look at the resources passed in and ignore fpga conf
val resources =
ResourceDiscoverer.discoverResourcesInformation(sparkconf,
SPARK_EXECUTOR_RESOURCE_PREFIX, Some(Set("gpu")))
SPARK_EXECUTOR_RESOURCE_PREFIX, Some(Set(GPU)))
assert(resources.size === 1, "should only have the gpu resource")
val gpuValue = resources.get("gpu")
val gpuValue = resources.get(GPU)
assert(gpuValue.nonEmpty, "Should have a gpu entry")
assert(gpuValue.get.name == "gpu", "name should be gpu")
assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
Expand All @@ -100,11 +101,11 @@ class ResourceDiscovererSuite extends SparkFunSuite
val gpuFile = new File(dir, "gpuDiscoverScript")
val scriptPath = mockDiscoveryScript(gpuFile,
"""'{"name": "gpu"}'""")
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
val resources =
ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_EXECUTOR_RESOURCE_PREFIX)
val gpuValue = resources.get("gpu")
val gpuValue = resources.get(GPU)
assert(gpuValue.nonEmpty, "Should have a gpu entry")
assert(gpuValue.get.name == "gpu", "name should be gpu")
assert(gpuValue.get.addresses.size == 0, "Should have 0 indexes")
Expand All @@ -118,25 +119,25 @@ class ResourceDiscovererSuite extends SparkFunSuite
val gpuFile = new File(dir, "gpuDiscoverScript")
val gpuDiscovery = mockDiscoveryScript(gpuFile,
"""'{"name": "gpu", "addresses": ["0", "1"]}'""")
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery)

val fpgaFile = new File(dir, "fpgaDiscoverScript")
val fpgaDiscovery = mockDiscoveryScript(fpgaFile,
"""'{"name": "fpga", "addresses": ["f1", "f2", "f3"]}'""")
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" +
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, fpgaDiscovery)

val resources =
ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_EXECUTOR_RESOURCE_PREFIX)
assert(resources.size === 2)
val gpuValue = resources.get("gpu")
val gpuValue = resources.get(GPU)
assert(gpuValue.nonEmpty, "Should have a gpu entry")
assert(gpuValue.get.name == "gpu", "name should be gpu")
assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries")

val fpgaValue = resources.get("fpga")
val fpgaValue = resources.get(FPGA)
assert(fpgaValue.nonEmpty, "Should have a gpu entry")
assert(fpgaValue.get.name == "fpga", "name should be fpga")
assert(fpgaValue.get.addresses.size == 3, "Should have 3 indexes")
Expand All @@ -152,14 +153,14 @@ class ResourceDiscovererSuite extends SparkFunSuite
val gpuFile = new File(dir, "gpuDiscoverScript")
val gpuDiscovery = mockDiscoveryScript(gpuFile,
"""'{"name": "gpu", "addresses": ["0", "1"]}'""")
sparkconf.set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
sparkconf.set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery)
sparkconf set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
sparkconf set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, "boguspath")
// make sure it reads from correct config, here it should use driver
val resources =
ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_DRIVER_RESOURCE_PREFIX)
val gpuValue = resources.get("gpu")
val gpuValue = resources.get(GPU)
assert(gpuValue.nonEmpty, "Should have a gpu entry")
assert(gpuValue.get.name == "gpu", "name should be gpu")
assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
Expand All @@ -174,7 +175,7 @@ class ResourceDiscovererSuite extends SparkFunSuite
val gpuFile = new File(dir, "gpuDiscoverScript")
val gpuDiscovery = mockDiscoveryScript(gpuFile,
"""'{"name": "fpga", "addresses": ["0", "1"]}'""")
sparkconf.set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
sparkconf.set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery)
val error = intercept[SparkException] {
ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_DRIVER_RESOURCE_PREFIX)
Expand All @@ -191,7 +192,7 @@ class ResourceDiscovererSuite extends SparkFunSuite
val gpuFile = new File(dir, "gpuDiscoverScript")
val gpuDiscovery = mockDiscoveryScript(gpuFile,
"""'{"addresses": ["0", "1"]}'""")
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery)
val error = intercept[SparkException] {
ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_EXECUTOR_RESOURCE_PREFIX)
Expand All @@ -206,7 +207,7 @@ class ResourceDiscovererSuite extends SparkFunSuite
withTempDir { dir =>
val file1 = new File(dir, "bogusfilepath")
try {
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, file1.getPath())

val error = intercept[SparkException] {
Expand All @@ -222,7 +223,7 @@ class ResourceDiscovererSuite extends SparkFunSuite

test("gpu's specified but not discovery script") {
val sparkconf = new SparkConf
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_COUNT_SUFFIX, "2")

val error = intercept[SparkException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.executor


import java.io.File
import java.net.URL
import java.nio.ByteBuffer
Expand All @@ -39,7 +38,7 @@ import org.scalatest.mockito.MockitoSugar

import org.apache.spark._
import org.apache.spark.ResourceInformation
import org.apache.spark.ResourceName.GPU
import org.apache.spark.ResourceName._
import org.apache.spark.internal.config._
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.TaskDescription
Expand All @@ -58,7 +57,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite

test("parsing no resources") {
val conf = new SparkConf
conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)

Expand All @@ -80,8 +79,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite

test("parsing one resources") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
Expand All @@ -96,18 +95,18 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val parsedResources = backend.parseOrFindResources(Some(f1))

assert(parsedResources.size === 1)
assert(parsedResources.get("gpu").nonEmpty)
assert(parsedResources.get("gpu").get.name === "gpu")
assert(parsedResources.get("gpu").get.addresses.deep === Array("0", "1").deep)
assert(parsedResources.get(GPU).nonEmpty)
assert(parsedResources.get(GPU).get.name === "gpu")
assert(parsedResources.get(GPU).get.addresses.deep === Array("0", "1").deep)
}
}

test("parsing multiple resources") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_SUFFIX, "3")
conf.set(SPARK_TASK_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_SUFFIX, "3")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "3")
conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "3")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
Expand All @@ -126,19 +125,19 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val parsedResources = backend.parseOrFindResources(Some(f1))

assert(parsedResources.size === 2)
assert(parsedResources.get("gpu").nonEmpty)
assert(parsedResources.get("gpu").get.name === "gpu")
assert(parsedResources.get("gpu").get.addresses.deep === Array("0", "1").deep)
assert(parsedResources.get("fpga").nonEmpty)
assert(parsedResources.get("fpga").get.name === "fpga")
assert(parsedResources.get("fpga").get.addresses.deep === Array("f1", "f2", "f3").deep)
assert(parsedResources.get(GPU).nonEmpty)
assert(parsedResources.get(GPU).get.name === "gpu")
assert(parsedResources.get(GPU).get.addresses.deep === Array("0", "1").deep)
assert(parsedResources.get(FPGA).nonEmpty)
assert(parsedResources.get(FPGA).get.name === "fpga")
assert(parsedResources.get(FPGA).get.addresses.deep === Array("f1", "f2", "f3").deep)
}
}

test("error checking parsing resources and executor and task configs") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
Expand Down Expand Up @@ -179,8 +178,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite

test("executor resource found less than required") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, "4")
conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, "1")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "4")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "1")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
Expand All @@ -205,16 +204,16 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite

test("use discoverer") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_SUFFIX, "3")
conf.set(SPARK_TASK_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_SUFFIX, "3")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "3")
conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "3")
assume(!(Utils.isWindows))
withTempDir { dir =>
val fpgaDiscovery = new File(dir, "resourceDiscoverScriptfpga")
Files.write("""echo '{"name": "fpga","addresses":["f1", "f2", "f3"]}'""",
fpgaDiscovery, StandardCharsets.UTF_8)
JavaFiles.setPosixFilePermissions(fpgaDiscovery.toPath(),
EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" +
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, fpgaDiscovery.getPath())

val serializer = new JavaSerializer(conf)
Expand All @@ -227,9 +226,9 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val parsedResources = backend.parseOrFindResources(None)

assert(parsedResources.size === 1)
assert(parsedResources.get("fpga").nonEmpty)
assert(parsedResources.get("fpga").get.name === "fpga")
assert(parsedResources.get("fpga").get.addresses.deep === Array("f1", "f2", "f3").deep)
assert(parsedResources.get(FPGA).nonEmpty)
assert(parsedResources.get(FPGA).get.name === "fpga")
assert(parsedResources.get(FPGA).get.addresses.deep === Array("f1", "f2", "f3").deep)
}
}

Expand Down

0 comments on commit fcb3fb0

Please sign in to comment.