Skip to content

Commit

Permalink
[SPARK-37239][YARN][TESTS][FOLLOWUP] Add UT to cover `Client.prepareL…
Browse files Browse the repository at this point in the history
…ocalResources` with custom `STAGING_FILE_REPLICATION`

### What changes were proposed in this pull request?
This pr add a new UT to cover `o.a.s.deploy.yarn.Client.prepareLocalResources` method with custom `STAGING_FILE_REPLICATION` configuration and change other related UTs to verify that the `replication` passed into the `copyFileToRemote` method is `None` explicitly.

### Why are the changes needed?
Add new UT.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass the Jenkins or GitHub Action

Closes apache#34531 from LuciferYang/SPARK-37239-followup.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
LuciferYang authored and dongjoon-hyun committed Nov 9, 2021
1 parent 06175c0 commit f6a044c
Showing 1 changed file with 27 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
.set("spark.yarn.dist.jars", ADDED)
val client = createClient(sparkConf, args = Array("--jar", USER))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
any(classOf[Path]), any(), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
any(classOf[Path]), meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())

val tempDir = Utils.createTempDir()
try {
Expand Down Expand Up @@ -308,12 +308,12 @@ class ClientSuite extends SparkFunSuite with Matchers {
assert(sparkConf.get(SPARK_JARS) ===
Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*")))

verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), any(),
any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), any(),
any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), any(),
any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())),
meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())),
meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())),
meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())

val cp = classpath(client)
cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
Expand All @@ -330,7 +330,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
val client = createClient(sparkConf)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)

verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), any(),
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), meq(None),
any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))

Expand All @@ -340,6 +340,25 @@ class ClientSuite extends SparkFunSuite with Matchers {
}
}

test("SPARK-37239: distribute jars archive with set STAGING_FILE_REPLICATION") {
val temp = Utils.createTempDir()
val archive = TestUtils.createJarWithFiles(Map(), temp)
val replication = 5

val sparkConf = new SparkConf()
.set(SPARK_ARCHIVE, archive.getPath())
.set(STAGING_FILE_REPLICATION, replication)
val client = createClient(sparkConf)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)

// It is difficult to assert the result of `setReplication` in UT because this method in
// `RawLocalFileSystem` always return true and not change the value of `replication`.
// So we can only assert the call of `client.copyFileToRemote` has passed in a non `None`.
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())),
meq(Some(replication.toShort)), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
}

test("distribute archive multiple times") {
val libs = Utils.createTempDir()
// Create jars dir and RELEASE file to avoid IllegalStateException.
Expand Down

0 comments on commit f6a044c

Please sign in to comment.