layout | title | displayTitle |
---|---|---|
global |
Linear Methods - MLlib |
<a href="mllib-guide.html">MLlib</a> - Linear Methods |
- Table of contents {:toc}
\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]
Many standard machine learning methods can be formulated as a convex optimization problem, i.e.
the task of finding a minimizer of a convex function $f$
that depends on a variable vector
$\wv$
(called weights
in the code), which has $d$
entries.
Formally, we can write this as the optimization problem $\min_{\wv \in\R^d} \; f(\wv)$
, where
the objective function is of the form
\begin{equation} f(\wv) := \lambda\, R(\wv) + \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) \label{eq:regPrimal} \ . \end{equation}
Here the vectors $\x_i\in\R^d$
are the training data examples, for $1\le i\le n$
, and
$y_i\in\R$
are their corresponding labels, which we want to predict.
We call the method linear if
The objective function $f$
has two parts:
the regularizer that controls the complexity of the model,
and the loss that measures the error of the model on the training data.
The loss function $L(\wv;.)$
is typically a convex function in $\wv$
. The
fixed regularization parameter $\lambda \ge 0$
(regParam
in the code)
defines the trade-off between the two goals of minimizing the loss (i.e.,
training error) and minimizing model complexity (i.e., to avoid overfitting).
The following table summarizes the loss functions and their gradients or sub-gradients for the methods MLlib supports:
loss function |
gradient or sub-gradient | |
---|---|---|
hinge loss | $\begin{cases}-y \cdot \x & \text{if |
|
logistic loss | ||
squared loss |
The purpose of the regularizer is to encourage simple models and avoid overfitting. We support the following regularizers in MLlib:
regularizer |
gradient or sub-gradient | |
---|---|---|
zero (unregularized) | 0 | |
L2 | ||
L1 | ||
elastic net |
Here $\mathrm{sign}(\wv)$
is the vector consisting of the signs ($\pm1$
) of all the entries
of $\wv$
.
L2-regularized problems are generally easier to solve than L1-regularized due to smoothness. However, L1 regularization can help promote sparsity in weights leading to smaller and more interpretable models, the latter of which can be useful for feature selection. Elastic net is a combination of L1 and L2 regularization. It is not recommended to train models without any regularization, especially when the number of training examples is small.
Under the hood, linear methods use convex optimization methods to optimize the objective functions. MLlib uses two methods, SGD and L-BFGS, described in the optimization section. Currently, most algorithm APIs support Stochastic Gradient Descent (SGD), and a few support L-BFGS. Refer to this optimization section for guidelines on choosing between optimization methods.
Classification aims to divide items into
categories.
The most common classification type is
binary classificaion, where there are two
categories, usually named positive and negative.
If there are more than two categories, it is called
multiclass classification.
MLlib supports two linear methods for classification: linear Support Vector Machines (SVMs)
and logistic regression.
Linear SVMs supports only binary classification, while logistic regression supports both binary and
multiclass classification problems.
For both methods, MLlib supports L1 and L2 regularized variants.
The training data set is represented by an RDD of LabeledPoint in MLlib,
where labels are class indices starting from zero:
The linear SVM
is a standard method for large-scale classification tasks. It is a linear method as described above in equation $\eqref{eq:regPrimal}$
, with the loss function in the formulation given by the hinge loss:
\[ L(\wv;\x,y) := \max \{0, 1-y \wv^T \x \}. \]
By default, linear SVMs are trained with an L2 regularization.
We also support alternative L1 regularization. In this case,
the problem becomes a linear program.
The linear SVMs algorithm outputs an SVM model. Given a new data point,
denoted by
Examples
{% highlight scala %} import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD} import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.util.MLUtils
// Load training data in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1)
// Run training algorithm to build the model val numIterations = 100 val model = SVMWithSGD.train(training, numIterations)
// Clear the default threshold. model.clearThreshold()
// Compute raw scores on the test set. val scoreAndLabels = test.map { point => val score = model.predict(point.features) (score, point.label) }
// Get evaluation metrics. val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC()
println("Area under ROC = " + auROC)
// Save and load model model.save(sc, "myModelPath") val sameModel = SVMModel.load(sc, "myModelPath") {% endhighlight %}
The SVMWithSGD.train()
method by default performs L2 regularization with the
regularization parameter set to 1.0. If we want to configure this algorithm, we
can customize SVMWithSGD
further by creating a new object directly and
calling setter methods. All other MLlib algorithms support customization in
this way as well. For example, the following code produces an L1 regularized
variant of SVMs with regularization parameter set to 0.1, and runs the training
algorithm for 200 iterations.
{% highlight scala %} import org.apache.spark.mllib.optimization.L1Updater
val svmAlg = new SVMWithSGD() svmAlg.optimizer. setNumIterations(200). setRegParam(0.1). setUpdater(new L1Updater) val modelL1 = svmAlg.run(training) {% endhighlight %}
{% highlight java %} import scala.Tuple2;
import org.apache.spark.api.java.; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.; import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext;
public class SVMClassifier { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("SVM Classifier Example"); SparkContext sc = new SparkContext(conf); String path = "data/mllib/sample_libsvm_data.txt"; JavaRDD data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> training = data.sample(false, 0.6, 11L);
training.cache();
JavaRDD<LabeledPoint> test = data.subtract(training);
// Run training algorithm to build the model.
int numIterations = 100;
final SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);
// Clear the default threshold.
model.clearThreshold();
// Compute raw scores on the test set.
JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(
new Function<LabeledPoint, Tuple2<Object, Object>>() {
public Tuple2<Object, Object> call(LabeledPoint p) {
Double score = model.predict(p.features());
return new Tuple2<Object, Object>(score, p.label());
}
}
);
// Get evaluation metrics.
BinaryClassificationMetrics metrics =
new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
double auROC = metrics.areaUnderROC();
System.out.println("Area under ROC = " + auROC);
// Save and load model
model.save(sc, "myModelPath");
SVMModel sameModel = SVMModel.load(sc, "myModelPath");
} } {% endhighlight %}
The SVMWithSGD.train()
method by default performs L2 regularization with the
regularization parameter set to 1.0. If we want to configure this algorithm, we
can customize SVMWithSGD
further by creating a new object directly and
calling setter methods. All other MLlib algorithms support customization in
this way as well. For example, the following code produces an L1 regularized
variant of SVMs with regularization parameter set to 0.1, and runs the training
algorithm for 200 iterations.
{% highlight java %} import org.apache.spark.mllib.optimization.L1Updater;
SVMWithSGD svmAlg = new SVMWithSGD(); svmAlg.optimizer() .setNumIterations(200) .setRegParam(0.1) .setUpdater(new L1Updater()); final SVMModel modelL1 = svmAlg.run(training.rdd()); {% endhighlight %}
In order to run the above application, follow the instructions provided in the Self-Contained Applications section of the Spark quick-start guide. Be sure to also include spark-mllib to your build file as a dependency.
{% highlight python %} from pyspark.mllib.classification import SVMWithSGD, SVMModel from pyspark.mllib.regression import LabeledPoint
def parsePoint(line): values = [float(x) for x in line.split(' ')] return LabeledPoint(values[0], values[1:])
data = sc.textFile("data/mllib/sample_svm_data.txt") parsedData = data.map(parsePoint)
model = SVMWithSGD.train(parsedData, iterations=100)
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count()) print("Training Error = " + str(trainErr))
model.save(sc, "myModelPath") sameModel = SVMModel.load(sc, "myModelPath") {% endhighlight %}
Logistic regression is widely used to predict a
binary response. It is a linear method as described above in equation $\eqref{eq:regPrimal}$
,
with the loss function in the formulation given by the logistic loss:
\[ L(\wv;\x,y) := \log(1+\exp( -y \wv^T \x)). \]
For binary classification problems, the algorithm outputs a binary logistic regression model.
Given a new data point, denoted by \[ \mathrm{f}(z) = \frac{1}{1 + e^{-z}} \]
where
Binary logistic regression can be generalized into
multinomial logistic regression to
train and predict multiclass classification problems.
For example, for
For multiclass classification problems, the algorithm will output a multinomial logistic regression
model, which contains
We implemented two algorithms to solve logistic regression: mini-batch gradient descent and L-BFGS. We recommend L-BFGS over mini-batch gradient descent for faster convergence.
Examples
{% highlight scala %} import org.apache.spark.SparkContext import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel} import org.apache.spark.mllib.evaluation.MulticlassMetrics import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLUtils
// Load training data in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1)
// Run training algorithm to build the model val model = new LogisticRegressionWithLBFGS() .setNumClasses(10) .run(training)
// Compute raw scores on the test set. val predictionAndLabels = test.map { case LabeledPoint(label, features) => val prediction = model.predict(features) (prediction, label) }
// Get evaluation metrics. val metrics = new MulticlassMetrics(predictionAndLabels) val precision = metrics.precision println("Precision = " + precision)
// Save and load model model.save(sc, "myModelPath") val sameModel = LogisticRegressionModel.load(sc, "myModelPath") {% endhighlight %}
{% highlight java %} import scala.Tuple2;
import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS; import org.apache.spark.mllib.evaluation.MulticlassMetrics; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext;
public class MultinomialLogisticRegressionExample { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("SVM Classifier Example"); SparkContext sc = new SparkContext(conf); String path = "data/mllib/sample_libsvm_data.txt"; JavaRDD data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[] {0.6, 0.4}, 11L);
JavaRDD<LabeledPoint> training = splits[0].cache();
JavaRDD<LabeledPoint> test = splits[1];
// Run training algorithm to build the model.
final LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
.setNumClasses(10)
.run(training.rdd());
// Compute raw scores on the test set.
JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map(
new Function<LabeledPoint, Tuple2<Object, Object>>() {
public Tuple2<Object, Object> call(LabeledPoint p) {
Double prediction = model.predict(p.features());
return new Tuple2<Object, Object>(prediction, p.label());
}
}
);
// Get evaluation metrics.
MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
double precision = metrics.precision();
System.out.println("Precision = " + precision);
// Save and load model
model.save(sc, "myModelPath");
LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc, "myModelPath");
} } {% endhighlight %}
Note that the Python API does not yet support multiclass classification and model save/load but will in the future.
{% highlight python %} from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel from pyspark.mllib.regression import LabeledPoint from numpy import array
def parsePoint(line): values = [float(x) for x in line.split(' ')] return LabeledPoint(values[0], values[1:])
data = sc.textFile("data/mllib/sample_svm_data.txt") parsedData = data.map(parsePoint)
model = LogisticRegressionWithLBFGS.train(parsedData)
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count()) print("Training Error = " + str(trainErr))
model.save(sc, "myModelPath") sameModel = LogisticRegressionModel.load(sc, "myModelPath") {% endhighlight %}
Linear least squares is the most common formulation for regression problems.
It is a linear method as described above in equation $\eqref{eq:regPrimal}$
, with the loss
function in the formulation given by the squared loss:
\[ L(\wv;\x,y) := \frac{1}{2} (\wv^T \x - y)^2. \]
Various related regression methods are derived by using different types of regularization:
ordinary least squares or
linear least squares uses
no regularization; ridge regression uses L2
regularization; and Lasso uses L1
regularization. For all of these models, the average loss or training error,
Examples
{% highlight scala %} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.LinearRegressionModel import org.apache.spark.mllib.regression.LinearRegressionWithSGD import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data val data = sc.textFile("data/mllib/ridge-data/lpsa.data") val parsedData = data.map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) }.cache()
// Building the model val numIterations = 100 val model = LinearRegressionWithSGD.train(parsedData, numIterations)
// Evaluate model on training examples and compute training error val valuesAndPreds = parsedData.map { point => val prediction = model.predict(point.features) (point.label, prediction) } val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean() println("training Mean Squared Error = " + MSE)
// Save and load model model.save(sc, "myModelPath") val sameModel = LinearRegressionModel.load(sc, "myModelPath") {% endhighlight %}
RidgeRegressionWithSGD
and LassoWithSGD
can be used in a similar fashion as LinearRegressionWithSGD
.
{% highlight java %} import scala.Tuple2;
import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.regression.LinearRegressionModel; import org.apache.spark.mllib.regression.LinearRegressionWithSGD; import org.apache.spark.SparkConf;
public class LinearRegression { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Linear Regression Example"); JavaSparkContext sc = new JavaSparkContext(conf);
// Load and parse the data
String path = "data/mllib/ridge-data/lpsa.data";
JavaRDD<String> data = sc.textFile(path);
JavaRDD<LabeledPoint> parsedData = data.map(
new Function<String, LabeledPoint>() {
public LabeledPoint call(String line) {
String[] parts = line.split(",");
String[] features = parts[1].split(" ");
double[] v = new double[features.length];
for (int i = 0; i < features.length - 1; i++)
v[i] = Double.parseDouble(features[i]);
return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
}
}
);
parsedData.cache();
// Building the model
int numIterations = 100;
final LinearRegressionModel model =
LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations);
// Evaluate model on training examples and compute training error
JavaRDD<Tuple2<Double, Double>> valuesAndPreds = parsedData.map(
new Function<LabeledPoint, Tuple2<Double, Double>>() {
public Tuple2<Double, Double> call(LabeledPoint point) {
double prediction = model.predict(point.features());
return new Tuple2<Double, Double>(prediction, point.label());
}
}
);
double MSE = new JavaDoubleRDD(valuesAndPreds.map(
new Function<Tuple2<Double, Double>, Object>() {
public Object call(Tuple2<Double, Double> pair) {
return Math.pow(pair._1() - pair._2(), 2.0);
}
}
).rdd()).mean();
System.out.println("training Mean Squared Error = " + MSE);
// Save and load model
model.save(sc.sc(), "myModelPath");
LinearRegressionModel sameModel = LinearRegressionModel.load(sc.sc(), "myModelPath");
} } {% endhighlight %}
Note that the Python API does not yet support model save/load but will in the future.
{% highlight python %} from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel from numpy import array
def parsePoint(line): values = [float(x) for x in line.replace(',', ' ').split(' ')] return LabeledPoint(values[0], values[1:])
data = sc.textFile("data/mllib/ridge-data/lpsa.data") parsedData = data.map(parsePoint)
model = LinearRegressionWithSGD.train(parsedData)
valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features))) MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count() print("Mean Squared Error = " + str(MSE))
model.save(sc, "myModelPath") sameModel = LinearRegressionModel.load(sc, "myModelPath") {% endhighlight %}
In order to run the above application, follow the instructions provided in the Self-Contained Applications section of the Spark quick-start guide. Be sure to also include spark-mllib to your build file as a dependency.
###Streaming linear regression
When data arrive in a streaming fashion, it is useful to fit regression models online, updating the parameters of the model as new data arrives. MLlib currently supports streaming linear regression using ordinary least squares. The fitting is similar to that performed offline, except fitting occurs on each batch of data, so that the model continually updates to reflect the data from the stream.
Examples
The following example demonstrates how to load training and testing data from two different input streams of text files, parse the streams as labeled points, fit a linear regression model online to the first stream, and make predictions on the second stream.
First, we import the necessary classes for parsing our input data and creating the model.
{% highlight scala %}
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
{% endhighlight %}
Then we make input streams for training and testing data. We assume a StreamingContext ssc
has already been created, see Spark Streaming Programming Guide
for more info. For this example, we use labeled points in training and testing streams,
but in practice you will likely want to use unlabeled vectors for test data.
{% highlight scala %}
val trainingData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse).cache() val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)
{% endhighlight %}
We create our model by initializing the weights to 0
{% highlight scala %}
val numFeatures = 3 val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.zeros(numFeatures))
{% endhighlight %}
Now we register the streams for training and testing and start the job. Printing predictions alongside true labels lets us easily see the result.
{% highlight scala %}
model.trainOn(trainingData) model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start() ssc.awaitTermination()
{% endhighlight %}
We can now save text files with data to the training or testing folders.
Each line should be a data point formatted as (y,[x1,x2,x3])
where y
is the label
and x1,x2,x3
are the features. Anytime a text file is placed in /training/data/dir
the model will update. Anytime a text file is placed in /testing/data/dir
you will see predictions.
As you feed more data to the training directory, the predictions
will get better!
First, we import the necessary classes for parsing our input data and creating the model.
{% highlight python %} from pyspark.mllib.linalg import Vectors from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.regression import StreamingLinearRegressionWithSGD {% endhighlight %}
Then we make input streams for training and testing data. We assume a StreamingContext ssc
has already been created, see Spark Streaming Programming Guide
for more info. For this example, we use labeled points in training and testing streams,
but in practice you will likely want to use unlabeled vectors for test data.
{% highlight python %} def parse(lp): label = float(lp[lp.find('(') + 1: lp.find(',')]) vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(',')) return LabeledPoint(label, vec)
trainingData = ssc.textFileStream("/training/data/dir").map(parse).cache() testData = ssc.textFileStream("/testing/data/dir").map(parse) {% endhighlight %}
We create our model by initializing the weights to 0
{% highlight python %} numFeatures = 3 model = StreamingLinearRegressionWithSGD() model.setInitialWeights([0.0, 0.0, 0.0]) {% endhighlight %}
Now we register the streams for training and testing and start the job.
{% highlight python %} model.trainOn(trainingData) print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))
ssc.start() ssc.awaitTermination() {% endhighlight %}
We can now save text files with data to the training or testing folders.
Each line should be a data point formatted as (y,[x1,x2,x3])
where y
is the label
and x1,x2,x3
are the features. Anytime a text file is placed in /training/data/dir
the model will update. Anytime a text file is placed in /testing/data/dir
you will see predictions.
As you feed more data to the training directory, the predictions
will get better!
Behind the scene, MLlib implements a simple distributed version of stochastic gradient descent
(SGD), building on the underlying gradient descent primitive (as described in the optimization section). All provided algorithms take as input a
regularization parameter (regParam
) along with various parameters associated with stochastic
gradient descent (stepSize
, numIterations
, miniBatchFraction
). For each of them, we support
all three possible regularizations (none, L1 or L2).
For Logistic Regression, L-BFGS version is implemented under LogisticRegressionWithLBFGS, and this version supports both binary and multinomial Logistic Regression while SGD version only supports binary Logistic Regression. However, L-BFGS version doesn't support L1 regularization but SGD one supports L1 regularization. When L1 regularization is not required, L-BFGS version is strongly recommended since it converges faster and more accurately compared to SGD by approximating the inverse Hessian matrix using quasi-Newton method.
Algorithms are all implemented in Scala:
- SVMWithSGD
- LogisticRegressionWithLBFGS
- LogisticRegressionWithSGD
- LinearRegressionWithSGD
- RidgeRegressionWithSGD
- LassoWithSGD
Python calls the Scala implementation via PythonMLLibAPI.