diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index efdf873c34556..d93f30b1e5cc3 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -12,130 +12,129 @@ title: Spark SQL and DataFrames Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to -interact with Spark SQL including SQL, the DataFrames API and the Datasets API. When computing a result +interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the -computation. This unification means that developers can easily switch back and forth between the -various APIs based on which provides the most natural way to express a given transformation. +computation. This unification means that developers can easily switch back and forth between +different APIs based on which provides the most natural way to express a given transformation. All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`, `pyspark` shell, or `sparkR` shell. ## SQL -One use of Spark SQL is to execute SQL queries written using either a basic SQL syntax or HiveQL. +One use of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation. For more on how to configure this feature, please refer to the [Hive Tables](#hive-tables) section. When running -SQL from within another programming language the results will be returned as a [DataFrame](#DataFrames). +SQL from within another programming language the results will be returned as a [DataFrame](#datasets-and-dataframes). You can also interact with the SQL interface using the [command-line](#running-the-spark-sql-cli) or over [JDBC/ODBC](#running-the-thrift-jdbcodbc-server). -## DataFrames +## Datasets and DataFrames -A DataFrame is a distributed collection of data organized into named columns. It is conceptually -equivalent to a table in a relational database or a data frame in R/Python, but with richer -optimizations under the hood. DataFrames can be constructed from a wide array of [sources](#data-sources) such -as: structured data files, tables in Hive, external databases, or existing RDDs. +A Dataset is a new interface added in Spark 1.6 that tries to provide the benefits of RDDs (strong +typing, ability to use powerful lambda functions) with the benefits of Spark SQL's optimized +execution engine. A Dataset can be [constructed](#creating-datasets) from JVM objects and then +manipulated using functional transformations (`map`, `flatMap`, `filter`, etc.). -The DataFrame API is available in [Scala](api/scala/index.html#org.apache.spark.sql.DataFrame), -[Java](api/java/index.html?org/apache/spark/sql/DataFrame.html), -[Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame), and [R](api/R/index.html). +The Dataset API is the successor of the DataFrame API, which was introduced in Spark 1.3. In Spark +2.0, Datasets and DataFrames are unified, and DataFrames are now equivalent to Datasets of `Row`s. +In fact, `DataFrame` is simply a type alias of `Dataset[Row]` in [the Scala API][scala-datasets]. +However, [Java API][java-datasets] users must use `Dataset` instead. -## Datasets +[scala-datasets]: api/scala/index.html#org.apache.spark.sql.Dataset +[java-datasets]: api/java/index.html?org/apache/spark/sql/Dataset.html -A Dataset is a new experimental interface added in Spark 1.6 that tries to provide the benefits of -RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL's -optimized execution engine. A Dataset can be [constructed](#creating-datasets) from JVM objects and then manipulated -using functional transformations (map, flatMap, filter, etc.). +Python does not have support for the Dataset API, but due to its dynamic nature many of the +benefits are already available (i.e. you can access the field of a row by name naturally +`row.columnName`). The case for R is similar. -The unified Dataset API can be used both in [Scala](api/scala/index.html#org.apache.spark.sql.Dataset) and -[Java](api/java/index.html?org/apache/spark/sql/Dataset.html). Python does not yet have support for -the Dataset API, but due to its dynamic nature many of the benefits are already available (i.e. you can -access the field of a row by name naturally `row.columnName`). Full python support will be added -in a future release. +Throughout this document, we will often refer to Scala/Java Datasets of `Row`s as DataFrames. # Getting Started -## Starting Point: SQLContext +## Starting Point: SparkSession
-The entry point into all functionality in Spark SQL is the -[`SQLContext`](api/scala/index.html#org.apache.spark.sql.SQLContext) class, or one of its -descendants. To create a basic `SQLContext`, all you need is a SparkContext. +The entry point into all functionality in Spark is the [`SparkSession`](api/scala/index.html#org.apache.spark.sql.SparkSession) class. To create a basic `SparkSession`, just use `SparkSession.build()`: {% highlight scala %} -val sc: SparkContext // An existing SparkContext. -val sqlContext = new org.apache.spark.sql.SQLContext(sc) +import org.apache.spark.sql.SparkSession + +val spark = SparkSession.build() + .master("local") + .appName("Word Count") + .config("spark.some.config.option", "some-value") + .getOrCreate() // this is used to implicitly convert an RDD to a DataFrame. -import sqlContext.implicits._ +import spark.implicits._ {% endhighlight %}
-The entry point into all functionality in Spark SQL is the -[`SQLContext`](api/java/index.html#org.apache.spark.sql.SQLContext) class, or one of its -descendants. To create a basic `SQLContext`, all you need is a SparkContext. +The entry point into all functionality in Spark is the [`SparkSession`](api/java/index.html#org.apache.spark.sql.SparkSession) class. To create a basic `SparkSession`, just use `SparkSession.build()`: {% highlight java %} -JavaSparkContext sc = ...; // An existing JavaSparkContext. -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); -{% endhighlight %} +import org.apache.spark.sql.SparkSession +SparkSession spark = SparkSession.build() + .master("local") + .appName("Word Count") + .config("spark.some.config.option", "some-value") + .getOrCreate(); +{% endhighlight %}
-The entry point into all relational functionality in Spark is the -[`SQLContext`](api/python/pyspark.sql.html#pyspark.sql.SQLContext) class, or one -of its decedents. To create a basic `SQLContext`, all you need is a SparkContext. +The entry point into all functionality in Spark is the [`SparkSession`](api/python/pyspark.sql.html#pyspark.sql.SparkSession) class. To create a basic `SparkSession`, just use `SparkSession.build`: {% highlight python %} -from pyspark.sql import SQLContext -sqlContext = SQLContext(sc) +from pyspark.sql import SparkSession + +spark = SparkSession.build \ + .master("local") \ + .appName("Word Count") \ + .config("spark.some.config.option", "some-value") \ + .getOrCreate() {% endhighlight %}
-The entry point into all relational functionality in Spark is the -`SQLContext` class, or one of its decedents. To create a basic `SQLContext`, all you need is a SparkContext. +Unlike Scala, Java, and Python API, we haven't finished migrating `SQLContext` to `SparkSession` for SparkR yet, so +the entry point into all relational functionality in SparkR is still the +`SQLContext` class in Spark 2.0. To create a basic `SQLContext`, all you need is a `SparkContext`. {% highlight r %} -sqlContext <- sparkRSQL.init(sc) +spark <- sparkRSQL.init(sc) {% endhighlight %} +Note that when invoked for the first time, `sparkRSQL.init()` initializes a global `SQLContext` singleton instance, and always returns a reference to this instance for successive invocations. In this way, users only need to initialize the `SQLContext` once, then SparkR functions like `read.df` will be able to access this global instance implicitly, and users don't need to pass the `SQLContext` instance around.
-In addition to the basic `SQLContext`, you can also create a `HiveContext`, which provides a -superset of the functionality provided by the basic `SQLContext`. Additional features include -the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the -ability to read data from Hive tables. To use a `HiveContext`, you do not need to have an -existing Hive setup, and all of the data sources available to a `SQLContext` are still available. -`HiveContext` is only packaged separately to avoid including all of Hive's dependencies in the default -Spark build. If these dependencies are not a problem for your application then using `HiveContext` -is recommended for the 1.3 release of Spark. Future releases will focus on bringing `SQLContext` up -to feature parity with a `HiveContext`. - +`SparkSession` (or `SQLContext` for SparkR) in Spark 2.0 provides builtin support for Hive features including the ability to +write queries using HiveQL, access to Hive UDFs, and the ability to read data from Hive tables. +To use these features, you do not need to have an existing Hive setup. ## Creating DataFrames -With a `SQLContext`, applications can create `DataFrame`s from an existing `RDD`, from a Hive table, or from data sources. - -As an example, the following creates a `DataFrame` based on the content of a JSON file: -
-{% highlight scala %} -val sc: SparkContext // An existing SparkContext. -val sqlContext = new org.apache.spark.sql.SQLContext(sc) +With a `SparkSession`, applications can create DataFrames from an [existing `RDD`](#interoperating-with-rdds), +from a Hive table, or from [Spark data sources](#data-sources). + +As an example, the following creates a DataFrame based on the content of a JSON file: -val df = sqlContext.read.json("examples/src/main/resources/people.json") +{% highlight scala %} +val spark: SparkSession // An existing SparkSession. +val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() @@ -144,11 +143,14 @@ df.show()
-{% highlight java %} -JavaSparkContext sc = ...; // An existing JavaSparkContext. -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); +With a `SparkSession`, applications can create DataFrames from an [existing `RDD`](#interoperating-with-rdds), +from a Hive table, or from [Spark data sources](#data-sources). + +As an example, the following creates a DataFrame based on the content of a JSON file: -DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); +{% highlight java %} +SparkSession spark = ...; // An existing SparkSession. +Dataset df = spark.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show(); @@ -157,11 +159,14 @@ df.show();
-{% highlight python %} -from pyspark.sql import SQLContext -sqlContext = SQLContext(sc) +With a `SparkSession`, applications can create DataFrames from an [existing `RDD`](#interoperating-with-rdds), +from a Hive table, or from [Spark data sources](#data-sources). -df = sqlContext.read.json("examples/src/main/resources/people.json") +As an example, the following creates a DataFrame based on the content of a JSON file: + +{% highlight python %} +# spark is an existing SparkSession +df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() @@ -170,34 +175,37 @@ df.show()
-{% highlight r %} -sqlContext <- SQLContext(sc) +With a `SQLContext`, applications can create DataFrames from an [existing `RDD`](#interoperating-with-rdds), +from a Hive table, or from [Spark data sources](#data-sources). + +As an example, the following creates a DataFrame based on the content of a JSON file: -df <- read.json(sqlContext, "examples/src/main/resources/people.json") +{% highlight r %} +df <- read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout showDF(df) {% endhighlight %}
-
-## DataFrame Operations +## Untyped Dataset Operations (aka DataFrame Operations) + +DataFrames provide a domain-specific language for structured data manipulation in [Scala](api/scala/index.html#org.apache.spark.sql.Dataset), [Java](api/java/index.html?org/apache/spark/sql/Dataset.html), [Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame) and [R](api/R/DataFrame.html). -DataFrames provide a domain-specific language for structured data manipulation in [Scala](api/scala/index.html#org.apache.spark.sql.DataFrame), [Java](api/java/index.html?org/apache/spark/sql/DataFrame.html), [Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame) and [R](api/R/DataFrame.html). +As mentioned above, in Spark 2.0, DataFrames are just Dataset of `Row`s in Scala and Java API. These operations are also referred as "untyped transformations" in contrast to "typed transformations" come with strongly typed Scala/Java Datasets. -Here we include some basic examples of structured data processing using DataFrames: +Here we include some basic examples of structured data processing using Datasets:
{% highlight scala %} -val sc: SparkContext // An existing SparkContext. -val sqlContext = new org.apache.spark.sql.SQLContext(sc) +val spark: SparkSession // An existing SparkSession // Create the DataFrame -val df = sqlContext.read.json("examples/src/main/resources/people.json") +val df = spark.read.json("examples/src/main/resources/people.json") // Show the content of the DataFrame df.show() @@ -239,20 +247,19 @@ df.groupBy("age").count().show() // 30 1 {% endhighlight %} -For a complete list of the types of operations that can be performed on a DataFrame refer to the [API Documentation](api/scala/index.html#org.apache.spark.sql.DataFrame). +For a complete list of the types of operations that can be performed on a Dataset refer to the [API Documentation](api/scala/index.html#org.apache.spark.sql.Dataset). -In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the [DataFrame Function Reference](api/scala/index.html#org.apache.spark.sql.functions$). +In addition to simple column references and expressions, Datasets also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the [DataFrame Function Reference](api/scala/index.html#org.apache.spark.sql.functions$).
{% highlight java %} -JavaSparkContext sc // An existing SparkContext. -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc) +SparkSession spark = ...; // An existing SparkSession // Create the DataFrame -DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); +Dataset df = spark.read().json("examples/src/main/resources/people.json"); // Show the content of the DataFrame df.show(); @@ -294,9 +301,9 @@ df.groupBy("age").count().show(); // 30 1 {% endhighlight %} -For a complete list of the types of operations that can be performed on a DataFrame refer to the [API Documentation](api/java/org/apache/spark/sql/DataFrame.html). +For a complete list of the types of operations that can be performed on a Dataset refer to the [API Documentation](api/java/org/apache/spark/sql/Dataset.html). -In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the [DataFrame Function Reference](api/java/org/apache/spark/sql/functions.html). +In addition to simple column references and expressions, Datasets also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the [DataFrame Function Reference](api/java/org/apache/spark/sql/functions.html).
@@ -308,11 +315,10 @@ latter form, which is future proof and won't break with column names that are also attributes on the DataFrame class. {% highlight python %} -from pyspark.sql import SQLContext -sqlContext = SQLContext(sc) +# spark is an existing SparkSession # Create the DataFrame -df = sqlContext.read.json("examples/src/main/resources/people.json") +df = spark.read.json("examples/src/main/resources/people.json") # Show the content of the DataFrame df.show() @@ -363,10 +369,8 @@ In addition to simple column references and expressions, DataFrames also have a
{% highlight r %} -sqlContext <- sparkRSQL.init(sc) - # Create the DataFrame -df <- read.json(sqlContext, "examples/src/main/resources/people.json") +df <- read.json("examples/src/main/resources/people.json") # Show the content of the DataFrame showDF(df) @@ -419,35 +423,39 @@ In addition to simple column references and expressions, DataFrames also have a ## Running SQL Queries Programmatically -The `sql` function on a `SQLContext` enables applications to run SQL queries programmatically and returns the result as a `DataFrame`. -
+The `sql` function on a `SparkSession` enables applications to run SQL queries programmatically and returns the result as a `DataFrame`. + {% highlight scala %} -val sqlContext = ... // An existing SQLContext -val df = sqlContext.sql("SELECT * FROM table") +val spark = ... // An existing SparkSession +val df = spark.sql("SELECT * FROM table") {% endhighlight %}
+The `sql` function on a `SparkSession` enables applications to run SQL queries programmatically and returns the result as a `Dataset`. + {% highlight java %} -SQLContext sqlContext = ... // An existing SQLContext -DataFrame df = sqlContext.sql("SELECT * FROM table") +SparkSession spark = ... // An existing SparkSession +Dataset df = spark.sql("SELECT * FROM table") {% endhighlight %}
+The `sql` function on a `SparkSession` enables applications to run SQL queries programmatically and returns the result as a `DataFrame`. + {% highlight python %} -from pyspark.sql import SQLContext -sqlContext = SQLContext(sc) -df = sqlContext.sql("SELECT * FROM table") +# spark is an existing SparkSession +df = spark.sql("SELECT * FROM table") {% endhighlight %}
+The `sql` function enables applications to run SQL queries programmatically and returns the result as a `DataFrame`. + {% highlight r %} -sqlContext <- sparkRSQL.init(sc) -df <- sql(sqlContext, "SELECT * FROM table") +df <- sql("SELECT * FROM table") {% endhighlight %}
@@ -456,7 +464,7 @@ df <- sql(sqlContext, "SELECT * FROM table") ## Creating Datasets -Datasets are similar to RDDs, however, instead of using Java Serialization or Kryo they use +Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use a specialized [Encoder](api/scala/index.html#org.apache.spark.sql.Encoder) to serialize the objects for processing or transmitting over the network. While both encoders and standard serialization are responsible for turning an object into bytes, encoders are code generated dynamically and use a format @@ -467,7 +475,7 @@ the bytes back into an object.
{% highlight scala %} -// Encoders for most common types are automatically provided by importing sqlContext.implicits._ +// Encoders for most common types are automatically provided by importing spark.implicits._ val ds = Seq(1, 2, 3).toDS() ds.map(_ + 1).collect() // Returns: Array(2, 3, 4) @@ -477,7 +485,7 @@ val ds = Seq(Person("Andy", 32)).toDS() // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name. val path = "examples/src/main/resources/people.json" -val people = sqlContext.read.json(path).as[Person] +val people = spark.read.json(path).as[Person] {% endhighlight %} @@ -486,8 +494,30 @@ val people = sqlContext.read.json(path).as[Person]
{% highlight java %} -JavaSparkContext sc = ...; // An existing JavaSparkContext. -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); +SparkSession spark = ... // An existing SparkSession + +// Encoders for most common types are provided in class Encoders. +Dataset ds = spark.createDataset(Arrays.asList(1, 2, 3), Encoders.INT()); +ds.map(new MapFunction() { + @Override + public Integer call(Integer value) throws Exception { + return value + 1; + } +}, Encoders.INT()); // Returns: [2, 3, 4] + +Person person = new Person(); +person.setName("Andy"); +person.setAge(32); + +// Encoders are also created for Java beans. +Dataset ds = spark.createDataset( + Collections.singletonList(person), + Encoders.bean(Person.class) +); + +// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name. +String path = "examples/src/main/resources/people.json"; +Dataset people = spark.read().json(path).as(Encoders.bean(Person.class)); {% endhighlight %}
@@ -495,14 +525,14 @@ SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); ## Interoperating with RDDs -Spark SQL supports two different methods for converting existing RDDs into DataFrames. The first +Spark SQL supports two different methods for converting existing RDDs into Datasets. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application. -The second method for creating DataFrames is through a programmatic interface that allows you to +The second method for creating Datasets is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows -you to construct DataFrames when the columns and their types are not known until runtime. +you to construct Datasets when the columns and their types are not known until runtime. ### Inferring the Schema Using Reflection
@@ -513,28 +543,30 @@ The Scala interface for Spark SQL supports automatically converting an RDD conta to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex -types such as Sequences or Arrays. This RDD can be implicitly converted to a DataFrame and then be +types such as `Seq`s or `Array`s. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements. {% highlight scala %} -// sc is an existing SparkContext. -val sqlContext = new org.apache.spark.sql.SQLContext(sc) +val spark: SparkSession // An existing SparkSession // this is used to implicitly convert an RDD to a DataFrame. -import sqlContext.implicits._ +import spark.implicits._ // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) -// Create an RDD of Person objects and register it as a table. -val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() +// Create an RDD of Person objects and register it as a temporary view. +val people = sc + .textFile("examples/src/main/resources/people.txt") + .map(_.split(",")) + .map(p => Person(p(0), p(1).trim.toInt)) + .toDF() people.createOrReplaceTempView("people") -// SQL statements can be run by using the sql methods provided by sqlContext. -val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") +// SQL statements can be run by using the sql methods provided by spark. +val teenagers = spark.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") -// The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index: teenagers.map(t => "Name: " + t(0)).collect().foreach(println) @@ -550,11 +582,12 @@ teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(printl
-Spark SQL supports automatically converting an RDD of [JavaBeans](http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly) -into a DataFrame. The BeanInfo, obtained using reflection, defines the schema of the table. -Currently, Spark SQL does not support JavaBeans that contain -nested or contain complex types such as Lists or Arrays. You can create a JavaBean by creating a -class that implements Serializable and has getters and setters for all of its fields. +Spark SQL supports automatically converting an RDD of +[JavaBeans](http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly) into a DataFrame. +The `BeanInfo`, obtained using reflection, defines the schema of the table. Currently, Spark SQL +does not support JavaBeans that contain `Map` field(s). Nested JavaBeans and `List` or `Array` +fields are supported though. You can create a JavaBean by creating a class that implements +Serializable and has getters and setters for all of its fields. {% highlight java %} @@ -586,11 +619,10 @@ A schema can be applied to an existing RDD by calling `createDataFrame` and prov for the JavaBean. {% highlight java %} -// sc is an existing JavaSparkContext. -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); +SparkSession spark = ...; // An existing SparkSession // Load a text file and convert each line to a JavaBean. -JavaRDD people = sc.textFile("examples/src/main/resources/people.txt").map( +JavaRDD people = spark.sparkContext.textFile("examples/src/main/resources/people.txt").map( new Function() { public Person call(String line) throws Exception { String[] parts = line.split(","); @@ -604,19 +636,18 @@ JavaRDD people = sc.textFile("examples/src/main/resources/people.txt").m }); // Apply a schema to an RDD of JavaBeans and register it as a table. -DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class); +Dataset schemaPeople = spark.createDataFrame(people, Person.class); schemaPeople.createOrReplaceTempView("people"); // SQL can be run over RDDs that have been registered as tables. -DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +Dataset teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") -// The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. -List teenagerNames = teenagers.javaRDD().map(new Function() { +List teenagerNames = teenagers.map(new MapFunction() { public String call(Row row) { return "Name: " + row.getString(0); } -}).collect(); +}).collectAsList(); {% endhighlight %} @@ -626,15 +657,12 @@ List teenagerNames = teenagers.javaRDD().map(new Function() Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, -and the types are inferred by looking at the first row. Since we currently only look at the first -row, it is important that there is no missing data in the first row of the RDD. In future versions we -plan to more completely infer the schema by looking at more data, similar to the inference that is -performed on JSON files. +and the types are inferred by sampling the whole datase, similar to the inference that is performed on JSON files. {% highlight python %} -# sc is an existing SparkContext. -from pyspark.sql import SQLContext, Row -sqlContext = SQLContext(sc) +# spark is an existing SparkSession. +from pyspark.sql import Row +sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") @@ -642,11 +670,11 @@ parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # Infer the schema, and register the DataFrame as a table. -schemaPeople = sqlContext.createDataFrame(people) +schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. -teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are RDDs and support all the normal RDD operations. teenNames = teenagers.map(lambda p: "Name: " + p.name) @@ -673,12 +701,11 @@ a `DataFrame` can be created programmatically with three steps. 2. Create the schema represented by a `StructType` matching the structure of `Row`s in the RDD created in Step 1. 3. Apply the schema to the RDD of `Row`s via `createDataFrame` method provided -by `SQLContext`. +by `SparkSession`. For example: {% highlight scala %} -// sc is an existing SparkContext. -val sqlContext = new org.apache.spark.sql.SQLContext(sc) +val spark: SparkSession // An existing SparkSession // Create an RDD val people = sc.textFile("examples/src/main/resources/people.txt") @@ -690,26 +717,25 @@ val schemaString = "name age" import org.apache.spark.sql.Row; // Import Spark SQL data types -import org.apache.spark.sql.types.{StructType,StructField,StringType}; +import org.apache.spark.sql.types.{StructType, StructField, StringType}; // Generate the schema based on the string of schema -val schema = - StructType( - schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) +val schema = StructType(schemaString.split(" ").map { fieldName => + StructField(fieldName, StringType, true) +}) // Convert records of the RDD (people) to Rows. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // Apply the schema to the RDD. -val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) +val peopleDataFrame = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame. peopleDataFrame.createOrReplaceTempView("people") -// SQL statements can be run by using the sql methods provided by sqlContext. -val results = sqlContext.sql("SELECT name FROM people") +// SQL statements can be run by using the sql methods provided by spark. +val results = spark.sql("SELECT name FROM people") -// The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index or by field name. results.map(t => "Name: " + t(0)).collect().foreach(println) {% endhighlight %} @@ -722,13 +748,13 @@ results.map(t => "Name: " + t(0)).collect().foreach(println) When JavaBean classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), -a `DataFrame` can be created programmatically with three steps. +a `Dataset` can be created programmatically with three steps. 1. Create an RDD of `Row`s from the original RDD; 2. Create the schema represented by a `StructType` matching the structure of `Row`s in the RDD created in Step 1. 3. Apply the schema to the RDD of `Row`s via `createDataFrame` method provided -by `SQLContext`. +by `SparkSession`. For example: {% highlight java %} @@ -743,8 +769,8 @@ import org.apache.spark.sql.Row; // Import RowFactory. import org.apache.spark.sql.RowFactory; -// sc is an existing JavaSparkContext. -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); +SparkSession spark = ...; // An existing SparkSession. +JavaSparkContext sc = spark.sparkContext // Load a text file and convert each line to a JavaBean. JavaRDD people = sc.textFile("examples/src/main/resources/people.txt"); @@ -769,13 +795,13 @@ JavaRDD rowRDD = people.map( }); // Apply the schema to the RDD. -DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); +Dataset peopleDataFrame = spark.createDataFrame(rowRDD, schema); // Creates a temporary view using the DataFrame. peopleDataFrame.createOrReplaceTempView("people"); // SQL can be run over a temporary view created using DataFrames. -DataFrame results = sqlContext.sql("SELECT name FROM people"); +Dataset results = spark.sql("SELECT name FROM people"); // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. @@ -799,16 +825,15 @@ a `DataFrame` can be created programmatically with three steps. 1. Create an RDD of tuples or lists from the original RDD; 2. Create the schema represented by a `StructType` matching the structure of tuples or lists in the RDD created in the step 1. -3. Apply the schema to the RDD via `createDataFrame` method provided by `SQLContext`. +3. Apply the schema to the RDD via `createDataFrame` method provided by `SparkSession`. For example: {% highlight python %} -# Import SQLContext and data types -from pyspark.sql import SQLContext +# Import SparkSession and data types from pyspark.sql.types import * -# sc is an existing SparkContext. -sqlContext = SQLContext(sc) +# spark is an existing SparkSession. +sc = spark.sparkContext # Load a text file and convert each line to a tuple. lines = sc.textFile("examples/src/main/resources/people.txt") @@ -822,13 +847,13 @@ fields = [StructField(field_name, StringType(), True) for field_name in schemaSt schema = StructType(fields) # Apply the schema to the RDD. -schemaPeople = sqlContext.createDataFrame(people, schema) +schemaPeople = spark.createDataFrame(people, schema) # Creates a temporary view using the DataFrame schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. -results = sqlContext.sql("SELECT name FROM people") +results = spark.sql("SELECT name FROM people") # The results of SQL queries are RDDs and support all the normal RDD operations. names = results.map(lambda p: "Name: " + p.name) @@ -843,9 +868,9 @@ for name in names.collect(): # Data Sources -Spark SQL supports operating on a variety of data sources through the `DataFrame` interface. -A DataFrame can be operated on as normal RDDs and can also be used to create a temporary view. -Registering a DataFrame as a table allows you to run SQL queries over its data. This section +Spark SQL supports operating on a variety of data sources through the DataFrame interface. +A DataFrame can be operated on using relational transformations and can also be used to create a temporary view. +Registering a DataFrame as a temporary view allows you to run SQL queries over its data. This section describes the general methods for loading and saving data using the Spark Data Sources and then goes into specific options that are available for the built-in data sources. @@ -858,7 +883,7 @@ In the simplest form, the default data source (`parquet` unless otherwise config
{% highlight scala %} -val df = sqlContext.read.load("examples/src/main/resources/users.parquet") +val df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") {% endhighlight %} @@ -868,7 +893,7 @@ df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") {% highlight java %} -DataFrame df = sqlContext.read().load("examples/src/main/resources/users.parquet"); +Dataset df = spark.read().load("examples/src/main/resources/users.parquet"); df.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); {% endhighlight %} @@ -879,7 +904,7 @@ df.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); {% highlight python %} -df = sqlContext.read.load("examples/src/main/resources/users.parquet") +df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") {% endhighlight %} @@ -889,7 +914,7 @@ df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
{% highlight r %} -df <- read.df(sqlContext, "examples/src/main/resources/users.parquet") +df <- read.df("examples/src/main/resources/users.parquet") write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet") {% endhighlight %} @@ -901,14 +926,14 @@ write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet") You can also manually specify the data source that will be used along with any extra options that you would like to pass to the data source. Data sources are specified by their fully qualified name (i.e., `org.apache.spark.sql.parquet`), but for built-in sources you can also use their short -names (`json`, `parquet`, `jdbc`). DataFrames of any type can be converted into other types +names (`json`, `parquet`, `jdbc`). DataFrames loaded from any data source type can be converted into other types using this syntax.
{% highlight scala %} -val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") +val df = spark.read.format("json").load("examples/src/main/resources/people.json") df.select("name", "age").write.format("parquet").save("namesAndAges.parquet") {% endhighlight %} @@ -918,7 +943,7 @@ df.select("name", "age").write.format("parquet").save("namesAndAges.parquet") {% highlight java %} -DataFrame df = sqlContext.read().format("json").load("examples/src/main/resources/people.json"); +Dataset df = spark.read().format("json").load("examples/src/main/resources/people.json"); df.select("name", "age").write().format("parquet").save("namesAndAges.parquet"); {% endhighlight %} @@ -929,7 +954,7 @@ df.select("name", "age").write().format("parquet").save("namesAndAges.parquet"); {% highlight python %} -df = sqlContext.read.load("examples/src/main/resources/people.json", format="json") +df = spark.read.load("examples/src/main/resources/people.json", format="json") df.select("name", "age").write.save("namesAndAges.parquet", format="parquet") {% endhighlight %} @@ -939,7 +964,7 @@ df.select("name", "age").write.save("namesAndAges.parquet", format="parquet") {% highlight r %} -df <- read.df(sqlContext, "examples/src/main/resources/people.json", "json") +df <- read.df("examples/src/main/resources/people.json", "json") write.df(select(df, "name", "age"), "namesAndAges.parquet", "parquet") {% endhighlight %} @@ -956,7 +981,7 @@ file directly with SQL.
{% highlight scala %} -val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") +val df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") {% endhighlight %}
@@ -964,14 +989,14 @@ val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/user
{% highlight java %} -DataFrame df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`"); +Dataset df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`"); {% endhighlight %}
{% highlight python %} -df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") +df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") {% endhighlight %}
@@ -979,7 +1004,7 @@ df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.pa
{% highlight r %} -df <- sql(sqlContext, "SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") +df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") {% endhighlight %}
@@ -989,7 +1014,7 @@ df <- sql(sqlContext, "SELECT * FROM parquet.`examples/src/main/resources/users. Save operations can optionally take a `SaveMode`, that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not -atomic. Additionally, when performing a `Overwrite`, the data will be deleted before writing out the +atomic. Additionally, when performing an `Overwrite`, the data will be deleted before writing out the new data. @@ -1032,12 +1057,13 @@ new data. ### Saving to Persistent Tables -When working with a `HiveContext`, `DataFrames` can also be saved as persistent tables using the -`saveAsTable` command. Unlike the `registerTempTable` command, `saveAsTable` will materialize the -contents of the dataframe and create a pointer to the data in the HiveMetastore. Persistent tables -will still exist even after your Spark program has restarted, as long as you maintain your connection -to the same metastore. A DataFrame for a persistent table can be created by calling the `table` -method on a `SQLContext` with the name of the table. +`DataFrames` can also be saved as persistent tables into Hive metastore using the `saveAsTable` +command. Notice existing Hive deployment is not necessary to use this feature. Spark will create a +default local Hive metastore (using Derby) for you. Unlike the `createOrReplaceTempView` command, +`saveAsTable` will materialize the contents of the DataFrame and create a pointer to the data in the +Hive metastore. Persistent tables will still exist even after your Spark program has restarted, as +long as you maintain your connection to the same metastore. A DataFrame for a persistent table can +be created by calling the `table` method on a `SparkSession` with the name of the table. By default `saveAsTable` will create a "managed table", meaning that the location of the data will be controlled by the metastore. Managed tables will also have their data deleted automatically @@ -1047,7 +1073,7 @@ when a table is dropped. [Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema -of the original data. When writing Parquet files, all columns are automatically converted to be nullable for +of the original data. When writing Parquet files, all columns are automatically converted to be nullable for compatibility reasons. ### Loading Data Programmatically @@ -1059,9 +1085,9 @@ Using the data from the above example:
{% highlight scala %} -// sqlContext from the previous example is used in this example. +// spark from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. -import sqlContext.implicits._ +import spark.implicits._ val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. @@ -1070,11 +1096,11 @@ people.write.parquet("people.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. -val parquetFile = sqlContext.read.parquet("people.parquet") +val parquetFile = spark.read.parquet("people.parquet") // Parquet files can also be used to create a temporary view and then used in SQL statements. parquetFile.createOrReplaceTempView("parquetFile") -val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") +val teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println) {% endhighlight %} @@ -1083,20 +1109,20 @@ teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
{% highlight java %} -// sqlContext from the previous example is used in this example. +// spark from the previous example is used in this example. -DataFrame schemaPeople = ... // The DataFrame from the previous example. +Dataset schemaPeople = ... // The DataFrame from the previous example. // DataFrames can be saved as Parquet files, maintaining the schema information. schemaPeople.write().parquet("people.parquet"); // Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a parquet file is also a DataFrame. -DataFrame parquetFile = sqlContext.read().parquet("people.parquet"); +Dataset parquetFile = spark.read().parquet("people.parquet"); // Parquet files can also be used to create a temporary view and then used in SQL statements. parquetFile.createOrReplaceTempView("parquetFile"); -DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); +Dataset teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); List teenagerNames = teenagers.javaRDD().map(new Function() { public String call(Row row) { return "Name: " + row.getString(0); @@ -1109,7 +1135,7 @@ List teenagerNames = teenagers.javaRDD().map(new Function()
{% highlight python %} -# sqlContext from the previous example is used in this example. +# spark from the previous example is used in this example. schemaPeople # The DataFrame from the previous example. @@ -1118,11 +1144,11 @@ schemaPeople.write.parquet("people.parquet") # Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a DataFrame. -parquetFile = sqlContext.read.parquet("people.parquet") +parquetFile = spark.read.parquet("people.parquet") # Parquet files can also be used to create a temporary view and then used in SQL statements. parquetFile.createOrReplaceTempView("parquetFile"); -teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") +teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenNames = teenagers.map(lambda p: "Name: " + p.name) for teenName in teenNames.collect(): print(teenName) @@ -1133,7 +1159,7 @@ for teenName in teenNames.collect():
{% highlight r %} -# sqlContext from the previous example is used in this example. +# spark from the previous example is used in this example. schemaPeople # The DataFrame from the previous example. @@ -1142,11 +1168,11 @@ write.parquet(schemaPeople, "people.parquet") # Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a DataFrame. -parquetFile <- read.parquet(sqlContext, "people.parquet") +parquetFile <- read.parquet("people.parquet") # Parquet files can also be used to create a temporary view and then used in SQL statements. -registerTempTable(parquetFile, "parquetFile") -teenagers <- sql(sqlContext, "SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") +createOrReplaceTempView(parquetFile, "parquetFile") +teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") schema <- structType(structField("name", "string")) teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema) for (teenName in collect(teenNames)$name) { @@ -1160,7 +1186,7 @@ for (teenName in collect(teenNames)$name) { {% highlight sql %} -CREATE TEMPORARY TABLE parquetTable +CREATE TEMPORARY VIEW parquetTable USING org.apache.spark.sql.parquet OPTIONS ( path "examples/src/main/resources/people.parquet" @@ -1207,7 +1233,7 @@ path {% endhighlight %} -By passing `path/to/table` to either `SQLContext.read.parquet` or `SQLContext.read.load`, Spark SQL +By passing `path/to/table` to either `SparkSession.read.parquet` or `SparkSession.read.load`, Spark SQL will automatically extract the partitioning information from the paths. Now the schema of the returned DataFrame becomes: @@ -1228,8 +1254,8 @@ can be configured by `spark.sql.sources.partitionColumnTypeInference.enabled`, w `true`. When type inference is disabled, string type will be used for the partitioning columns. Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths -by default. For the above example, if users pass `path/to/table/gender=male` to either -`SQLContext.read.parquet` or `SQLContext.read.load`, `gender` will not be considered as a +by default. For the above example, if users pass `path/to/table/gender=male` to either +`SparkSession.read.parquet` or `SparkSession.read.load`, `gender` will not be considered as a partitioning column. If users need to specify the base path that partition discovery should start with, they can set `basePath` in the data source options. For example, when `path/to/table/gender=male` is the path of the data and @@ -1254,9 +1280,9 @@ turned it off by default starting from 1.5.0. You may enable it by
{% highlight scala %} -// sqlContext from the previous example is used in this example. +// spark from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. -import sqlContext.implicits._ +import spark.implicits._ // Create a simple DataFrame, stored into a partition directory val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") @@ -1268,7 +1294,7 @@ val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("data/test_table/key=2") // Read the partitioned table -val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") +val df3 = spark.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() // The final schema consists of all 3 columns in the Parquet files together @@ -1285,21 +1311,21 @@ df3.printSchema()
{% highlight python %} -# sqlContext from the previous example is used in this example. +# spark from the previous example is used in this example. # Create a simple DataFrame, stored into a partition directory -df1 = sqlContext.createDataFrame(sc.parallelize(range(1, 6))\ +df1 = spark.createDataFrame(sc.parallelize(range(1, 6))\ .map(lambda i: Row(single=i, double=i * 2))) df1.write.parquet("data/test_table/key=1") # Create another DataFrame in a new partition directory, # adding a new column and dropping an existing column -df2 = sqlContext.createDataFrame(sc.parallelize(range(6, 11)) +df2 = spark.createDataFrame(sc.parallelize(range(6, 11)) .map(lambda i: Row(single=i, triple=i * 3))) df2.write.parquet("data/test_table/key=2") # Read the partitioned table -df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") +df3 = spark.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() # The final schema consists of all 3 columns in the Parquet files together @@ -1316,7 +1342,7 @@ df3.printSchema()
{% highlight r %} -# sqlContext from the previous example is used in this example. +# spark from the previous example is used in this example. # Create a simple DataFrame, stored into a partition directory write.df(df1, "data/test_table/key=1", "parquet", "overwrite") @@ -1326,7 +1352,7 @@ write.df(df1, "data/test_table/key=1", "parquet", "overwrite") write.df(df2, "data/test_table/key=2", "parquet", "overwrite") # Read the partitioned table -df3 <- read.df(sqlContext, "data/test_table", "parquet", mergeSchema="true") +df3 <- read.df("data/test_table", "parquet", mergeSchema="true") printSchema(df3) # The final schema consists of all 3 columns in the Parquet files together @@ -1381,8 +1407,8 @@ metadata.
{% highlight scala %} -// sqlContext is an existing HiveContext -sqlContext.refreshTable("my_table") +// spark is an existing HiveContext +spark.refreshTable("my_table") {% endhighlight %}
@@ -1390,8 +1416,8 @@ sqlContext.refreshTable("my_table")
{% highlight java %} -// sqlContext is an existing HiveContext -sqlContext.refreshTable("my_table") +// spark is an existing HiveContext +spark.refreshTable("my_table") {% endhighlight %}
@@ -1399,8 +1425,8 @@ sqlContext.refreshTable("my_table")
{% highlight python %} -# sqlContext is an existing HiveContext -sqlContext.refreshTable("my_table") +# spark is an existing HiveContext +spark.refreshTable("my_table") {% endhighlight %}
@@ -1417,7 +1443,7 @@ REFRESH TABLE my_table; ### Configuration -Configuration of Parquet can be done using the `setConf` method on `SQLContext` or by running +Configuration of Parquet can be done using the `setConf` method on `SparkSession` or by running `SET key=value` commands using SQL.
@@ -1469,7 +1495,7 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext` - + - + - +
spark.sql.parquet.mergeSchemafalsefalse

When true, the Parquet data source merges schemas collected from all data files, otherwise the @@ -1483,8 +1509,8 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`

-Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. -This conversion can be done using `SQLContext.read.json()` on either an RDD of String, +Spark SQL can automatically infer the schema of a JSON dataset and load it as a `Dataset[Row]`. +This conversion can be done using `SparkSession.read.json()` on either an RDD of String, or a JSON file. Note that the file that is offered as _a json file_ is not a typical JSON file. Each @@ -1492,13 +1518,12 @@ line must contain a separate, self-contained valid JSON object. As a consequence a regular multi-line JSON file will most often fail. {% highlight scala %} -// sc is an existing SparkContext. -val sqlContext = new org.apache.spark.sql.SQLContext(sc) +val spark: SparkSession // An existing SparkSession // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. val path = "examples/src/main/resources/people.json" -val people = sqlContext.read.json(path) +val people = spark.read.json(path) // The inferred schema can be visualized using the printSchema() method. people.printSchema() @@ -1509,21 +1534,21 @@ people.printSchema() // Creates a temporary view using the DataFrame people.createOrReplaceTempView("people") -// SQL statements can be run by using the sql methods provided by sqlContext. -val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +// SQL statements can be run by using the sql methods provided by spark. +val teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) -val anotherPeople = sqlContext.read.json(anotherPeopleRDD) +val anotherPeople = spark.read.json(anotherPeopleRDD) {% endhighlight %}
-Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. -This conversion can be done using `SQLContext.read().json()` on either an RDD of String, +Spark SQL can automatically infer the schema of a JSON dataset and load it as a `Dataset`. +This conversion can be done using `SparkSession.read().json()` on either an RDD of String, or a JSON file. Note that the file that is offered as _a json file_ is not a typical JSON file. Each @@ -1532,11 +1557,11 @@ a regular multi-line JSON file will most often fail. {% highlight java %} // sc is an existing JavaSparkContext. -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); +SparkSession spark = new org.apache.spark.sql.SparkSession(sc); // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. -DataFrame people = sqlContext.read().json("examples/src/main/resources/people.json"); +Dataset people = spark.read().json("examples/src/main/resources/people.json"); // The inferred schema can be visualized using the printSchema() method. people.printSchema(); @@ -1547,34 +1572,32 @@ people.printSchema(); // Creates a temporary view using the DataFrame people.createOrReplaceTempView("people"); -// SQL statements can be run by using the sql methods provided by sqlContext. -DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); +// SQL statements can be run by using the sql methods provided by spark. +Dataset teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. List jsonData = Arrays.asList( "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); JavaRDD anotherPeopleRDD = sc.parallelize(jsonData); -DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD); +Dataset anotherPeople = spark.read().json(anotherPeopleRDD); {% endhighlight %}
Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. -This conversion can be done using `SQLContext.read.json` on a JSON file. +This conversion can be done using `SparkSession.read.json` on a JSON file. Note that the file that is offered as _a json file_ is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail. {% highlight python %} -# sc is an existing SparkContext. -from pyspark.sql import SQLContext -sqlContext = SQLContext(sc) +# spark is an existing SparkSession. # A JSON dataset is pointed to by path. # The path can be either a single text file or a directory storing text files. -people = sqlContext.read.json("examples/src/main/resources/people.json") +people = spark.read.json("examples/src/main/resources/people.json") # The inferred schema can be visualized using the printSchema() method. people.printSchema() @@ -1585,14 +1608,14 @@ people.printSchema() # Creates a temporary view using the DataFrame. people.createOrReplaceTempView("people") -# SQL statements can be run by using the sql methods provided by `sqlContext`. -teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +# SQL statements can be run by using the sql methods provided by `spark`. +teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # Alternatively, a DataFrame can be created for a JSON dataset represented by # an RDD[String] storing one JSON object per string. anotherPeopleRDD = sc.parallelize([ '{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']) -anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) +anotherPeople = spark.jsonRDD(anotherPeopleRDD) {% endhighlight %}
@@ -1606,14 +1629,11 @@ line must contain a separate, self-contained valid JSON object. As a consequence a regular multi-line JSON file will most often fail. {% highlight r %} -# sc is an existing SparkContext. -sqlContext <- sparkRSQL.init(sc) - # A JSON dataset is pointed to by path. # The path can be either a single text file or a directory storing text files. path <- "examples/src/main/resources/people.json" # Create a DataFrame from the file(s) pointed to by path -people <- read.json(sqlContext, path) +people <- read.json(path) # The inferred schema can be visualized using the printSchema() method. printSchema(people) @@ -1622,10 +1642,10 @@ printSchema(people) # |-- name: string (nullable = true) # Register this DataFrame as a table. -registerTempTable(people, "people") +createOrReplaceTempView(people, "people") -# SQL statements can be run by using the sql methods provided by `sqlContext`. -teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19") +# SQL statements can be run by using the sql methods provided by `spark`. +teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") {% endhighlight %}
@@ -1633,7 +1653,7 @@ teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= {% highlight sql %} -CREATE TEMPORARY TABLE jsonTable +CREATE TEMPORARY VIEW jsonTable USING org.apache.spark.sql.json OPTIONS ( path "examples/src/main/resources/people.json" @@ -1650,14 +1670,14 @@ SELECT * FROM jsonTable ## Hive Tables Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/). -However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. -Hive support is enabled by adding the `-Phive` and `-Phive-thriftserver` flags to Spark's build. -This command builds a new assembly directory that includes Hive. Note that this Hive assembly directory must also be present -on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries -(SerDes) in order to access data stored in Hive. +However, since Hive has a large number of dependencies, these dependencies are not included in the +default Spark distribution. If Hive dependencies can be found on the classpath, Spark will load them +automatically. Note that these Hive dependencies must also be present on all of the worker nodes, as +they will need access to the Hive serialization and deserialization libraries (SerDes) in order to +access data stored in Hive. Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` (for security configuration), -`hdfs-site.xml` (for HDFS configuration) file in `conf/`. +and `hdfs-site.xml` (for HDFS configuration) file in `conf/`.
@@ -1668,7 +1688,7 @@ connectivity to a persistent Hive metastore, support for Hive serdes, and Hive u Users who do not have an existing Hive deployment can still enable Hive support. When not configured by the `hive-site.xml`, the context automatically creates `metastore_db` in the current directory and creates a directory configured by `spark.sql.warehouse.dir`, which defaults to the directory -`spark-warehouse` in the current directory that the spark application is started. Note that +`spark-warehouse` in the current directory that the spark application is started. Note that the `hive.metastore.warehouse.dir` property in `hive-site.xml` is deprecated since Spark 2.0.0. Instead, use `spark.sql.warehouse.dir` to specify the default location of database in warehouse. You may need to grant write privilege to the user who starts the spark application. @@ -1694,7 +1714,7 @@ connectivity to a persistent Hive metastore, support for Hive serdes, and Hive u Users who do not have an existing Hive deployment can still enable Hive support. When not configured by the `hive-site.xml`, the context automatically creates `metastore_db` in the current directory and creates a directory configured by `spark.sql.warehouse.dir`, which defaults to the directory -`spark-warehouse` in the current directory that the spark application is started. Note that +`spark-warehouse` in the current directory that the spark application is started. Note that the `hive.metastore.warehouse.dir` property in `hive-site.xml` is deprecated since Spark 2.0.0. Instead, use `spark.sql.warehouse.dir` to specify the default location of database in warehouse. You may need to grant write privilege to the user who starts the spark application. @@ -1706,7 +1726,7 @@ spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL. -Row[] results = spark.sql("FROM src SELECT key, value").collect(); +List results = spark.sql("FROM src SELECT key, value").collectAsList(); {% endhighlight %} @@ -1719,14 +1739,13 @@ connectivity to a persistent Hive metastore, support for Hive serdes, and Hive u Users who do not have an existing Hive deployment can still enable Hive support. When not configured by the `hive-site.xml`, the context automatically creates `metastore_db` in the current directory and creates a directory configured by `spark.sql.warehouse.dir`, which defaults to the directory -`spark-warehouse` in the current directory that the spark application is started. Note that +`spark-warehouse` in the current directory that the spark application is started. Note that the `hive.metastore.warehouse.dir` property in `hive-site.xml` is deprecated since Spark 2.0.0. Instead, use `spark.sql.warehouse.dir` to specify the default location of database in warehouse. You may need to grant write privilege to the user who starts the spark application. {% highlight python %} -from pyspark.sql import SparkSession -spark = SparkSession.builder.enableHiveSupport().getOrCreate() +# spark is an existing SparkSession spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") @@ -1740,17 +1759,14 @@ results = spark.sql("FROM src SELECT key, value").collect()
-When working with Hive one must construct a `HiveContext`, which inherits from `SQLContext`, and +When working with Hive one must construct a `HiveContext`, which inherits from `SparkSession`, and adds support for finding tables in the MetaStore and writing queries using HiveQL. {% highlight r %} -# sc is an existing SparkContext. -sqlContext <- sparkRHive.init(sc) - -sql(sqlContext, "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") -sql(sqlContext, "LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") +sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries can be expressed in HiveQL. -results <- collect(sql(sqlContext, "FROM src SELECT key, value")) +results <- collect(sql("FROM src SELECT key, value")) {% endhighlight %} @@ -1870,7 +1886,7 @@ the Data Sources API. The following options are supported: The class name of the JDBC driver to use to connect to this URL.
partitionColumn, lowerBound, upperBound, numPartitions @@ -1882,7 +1898,7 @@ the Data Sources API. The following options are supported: partitioned and returned.
fetchSize @@ -1896,7 +1912,7 @@ the Data Sources API. The following options are supported:
{% highlight scala %} -val jdbcDF = sqlContext.read.format("jdbc").options( +val jdbcDF = spark.read.format("jdbc").options( Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")).load() {% endhighlight %} @@ -1911,7 +1927,7 @@ Map options = new HashMap<>(); options.put("url", "jdbc:postgresql:dbserver"); options.put("dbtable", "schema.tablename"); -DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load(); +Dataset jdbcDF = spark.read().format("jdbc"). options(options).load(); {% endhighlight %} @@ -1921,7 +1937,7 @@ DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load(); {% highlight python %} -df = sqlContext.read.format('jdbc').options(url='jdbc:postgresql:dbserver', dbtable='schema.tablename').load() +df = spark.read.format('jdbc').options(url='jdbc:postgresql:dbserver', dbtable='schema.tablename').load() {% endhighlight %} @@ -1931,7 +1947,7 @@ df = sqlContext.read.format('jdbc').options(url='jdbc:postgresql:dbserver', dbta {% highlight r %} -df <- loadDF(sqlContext, source="jdbc", url="jdbc:postgresql:dbserver", dbtable="schema.tablename") +df <- loadDF(spark, source="jdbc", url="jdbc:postgresql:dbserver", dbtable="schema.tablename") {% endhighlight %} @@ -1941,7 +1957,7 @@ df <- loadDF(sqlContext, source="jdbc", url="jdbc:postgresql:dbserver", dbtable= {% highlight sql %} -CREATE TEMPORARY TABLE jdbcTable +CREATE TEMPORARY VIEW jdbcTable USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:postgresql:dbserver", @@ -1966,11 +1982,11 @@ turning on some experimental options. ## Caching Data In Memory -Spark SQL can cache tables using an in-memory columnar format by calling `sqlContext.cacheTable("tableName")` or `dataFrame.cache()`. +Spark SQL can cache tables using an in-memory columnar format by calling `spark.cacheTable("tableName")` or `dataFrame.cache()`. Then Spark SQL will scan only required columns and will automatically tune compression to minimize -memory usage and GC pressure. You can call `sqlContext.uncacheTable("tableName")` to remove the table from memory. +memory usage and GC pressure. You can call `spark.uncacheTable("tableName")` to remove the table from memory. -Configuration of in-memory caching can be done using the `setConf` method on `SQLContext` or by running +Configuration of in-memory caching can be done using the `setConf` method on `SparkSession` or by running `SET key=value` commands using SQL. @@ -2109,6 +2125,19 @@ options. # Migration Guide +## Upgrading From Spark SQL 1.6 to 2.0 + + - `SparkSession` is now the new entry point of Spark that replaces the old `SQLContext` and + `HiveContext`. Note that the old SQLContext and HiveContext are kept for backward compatibility. + + - Dataset API and DataFrame API are unified. In Scala, `DataFrame` becomes a type alias for + `Dataset[Row]`, while Java API users must replace `DataFrame` with `Dataset`. Both the typed + transformations (e.g. `map`, `filter`, and `groupByKey`) and untyped transformations (e.g. + `select` and `groupBy`) are available on the Dataset class. Since compile-time type-safety in + Python and R is not a language feature, the concept of Dataset does not apply to these languages’ + APIs. Instead, `DataFrame` remains the primary programing abstraction, which is analogous to the + single-node data frame notion in these languages. + ## Upgrading From Spark SQL 1.5 to 1.6 - From Spark 1.6, by default the Thrift server runs in multi-session mode. Which means each JDBC/ODBC @@ -2139,7 +2168,7 @@ options. `spark.sql.parquet.mergeSchema` to `true`. - Resolution of strings to columns in python now supports using dots (`.`) to qualify the column or access nested values. For example `df['table.column.nestedField']`. However, this means that if - your column name contains any dots you must now escape them using backticks (e.g., ``table.`column.with.dots`.nested``). + your column name contains any dots you must now escape them using backticks (e.g., ``table.`column.with.dots`.nested``). - In-memory columnar storage partition pruning is on by default. It can be disabled by setting `spark.sql.inMemoryColumnarStorage.partitionPruning` to `false`. - Unlimited precision decimal columns are no longer supported, instead Spark SQL enforces a maximum