Skip to content

Commit

Permalink
[SPARK-16037][SQL] Follow-up: add DataFrameWriter.insertInto() test c…
Browse files Browse the repository at this point in the history
…ases for by position resolution

## What changes were proposed in this pull request?

This PR migrates some test cases introduced in apache#12313 as a follow-up of apache#13754 and apache#13766. These test cases cover `DataFrameWriter.insertInto()`, while the former two only cover SQL `INSERT` statements.

Note that the `testPartitionedTable` utility method tests both Hive SerDe tables and data source tables.

## How was this patch tested?

N/A

Author: Cheng Lian <[email protected]>

Closes apache#13810 from liancheng/spark-16037-follow-up-tests.
  • Loading branch information
liancheng authored and yhuai committed Jun 21, 2016
1 parent b76e355 commit f4a3d45
Showing 1 changed file with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -469,4 +469,52 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
)
}
}

testPartitionedTable("insertInto() should match columns by position and ignore column names") {
tableName =>
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
// Columns `df.c` and `df.d` are resolved by position, and thus mapped to partition columns
// `b` and `c` of the target table.
val df = Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d")
df.write.insertInto(tableName)

checkAnswer(
sql(s"SELECT a, b, c, d FROM $tableName"),
Row(1, 3, 4, 2)
)
}
}

testPartitionedTable("insertInto() should match unnamed columns by position") {
tableName =>
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
// Columns `c + 1` and `d + 1` are resolved by position, and thus mapped to partition
// columns `b` and `c` of the target table.
val df = Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d")
df.select('a + 1, 'b + 1, 'c + 1, 'd + 1).write.insertInto(tableName)

checkAnswer(
sql(s"SELECT a, b, c, d FROM $tableName"),
Row(2, 4, 5, 3)
)
}
}

testPartitionedTable("insertInto() should reject missing columns") {
tableName =>
sql("CREATE TABLE t (a INT, b INT)")

intercept[AnalysisException] {
spark.table("t").write.insertInto(tableName)
}
}

testPartitionedTable("insertInto() should reject extra columns") {
tableName =>
sql("CREATE TABLE t (a INT, b INT, c INT, d INT, e INT)")

intercept[AnalysisException] {
spark.table("t").write.insertInto(tableName)
}
}
}

0 comments on commit f4a3d45

Please sign in to comment.