Skip to content

Commit

Permalink
[SPARK-33898][SQL] Support SHOW CREATE TABLE In V2
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1. Implement V2 execution node `ShowCreateTableExec` similar to V1 `ShowCreateTableCommand`
2. No support `SHOW CREATE TABLE XXX AS SERDE`

### Why are the changes needed?
[SPARK-33898](https://issues.apache.org/jira/browse/SPARK-33898)

### Does this PR introduce _any_ user-facing change?
Yes. Support the user to execute `SHOW CREATE TABLE` command in V2 table

### How was this patch tested?
Add two UT tests
1. ./dev/scalastyle
2. run test DataSourceV2SQLSuite

Closes apache#32931 from Peng-Lei/SPARK-33898.

Lead-authored-by: PengLei <[email protected]>
Co-authored-by: PengLei <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
2 people authored and yaooqinn committed Jun 29, 2021
1 parent 5f0113e commit 8fbbd2e
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -934,8 +934,8 @@ private[spark] object QueryCompilationErrors {
notSupportedForV2TablesError("LOAD DATA")
}

def showCreateTableNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("SHOW CREATE TABLE")
def showCreateTableAsSerdeNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("SHOW CREATE TABLE AS SERDE")
}

def showColumnsNotSupportedForV2TablesError(): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case LoadData(_: ResolvedTable, _, _, _, _) =>
throw QueryCompilationErrors.loadDataNotSupportedForV2TablesError()

case ShowCreateTable(_: ResolvedTable, _, _) =>
throw QueryCompilationErrors.showCreateTableNotSupportedForV2TablesError()
case ShowCreateTable(rt: ResolvedTable, asSerde, output) =>
if (asSerde) {
throw QueryCompilationErrors.showCreateTableAsSerdeNotSupportedForV2TablesError()
}
ShowCreateTableExec(output, rt.table) :: Nil

case TruncateTable(r: ResolvedTable) =>
TruncateTableExec(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.escapeSingleQuotedString
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Table, TableCatalog}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.unsafe.types.UTF8String

/**
* Physical plan node for show create table.
*/
case class ShowCreateTableExec(
output: Seq[Attribute],
table: Table) extends V2CommandExec with LeafExecNode {
override protected def run(): Seq[InternalRow] = {
val builder = StringBuilder.newBuilder
showCreateTable(table, builder)
Seq(InternalRow(UTF8String.fromString(builder.toString)))
}

private def showCreateTable(table: Table, builder: StringBuilder): Unit = {
builder ++= s"CREATE TABLE ${table.name()} "

showTableDataColumns(table, builder)
showTableUsing(table, builder)

val tableOptions = table.properties.asScala
.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
case (k, v) => k.drop(TableCatalog.OPTION_PREFIX.length) -> v
}.toMap
showTableOptions(builder, tableOptions)
showTablePartitioning(table, builder)
showTableComment(table, builder)
showTableLocation(table, builder)
showTableProperties(table, builder, tableOptions)
}

private def showTableDataColumns(table: Table, builder: StringBuilder): Unit = {
val columns = table.schema().fields.map(_.toDDL)
builder ++= concatByMultiLines(columns)
}

private def showTableUsing(table: Table, builder: StringBuilder): Unit = {
Option(table.properties.get(TableCatalog.PROP_PROVIDER))
.map("USING " + escapeSingleQuotedString(_) + "\n")
.foreach(builder.append)
}

private def showTableOptions(
builder: StringBuilder,
tableOptions: Map[String, String]): Unit = {
if (tableOptions.nonEmpty) {
val props = tableOptions.map { case (key, value) =>
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
}
builder ++= "OPTIONS"
builder ++= concatByMultiLines(props)
}
}

private def showTablePartitioning(table: Table, builder: StringBuilder): Unit = {
if (!table.partitioning.isEmpty) {
val transforms = new ArrayBuffer[String]
table.partitioning.foreach(t => transforms += t.describe())
builder ++= s"PARTITIONED BY ${transforms.mkString("(", ", ", ")")}\n"
}
}

private def showTableLocation(table: Table, builder: StringBuilder): Unit = {
Option(table.properties.get(TableCatalog.PROP_LOCATION))
.map("LOCATION '" + escapeSingleQuotedString(_) + "'\n")
.foreach(builder.append)
}

private def showTableProperties(
table: Table,
builder: StringBuilder,
tableOptions: Map[String, String]): Unit = {


val showProps = table.properties.asScala
.filterKeys(key => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(key)
&& !key.startsWith(TableCatalog.OPTION_PREFIX)
&& !tableOptions.contains(key))
if (showProps.nonEmpty) {
val props = showProps.map {
case (key, value) =>
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
}

builder ++= "TBLPROPERTIES"
builder ++= concatByMultiLines(props)
}
}

private def showTableComment(table: Table, builder: StringBuilder): Unit = {
Option(table.properties.get(TableCatalog.PROP_COMMENT))
.map("COMMENT '" + escapeSingleQuotedString(_) + "'\n")
.foreach(builder.append)
}

private def concatByMultiLines(iter: Iterable[String]): String = {
iter.mkString("(\n ", ",\n ", ")\n")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1961,12 +1961,101 @@ class DataSourceV2SQLSuite
}
}

test("SHOW CREATE TABLE") {
test("SPARK-33898: SHOW CREATE TABLE AS SERDE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
testNotSupportedV2Command("SHOW CREATE TABLE", t)
testNotSupportedV2Command("SHOW CREATE TABLE", s"$t AS SERDE")
val e = intercept[AnalysisException] {
sql(s"SHOW CREATE TABLE $t AS SERDE")
}
assert(e.message.contains(s"SHOW CREATE TABLE AS SERDE is not supported for v2 tables."))
}
}

test("SPARK-33898: SHOW CREATE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(
s"""
|CREATE TABLE $t (
| a bigint,
| b bigint,
| c bigint,
| `extra col` ARRAY<INT>,
| `<another>` STRUCT<x: INT, y: ARRAY<BOOLEAN>>
|)
|USING foo
|OPTIONS (
| from = 0,
| to = 1)
|COMMENT 'This is a comment'
|TBLPROPERTIES ('prop1' = '1')
|PARTITIONED BY (a)
|LOCATION '/tmp'
""".stripMargin)
val showDDL = getShowCreateDDL(s"SHOW CREATE TABLE $t")
assert(showDDL === Array(
"CREATE TABLE testcat.ns1.ns2.tbl (",
"`a` BIGINT,",
"`b` BIGINT,",
"`c` BIGINT,",
"`extra col` ARRAY<INT>,",
"`<another>` STRUCT<`x`: INT, `y`: ARRAY<BOOLEAN>>)",
"USING foo",
"OPTIONS(",
"'from' = '0',",
"'to' = '1')",
"PARTITIONED BY (a)",
"COMMENT 'This is a comment'",
"LOCATION '/tmp'",
"TBLPROPERTIES(",
"'prop1' = '1')"
))
}
}

test("SPARK-33898: SHOW CREATE TABLE WITH AS SELECT") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(
s"""
|CREATE TABLE $t
|USING foo
|AS SELECT 1 AS a, "foo" AS b
""".stripMargin)
val showDDL = getShowCreateDDL(s"SHOW CREATE TABLE $t")
assert(showDDL === Array(
"CREATE TABLE testcat.ns1.ns2.tbl (",
"`a` INT,",
"`b` STRING)",
"USING foo"
))
}
}

test("SPARK-33898: SHOW CREATE TABLE PARTITIONED BY Transforms") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(
s"""
|CREATE TABLE $t (a INT, b STRING, ts TIMESTAMP) USING foo
|PARTITIONED BY (
| a,
| bucket(16, b),
| years(ts),
| months(ts),
| days(ts),
| hours(ts))
""".stripMargin)
val showDDL = getShowCreateDDL(s"SHOW CREATE TABLE $t")
assert(showDDL === Array(
"CREATE TABLE testcat.ns1.ns2.tbl (",
"`a` INT,",
"`b` STRING,",
"`ts` TIMESTAMP)",
"USING foo",
"PARTITIONED BY (a, bucket(16, b), years(ts), months(ts), days(ts), hours(ts))"
))
}
}

Expand Down Expand Up @@ -2835,6 +2924,10 @@ class DataSourceV2SQLSuite
}.getMessage
assert(errMsg.contains(expectedError))
}

private def getShowCreateDDL(showCreateTableSql: String): Array[String] = {
sql(showCreateTableSql).head().getString(0).split("\n").map(_.trim)
}
}


Expand Down

0 comments on commit 8fbbd2e

Please sign in to comment.