Skip to content

Commit

Permalink
[SPARK-2739][SQL] Rename registerAsTable to registerTempTable
Browse files Browse the repository at this point in the history
There have been user complaints that the difference between `registerAsTable` and `saveAsTable` is too subtle.  This PR addresses this by renaming `registerAsTable` to `registerTempTable`, which more clearly reflects what is happening.  `registerAsTable` remains, but will cause a deprecation warning.

Author: Michael Armbrust <[email protected]>

Closes apache#1743 from marmbrus/registerTempTable and squashes the following commits:

d031348 [Michael Armbrust] Merge remote-tracking branch 'apache/master' into registerTempTable
4dff086 [Michael Armbrust] Fix .java files too
89a2f12 [Michael Armbrust] Merge remote-tracking branch 'apache/master' into registerTempTable
0b7b71e [Michael Armbrust] Rename registerAsTable to registerTempTable
  • Loading branch information
marmbrus committed Aug 3, 2014
1 parent d210022 commit 1a80437
Show file tree
Hide file tree
Showing 25 changed files with 103 additions and 96 deletions.
2 changes: 1 addition & 1 deletion dev/audit-release/sbt_app_sql/src/main/scala/SqlApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object SparkSqlExample {

import sqlContext._
val people = sc.makeRDD(1 to 100, 10).map(x => Person(s"Name$x", x))
people.registerAsTable("people")
people.registerTempTable("people")
val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
val teenagerNames = teenagers.map(t => "Name: " + t(0)).collect()
teenagerNames.foreach(println)
Expand Down
18 changes: 9 additions & 9 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerAsTable("people")
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
Expand Down Expand Up @@ -210,7 +210,7 @@ JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").m

// Apply a schema to an RDD of JavaBeans and register it as a table.
JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class);
schemaPeople.registerAsTable("people");
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
Expand Down Expand Up @@ -248,7 +248,7 @@ people = parts.map(lambda p: {"name": p[0], "age": int(p[1])})
# In future versions of PySpark we would like to add support for registering RDDs with other
# datatypes as tables
schemaPeople = sqlContext.inferSchema(people)
schemaPeople.registerAsTable("people")
schemaPeople.registerTempTable("people")

# SQL can be run over SchemaRDDs that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
Expand Down Expand Up @@ -292,7 +292,7 @@ people.saveAsParquetFile("people.parquet")
val parquetFile = sqlContext.parquetFile("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile")
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
{% endhighlight %}
Expand All @@ -314,7 +314,7 @@ schemaPeople.saveAsParquetFile("people.parquet");
JavaSchemaRDD parquetFile = sqlContext.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile");
parquetFile.registerTempTable("parquetFile");
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
public String call(Row row) {
Expand All @@ -340,7 +340,7 @@ schemaPeople.saveAsParquetFile("people.parquet")
parquetFile = sqlContext.parquetFile("people.parquet")

# Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile");
parquetFile.registerTempTable("parquetFile");
teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
Expand Down Expand Up @@ -378,7 +378,7 @@ people.printSchema()
// |-- name: StringType

// Register this SchemaRDD as a table.
people.registerAsTable("people")
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
Expand Down Expand Up @@ -416,7 +416,7 @@ people.printSchema();
// |-- name: StringType

// Register this JavaSchemaRDD as a table.
people.registerAsTable("people");
people.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlContext.
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
Expand Down Expand Up @@ -455,7 +455,7 @@ people.printSchema()
# |-- name: StringType

# Register this SchemaRDD as a table.
people.registerAsTable("people")
people.registerTempTable("people")

# SQL statements can be run by using the sql methods provided by sqlContext.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Person call(String line) throws Exception {

// Apply a schema to an RDD of Java Beans and register it as a table.
JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
schemaPeople.registerAsTable("people");
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
Expand All @@ -100,7 +100,7 @@ public String call(Row row) {
JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile");
parquetFile.registerTempTable("parquetFile");
JavaSchemaRDD teenagers2 =
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.map(new Function<Row, String>() {
Expand Down Expand Up @@ -128,7 +128,7 @@ public String call(Row row) {
// |-- name: StringType

// Register this JavaSchemaRDD as a table.
peopleFromJsonFile.registerAsTable("people");
peopleFromJsonFile.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlCtx.
JavaSchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
Expand Down Expand Up @@ -158,7 +158,7 @@ public String call(Row row) {
// | |-- state: StringType
// |-- name: StringType

peopleFromJsonRDD.registerAsTable("people2");
peopleFromJsonRDD.registerTempTable("people2");

JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.map(new Function<Row, String>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object RDDRelation {
val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
// Any RDD containing case classes can be registered as a table. The schema of the table is
// automatically inferred using scala reflection.
rdd.registerAsTable("records")
rdd.registerTempTable("records")

// Once tables have been registered, you can run SQL queries over them.
println("Result of SELECT *:")
Expand Down Expand Up @@ -66,7 +66,7 @@ object RDDRelation {
parquetFile.where('key === 1).select('value as 'a).collect().foreach(println)

// These files can also be registered as tables.
parquetFile.registerAsTable("parquetFile")
parquetFile.registerTempTable("parquetFile")
sql("SELECT * FROM parquetFile").collect().foreach(println)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object HiveFromSpark {

// You can also register RDDs as temporary tables within a HiveContext.
val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
rdd.registerAsTable("records")
rdd.registerTempTable("records")

// Queries can then join RDD data with data stored in Hive.
println("Result of SELECT *:")
Expand Down
12 changes: 8 additions & 4 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ def __init__(self, sparkContext, sqlContext=None):
... b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1),
... time=datetime(2014, 8, 1, 14, 1, 5))])
>>> srdd = sqlCtx.inferSchema(allTypes)
>>> srdd.registerAsTable("allTypes")
>>> srdd.registerTempTable("allTypes")
>>> sqlCtx.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a '
... 'from allTypes where b and i > 0').collect()
[Row(c0=2, c1=2.0, c2=False, c3=2, c4=0...8, 1, 14, 1, 5), a=1)]
Expand Down Expand Up @@ -1486,19 +1486,23 @@ def saveAsParquetFile(self, path):
"""
self._jschema_rdd.saveAsParquetFile(path)

def registerAsTable(self, name):
def registerTempTable(self, name):
"""Registers this RDD as a temporary table using the given name.
The lifetime of this temporary table is tied to the L{SQLContext}
that was used to create this SchemaRDD.
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.registerAsTable("test")
>>> srdd.registerTempTable("test")
>>> srdd2 = sqlCtx.sql("select * from test")
>>> sorted(srdd.collect()) == sorted(srdd2.collect())
True
"""
self._jschema_rdd.registerAsTable(name)
self._jschema_rdd.registerTempTable(name)

def registerAsTable(self, name):
warnings.warn("Use registerTempTable instead of registerAsTable.", DeprecationWarning)
self.registerTempTable(name)

def insertInto(self, tableName, overwrite=False):
"""Inserts the contents of this SchemaRDD into the specified table.
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* // |-- name: string (nullable = false)
* // |-- age: integer (nullable = true)
*
* peopleSchemaRDD.registerAsTable("people")
* peopleSchemaRDD.registerTempTable("people")
* sqlContext.sql("select name from people").collect.foreach(println)
* }}}
*
Expand Down Expand Up @@ -212,7 +212,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* import sqlContext._
*
* case class Person(name: String, age: Int)
* createParquetFile[Person]("path/to/file.parquet").registerAsTable("people")
* createParquetFile[Person]("path/to/file.parquet").registerTempTable("people")
* sql("INSERT INTO people SELECT 'michael', 29")
* }}}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ import org.apache.spark.api.java.JavaRDD
* val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
* // Any RDD containing case classes can be registered as a table. The schema of the table is
* // automatically inferred using scala reflection.
* rdd.registerAsTable("records")
* rdd.registerTempTable("records")
*
* val results: SchemaRDD = sql("SELECT * FROM records")
* }}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,13 @@ private[sql] trait SchemaRDDLike {
*
* @group schema
*/
def registerAsTable(tableName: String): Unit = {
def registerTempTable(tableName: String): Unit = {
sqlContext.registerRDDAsTable(baseSchemaRDD, tableName)
}

@deprecated("Use registerTempTable instead of registerAsTable.", "1.1")
def registerAsTable(tableName: String): Unit = registerTempTable(tableName)

/**
* :: Experimental ::
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
* {{{
* JavaSQLContext sqlCtx = new JavaSQLContext(...)
*
* sqlCtx.createParquetFile(Person.class, "path/to/file.parquet").registerAsTable("people")
* sqlCtx.createParquetFile(Person.class, "path/to/file.parquet").registerTempTable("people")
* sqlCtx.sql("INSERT INTO people SELECT 'michael', 29")
* }}}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public Row call(Person person) throws Exception {
StructType schema = DataType.createStructType(fields);

JavaSchemaRDD schemaRDD = javaSqlCtx.applySchema(rowRDD, schema);
schemaRDD.registerAsTable("people");
schemaRDD.registerTempTable("people");
List<Row> actual = javaSqlCtx.sql("SELECT * FROM people").collect();

List<Row> expected = new ArrayList<Row>(2);
Expand Down Expand Up @@ -149,14 +149,14 @@ public void applySchemaToJSON() {
JavaSchemaRDD schemaRDD1 = javaSqlCtx.jsonRDD(jsonRDD);
StructType actualSchema1 = schemaRDD1.schema();
Assert.assertEquals(expectedSchema, actualSchema1);
schemaRDD1.registerAsTable("jsonTable1");
schemaRDD1.registerTempTable("jsonTable1");
List<Row> actual1 = javaSqlCtx.sql("select * from jsonTable1").collect();
Assert.assertEquals(expectedResult, actual1);

JavaSchemaRDD schemaRDD2 = javaSqlCtx.jsonRDD(jsonRDD, expectedSchema);
StructType actualSchema2 = schemaRDD2.schema();
Assert.assertEquals(expectedSchema, actualSchema2);
schemaRDD1.registerAsTable("jsonTable2");
schemaRDD1.registerTempTable("jsonTable2");
List<Row> actual2 = javaSqlCtx.sql("select * from jsonTable2").collect();
Assert.assertEquals(expectedResult, actual2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class CachedTableSuite extends QueryTest {
}

test("SELECT Star Cached Table") {
TestSQLContext.sql("SELECT * FROM testData").registerAsTable("selectStar")
TestSQLContext.sql("SELECT * FROM testData").registerTempTable("selectStar")
TestSQLContext.cacheTable("selectStar")
TestSQLContext.sql("SELECT * FROM selectStar WHERE key = 1").collect()
TestSQLContext.uncacheTable("selectStar")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class InsertIntoSuite extends QueryTest {
testFilePath.delete()
testFilePath.deleteOnExit()
val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
testFile.registerAsTable("createAndInsertTest")
testFile.registerTempTable("createAndInsertTest")

// Add some data.
testData.insertInto("createAndInsertTest")
Expand Down Expand Up @@ -86,7 +86,7 @@ class InsertIntoSuite extends QueryTest {
testFilePath.delete()
testFilePath.deleteOnExit()
val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
testFile.registerAsTable("createAndInsertSQLTest")
testFile.registerTempTable("createAndInsertSQLTest")

sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData")

Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
}

test("full outer join") {
upperCaseData.where('N <= 4).registerAsTable("left")
upperCaseData.where('N >= 3).registerAsTable("right")
upperCaseData.where('N <= 4).registerTempTable("left")
upperCaseData.where('N >= 3).registerTempTable("right")

val left = UnresolvedRelation(None, "left", None)
val right = UnresolvedRelation(None, "right", None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ class SQLQuerySuite extends QueryTest {
}

val schemaRDD1 = applySchema(rowRDD1, schema1)
schemaRDD1.registerAsTable("applySchema1")
schemaRDD1.registerTempTable("applySchema1")
checkAnswer(
sql("SELECT * FROM applySchema1"),
(1, "A1", true, null) ::
Expand Down Expand Up @@ -491,7 +491,7 @@ class SQLQuerySuite extends QueryTest {
}

val schemaRDD2 = applySchema(rowRDD2, schema2)
schemaRDD2.registerAsTable("applySchema2")
schemaRDD2.registerTempTable("applySchema2")
checkAnswer(
sql("SELECT * FROM applySchema2"),
(Seq(1, true), Map("A1" -> null)) ::
Expand All @@ -516,7 +516,7 @@ class SQLQuerySuite extends QueryTest {
}

val schemaRDD3 = applySchema(rowRDD3, schema2)
schemaRDD3.registerAsTable("applySchema3")
schemaRDD3.registerTempTable("applySchema3")

checkAnswer(
sql("SELECT f1.f11, f2['D4'] FROM applySchema3"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,31 +61,31 @@ class ScalaReflectionRelationSuite extends FunSuite {
val data = ReflectData("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true,
BigDecimal(1), new Timestamp(12345), Seq(1,2,3))
val rdd = sparkContext.parallelize(data :: Nil)
rdd.registerAsTable("reflectData")
rdd.registerTempTable("reflectData")

assert(sql("SELECT * FROM reflectData").collect().head === data.productIterator.toSeq)
}

test("query case class RDD with nulls") {
val data = NullReflectData(null, null, null, null, null, null, null)
val rdd = sparkContext.parallelize(data :: Nil)
rdd.registerAsTable("reflectNullData")
rdd.registerTempTable("reflectNullData")

assert(sql("SELECT * FROM reflectNullData").collect().head === Seq.fill(7)(null))
}

test("query case class RDD with Nones") {
val data = OptionalReflectData(None, None, None, None, None, None, None)
val rdd = sparkContext.parallelize(data :: Nil)
rdd.registerAsTable("reflectOptionalData")
rdd.registerTempTable("reflectOptionalData")

assert(sql("SELECT * FROM reflectOptionalData").collect().head === Seq.fill(7)(null))
}

// Equality is broken for Arrays, so we test that separately.
test("query binary data") {
val rdd = sparkContext.parallelize(ReflectBinary(Array[Byte](1)) :: Nil)
rdd.registerAsTable("reflectBinary")
rdd.registerTempTable("reflectBinary")

val result = sql("SELECT data FROM reflectBinary").collect().head(0).asInstanceOf[Array[Byte]]
assert(result.toSeq === Seq[Byte](1))
Expand Down
Loading

0 comments on commit 1a80437

Please sign in to comment.