Skip to content

Commit

Permalink
[SPARK-2734][SQL] Remove tables from cache when DROP TABLE is run.
Browse files Browse the repository at this point in the history
Author: Michael Armbrust <[email protected]>

Closes apache#1650 from marmbrus/dropCached and squashes the following commits:

e6ab80b [Michael Armbrust] Support if exists.
83426c6 [Michael Armbrust] Remove tables from cache when DROP TABLE is run.
  • Loading branch information
marmbrus committed Jul 31, 2014
1 parent 2ac37db commit 88a519d
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ private[hive] case class SourceCommand(filePath: String) extends Command

private[hive] case class AddFile(filePath: String) extends Command

private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command

/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
private[hive] object HiveQl {
protected val nativeCommands = Seq(
Expand Down Expand Up @@ -96,7 +98,6 @@ private[hive] object HiveQl {
"TOK_CREATEINDEX",
"TOK_DROPDATABASE",
"TOK_DROPINDEX",
"TOK_DROPTABLE",
"TOK_MSCK",

// TODO(marmbrus): Figure out how view are expanded by hive, as we might need to handle this.
Expand Down Expand Up @@ -377,6 +378,12 @@ private[hive] object HiveQl {
}

protected def nodeToPlan(node: Node): LogicalPlan = node match {
// Special drop table that also uncaches.
case Token("TOK_DROPTABLE",
Token("TOK_TABNAME", tableNameParts) ::
ifExists) =>
val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".")
DropTable(tableName, ifExists.nonEmpty)
// Just fake explain for any of the native commands.
case Token("TOK_EXPLAIN", explainArgs)
if noExplainCommands.contains(explainArgs.head.getText) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ private[hive] trait HiveStrategies {
case logical.NativeCommand(sql) =>
NativeCommand(sql, plan.output)(context) :: Nil

case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil

case describe: logical.DescribeCommand =>
val resolvedTable = context.executePlan(describe.table).analyzed
resolvedTable match {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{Command, LeafNode}
import org.apache.spark.sql.hive.HiveContext

/**
* :: DeveloperApi ::
* Drops a table from the metastore and removes it if it is cached.
*/
@DeveloperApi
case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Command {

def hiveContext = sqlContext.asInstanceOf[HiveContext]

def output = Seq.empty

override protected[sql] lazy val sideEffectResult: Seq[Any] = {
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
hiveContext.catalog.unregisterTable(None, tableName)
Seq.empty
}

override def execute(): RDD[Row] = {
sideEffectResult
sparkContext.emptyRDD[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.apache.spark.sql.hive.execution.HiveComparisonTest
import org.apache.spark.sql.hive.test.TestHive

class CachedTableSuite extends HiveComparisonTest {
import TestHive._

TestHive.loadTestTable("src")

test("cache table") {
Expand All @@ -32,6 +34,20 @@ class CachedTableSuite extends HiveComparisonTest {
createQueryTest("read from cached table",
"SELECT * FROM src LIMIT 1", reset = false)

test("Drop cached table") {
hql("CREATE TABLE test(a INT)")
cacheTable("test")
hql("SELECT * FROM test").collect()
hql("DROP TABLE test")
intercept[org.apache.hadoop.hive.ql.metadata.InvalidTableException] {
hql("SELECT * FROM test").collect()
}
}

test("DROP nonexistant table") {
hql("DROP TABLE IF EXISTS nonexistantTable")
}

test("check that table is cached and uncache") {
TestHive.table("src").queryExecution.analyzed match {
case _ : InMemoryRelation => // Found evidence of caching
Expand Down

0 comments on commit 88a519d

Please sign in to comment.