Skip to content

Commit

Permalink
[SPARK-7853] [SQL] Fixes a class loader issue in Spark SQL
Browse files Browse the repository at this point in the history
This PR is based on PR apache#6396 authored by chenghao-intel. Essentially, Spark SQL should use context classloader to load SerDe classes.

yhuai helped updating the test case, and I fixed a bug in the original `CliSuite`: while testing the CLI tool with `runCliWithin`, we don't append `\n` to the last query, thus the last query is never executed.

Original PR description is pasted below.

----

```
bin/spark-sql --jars ./sql/hive/src/test/resources/hive-hcatalog-core-0.13.1.jar
CREATE TABLE t1(a string, b string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
```

Throws exception like

```
15/05/26 00:16:33 ERROR SparkSQLDriver: Failed in [CREATE TABLE t1(a string, b string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe']
org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Cannot validate serde: org.apache.hive.hcatalog.data.JsonSerDe
        at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$runHive$1.apply(ClientWrapper.scala:333)
        at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$runHive$1.apply(ClientWrapper.scala:310)
        at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:139)
        at org.apache.spark.sql.hive.client.ClientWrapper.runHive(ClientWrapper.scala:310)
        at org.apache.spark.sql.hive.client.ClientWrapper.runSqlHive(ClientWrapper.scala:300)
        at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:457)
        at org.apache.spark.sql.hive.execution.HiveNativeCommand.run(HiveNativeCommand.scala:33)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:922)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:922)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:147)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
        at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:727)
        at org.apache.spark.sql.hive.thriftserver.AbstractSparkSQLDriver.run(AbstractSparkSQLDriver.scala:57)
```

Author: Cheng Hao <[email protected]>
Author: Cheng Lian <[email protected]>
Author: Yin Huai <[email protected]>

Closes apache#6435 from liancheng/classLoader and squashes the following commits:

d4c4845 [Cheng Lian] Fixes CliSuite
75e80e2 [Yin Huai] Update the fix.
fd26533 [Cheng Hao] scalastyle
dd78775 [Cheng Hao] workaround for classloader of IsolatedClientLoader
  • Loading branch information
chenghao-intel authored and yhuai committed May 27, 2015
1 parent b97ddff commit db3fd05
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ import scala.concurrent.{Await, Promise}
import scala.sys.process.{Process, ProcessLogger}

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.{BeforeAndAfter, FunSuite}

import org.apache.spark.Logging
import org.apache.spark.util.Utils

/**
* A test suite for the `spark-sql` CLI tool. Note that all test cases share the same temporary
* Hive metastore and warehouse.
*/
class CliSuite extends FunSuite with BeforeAndAfter with Logging {
val warehousePath = Utils.createTempDir()
val metastorePath = Utils.createTempDir()
Expand Down Expand Up @@ -58,13 +62,13 @@ class CliSuite extends FunSuite with BeforeAndAfter with Logging {
| --master local
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
| --driver-class-path ${sys.props("java.class.path")}
""".stripMargin.split("\\s+").toSeq ++ extraArgs
}

var next = 0
val foundAllExpectedAnswers = Promise.apply[Unit]()
val queryStream = new ByteArrayInputStream(queries.mkString("\n").getBytes)
// Explicitly adds ENTER for each statement to make sure they are actually entered into the CLI.
val queryStream = new ByteArrayInputStream(queries.map(_ + "\n").mkString.getBytes)
val buffer = new ArrayBuffer[String]()
val lock = new Object

Expand Down Expand Up @@ -124,7 +128,7 @@ class CliSuite extends FunSuite with BeforeAndAfter with Logging {
"SELECT COUNT(*) FROM hive_test;"
-> "5",
"DROP TABLE hive_test;"
-> "Time taken: "
-> "OK"
)
}

Expand All @@ -151,4 +155,33 @@ class CliSuite extends FunSuite with BeforeAndAfter with Logging {
-> "hive_test"
)
}

test("Commands using SerDe provided in --jars") {
val jarFile =
"../hive/src/test/resources/hive-hcatalog-core-0.13.1.jar"
.split("/")
.mkString(File.separator)

val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")

runCliWithin(1.minute, Seq("--jars", s"$jarFile"))(
"""CREATE TABLE t1(key string, val string)
|ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
""".stripMargin
-> "OK",
"CREATE TABLE sourceTable (key INT, val STRING);"
-> "OK",
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;"
-> "OK",
"INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;"
-> "Time taken:",
"SELECT count(key) FROM t1;"
-> "5",
"DROP TABLE t1;"
-> "OK",
"DROP TABLE sourceTable;"
-> "OK"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
package org.apache.spark.sql.hive

import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
import java.net.{URL, URLClassLoader}
import java.sql.Timestamp
import java.util.{ArrayList => JArrayList}

import org.apache.hadoop.hive.ql.parse.VariableSubstitution
import org.apache.spark.sql.catalyst.ParserDialect

import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.language.implicitConversions

import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -188,8 +189,19 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
"Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " +
s"or change $HIVE_METASTORE_VERSION to $hiveExecutionVersion.")
}
val jars = getClass.getClassLoader match {
case urlClassLoader: java.net.URLClassLoader => urlClassLoader.getURLs
// We recursively add all jars in the class loader chain,
// starting from the given urlClassLoader.
def addJars(urlClassLoader: URLClassLoader): Array[URL] = {
val jarsInParent = urlClassLoader.getParent match {
case parent: URLClassLoader => addJars(parent)
case other => Array.empty[URL]
}

urlClassLoader.getURLs ++ jarsInParent
}

val jars = Utils.getContextOrSparkClassLoader match {
case urlClassLoader: URLClassLoader => addJars(urlClassLoader)
case other =>
throw new IllegalArgumentException(
"Unable to locate hive jars to connect to metastore " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class HadoopTableReader(
makeRDDForTable(
hiveTable,
Class.forName(
relation.tableDesc.getSerdeClassName, true, Utils.getSparkClassLoader)
relation.tableDesc.getSerdeClassName, true, Utils.getContextOrSparkClassLoader)
.asInstanceOf[Class[Deserializer]],
filterOpt = None)

Expand Down

0 comments on commit db3fd05

Please sign in to comment.