Skip to content

Commit

Permalink
[SPARK-2678][Core][SQL] A workaround for SPARK-2678
Browse files Browse the repository at this point in the history
JIRA issues:

- Main: [SPARK-2678](https://issues.apache.org/jira/browse/SPARK-2678)
- Related: [SPARK-2874](https://issues.apache.org/jira/browse/SPARK-2874)

Related PR:

- apache#1715

This PR is both a fix for SPARK-2874 and a workaround for SPARK-2678. Fixing SPARK-2678 completely requires some API level changes that need further discussion, and we decided not to include it in Spark 1.1 release. As currently SPARK-2678 only affects Spark SQL scripts, this workaround is enough for Spark 1.1. Command line option handling logic in bash scripts looks somewhat dirty and duplicated, but it helps to provide a cleaner user interface as well as retain full downward compatibility for now.

Author: Cheng Lian <[email protected]>

Closes apache#1801 from liancheng/spark-2874 and squashes the following commits:

8045d7a [Cheng Lian] Make sure test suites pass
8493a9e [Cheng Lian] Using eval to retain quoted arguments
aed523f [Cheng Lian] Fixed typo in bin/spark-sql
f12a0b1 [Cheng Lian] Worked arount SPARK-2678
daee105 [Cheng Lian] Fixed usage messages of all Spark SQL related scripts
  • Loading branch information
liancheng authored and pwendell committed Aug 6, 2014
1 parent 4878911 commit a6cd311
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 75 deletions.
29 changes: 7 additions & 22 deletions bin/beeline
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,14 @@
# limitations under the License.
#

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"
#
# Shell script for starting BeeLine

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ `command -v java` ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
# Enter posix mode for bash
set -o posix

# Compute classpath using external script
classpath_output=$($FWDIR/bin/compute-classpath.sh)
if [[ "$?" != "0" ]]; then
echo "$classpath_output"
exit 1
else
CLASSPATH=$classpath_output
fi
# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

CLASS="org.apache.hive.beeline.BeeLine"
exec "$RUNNER" -cp "$CLASSPATH" $CLASS "$@"
exec "$FWDIR/bin/spark-class" $CLASS "$@"
66 changes: 62 additions & 4 deletions bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,72 @@
# Enter posix mode for bash
set -o posix

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/spark-sql [options]"
function usage {
echo "Usage: ./sbin/spark-sql [options] [cli option]"
pattern="usage"
pattern+="\|Spark assembly has been built with Hive"
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
pattern+="\|Spark Command: "
pattern+="\|--help"
pattern+="\|======="

$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
echo
echo "CLI options:"
$FWDIR/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
}

function ensure_arg_number {
arg_number=$1
at_least=$2

if [[ $arg_number -lt $at_least ]]; then
usage
exit 1
fi
}

if [[ "$@" = --help ]] || [[ "$@" = -h ]]; then
usage
exit 0
fi

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@
CLI_ARGS=()
SUBMISSION_ARGS=()

while (($#)); do
case $1 in
-d | --define | --database | -f | -h | --hiveconf | --hivevar | -i | -p)
ensure_arg_number $# 2
CLI_ARGS+=($1); shift
CLI_ARGS+=($1); shift
;;

-e)
ensure_arg_number $# 2
CLI_ARGS+=($1); shift
CLI_ARGS+=(\"$1\"); shift
;;

-s | --silent)
CLI_ARGS+=($1); shift
;;

-v | --verbose)
# Both SparkSubmit and SparkSQLCLIDriver recognizes -v | --verbose
CLI_ARGS+=($1)
SUBMISSION_ARGS+=($1); shift
;;

*)
SUBMISSION_ARGS+=($1); shift
;;
esac
done

eval exec "$FWDIR"/bin/spark-submit --class $CLASS ${SUBMISSION_ARGS[*]} spark-internal ${CLI_ARGS[*]}
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
/** Fill in values by parsing user options. */
private def parseOpts(opts: Seq[String]): Unit = {
var inSparkOpts = true
val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r

// Delineates parsing of Spark options from parsing of user options.
parse(opts)
Expand Down Expand Up @@ -322,33 +323,21 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
verbose = true
parse(tail)

case EQ_SEPARATED_OPT(opt, value) :: tail =>
parse(opt :: value :: tail)

case value :: tail if value.startsWith("-") =>
SparkSubmit.printErrorAndExit(s"Unrecognized option '$value'.")

case value :: tail =>
if (inSparkOpts) {
value match {
// convert --foo=bar to --foo bar
case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 =>
val parts = v.split("=")
parse(Seq(parts(0), parts(1)) ++ tail)
case v if v.startsWith("-") =>
val errMessage = s"Unrecognized option '$value'."
SparkSubmit.printErrorAndExit(errMessage)
case v =>
primaryResource =
if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) {
Utils.resolveURI(v).toString
} else {
v
}
inSparkOpts = false
isPython = SparkSubmit.isPython(v)
parse(tail)
primaryResource =
if (!SparkSubmit.isShell(value) && !SparkSubmit.isInternal(value)) {
Utils.resolveURI(value).toString
} else {
value
}
} else {
if (!value.isEmpty) {
childArgs += value
}
parse(tail)
}
isPython = SparkSubmit.isPython(value)
childArgs ++= tail

case Nil =>
}
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ class SparkSubmitSuite extends FunSuite with Matchers {
appArgs.childArgs should be (Seq("some", "--weird", "args"))
}

test("handles arguments to user program with name collision") {
val clArgs = Seq(
"--name", "myApp",
"--class", "Foo",
"userjar.jar",
"--master", "local",
"some",
"--weird", "args")
val appArgs = new SparkSubmitArguments(clArgs)
appArgs.childArgs should be (Seq("--master", "local", "some", "--weird", "args"))
}

test("handles YARN cluster mode") {
val clArgs = Seq(
"--deploy-mode", "cluster",
Expand Down
50 changes: 46 additions & 4 deletions sbin/start-thriftserver.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,53 @@ set -o posix
# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/start-thriftserver [options]"
CLASS="org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"

function usage {
echo "Usage: ./sbin/start-thriftserver [options] [thrift server options]"
pattern="usage"
pattern+="\|Spark assembly has been built with Hive"
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
pattern+="\|Spark Command: "
pattern+="\|======="
pattern+="\|--help"

$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
echo
echo "Thrift server options:"
$FWDIR/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
}

function ensure_arg_number {
arg_number=$1
at_least=$2

if [[ $arg_number -lt $at_least ]]; then
usage
exit 1
fi
}

if [[ "$@" = --help ]] || [[ "$@" = -h ]]; then
usage
exit 0
fi

CLASS="org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@
THRIFT_SERVER_ARGS=()
SUBMISSION_ARGS=()

while (($#)); do
case $1 in
--hiveconf)
ensure_arg_number $# 2
THRIFT_SERVER_ARGS+=($1); shift
THRIFT_SERVER_ARGS+=($1); shift
;;

*)
SUBMISSION_ARGS+=($1); shift
;;
esac
done

eval exec "$FWDIR"/bin/spark-submit --class $CLASS ${SUBMISSION_ARGS[*]} spark-internal ${THRIFT_SERVER_ARGS[*]}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ private[hive] object HiveThriftServer2 extends Logging {
val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")

if (!optionsProcessor.process(args)) {
logWarning("Error starting HiveThriftServer2 with given arguments")
System.exit(-1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,23 @@ package org.apache.spark.sql.hive.thriftserver

import java.io.{BufferedReader, InputStreamReader, PrintWriter}

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}

class CliSuite extends FunSuite with BeforeAndAfterAll with TestUtils {
val WAREHOUSE_PATH = TestUtils.getWarehousePath("cli")
val METASTORE_PATH = TestUtils.getMetastorePath("cli")

override def beforeAll() {
val pb = new ProcessBuilder(
"../../bin/spark-sql",
"--master",
"local",
"--hiveconf",
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true",
"--hiveconf",
"hive.metastore.warehouse.dir=" + WAREHOUSE_PATH)

val jdbcUrl = s"jdbc:derby:;databaseName=$METASTORE_PATH;create=true"
val commands =
s"""../../bin/spark-sql
| --master local
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}="$jdbcUrl"
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$WAREHOUSE_PATH
""".stripMargin.split("\\s+")

val pb = new ProcessBuilder(commands: _*)
process = pb.start()
outputWriter = new PrintWriter(process.getOutputStream, true)
inputReader = new BufferedReader(new InputStreamReader(process.getInputStream))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.io.{BufferedReader, InputStreamReader}
import java.net.ServerSocket
import java.sql.{Connection, DriverManager, Statement}

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import org.apache.spark.Logging
Expand Down Expand Up @@ -63,16 +64,18 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with TestUt
// Forking a new process to start the Hive Thrift server. The reason to do this is it is
// hard to clean up Hive resources entirely, so we just start a new process and kill
// that process for cleanup.
val defaultArgs = Seq(
"../../sbin/start-thriftserver.sh",
"--master local",
"--hiveconf",
"hive.root.logger=INFO,console",
"--hiveconf",
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true",
"--hiveconf",
s"hive.metastore.warehouse.dir=$WAREHOUSE_PATH")
val pb = new ProcessBuilder(defaultArgs ++ args)
val jdbcUrl = s"jdbc:derby:;databaseName=$METASTORE_PATH;create=true"
val command =
s"""../../sbin/start-thriftserver.sh
| --master local
| --hiveconf hive.root.logger=INFO,console
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}="$jdbcUrl"
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$METASTORE_PATH
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=$HOST
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$PORT
""".stripMargin.split("\\s+")

val pb = new ProcessBuilder(command ++ args: _*)
val environment = pb.environment()
environment.put("HIVE_SERVER2_THRIFT_PORT", PORT.toString)
environment.put("HIVE_SERVER2_THRIFT_BIND_HOST", HOST)
Expand Down

0 comments on commit a6cd311

Please sign in to comment.