Skip to content

Commit

Permalink
[SPARK-17729][SQL] Enable creating hive bucketed tables
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Hive allows inserting data to bucketed table without guaranteeing bucketed and sorted-ness based on these two configs : `hive.enforce.bucketing` and `hive.enforce.sorting`.

What does this PR achieve ?
- Spark will disallow users from writing outputs to hive bucketed tables by default (given that output won't adhere with Hive's semantics).
- IF user still wants to write to hive bucketed table, the only resort is to use `hive.enforce.bucketing=false` and `hive.enforce.sorting=false` which means user does NOT care about bucketing guarantees.

Changes done in this PR:
- Extract table's bucketing information in `HiveClientImpl`
- While writing table info to metastore, `HiveClientImpl` now populates the bucketing information in the hive `Table` object
- `InsertIntoHiveTable` allows inserts to bucketed table only if both `hive.enforce.bucketing` and `hive.enforce.sorting` are `false`

Ability to create bucketed tables will enable adding test cases to Spark while I add more changes related to hive bucketing support. Design doc for hive hive bucketing support : https://docs.google.com/document/d/1a8IDh23RAkrkg9YYAeO51F4aGO8-xAlupKwdshve2fc/edit#

## How was this patch tested?
- Added test for creating bucketed and sorted table.
- Added test to ensure that INSERTs fail if strict bucket / sort is enforced
- Added test to ensure that INSERTs can go through if strict bucket / sort is NOT enforced
- Added test to validate that bucketing information shows up in output of DESC FORMATTED
- Added test to ensure that `SHOW CREATE TABLE` works for hive bucketed tables

Author: Tejas Patil <[email protected]>

Closes apache#17644 from tejasapatil/SPARK-17729_create_bucketed_table.
  • Loading branch information
tejasapatil authored and cloud-fan committed May 15, 2017
1 parent 271175e commit d241692
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
val tbl1 = catalog.getTable("db2", "tbl1")
val newSchema = StructType(Seq(
StructField("new_field_1", IntegerType),
StructField("col1", IntegerType),
StructField("new_field_2", StringType),
StructField("a", IntegerType),
StructField("b", StringType)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1072,13 +1072,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
if (ctx.skewSpec != null) {
operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
}
if (ctx.bucketSpec != null) {
operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
}

val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
val selectQuery = Option(ctx.query).map(plan)
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)

// Note: Hive requires partition columns to be distinct from the schema, so we need
// to include the partition columns here explicitly
Expand Down Expand Up @@ -1119,6 +1118,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
tableType = tableType,
storage = storage,
schema = schema,
bucketSpec = bucketSpec,
provider = Some(DDLUtils.HIVE_PROVIDER),
partitionColumnNames = partitionCols.map(_.name),
properties = properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,8 +903,13 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
}

if (metadata.bucketSpec.isDefined) {
throw new UnsupportedOperationException(
"Creating Hive table with bucket spec is not supported yet.")
val bucketSpec = metadata.bucketSpec.get
builder ++= s"CLUSTERED BY (${bucketSpec.bucketColumnNames.mkString(",")})\n"

if (bucketSpec.sortColumnNames.nonEmpty) {
builder ++= s"SORTED BY (${bucketSpec.sortColumnNames.map(_ + " ASC").mkString(", ")})\n"
}
builder ++= s"INTO ${bucketSpec.numBuckets} BUCKETS\n"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
location = fileIndex,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
// We don't support hive bucketed tables, only ones we write out.
bucketSpec = None,
fileFormat = fileFormat,
options = options)(sparkSession = sparkSession)
Expand Down Expand Up @@ -199,7 +198,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
sparkSession = sparkSession,
paths = rootPath.toString :: Nil,
userSpecifiedSchema = Option(dataSchema),
// We don't support hive bucketed tables, only ones we write out.
bucketSpec = None,
options = options,
className = fileType).resolveRelation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order}
import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.security.UserGroupInformation
Expand Down Expand Up @@ -373,10 +374,30 @@ private[hive] class HiveClientImpl(
Option(client.getTable(dbName, tableName, false)).map { h =>
// Note: Hive separates partition columns and the schema, but for us the
// partition columns are part of the schema
val cols = h.getCols.asScala.map(fromHiveColumn)
val partCols = h.getPartCols.asScala.map(fromHiveColumn)
val schema = StructType(h.getCols.asScala.map(fromHiveColumn) ++ partCols)
val schema = StructType(cols ++ partCols)

val bucketSpec = if (h.getNumBuckets > 0) {
val sortColumnOrders = h.getSortCols.asScala
// Currently Spark only supports columns to be sorted in ascending order
// but Hive can support both ascending and descending order. If all the columns
// are sorted in ascending order, only then propagate the sortedness information
// to downstream processing / optimizations in Spark
// TODO: In future we can have Spark support columns sorted in descending order
val allAscendingSorted = sortColumnOrders.forall(_.getOrder == HIVE_COLUMN_ORDER_ASC)

val sortColumnNames = if (allAscendingSorted) {
sortColumnOrders.map(_.getCol)
} else {
Seq()
}
Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala, sortColumnNames))
} else {
None
}

// Skew spec, storage handler, and bucketing info can't be mapped to CatalogTable (yet)
// Skew spec and storage handler can't be mapped to CatalogTable (yet)
val unsupportedFeatures = ArrayBuffer.empty[String]

if (!h.getSkewedColNames.isEmpty) {
Expand All @@ -387,10 +408,6 @@ private[hive] class HiveClientImpl(
unsupportedFeatures += "storage handler"
}

if (!h.getBucketCols.isEmpty) {
unsupportedFeatures += "bucketing"
}

if (h.getTableType == HiveTableType.VIRTUAL_VIEW && partCols.nonEmpty) {
unsupportedFeatures += "partitioned view"
}
Expand All @@ -408,9 +425,11 @@ private[hive] class HiveClientImpl(
},
schema = schema,
partitionColumnNames = partCols.map(_.name),
// We can not populate bucketing information for Hive tables as Spark SQL has a different
// implementation of hash function from Hive.
bucketSpec = None,
// If the table is written by Spark, we will put bucketing information in table properties,
// and will always overwrite the bucket spec in hive metastore by the bucketing information
// in table properties. This means, if we have bucket spec in both hive metastore and
// table properties, we will trust the one in table properties.
bucketSpec = bucketSpec,
owner = h.getOwner,
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
Expand Down Expand Up @@ -870,6 +889,23 @@ private[hive] object HiveClientImpl {
hiveTable.setViewOriginalText(t)
hiveTable.setViewExpandedText(t)
}

table.bucketSpec match {
case Some(bucketSpec) if DDLUtils.isHiveTable(table) =>
hiveTable.setNumBuckets(bucketSpec.numBuckets)
hiveTable.setBucketCols(bucketSpec.bucketColumnNames.toList.asJava)

if (bucketSpec.sortColumnNames.nonEmpty) {
hiveTable.setSortCols(
bucketSpec.sortColumnNames
.map(col => new Order(col, HIVE_COLUMN_ORDER_ASC))
.toList
.asJava
)
}
case _ =>
}

hiveTable
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,27 @@ case class InsertIntoHiveTable(
}
}

table.bucketSpec match {
case Some(bucketSpec) =>
// Writes to bucketed hive tables are allowed only if user does not care about maintaining
// table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are
// set to false
val enforceBucketingConfig = "hive.enforce.bucketing"
val enforceSortingConfig = "hive.enforce.sorting"

val message = s"Output Hive table ${table.identifier} is bucketed but Spark" +
"currently does NOT populate bucketed output which is compatible with Hive."

if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean ||
hadoopConf.get(enforceSortingConfig, "true").toBoolean) {
throw new AnalysisException(message)
} else {
logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and " +
s"$enforceSortingConfig are set to false.")
}
case _ => // do nothing since table has no bucketing
}

val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,32 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
}

test("create table - clustered by") {
val baseQuery = "CREATE TABLE my_table (id int, name string) CLUSTERED BY(id)"
val query1 = s"$baseQuery INTO 10 BUCKETS"
val query2 = s"$baseQuery SORTED BY(id) INTO 10 BUCKETS"
val e1 = intercept[ParseException] { parser.parsePlan(query1) }
val e2 = intercept[ParseException] { parser.parsePlan(query2) }
assert(e1.getMessage.contains("Operation not allowed"))
assert(e2.getMessage.contains("Operation not allowed"))
val numBuckets = 10
val bucketedColumn = "id"
val sortColumn = "id"
val baseQuery =
s"""
CREATE TABLE my_table (
$bucketedColumn int,
name string)
CLUSTERED BY($bucketedColumn)
"""

val query1 = s"$baseQuery INTO $numBuckets BUCKETS"
val (desc1, _) = extractTableDesc(query1)
assert(desc1.bucketSpec.isDefined)
val bucketSpec1 = desc1.bucketSpec.get
assert(bucketSpec1.numBuckets == numBuckets)
assert(bucketSpec1.bucketColumnNames.head.equals(bucketedColumn))
assert(bucketSpec1.sortColumnNames.isEmpty)

val query2 = s"$baseQuery SORTED BY($sortColumn) INTO $numBuckets BUCKETS"
val (desc2, _) = extractTableDesc(query2)
assert(desc2.bucketSpec.isDefined)
val bucketSpec2 = desc2.bucketSpec.get
assert(bucketSpec2.numBuckets == numBuckets)
assert(bucketSpec2.bucketColumnNames.head.equals(bucketedColumn))
assert(bucketSpec2.sortColumnNames.head.equals(sortColumn))
}

test("create table - skewed by") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,53 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
}
}

private def testBucketedTable(testName: String)(f: String => Unit): Unit = {
test(s"Hive SerDe table - $testName") {
val hiveTable = "hive_table"

withTable(hiveTable) {
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
sql(
s"""
|CREATE TABLE $hiveTable (a INT, d INT)
|PARTITIONED BY (b INT, c INT)
|CLUSTERED BY(a)
|SORTED BY(a, d) INTO 256 BUCKETS
|STORED AS TEXTFILE
""".stripMargin)
f(hiveTable)
}
}
}
}

testBucketedTable("INSERT should NOT fail if strict bucketing is NOT enforced") {
tableName =>
withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "false") {
sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
}
}

testBucketedTable("INSERT should fail if strict bucketing / sorting is enforced") {
tableName =>
withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "false") {
intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
}
}
withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "true") {
intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
}
}
withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "true") {
intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
}
}
}

test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") {
// Set hive.exec.stagingdir under the table directory without start with ".".
withSQLConf("hive.exec.stagingdir" -> "./test") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,21 +247,16 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
}
}

test("hive bucketing is not supported") {
test("hive bucketing is supported") {
withTable("t1") {
createRawHiveTable(
sql(
s"""CREATE TABLE t1 (a INT, b STRING)
|CLUSTERED BY (a)
|SORTED BY (b)
|INTO 2 BUCKETS
""".stripMargin
)

val cause = intercept[AnalysisException] {
sql("SHOW CREATE TABLE t1")
}

assert(cause.getMessage.contains(" - bucketing"))
checkCreateTable("t1")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,26 @@ class HiveDDLSuite
}
}

test("desc table for Hive table - bucketed + sorted table") {
withTable("tbl") {
sql(s"""
CREATE TABLE tbl (id int, name string)
PARTITIONED BY (ds string)
CLUSTERED BY(id)
SORTED BY(id, name) INTO 1024 BUCKETS
""")

val x = sql("DESC FORMATTED tbl").collect()
assert(x.containsSlice(
Seq(
Row("Num Buckets", "1024", ""),
Row("Bucket Columns", "[`id`]", ""),
Row("Sort Columns", "[`id`, `name`]", "")
)
))
}
}

test("desc table for data source table using Hive Metastore") {
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive")
val tabName = "tab1"
Expand Down

0 comments on commit d241692

Please sign in to comment.