Skip to content

Commit

Permalink
[SPARK-5752][SQL] Don't implicitly convert RDDs directly to DataFrames
Browse files Browse the repository at this point in the history
- The old implicit would convert RDDs directly to DataFrames, and that added too many methods.
- toDataFrame -> toDF
- Dsl -> functions
- implicits moved into SQLContext.implicits
- addColumn -> withColumn
- renameColumn -> withColumnRenamed

Python changes:
- toDataFrame -> toDF
- Dsl -> functions package
- addColumn -> withColumn
- renameColumn -> withColumnRenamed
- add toDF functions to RDD on SQLContext init
- add flatMap to DataFrame

Author: Reynold Xin <[email protected]>
Author: Davies Liu <[email protected]>

Closes #4556 from rxin/SPARK-5752 and squashes the following commits:

5ef9910 [Reynold Xin] More fix
61d3fca [Reynold Xin] Merge branch 'df5' of github.com:davies/spark into SPARK-5752
ff5832c [Reynold Xin] Fix python
749c675 [Reynold Xin] count(*) fixes.
5806df0 [Reynold Xin] Fix build break again.
d941f3d [Reynold Xin] Fixed explode compilation break.
fe1267a [Davies Liu] flatMap
c4afb8e [Reynold Xin] style
d9de47f [Davies Liu] add comment
b783994 [Davies Liu] add comment for toDF
e2154e5 [Davies Liu] schema() -> schema
3a1004f [Davies Liu] Dsl -> functions, toDF()
fb256af [Reynold Xin] - toDataFrame -> toDF - Dsl -> functions - implicits moved into SQLContext.implicits - addColumn -> withColumn - renameColumn -> withColumnRenamed
0dd74eb [Reynold Xin] [SPARK-5752][SQL] Don't implicitly convert RDDs directly to DataFrames
97dd47c [Davies Liu] fix mistake
6168f74 [Davies Liu] fix test
1fc0199 [Davies Liu] fix test
a075cd5 [Davies Liu] clean up, toPandas
663d314 [Davies Liu] add test for agg('*')
9e214d5 [Reynold Xin] count(*) fixes.
1ed7136 [Reynold Xin] Fix build break again.
921b2e3 [Reynold Xin] Fixed explode compilation break.
14698d4 [Davies Liu] flatMap
ba3e12d [Reynold Xin] style
d08c92d [Davies Liu] add comment
5c8b524 [Davies Liu] add comment for toDF
a4e5e66 [Davies Liu] schema() -> schema
d377fc9 [Davies Liu] Dsl -> functions, toDF()
6b3086c [Reynold Xin] - toDataFrame -> toDF - Dsl -> functions - implicits moved into SQLContext.implicits - addColumn -> withColumn - renameColumn -> withColumnRenamed
807e8b1 [Reynold Xin] [SPARK-5752][SQL] Don't implicitly convert RDDs directly to DataFrames
  • Loading branch information
rxin committed Feb 14, 2015
1 parent 0ce4e43 commit e98dfe6
Show file tree
Hide file tree
Showing 70 changed files with 596 additions and 456 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ object CrossValidatorExample {
crossval.setNumFolds(2) // Use 3+ in practice

// Run cross-validation, and choose the best set of parameters.
val cvModel = crossval.fit(training)
val cvModel = crossval.fit(training.toDF)

// Prepare test documents, which are unlabeled.
val test = sc.parallelize(Seq(
Expand All @@ -100,7 +100,7 @@ object CrossValidatorExample {
Document(7L, "apache hadoop")))

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test)
cvModel.transform(test.toDF)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object DeveloperApiExample {
lr.setMaxIter(10)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model = lr.fit(training)
val model = lr.fit(training.toDF)

// Prepare test data.
val test = sc.parallelize(Seq(
Expand All @@ -67,7 +67,7 @@ object DeveloperApiExample {
LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))))

// Make predictions on test data.
val sumPredictions: Double = model.transform(test)
val sumPredictions: Double = model.transform(test.toDF)
.select("features", "label", "prediction")
.collect()
.map { case Row(features: Vector, label: Double, prediction: Double) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ object MovieLensALS {
.setRegParam(params.regParam)
.setNumBlocks(params.numBlocks)

val model = als.fit(training)
val model = als.fit(training.toDF)

val predictions = model.transform(test).cache()
val predictions = model.transform(test.toDF).cache()

// Evaluate the model.
// TODO: Create an evaluator to compute RMSE.
Expand All @@ -158,7 +158,7 @@ object MovieLensALS {

// Inspect false positives.
predictions.registerTempTable("prediction")
sc.textFile(params.movies).map(Movie.parseMovie).registerTempTable("movie")
sc.textFile(params.movies).map(Movie.parseMovie).toDF.registerTempTable("movie")
sqlContext.sql(
"""
|SELECT userId, prediction.movieId, title, rating, prediction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object SimpleParamsExample {
.setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
val model1 = lr.fit(training.toDF)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
Expand All @@ -77,7 +77,7 @@ object SimpleParamsExample {

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
val model2 = lr.fit(training.toDF, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.fittingParamMap)

// Prepare test data.
Expand All @@ -90,7 +90,7 @@ object SimpleParamsExample {
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
model2.transform(test.toDF)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object SimpleTextClassificationPipeline {
.setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training)
val model = pipeline.fit(training.toDF)

// Prepare test documents, which are unlabeled.
val test = sc.parallelize(Seq(
Expand All @@ -79,7 +79,7 @@ object SimpleTextClassificationPipeline {
Document(7L, "apache hadoop")))

// Make predictions on test documents.
model.transform(test)
model.transform(test.toDF)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ object DatasetExample {
println(s"Loaded ${origData.count()} instances from file: ${params.input}")

// Convert input data to DataFrame explicitly.
val df: DataFrame = origData.toDataFrame
val df: DataFrame = origData.toDF
println(s"Inferred schema:\n${df.schema.prettyJson}")
println(s"Converted to DataFrame with ${df.count()} records")

// Select columns, using implicit conversion to DataFrames.
val labelsDf: DataFrame = origData.select("label")
// Select columns
val labelsDf: DataFrame = df.select("label")
val labels: RDD[Double] = labelsDf.map { case Row(v: Double) => v }
val numLabels = labels.count()
val meanLabel = labels.fold(0.0)(_ + _) / numLabels
println(s"Selected label column with average value $meanLabel")

val featuresDf: DataFrame = origData.select("features")
val featuresDf: DataFrame = df.select("features")
val features: RDD[Vector] = featuresDf.map { case Row(v: Vector) => v }
val featureSummary = features.aggregate(new MultivariateOnlineSummarizer())(
(summary, feat) => summary.add(feat),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.examples.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._

// One method for defining the schema of an RDD is to make a case class with the desired column
// names and types.
Expand All @@ -34,10 +34,10 @@ object RDDRelation {
// Importing the SQL context gives access to all the SQL functions and implicit conversions.
import sqlContext.implicits._

val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF
// Any RDD containing case classes can be registered as a table. The schema of the table is
// automatically inferred using scala reflection.
rdd.registerTempTable("records")
df.registerTempTable("records")

// Once tables have been registered, you can run SQL queries over them.
println("Result of SELECT *:")
Expand All @@ -55,10 +55,10 @@ object RDDRelation {
rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)

// Queries can also be written using a LINQ-like Scala DSL.
rdd.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)
df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)

// Write out an RDD as a parquet file.
rdd.saveAsParquetFile("pair.parquet")
df.saveAsParquetFile("pair.parquet")

// Read in parquet file. Parquet files are self-describing so the schmema is preserved.
val parquetFile = sqlContext.parquetFile("pair.parquet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object HiveFromSpark {

// You can also register RDDs as temporary tables within a HiveContext.
val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
rdd.registerTempTable("records")
rdd.toDF.registerTempTable("records")

// Queries can then join RDD data with data stored in Hive.
println("Result of SELECT *:")
Expand Down
6 changes: 3 additions & 3 deletions mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -100,7 +100,7 @@ private[ml] abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, O
override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
transformSchema(dataset.schema, paramMap, logging = true)
val map = this.paramMap ++ paramMap
dataset.select($"*", callUDF(
this.createTransformFunc(map), outputDataType, dataset(map(inputCol))).as(map(outputCol)))
dataset.withColumn(map(outputCol),
callUDF(this.createTransformFunc(map), outputDataType, dataset(map(inputCol))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.annotation.{DeveloperApi, AlphaComponent}
import org.apache.spark.ml.impl.estimator.{PredictionModel, Predictor, PredictorParams}
import org.apache.spark.ml.param.{Params, ParamMap, HasRawPredictionCol}
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{DataType, DoubleType, StructType}

Expand Down Expand Up @@ -182,24 +182,22 @@ private[ml] object ClassificationModel {
if (map(model.rawPredictionCol) != "") {
// output raw prediction
val features2raw: FeaturesType => Vector = model.predictRaw
tmpData = tmpData.select($"*",
callUDF(features2raw, new VectorUDT,
col(map(model.featuresCol))).as(map(model.rawPredictionCol)))
tmpData = tmpData.withColumn(map(model.rawPredictionCol),
callUDF(features2raw, new VectorUDT, col(map(model.featuresCol))))
numColsOutput += 1
if (map(model.predictionCol) != "") {
val raw2pred: Vector => Double = (rawPred) => {
rawPred.toArray.zipWithIndex.maxBy(_._1)._2
}
tmpData = tmpData.select($"*", callUDF(raw2pred, DoubleType,
col(map(model.rawPredictionCol))).as(map(model.predictionCol)))
tmpData = tmpData.withColumn(map(model.predictionCol),
callUDF(raw2pred, DoubleType, col(map(model.rawPredictionCol))))
numColsOutput += 1
}
} else if (map(model.predictionCol) != "") {
// output prediction
val features2pred: FeaturesType => Double = model.predict
tmpData = tmpData.select($"*",
callUDF(features2pred, DoubleType,
col(map(model.featuresCol))).as(map(model.predictionCol)))
tmpData = tmpData.withColumn(map(model.predictionCol),
callUDF(features2pred, DoubleType, col(map(model.featuresCol))))
numColsOutput += 1
}
(numColsOutput, tmpData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.linalg.{VectorUDT, BLAS, Vector, Vectors}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -130,44 +130,39 @@ class LogisticRegressionModel private[ml] (
var numColsOutput = 0
if (map(rawPredictionCol) != "") {
val features2raw: Vector => Vector = (features) => predictRaw(features)
tmpData = tmpData.select($"*",
callUDF(features2raw, new VectorUDT, col(map(featuresCol))).as(map(rawPredictionCol)))
tmpData = tmpData.withColumn(map(rawPredictionCol),
callUDF(features2raw, new VectorUDT, col(map(featuresCol))))
numColsOutput += 1
}
if (map(probabilityCol) != "") {
if (map(rawPredictionCol) != "") {
val raw2prob: Vector => Vector = { (rawPreds: Vector) =>
val raw2prob = udf { (rawPreds: Vector) =>
val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1)))
Vectors.dense(1.0 - prob1, prob1)
Vectors.dense(1.0 - prob1, prob1): Vector
}
tmpData = tmpData.select($"*",
callUDF(raw2prob, new VectorUDT, col(map(rawPredictionCol))).as(map(probabilityCol)))
tmpData = tmpData.withColumn(map(probabilityCol), raw2prob(col(map(rawPredictionCol))))
} else {
val features2prob: Vector => Vector = (features: Vector) => predictProbabilities(features)
tmpData = tmpData.select($"*",
callUDF(features2prob, new VectorUDT, col(map(featuresCol))).as(map(probabilityCol)))
val features2prob = udf { (features: Vector) => predictProbabilities(features) : Vector }
tmpData = tmpData.withColumn(map(probabilityCol), features2prob(col(map(featuresCol))))
}
numColsOutput += 1
}
if (map(predictionCol) != "") {
val t = map(threshold)
if (map(probabilityCol) != "") {
val predict: Vector => Double = { probs: Vector =>
val predict = udf { probs: Vector =>
if (probs(1) > t) 1.0 else 0.0
}
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, col(map(probabilityCol))).as(map(predictionCol)))
tmpData = tmpData.withColumn(map(predictionCol), predict(col(map(probabilityCol))))
} else if (map(rawPredictionCol) != "") {
val predict: Vector => Double = { rawPreds: Vector =>
val predict = udf { rawPreds: Vector =>
val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1)))
if (prob1 > t) 1.0 else 0.0
}
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, col(map(rawPredictionCol))).as(map(predictionCol)))
tmpData = tmpData.withColumn(map(predictionCol), predict(col(map(rawPredictionCol))))
} else {
val predict: Vector => Double = (features: Vector) => this.predict(features)
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, col(map(featuresCol))).as(map(predictionCol)))
val predict = udf { features: Vector => this.predict(features) }
tmpData = tmpData.withColumn(map(predictionCol), predict(col(map(featuresCol))))
}
numColsOutput += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.annotation.{AlphaComponent, DeveloperApi}
import org.apache.spark.ml.param.{HasProbabilityCol, ParamMap, Params}
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, StructType}


Expand Down Expand Up @@ -122,8 +122,8 @@ private[spark] abstract class ProbabilisticClassificationModel[
val features2probs: FeaturesType => Vector = (features) => {
tmpModel.predictProbabilities(features)
}
outputData.select($"*",
callUDF(features2probs, new VectorUDT, col(map(featuresCol))).as(map(probabilityCol)))
outputData.withColumn(map(probabilityCol),
callUDF(features2probs, new VectorUDT, col(map(featuresCol))))
} else {
if (numColsOutput == 0) {
this.logWarning(s"$uid: ProbabilisticClassificationModel.transform() was called as NOOP" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.mllib.feature
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql._
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}

/**
Expand Down Expand Up @@ -88,7 +88,7 @@ class StandardScalerModel private[ml] (
transformSchema(dataset.schema, paramMap, logging = true)
val map = this.paramMap ++ paramMap
val scale = udf((v: Vector) => { scaler.transform(v) } : Vector)
dataset.select($"*", scale(col(map(inputCol))).as(map(outputCol)))
dataset.withColumn(map(outputCol), scale(col(map(inputCol))))
}

private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.mllib.linalg.{VectorUDT, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, DoubleType, StructType}


Expand Down Expand Up @@ -216,7 +216,7 @@ private[spark] abstract class PredictionModel[FeaturesType, M <: PredictionModel
val pred: FeaturesType => Double = (features) => {
tmpModel.predict(features)
}
dataset.select($"*", callUDF(pred, DoubleType, col(map(featuresCol))).as(map(predictionCol)))
dataset.withColumn(map(predictionCol), callUDF(pred, DoubleType, col(map(featuresCol))))
} else {
this.logWarning(s"$uid: Predictor.transform() was called as NOOP" +
" since no output columns were set.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.mllib.optimization.NNLS
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType, StructField, StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -170,8 +170,8 @@ class ALSModel private[ml] (
override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
import dataset.sqlContext.implicits._
val map = this.paramMap ++ paramMap
val users = userFactors.toDataFrame("id", "features")
val items = itemFactors.toDataFrame("id", "features")
val users = userFactors.toDF("id", "features")
val items = itemFactors.toDF("id", "features")

// Register a UDF for DataFrame, and then
// create a new column named map(predictionCol) by running the predict UDF.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))

// Create Parquet data.
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1)
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF
dataRDD.saveAsParquetFile(dataPath(path))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[classification] object GLMClassificationModel {

// Create Parquet data.
val data = Data(weights, intercept, threshold)
sc.parallelize(Seq(data), 1).saveAsParquetFile(Loader.dataPath(path))
sc.parallelize(Seq(data), 1).toDF.saveAsParquetFile(Loader.dataPath(path))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
model.userFeatures.toDataFrame("id", "features").saveAsParquetFile(userPath(path))
model.productFeatures.toDataFrame("id", "features").saveAsParquetFile(productPath(path))
model.userFeatures.toDF("id", "features").saveAsParquetFile(userPath(path))
model.productFeatures.toDF("id", "features").saveAsParquetFile(productPath(path))
}

def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
Expand Down
Loading

0 comments on commit e98dfe6

Please sign in to comment.