Skip to content

Commit

Permalink
[SPARK-18482][SQL] make sure Spark can access the table metadata crea…
Browse files Browse the repository at this point in the history
…ted by older version of spark

## What changes were proposed in this pull request?

In Spark 2.1, we did a lot of refactor for `HiveExternalCatalog` and related code path. These refactor may introduce external behavior changes and break backward compatibility. e.g. http://issues.apache.org/jira/browse/SPARK-18464

To avoid future compatibility problems of `HiveExternalCatalog`, this PR dumps some typical table metadata from tables created by 2.0, and test if they can recognized by current version of Spark.

## How was this patch tested?

test only change

Author: Wenchen Fan <[email protected]>

Closes apache#16003 from cloud-fan/test.
  • Loading branch information
cloud-fan authored and rxin committed Nov 28, 2016
1 parent 07f32c2 commit fc2c13b
Show file tree
Hide file tree
Showing 2 changed files with 251 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import java.net.URI

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils


class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {

// To test `HiveExternalCatalog`, we need to read/write the raw table meta from/to hive client.
val hiveClient: HiveClient =
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client

val tempDir = Utils.createTempDir().getCanonicalFile

override def beforeEach(): Unit = {
sql("CREATE DATABASE test_db")
for ((tbl, _) <- rawTablesAndExpectations) {
hiveClient.createTable(tbl, ignoreIfExists = false)
}
}

override def afterEach(): Unit = {
Utils.deleteRecursively(tempDir)
hiveClient.dropDatabase("test_db", ignoreIfNotExists = false, cascade = true)
}

private def getTableMetadata(tableName: String): CatalogTable = {
spark.sharedState.externalCatalog.getTable("test_db", tableName)
}

private def defaultTablePath(tableName: String): String = {
spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName, Some("test_db")))
}


// Raw table metadata that are dumped from tables created by Spark 2.0. Note that, all spark
// versions prior to 2.1 would generate almost same raw table metadata for a specific table.
val simpleSchema = new StructType().add("i", "int")
val partitionedSchema = new StructType().add("i", "int").add("j", "int")

lazy val hiveTable = CatalogTable(
identifier = TableIdentifier("tbl1", Some("test_db")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(
inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
schema = simpleSchema)

lazy val externalHiveTable = CatalogTable(
identifier = TableIdentifier("tbl2", Some("test_db")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat.empty.copy(
locationUri = Some(tempDir.getAbsolutePath),
inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
schema = simpleSchema)

lazy val partitionedHiveTable = CatalogTable(
identifier = TableIdentifier("tbl3", Some("test_db")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(
inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
schema = partitionedSchema,
partitionColumnNames = Seq("j"))


val simpleSchemaJson =
"""
|{
| "type": "struct",
| "fields": [{
| "name": "i",
| "type": "integer",
| "nullable": true,
| "metadata": {}
| }]
|}
""".stripMargin

val partitionedSchemaJson =
"""
|{
| "type": "struct",
| "fields": [{
| "name": "i",
| "type": "integer",
| "nullable": true,
| "metadata": {}
| },
| {
| "name": "j",
| "type": "integer",
| "nullable": true,
| "metadata": {}
| }]
|}
""".stripMargin

lazy val dataSourceTable = CatalogTable(
identifier = TableIdentifier("tbl4", Some("test_db")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(properties = Map("path" -> defaultTablePath("tbl4"))),
schema = new StructType(),
properties = Map(
"spark.sql.sources.provider" -> "json",
"spark.sql.sources.schema.numParts" -> "1",
"spark.sql.sources.schema.part.0" -> simpleSchemaJson))

lazy val hiveCompatibleDataSourceTable = CatalogTable(
identifier = TableIdentifier("tbl5", Some("test_db")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(properties = Map("path" -> defaultTablePath("tbl5"))),
schema = simpleSchema,
properties = Map(
"spark.sql.sources.provider" -> "parquet",
"spark.sql.sources.schema.numParts" -> "1",
"spark.sql.sources.schema.part.0" -> simpleSchemaJson))

lazy val partitionedDataSourceTable = CatalogTable(
identifier = TableIdentifier("tbl6", Some("test_db")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(properties = Map("path" -> defaultTablePath("tbl6"))),
schema = new StructType(),
properties = Map(
"spark.sql.sources.provider" -> "json",
"spark.sql.sources.schema.numParts" -> "1",
"spark.sql.sources.schema.part.0" -> partitionedSchemaJson,
"spark.sql.sources.schema.numPartCols" -> "1",
"spark.sql.sources.schema.partCol.0" -> "j"))

lazy val externalDataSourceTable = CatalogTable(
identifier = TableIdentifier("tbl7", Some("test_db")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat.empty.copy(
locationUri = Some(defaultTablePath("tbl7") + "-__PLACEHOLDER__"),
properties = Map("path" -> tempDir.getAbsolutePath)),
schema = new StructType(),
properties = Map(
"spark.sql.sources.provider" -> "json",
"spark.sql.sources.schema.numParts" -> "1",
"spark.sql.sources.schema.part.0" -> simpleSchemaJson))

lazy val hiveCompatibleExternalDataSourceTable = CatalogTable(
identifier = TableIdentifier("tbl8", Some("test_db")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat.empty.copy(
locationUri = Some(tempDir.getAbsolutePath),
properties = Map("path" -> tempDir.getAbsolutePath)),
schema = simpleSchema,
properties = Map(
"spark.sql.sources.provider" -> "parquet",
"spark.sql.sources.schema.numParts" -> "1",
"spark.sql.sources.schema.part.0" -> simpleSchemaJson))

lazy val dataSourceTableWithoutSchema = CatalogTable(
identifier = TableIdentifier("tbl9", Some("test_db")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat.empty.copy(
locationUri = Some(defaultTablePath("tbl9") + "-__PLACEHOLDER__"),
properties = Map("path" -> tempDir.getAbsolutePath)),
schema = new StructType(),
properties = Map("spark.sql.sources.provider" -> "json"))

// A list of all raw tables we want to test, with their expected schema.
lazy val rawTablesAndExpectations = Seq(
hiveTable -> simpleSchema,
externalHiveTable -> simpleSchema,
partitionedHiveTable -> partitionedSchema,
dataSourceTable -> simpleSchema,
hiveCompatibleDataSourceTable -> simpleSchema,
partitionedDataSourceTable -> partitionedSchema,
externalDataSourceTable -> simpleSchema,
hiveCompatibleExternalDataSourceTable -> simpleSchema,
dataSourceTableWithoutSchema -> new StructType())

test("make sure we can read table created by old version of Spark") {
for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
val readBack = getTableMetadata(tbl.identifier.table)
assert(readBack.schema == expectedSchema)

if (tbl.tableType == CatalogTableType.EXTERNAL) {
// trim the URI prefix
val tableLocation = new URI(readBack.storage.locationUri.get).getPath
assert(tableLocation == tempDir.getAbsolutePath)
}
}
}

test("make sure we can alter table location created by old version of Spark") {
withTempDir { dir =>
for ((tbl, _) <- rawTablesAndExpectations if tbl.tableType == CatalogTableType.EXTERNAL) {
sql(s"ALTER TABLE ${tbl.identifier} SET LOCATION '${dir.getAbsolutePath}'")

val readBack = getTableMetadata(tbl.identifier.table)

// trim the URI prefix
val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath
assert(actualTableLocation == dir.getAbsolutePath)
}
}
}

test("make sure we can rename table created by old version of Spark") {
for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
val newName = tbl.identifier.table + "_renamed"
sql(s"ALTER TABLE ${tbl.identifier} RENAME TO $newName")

val readBack = getTableMetadata(newName)
assert(readBack.schema == expectedSchema)

// trim the URI prefix
val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath
val expectedLocation = if (tbl.tableType == CatalogTableType.EXTERNAL) {
tempDir.getAbsolutePath
} else {
// trim the URI prefix
new URI(defaultTablePath(newName)).getPath
}
assert(actualTableLocation == expectedLocation)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1370,47 +1370,4 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sparkSession.sparkContext.conf.set(DEBUG_MODE, previousValue)
}
}

test("SPARK-17470: support old table that stores table location in storage properties") {
withTable("old") {
withTempPath { path =>
Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
val tableDesc = CatalogTable(
identifier = TableIdentifier("old", Some("default")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat.empty.copy(
properties = Map("path" -> path.getAbsolutePath)
),
schema = new StructType(),
properties = Map(
HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet",
HiveExternalCatalog.DATASOURCE_SCHEMA ->
new StructType().add("i", "int").add("j", "string").json))
hiveClient.createTable(tableDesc, ignoreIfExists = false)
checkAnswer(spark.table("old"), Row(1, "a"))
}
}
}

test("SPARK-18464: support old table which doesn't store schema in table properties") {
withTable("old") {
withTempPath { path =>
Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
val tableDesc = CatalogTable(
identifier = TableIdentifier("old", Some("default")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat.empty.copy(
properties = Map("path" -> path.getAbsolutePath)
),
schema = new StructType(),
properties = Map(
HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet"))
hiveClient.createTable(tableDesc, ignoreIfExists = false)

checkAnswer(spark.table("old"), Row(1, "a"))

checkAnswer(sql("DESC old"), Row("i", "int", null) :: Row("j", "string", null) :: Nil)
}
}
}
}

0 comments on commit fc2c13b

Please sign in to comment.