Skip to content

Commit

Permalink
Added Scala and Python examples for mllib
Browse files Browse the repository at this point in the history
  • Loading branch information
falaki committed Jan 2, 2014
1 parent 55b7e2f commit c189c83
Showing 1 changed file with 261 additions and 52 deletions.
313 changes: 261 additions & 52 deletions docs/mllib-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,56 +39,9 @@ underlying gradient descent primitive (described
parameter (*regParam*) along with various parameters associated with gradient
descent (*stepSize*, *numIterations*, *miniBatchFraction*).

The following code snippet illustrates how to load a sample dataset, execute a
training algorithm on this training data using a static method in the algorithm
object, and make predictions with the resulting model to compute the training
error.

{% highlight scala %}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint

// Load and parse the data file
val data = sc.textFile("mllib/data/sample_svm_data.txt")
val parsedData = data.map { line =>
val parts = line.split(' ')
LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray)
}

// Run training algorithm
val numIterations = 20
val model = SVMWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error
val labelAndPreds = parsedData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / parsedData.count
println("trainError = " + trainErr)
{% endhighlight %}

The `SVMWithSGD.train()` method by default performs L2 regularization with the
regularization parameter set to 1.0. If we want to configure this algorithm, we
can customize `SVMWithSGD` further by creating a new object directly and
calling setter methods. All other MLlib algorithms support customization in
this way as well. For example, the following code produces an L1 regularized
variant of SVMs with regularization parameter set to 0.1, and runs the training
algorithm for 200 iterations.

{% highlight scala %}
import org.apache.spark.mllib.optimization.L1Updater

val svmAlg = new SVMWithSGD()
svmAlg.optimizer.setNumIterations(200)
.setRegParam(0.1)
.setUpdater(new L1Updater)
val modelL1 = svmAlg.run(parsedData)
{% endhighlight %}

Both of the code snippets above can be executed in `spark-shell` to generate a
classifier for the provided dataset.

Available algorithms for binary classification:

Expand Down Expand Up @@ -121,14 +74,14 @@ of entities with one another based on some notion of similarity. Clustering is
often used for exploratory analysis and/or as a component of a hierarchical
supervised learning pipeline (in which distinct classifiers or regression
models are trained for each cluster). MLlib supports
[k-means](http://en.wikipedia.org/wiki/K-means_clustering) clustering, arguably
the most commonly used clustering approach that clusters the data points into
*k* clusters. The MLlib implementation includes a parallelized
[k-means](http://en.wikipedia.org/wiki/K-means_clustering) clustering, one of
the most commonly used clustering algorithms that clusters the data points into
predfined number of clusters. The MLlib implementation includes a parallelized
variant of the [k-means++](http://en.wikipedia.org/wiki/K-means%2B%2B) method
called [kmeans||](http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf).
The implementation in MLlib has the following parameters:

* *k* is the number of clusters.
* *k* is the number of desired clusters.
* *maxIterations* is the maximum number of iterations to run.
* *initializationMode* specifies either random initialization or
initialization via k-means\|\|.
Expand Down Expand Up @@ -169,7 +122,7 @@ the entries in the user-item matrix as *explicit* preferences given by the user
It is common in many real-world use cases to only have access to *implicit feedback*
(e.g. views, clicks, purchases, likes, shares etc.). The approach used in MLlib to deal with
such data is taken from
[Collaborative Filtering for Implicit Feedback Datasets](http://research.yahoo.com/pub/2433).
[Collaborative Filtering for Implicit Feedback Datasets](http://www2.research.att.com/~yifanhu/PUB/cf.pdf).
Essentially instead of trying to model the matrix of ratings directly, this approach treats the data as
a combination of binary preferences and *confidence values*. The ratings are then related
to the level of confidence in observed user preferences, rather than explicit ratings given to items.
Expand Down Expand Up @@ -210,3 +163,259 @@ at each iteration.
Available algorithms for gradient descent:

* [GradientDescent](api/mllib/index.html#org.apache.spark.mllib.optimization.GradientDescent)

# Using MLLib in Scala

Following code snippets can be executed in `spark-shell`.

## Binary Classification

The following code snippet illustrates how to load a sample dataset, execute a
training algorithm on this training data using a static method in the algorithm
object, and make predictions with the resulting model to compute the training
error.

{% highlight scala %}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint

// Load and parse the data file
val data = sc.textFile("mllib/data/sample_svm_data.txt")
val parsedData = data.map { line =>
val parts = line.split(' ')
LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray)
}

// Run training algorithm to build the model
val numIterations = 20
val model = SVMWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error
val labelAndPreds = parsedData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / parsedData.count
println("Training Error = " + trainErr)
{% endhighlight %}


The `SVMWithSGD.train()` method by default performs L2 regularization with the
regularization parameter set to 1.0. If we want to configure this algorithm, we
can customize `SVMWithSGD` further by creating a new object directly and
calling setter methods. All other MLlib algorithms support customization in
this way as well. For example, the following code produces an L1 regularized
variant of SVMs with regularization parameter set to 0.1, and runs the training
algorithm for 200 iterations.

{% highlight scala %}
import org.apache.spark.mllib.optimization.L1Updater

val svmAlg = new SVMWithSGD()
svmAlg.optimizer.setNumIterations(200)
.setRegParam(0.1)
.setUpdater(new L1Updater)
val modelL1 = svmAlg.run(parsedData)
{% endhighlight %}

## Linear Regression
The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. The
example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We
compute the Mean Squared Error at the end to evaluate
[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit)

{% highlight scala %}
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint

// Load and parse the data
val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray)
}

// Building the model
val numIterations = 20
val model = LinearRegressionWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error
val valuesAndPreds = parsedData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/valuesAndPreds.count
println("training Mean Squared Error = " + MSE)
{% endhighlight %}


Similarly you can use RidgeRegressionWithSGD and LassoWithSGD and compare training
[Mean Squared Errors](http://en.wikipedia.org/wiki/Mean_squared_error).

## Clustering
In the following example after loading and parsing data, we use the KMeans object to cluster the data
into two clusters. The number of desired clusters is passed to the algorithm. We then compute Within
Set Sum of Squared Error (WSSSE). You can reduce this error measure by increasing *k*. In fact the
optimal *k* is usually one where there is an "elbow" in the WSSSE graph.

{% highlight scala %}
import org.apache.spark.mllib.clustering.KMeans

// Load and parse the data
val data = sc.textFile("kmeans_data.txt")
val parsedData = data.map( _.split(' ').map(_.toDouble))

// Cluster the data into two classes using KMeans
val numIterations = 20
val numClusters = 2
val clusters = KMeans.train(parsedData, numClusters, numIterations)

// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE)
{% endhighlight %}


## Collaborative Filtering
In the following example we load rating data. Each row consists of a user, a product and a rating.
We use the default ALS.train() method which assumes ratings are explicit. We evaluate the recommendation
model by measuring the Mean Squared Error of rating prediction.

{% highlight scala %}
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating

// Load and parse the data
val data = sc.textFile("mllib/data/als/test.data")
val ratings = data.map(_.split(',') match {
case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)
})

// Build the recommendation model using ALS
val numIterations = 20
val model = ALS.train(ratings, 1, 20, 0.01)

// Evaluate the model on rating data
val ratesAndPreds = ratings.map{ case Rating(user, item, rate) => (rate, model.predict(user, item))}
val MSE = ratesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/ratesAndPreds.count
{% endhighlight %}

If the rating matrix is derived from other source of information (i.e., it is inferred from
other signals), you can use the trainImplicit method to get better results.

{% highlight scala %}
val model = ALS.trainImplicit(ratings, 1, 20, 0.01)
{% endhighlight %}

# Using MLLib in Python
Following examples can be tested in the PySpark shell.

## Binary Classification
The following example shows how to load a sample dataset, build Logistic Regression model,
and make predictions with the resulting model to compute the training error.

{% highlight python %}
from pyspark.mllib.classification import LogisticRegressionWithSGD
from numpy import array

# Load and parse the data
data = sc.textFile("mllib/data/sample_svm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
model = LogisticRegressionWithSGD.train(sc, parsedData)

# Build the model
labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)),
model.predict(point.take(range(1, point.size)))))

# Evaluating the model on training data
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
{% endhighlight %}

## Linear Regression
The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. The
example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We
compute the Mean Squared Error at the end to evaluate
[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit)

{% highlight python %}
from pyspark.mllib.regression import LinearRegressionWithSGD
from numpy import array

# Load and parse the data
data = sc.textFile("mllib/data/ridge-data/lpsa.data")
parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')]))

# Build the model
model = LinearRegressionWithSGD.train(sc, parsedData)

# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda point: (point.item(0),
model.predict(point.take(range(1, point.size)))))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
{% endhighlight %}


## Clustering
In the following example after loading and parsing data, we use the KMeans object to cluster the data
into two clusters. The number of desired clusters is passed to the algorithm. We then compute Within
Set Sum of Squared Error (WSSSE). You can reduce this error measure by increasing *k*. In fact the
optimal *k* is usually one where there is an "elbow" in the WSSSE graph.

{% highlight python %}
from pyspark.mllib.clustering import KMeans
from numpy import array
from math import sqrt

# Load and parse the data
data = sc.textFile("kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
clusters = KMeans.train(sc, parsedData, 2, maxIterations=10,
runs=30, initialization_mode="random")

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
{% endhighlight %}

Similarly you can use RidgeRegressionWithSGD and LassoWithSGD and compare training Mean Squared
Errors.

## Collaborative Filtering
In the following example we load rating data. Each row consists of a user, a product and a rating.
We use the default ALS.train() method which assumes ratings are explicit. We evaluate the recommendation
model by measuring the Mean Squared Error of rating prediction.

{% highlight python %}
from pyspark.mllib.recommendation import ALS
from numpy import array

# Load and parse the data
data = sc.textFile("mllib/data/als/test.data")
ratings = data.map(lambda line: array([float(x) for x in line.split(',')]))

# Build the recommendation model using Alternating Least Squares
model = ALS.train(sc, ratings, 1, 20)

# Evaluate the model on training data
ratesAndPreds = ratings.map(lambda p: (p[2], model.predict(int(p[0]), int(p[1]))))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

{% endhighlight %}

If the rating matrix is derived from other source of information (i.e., it is inferred from other
signals), you can use the trainImplicit method to get better results.

{% highlight python %}
# Build the recommendation model using Alternating Least Squares based on implicit ratings
model = ALS.trainImplicit(sc, ratings, 1, 20)
{% endhighlight %}

0 comments on commit c189c83

Please sign in to comment.