diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 58e49c999a679..a8815dcca6027 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -132,7 +132,7 @@ class ClientSuite extends SparkFunSuite with Matchers { .set("spark.yarn.dist.jars", ADDED) val client = createClient(sparkConf, args = Array("--jar", USER)) doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), - any(classOf[Path]), any(), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) + any(classOf[Path]), meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) val tempDir = Utils.createTempDir() try { @@ -308,12 +308,12 @@ class ClientSuite extends SparkFunSuite with Matchers { assert(sparkConf.get(SPARK_JARS) === Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*"))) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), any(), - any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), any(), - any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), any(), - any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), + meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), + meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), + meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) val cp = classpath(client) cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) @@ -330,7 +330,7 @@ class ClientSuite extends SparkFunSuite with Matchers { val client = createClient(sparkConf) client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), any(), + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) @@ -340,6 +340,25 @@ class ClientSuite extends SparkFunSuite with Matchers { } } + test("SPARK-37239: distribute jars archive with set STAGING_FILE_REPLICATION") { + val temp = Utils.createTempDir() + val archive = TestUtils.createJarWithFiles(Map(), temp) + val replication = 5 + + val sparkConf = new SparkConf() + .set(SPARK_ARCHIVE, archive.getPath()) + .set(STAGING_FILE_REPLICATION, replication) + val client = createClient(sparkConf) + client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) + + // It is difficult to assert the result of `setReplication` in UT because this method in + // `RawLocalFileSystem` always return true and not change the value of `replication`. + // So we can only assert the call of `client.copyFileToRemote` has passed in a non `None`. + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), + meq(Some(replication.toShort)), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) + classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) + } + test("distribute archive multiple times") { val libs = Utils.createTempDir() // Create jars dir and RELEASE file to avoid IllegalStateException.