Skip to content

Commit

Permalink
[SPARK-9763][SQL] Minimize exposure of internal SQL classes.
Browse files Browse the repository at this point in the history
There are a few changes in this pull request:

1. Moved all data sources to execution.datasources, except the public JDBC APIs.
2. In order to maintain backward compatibility from 1, added a backward compatibility translation map in data source resolution.
3. Moved ui and metric package into execution.
4. Added more documentation on some internal classes.
5. Renamed DataSourceRegister.format -> shortName.
6. Added "override" modifier on shortName.
7. Removed IntSQLMetric.

Author: Reynold Xin <[email protected]>

Closes apache#8056 from rxin/SPARK-9763 and squashes the following commits:

9df4801 [Reynold Xin] Removed hardcoded name in test cases.
d9babc6 [Reynold Xin] Shorten.
e484419 [Reynold Xin] Removed VisibleForTesting.
171b812 [Reynold Xin] MimaExcludes.
2041389 [Reynold Xin] Compile ...
79dda42 [Reynold Xin] Compile.
0818ba3 [Reynold Xin] Removed IntSQLMetric.
c46884f [Reynold Xin] Two more fixes.
f9aa88d [Reynold Xin] [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

(cherry picked from commit 40ed2af)
Signed-off-by: Reynold Xin <[email protected]>
  • Loading branch information
rxin committed Aug 10, 2015
1 parent d251d9f commit c1838e4
Show file tree
Hide file tree
Showing 76 changed files with 1,114 additions and 966 deletions.
24 changes: 21 additions & 3 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ object MimaExcludes {
"org.apache.spark.ml.classification.LogisticCostFun.this"),
// SQL execution is considered private.
excludePackage("org.apache.spark.sql.execution"),
// Parquet support is considered private.
excludePackage("org.apache.spark.sql.parquet"),
// The old JSON RDD is removed in favor of streaming Jackson
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD"),
Expand Down Expand Up @@ -155,7 +153,27 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException")
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException"),
// SPARK-9763 Minimize exposure of internal SQL classes
excludePackage("org.apache.spark.sql.parquet"),
excludePackage("org.apache.spark.sql.json"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$DecimalConversion$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartition"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JdbcUtils$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$DecimalConversion"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartitioningInfo$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartition$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$JDBCConversion"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package$DriverWrapper"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartitioningInfo"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JdbcUtils"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.DefaultSource"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRelation$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRelation")
) ++ Seq(
// SPARK-4751 Dynamic allocation for standalone mode
ProblemFilters.exclude[MissingMethodProblem](
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
org.apache.spark.sql.jdbc.DefaultSource
org.apache.spark.sql.json.DefaultSource
org.apache.spark.sql.parquet.DefaultSource
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource
org.apache.spark.sql.execution.datasources.json.DefaultSource
org.apache.spark.sql.execution.datasources.parquet.DefaultSource
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD, SQLExecution}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.json.JacksonGenerator
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.sources.HadoopFsRelation
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.execution.datasources.json.JSONRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json.JSONRelation
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.{Logging, Partition}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils}
import org.apache.spark.sql.sources.HadoopFsRelation


Expand Down Expand Up @@ -264,15 +264,15 @@ final class DataFrameWriter private[sql](df: DataFrame) {

// Create the table if the table didn't exist.
if (!tableExists) {
val schema = JDBCWriteDetails.schemaString(df, url)
val schema = JdbcUtils.schemaString(df, url)
val sql = s"CREATE TABLE $table ($schema)"
conn.prepareStatement(sql).executeUpdate()
}
} finally {
conn.close()
}

JDBCWriteDetails.saveTable(df, url, table, connectionProperties)
JdbcUtils.saveTable(df, url, table, connectionProperties)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.ui.{SQLListener, SQLTab}
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.ui.SparkPlanGraph
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.util.Utils

private[sql] object SQLExecution {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.metric.{IntSQLMetric, LongSQLMetric, SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.DataType

object SparkPlan {
Expand Down Expand Up @@ -98,12 +98,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*/
private[sql] def metrics: Map[String, SQLMetric[_, _]] = defaultMetrics

/**
* Return a IntSQLMetric according to the name.
*/
private[sql] def intMetric(name: String): IntSQLMetric =
metrics(name).asInstanceOf[IntSQLMetric]

/**
* Return a LongSQLMetric according to the name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.metric.SQLMetrics
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import scala.language.implicitConversions
import scala.util.matching.Regex

import org.apache.spark.Logging
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{TableIdentifier, AbstractSparkSQLParser}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types._


/**
* A parser for foreign DDL commands.
*/
class DDLParser(parseQuery: String => LogicalPlan)
extends AbstractSparkSQLParser with DataTypeParser with Logging {

def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
try {
parse(input)
} catch {
case ddlException: DDLException => throw ddlException
case _ if !exceptionOnError => parseQuery(input)
case x: Throwable => throw x
}
}

// Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
// properties via reflection the class in runtime for constructing the SqlLexical object
protected val CREATE = Keyword("CREATE")
protected val TEMPORARY = Keyword("TEMPORARY")
protected val TABLE = Keyword("TABLE")
protected val IF = Keyword("IF")
protected val NOT = Keyword("NOT")
protected val EXISTS = Keyword("EXISTS")
protected val USING = Keyword("USING")
protected val OPTIONS = Keyword("OPTIONS")
protected val DESCRIBE = Keyword("DESCRIBE")
protected val EXTENDED = Keyword("EXTENDED")
protected val AS = Keyword("AS")
protected val COMMENT = Keyword("COMMENT")
protected val REFRESH = Keyword("REFRESH")

protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable

protected def start: Parser[LogicalPlan] = ddl

/**
* `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* AS SELECT ...
*/
protected lazy val createTable: Parser[LogicalPlan] = {
// TODO: Support database.table.
(CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
if (temp.isDefined && allowExisting.isDefined) {
throw new DDLException(
"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
}

val options = opts.getOrElse(Map.empty[String, String])
if (query.isDefined) {
if (columns.isDefined) {
throw new DDLException(
"a CREATE TABLE AS SELECT statement does not allow column definitions.")
}
// When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
val mode = if (allowExisting.isDefined) {
SaveMode.Ignore
} else if (temp.isDefined) {
SaveMode.Overwrite
} else {
SaveMode.ErrorIfExists
}

val queryPlan = parseQuery(query.get)
CreateTableUsingAsSelect(tableName,
provider,
temp.isDefined,
Array.empty[String],
mode,
options,
queryPlan)
} else {
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
CreateTableUsing(
tableName,
userSpecifiedSchema,
provider,
temp.isDefined,
options,
allowExisting.isDefined,
managedIfNoPath = false)
}
}
}

protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"

/*
* describe [extended] table avroTable
* This will display all columns of table `avroTable` includes column_name,column_type,comment
*/
protected lazy val describeTable: Parser[LogicalPlan] =
(DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ {
case e ~ db ~ tbl =>
val tblIdentifier = db match {
case Some(dbName) =>
Seq(dbName, tbl)
case None =>
Seq(tbl)
}
DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
}

protected lazy val refreshTable: Parser[LogicalPlan] =
REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
case maybeDatabaseName ~ tableName =>
RefreshTable(TableIdentifier(tableName, maybeDatabaseName))
}

protected lazy val options: Parser[Map[String, String]] =
"(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }

protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}

override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
s"identifier matching regex $regex", {
case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
}
)

protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ {
case name => name
}

protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ {
case parts => parts.mkString(".")
}

protected lazy val pair: Parser[(String, String)] =
optionName ~ stringLit ^^ { case k ~ v => (k, v) }

protected lazy val column: Parser[StructField] =
ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm =>
val meta = cm match {
case Some(comment) =>
new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build()
case None => Metadata.empty
}

StructField(columnName, typ, nullable = true, meta)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import java.util.Properties

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCRelation, JDBCPartitioningInfo, DriverRegistry}
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}


class DefaultSource extends RelationProvider with DataSourceRegister {

override def shortName(): String = "jdbc"

/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
val driver = parameters.getOrElse("driver", null)
val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
val partitionColumn = parameters.getOrElse("partitionColumn", null)
val lowerBound = parameters.getOrElse("lowerBound", null)
val upperBound = parameters.getOrElse("upperBound", null)
val numPartitions = parameters.getOrElse("numPartitions", null)

if (driver != null) DriverRegistry.register(driver)

if (partitionColumn != null
&& (lowerBound == null || upperBound == null || numPartitions == null)) {
sys.error("Partitioning incompletely specified")
}

val partitionInfo = if (partitionColumn == null) {
null
} else {
JDBCPartitioningInfo(
partitionColumn,
lowerBound.toLong,
upperBound.toLong,
numPartitions.toInt)
}
val parts = JDBCRelation.columnPartition(partitionInfo)
val properties = new Properties() // Additional properties that we will pass to getConnection
parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
JDBCRelation(url, table, parts, properties)(sqlContext)
}
}
Loading

0 comments on commit c1838e4

Please sign in to comment.