Skip to content

Commit

Permalink
[SPARK-3048][MLLIB] add LabeledPoint.parse and remove loadStreamingLa…
Browse files Browse the repository at this point in the history
…beledPoints

Move `parse()` from `LabeledPointParser` to `LabeledPoint` and make it public. This breaks binary compatibility only when a user uses synthesized methods like `tupled` and `curried`, which is rare.

`LabeledPoint.parse` is more consistent with `Vectors.parse`, which is why `LabeledPointParser` is not preferred.

freeman-lab tdas

Author: Xiangrui Meng <[email protected]>

Closes apache#1952 from mengxr/labelparser and squashes the following commits:

c818fb2 [Xiangrui Meng] merge master
ce20e6f [Xiangrui Meng] update mima excludes
b386b8d [Xiangrui Meng] fix tests
2436b3d [Xiangrui Meng] add parse() to LabeledPoint
  • Loading branch information
mengxr committed Aug 16, 2014
1 parent 76fa0ea commit 7e70708
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package org.apache.spark.examples.mllib

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

Expand Down Expand Up @@ -56,8 +55,8 @@ object StreamingLinearRegression {
val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))

val trainingData = MLUtils.loadStreamingLabeledPoints(ssc, args(0))
val testData = MLUtils.loadStreamingLabeledPoints(ssc, args(1))
val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)

val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(Array.fill[Double](args(3).toInt)(0)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ case class LabeledPoint(label: Double, features: Vector) {
/**
* Parser for [[org.apache.spark.mllib.regression.LabeledPoint]].
*/
private[mllib] object LabeledPointParser {
object LabeledPoint {
/**
* Parses a string resulted from `LabeledPoint#toString` into
* an [[org.apache.spark.mllib.regression.LabeledPoint]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.mllib.regression

import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.Vector

/**
* Train or predict a linear regression model on streaming data. Training uses
Expand Down
17 changes: 2 additions & 15 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PartitionwiseSampledRDD
import org.apache.spark.util.random.BernoulliSampler
import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
Expand Down Expand Up @@ -185,7 +185,7 @@ object MLUtils {
* @return labeled points stored as an RDD[LabeledPoint]
*/
def loadLabeledPoints(sc: SparkContext, path: String, minPartitions: Int): RDD[LabeledPoint] =
sc.textFile(path, minPartitions).map(LabeledPointParser.parse)
sc.textFile(path, minPartitions).map(LabeledPoint.parse)

/**
* Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile` with the default number of
Expand All @@ -194,19 +194,6 @@ object MLUtils {
def loadLabeledPoints(sc: SparkContext, dir: String): RDD[LabeledPoint] =
loadLabeledPoints(sc, dir, sc.defaultMinPartitions)

/**
* Loads streaming labeled points from a stream of text files
* where points are in the same format as used in `RDD[LabeledPoint].saveAsTextFile`.
* See `StreamingContext.textFileStream` for more details on how to
* generate a stream from files
*
* @param ssc Streaming context
* @param dir Directory path in any Hadoop-supported file system URI
* @return Labeled points stored as a DStream[LabeledPoint]
*/
def loadStreamingLabeledPoints(ssc: StreamingContext, dir: String): DStream[LabeledPoint] =
ssc.textFileStream(dir).map(LabeledPointParser.parse)

/**
* Load labeled data from a file. The data format used here is
* <L>, <f1> <f2> ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ class LabeledPointSuite extends FunSuite {
LabeledPoint(1.0, Vectors.dense(1.0, 0.0)),
LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0))))
points.foreach { p =>
assert(p === LabeledPointParser.parse(p.toString))
assert(p === LabeledPoint.parse(p.toString))
}
}

test("parse labeled points with v0.9 format") {
val point = LabeledPointParser.parse("1.0,1.0 0.0 -2.0")
val point = LabeledPoint.parse("1.0,1.0 0.0 -2.0")
assert(point === LabeledPoint(1.0, Vectors.dense(1.0, 0.0, -2.0)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.google.common.io.Files
import org.scalatest.FunSuite

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext, MLUtils}
import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -55,7 +55,7 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext {
val numBatches = 10
val batchDuration = Milliseconds(1000)
val ssc = new StreamingContext(sc, batchDuration)
val data = MLUtils.loadStreamingLabeledPoints(ssc, testDir.toString)
val data = ssc.textFileStream(testDir.toString).map(LabeledPoint.parse)
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(0.0, 0.0))
.setStepSize(0.1)
Expand Down Expand Up @@ -97,7 +97,7 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext {
val batchDuration = Milliseconds(2000)
val ssc = new StreamingContext(sc, batchDuration)
val numBatches = 5
val data = MLUtils.loadStreamingLabeledPoints(ssc, testDir.toString)
val data = ssc.textFileStream(testDir.toString()).map(LabeledPoint.parse)
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(0.0))
.setStepSize(0.1)
Expand Down
5 changes: 5 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ object MimaExcludes {
Seq( // new Vector methods in MLlib (binary compatible assuming users do not implement Vector)
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.linalg.Vector.copy")
) ++
Seq( // synthetic methods generated in LabeledPoint
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.mllib.regression.LabeledPoint$"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.regression.LabeledPoint.apply"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.regression.LabeledPoint.toString")
) ++
Seq ( // Scala 2.11 compatibility fix
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.<init>$default$2")
)
Expand Down

0 comments on commit 7e70708

Please sign in to comment.