diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 550369d18cfec..647226a0d1a6e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -17,22 +17,31 @@ package org.apache.spark.ml.classification +import scala.collection.mutable + +import breeze.linalg.{norm => brzNorm, DenseVector => BDV} +import breeze.optimize.{LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN} +import breeze.optimize.{CachedDiffFunction, DiffFunction} + import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ -import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.linalg.BLAS._ +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame import org.apache.spark.storage.StorageLevel +import org.apache.spark.{SparkException, Logging} /** * Params for logistic regression. */ private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams - with HasRegParam with HasMaxIter with HasFitIntercept with HasThreshold { - - setDefault(regParam -> 0.1, maxIter -> 100, threshold -> 0.5) -} + with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol + with HasThreshold /** * :: AlphaComponent :: @@ -43,45 +52,171 @@ private[classification] trait LogisticRegressionParams extends ProbabilisticClas @AlphaComponent class LogisticRegression extends ProbabilisticClassifier[Vector, LogisticRegression, LogisticRegressionModel] - with LogisticRegressionParams { + with LogisticRegressionParams with Logging { - /** @group setParam */ + /** + * Set the regularization parameter. + * Default is 0.0. + * @group setParam + */ def setRegParam(value: Double): this.type = set(regParam, value) + setDefault(regParam -> 0.0) - /** @group setParam */ + /** + * Set the ElasticNet mixing parameter. + * For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. + * For 0 < alpha < 1, the penalty is a combination of L1 and L2. + * Default is 0.0 which is an L2 penalty. + * @group setParam + */ + def setElasticNetParam(value: Double): this.type = set(elasticNetParam, value) + setDefault(elasticNetParam -> 0.0) + + /** + * Set the maximal number of iterations. + * Default is 100. + * @group setParam + */ def setMaxIter(value: Int): this.type = set(maxIter, value) + setDefault(maxIter -> 100) + + /** + * Set the convergence tolerance of iterations. + * Smaller value will lead to higher accuracy with the cost of more iterations. + * Default is 1E-6. + * @group setParam + */ + def setTol(value: Double): this.type = set(tol, value) + setDefault(tol -> 1E-6) /** @group setParam */ def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value) + setDefault(fitIntercept -> true) /** @group setParam */ def setThreshold(value: Double): this.type = set(threshold, value) + setDefault(threshold -> 0.5) override protected def train(dataset: DataFrame): LogisticRegressionModel = { // Extract columns from data. If dataset is persisted, do not persist oldDataset. - val oldDataset = extractLabeledPoints(dataset) + val instances = extractLabeledPoints(dataset).map { + case LabeledPoint(label: Double, features: Vector) => (label, features) + } val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE - if (handlePersistence) { - oldDataset.persist(StorageLevel.MEMORY_AND_DISK) + if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + + val (summarizer, labelSummarizer) = instances.treeAggregate( + (new MultivariateOnlineSummarizer, new MultiClassSummarizer))( + seqOp = (c, v) => (c, v) match { + case ((summarizer: MultivariateOnlineSummarizer, labelSummarizer: MultiClassSummarizer), + (label: Double, features: Vector)) => + (summarizer.add(features), labelSummarizer.add(label)) + }, + combOp = (c1, c2) => (c1, c2) match { + case ((summarizer1: MultivariateOnlineSummarizer, + classSummarizer1: MultiClassSummarizer), (summarizer2: MultivariateOnlineSummarizer, + classSummarizer2: MultiClassSummarizer)) => + (summarizer1.merge(summarizer2), classSummarizer1.merge(classSummarizer2)) + }) + + val histogram = labelSummarizer.histogram + val numInvalid = labelSummarizer.countInvalid + val numClasses = histogram.length + val numFeatures = summarizer.mean.size + + if (numInvalid != 0) { + val msg = s"Classification labels should be in {0 to ${numClasses - 1} " + + s"Found $numInvalid invalid labels." + logError(msg) + throw new SparkException(msg) + } + + if (numClasses > 2) { + val msg = s"Currently, LogisticRegression with ElasticNet in ML package only supports " + + s"binary classification. Found $numClasses in the input dataset." + logError(msg) + throw new SparkException(msg) } - // Train model - val lr = new LogisticRegressionWithLBFGS() - .setIntercept($(fitIntercept)) - lr.optimizer - .setRegParam($(regParam)) - .setNumIterations($(maxIter)) - val oldModel = lr.run(oldDataset) - val lrm = new LogisticRegressionModel(this, oldModel.weights, oldModel.intercept) + val featuresMean = summarizer.mean.toArray + val featuresStd = summarizer.variance.toArray.map(math.sqrt) - if (handlePersistence) { - oldDataset.unpersist() + val regParamL1 = $(elasticNetParam) * $(regParam) + val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam) + + val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept), + featuresStd, featuresMean, regParamL2) + + val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) { + new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) + } else { + // Remove the L1 penalization on the intercept + def regParamL1Fun = (index: Int) => { + if (index == numFeatures) 0.0 else regParamL1 + } + new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) } - copyValues(lrm) + + val initialWeightsWithIntercept = + Vectors.zeros(if ($(fitIntercept)) numFeatures + 1 else numFeatures) + + if ($(fitIntercept)) { + /** + * For binary logistic regression, when we initialize the weights as zeros, + * it will converge faster if we initialize the intercept such that + * it follows the distribution of the labels. + * + * {{{ + * P(0) = 1 / (1 + \exp(b)), and + * P(1) = \exp(b) / (1 + \exp(b)) + * }}}, hence + * {{{ + * b = \log{P(1) / P(0)} = \log{count_1 / count_0} + * }}} + */ + initialWeightsWithIntercept.toArray(numFeatures) + = Math.log(histogram(1).toDouble / histogram(0).toDouble) + } + + val states = optimizer.iterations(new CachedDiffFunction(costFun), + initialWeightsWithIntercept.toBreeze.toDenseVector) + + var state = states.next() + val lossHistory = mutable.ArrayBuilder.make[Double] + + while (states.hasNext) { + lossHistory += state.value + state = states.next() + } + lossHistory += state.value + + // The weights are trained in the scaled space; we're converting them back to + // the original space. + val weightsWithIntercept = { + val rawWeights = state.x.toArray.clone() + var i = 0 + // Note that the intercept in scaled space and original space is the same; + // as a result, no scaling is needed. + while (i < numFeatures) { + rawWeights(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 } + i += 1 + } + Vectors.dense(rawWeights) + } + + if (handlePersistence) instances.unpersist() + + val (weights, intercept) = if ($(fitIntercept)) { + (Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)), + weightsWithIntercept(weightsWithIntercept.size - 1)) + } else { + (weightsWithIntercept, 0.0) + } + + new LogisticRegressionModel(this, weights.compressed, intercept) } } - /** * :: AlphaComponent :: * @@ -159,3 +294,258 @@ class LogisticRegressionModel private[ml] ( if (probability(1) > getThreshold) 1 else 0 } } + +/** + * MultiClassSummarizer computes the number of distinct labels and corresponding counts, + * and validates the data to see if the labels used for k class multi-label classification + * are in the range of {0, 1, ..., k - 1} in a online fashion. + * + * Two MultilabelSummarizer can be merged together to have a statistical summary of the + * corresponding joint dataset. + */ +private[classification] class MultiClassSummarizer extends Serializable { + private val distinctMap = new mutable.HashMap[Int, Long] + private var totalInvalidCnt: Long = 0L + + /** + * Add a new label into this MultilabelSummarizer, and update the distinct map. + * @param label The label for this data point. + * @return This MultilabelSummarizer + */ + def add(label: Double): this.type = { + if (label - label.toInt != 0.0 || label < 0) { + totalInvalidCnt += 1 + this + } + else { + val counts: Long = distinctMap.getOrElse(label.toInt, 0L) + distinctMap.put(label.toInt, counts + 1) + this + } + } + + /** + * Merge another MultilabelSummarizer, and update the distinct map. + * (Note that it will merge the smaller distinct map into the larger one using in-place + * merging, so either `this` or `other` object will be modified and returned.) + * + * @param other The other MultilabelSummarizer to be merged. + * @return Merged MultilabelSummarizer object. + */ + def merge(other: MultiClassSummarizer): MultiClassSummarizer = { + val (largeMap, smallMap) = if (this.distinctMap.size > other.distinctMap.size) { + (this, other) + } else { + (other, this) + } + smallMap.distinctMap.foreach { + case (key, value) => + val counts = largeMap.distinctMap.getOrElse(key, 0L) + largeMap.distinctMap.put(key, counts + value) + } + largeMap.totalInvalidCnt += smallMap.totalInvalidCnt + largeMap + } + + /** @return The total invalid input counts. */ + def countInvalid: Long = totalInvalidCnt + + /** @return The number of distinct labels in the input dataset. */ + def numClasses: Int = distinctMap.keySet.max + 1 + + /** @return The counts of each label in the input dataset. */ + def histogram: Array[Long] = { + val result = Array.ofDim[Long](numClasses) + var i = 0 + while (i < result.length) { + result(i) = distinctMap.getOrElse(i, 0L) + i += 1 + } + result + } +} + +/** + * LogisticAggregator computes the gradient and loss for binary logistic loss function, as used + * in binary classification for samples in sparse or dense vector in a online fashion. + * + * Note that multinomial logistic loss is not supported yet! + * + * Two LogisticAggregator can be merged together to have a summary of loss and gradient of + * the corresponding joint dataset. + * + * @param weights The weights/coefficients corresponding to the features. + * @param numClasses the number of possible outcomes for k classes classification problem in + * Multinomial Logistic Regression. + * @param fitIntercept Whether to fit an intercept term. + * @param featuresStd The standard deviation values of the features. + * @param featuresMean The mean values of the features. + */ +private class LogisticAggregator( + weights: Vector, + numClasses: Int, + fitIntercept: Boolean, + featuresStd: Array[Double], + featuresMean: Array[Double]) extends Serializable { + + private var totalCnt: Long = 0L + private var lossSum = 0.0 + + private val weightsArray = weights match { + case dv: DenseVector => dv.values + case _ => + throw new IllegalArgumentException( + s"weights only supports dense vector but got type ${weights.getClass}.") + } + + private val dim = if (fitIntercept) weightsArray.length - 1 else weightsArray.length + + private val gradientSumArray = Array.ofDim[Double](weightsArray.length) + + /** + * Add a new training data to this LogisticAggregator, and update the loss and gradient + * of the objective function. + * + * @param label The label for this data point. + * @param data The features for one data point in dense/sparse vector format to be added + * into this aggregator. + * @return This LogisticAggregator object. + */ + def add(label: Double, data: Vector): this.type = { + require(dim == data.size, s"Dimensions mismatch when adding new sample." + + s" Expecting $dim but got ${data.size}.") + + val dataSize = data.size + + val localWeightsArray = weightsArray + val localGradientSumArray = gradientSumArray + + numClasses match { + case 2 => + /** + * For Binary Logistic Regression. + */ + val margin = - { + var sum = 0.0 + data.foreachActive { (index, value) => + if (featuresStd(index) != 0.0 && value != 0.0) { + sum += localWeightsArray(index) * (value / featuresStd(index)) + } + } + sum + { if (fitIntercept) localWeightsArray(dim) else 0.0 } + } + + val multiplier = (1.0 / (1.0 + math.exp(margin))) - label + + data.foreachActive { (index, value) => + if (featuresStd(index) != 0.0 && value != 0.0) { + localGradientSumArray(index) += multiplier * (value / featuresStd(index)) + } + } + + if (fitIntercept) { + localGradientSumArray(dim) += multiplier + } + + if (label > 0) { + // The following is equivalent to log(1 + exp(margin)) but more numerically stable. + lossSum += MLUtils.log1pExp(margin) + } else { + lossSum += MLUtils.log1pExp(margin) - margin + } + case _ => + new NotImplementedError("LogisticRegression with ElasticNet in ML package only supports " + + "binary classification for now.") + } + totalCnt += 1 + this + } + + /** + * Merge another LogisticAggregator, and update the loss and gradient + * of the objective function. + * (Note that it's in place merging; as a result, `this` object will be modified.) + * + * @param other The other LogisticAggregator to be merged. + * @return This LogisticAggregator object. + */ + def merge(other: LogisticAggregator): this.type = { + require(dim == other.dim, s"Dimensions mismatch when merging with another " + + s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.") + + if (other.totalCnt != 0) { + totalCnt += other.totalCnt + lossSum += other.lossSum + + var i = 0 + val localThisGradientSumArray = this.gradientSumArray + val localOtherGradientSumArray = other.gradientSumArray + while (i < localThisGradientSumArray.length) { + localThisGradientSumArray(i) += localOtherGradientSumArray(i) + i += 1 + } + } + this + } + + def count: Long = totalCnt + + def loss: Double = lossSum / totalCnt + + def gradient: Vector = { + val result = Vectors.dense(gradientSumArray.clone()) + scal(1.0 / totalCnt, result) + result + } +} + +/** + * LogisticCostFun implements Breeze's DiffFunction[T] for a multinomial logistic loss function, + * as used in multi-class classification (it is also used in binary logistic regression). + * It returns the loss and gradient with L2 regularization at a particular point (weights). + * It's used in Breeze's convex optimization routines. + */ +private class LogisticCostFun( + data: RDD[(Double, Vector)], + numClasses: Int, + fitIntercept: Boolean, + featuresStd: Array[Double], + featuresMean: Array[Double], + regParamL2: Double) extends DiffFunction[BDV[Double]] { + + override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = { + val w = Vectors.fromBreeze(weights) + + val logisticAggregator = data.treeAggregate(new LogisticAggregator(w, numClasses, fitIntercept, + featuresStd, featuresMean))( + seqOp = (c, v) => (c, v) match { + case (aggregator, (label, features)) => aggregator.add(label, features) + }, + combOp = (c1, c2) => (c1, c2) match { + case (aggregator1, aggregator2) => aggregator1.merge(aggregator2) + }) + + // regVal is the sum of weight squares for L2 regularization + val norm = if (regParamL2 == 0.0) { + 0.0 + } else if (fitIntercept) { + brzNorm(Vectors.dense(weights.toArray.slice(0, weights.size -1)).toBreeze, 2.0) + } else { + brzNorm(weights, 2.0) + } + val regVal = 0.5 * regParamL2 * norm * norm + + val loss = logisticAggregator.loss + regVal + val gradient = logisticAggregator.gradient + + if (fitIntercept) { + val wArray = w.toArray.clone() + wArray(wArray.length - 1) = 0.0 + axpy(regParamL2, Vectors.dense(wArray), gradient) + } else { + axpy(regParamL2, w, gradient) + } + + (loss, gradient.toBreeze.asInstanceOf[BDV[Double]]) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index e63c9a3eead52..6377923afc0c4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -106,14 +106,16 @@ class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegress if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) val (summarizer, statCounter) = instances.treeAggregate( - (new MultivariateOnlineSummarizer, new StatCounter))( { - case ((summarizer: MultivariateOnlineSummarizer, statCounter: StatCounter), - (label: Double, features: Vector)) => - (summarizer.add(features), statCounter.merge(label)) - }, { - case ((summarizer1: MultivariateOnlineSummarizer, statCounter1: StatCounter), - (summarizer2: MultivariateOnlineSummarizer, statCounter2: StatCounter)) => - (summarizer1.merge(summarizer2), statCounter1.merge(statCounter2)) + (new MultivariateOnlineSummarizer, new StatCounter))( + seqOp = (c, v) => (c, v) match { + case ((summarizer: MultivariateOnlineSummarizer, statCounter: StatCounter), + (label: Double, features: Vector)) => + (summarizer.add(features), statCounter.merge(label)) + }, + combOp = (c1, c2) => (c1, c2) match { + case ((summarizer1: MultivariateOnlineSummarizer, statCounter1: StatCounter), + (summarizer2: MultivariateOnlineSummarizer, statCounter2: StatCounter)) => + (summarizer1.merge(summarizer2), statCounter1.merge(statCounter2)) }) val numFeatures = summarizer.mean.size diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 5d6ddd47f67d6..681f4c618d302 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -331,7 +331,7 @@ object MLUtils { * @param x a floating-point value as input. * @return the result of `math.log(1 + math.exp(x))`. */ - private[mllib] def log1pExp(x: Double): Double = { + private[spark] def log1pExp(x: Double): Double = { if (x > 0) { x + math.log1p(math.exp(-x)) } else { diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 6dd1fdf05514e..78cdd471859b7 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.classification import org.scalatest.FunSuite -import org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInput +import org.apache.spark.mllib.classification.LogisticRegressionSuite import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -30,13 +30,42 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { @transient var sqlContext: SQLContext = _ @transient var dataset: DataFrame = _ + @transient var binaryDataset: DataFrame = _ private val eps: Double = 1e-5 override def beforeAll(): Unit = { super.beforeAll() sqlContext = new SQLContext(sc) - dataset = sqlContext.createDataFrame( - sc.parallelize(generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 42), 2)) + + dataset = sqlContext.createDataFrame(sc.parallelize(LogisticRegressionSuite + .generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 42), 4)) + + /** + * Here is the instruction describing how to export the test data into CSV format + * so we can validate the training accuracy compared with R's glmnet package. + * + * import org.apache.spark.mllib.classification.LogisticRegressionSuite + * val nPoints = 10000 + * val weights = Array(-0.57997, 0.912083, -0.371077, -0.819866, 2.688191) + * val xMean = Array(5.843, 3.057, 3.758, 1.199) + * val xVariance = Array(0.6856, 0.1899, 3.116, 0.581) + * val data = sc.parallelize(LogisticRegressionSuite.generateMultinomialLogisticInput( + * weights, xMean, xVariance, true, nPoints, 42), 1) + * data.map(x=> x.label + ", " + x.features(0) + ", " + x.features(1) + ", " + * + x.features(2) + ", " + x.features(3)).saveAsTextFile("path") + */ + binaryDataset = { + val nPoints = 10000 + val weights = Array(-0.57997, 0.912083, -0.371077, -0.819866, 2.688191) + val xMean = Array(5.843, 3.057, 3.758, 1.199) + val xVariance = Array(0.6856, 0.1899, 3.116, 0.581) + + val testData = LogisticRegressionSuite.generateMultinomialLogisticInput( + weights, xMean, xVariance, true, nPoints, 42) + + sqlContext.createDataFrame(sc.parallelize(LogisticRegressionSuite + .generateMultinomialLogisticInput(weights, xMean, xVariance, true, nPoints, 42), 4)) + } } test("logistic regression: default params") { @@ -135,4 +164,364 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { assert(pred == predFromProb) } } + + test("MultiClassSummarizer") { + val summarizer1 = (new MultiClassSummarizer) + .add(0.0).add(3.0).add(4.0).add(3.0).add(6.0) + assert(summarizer1.histogram.zip(Array[Long](1, 0, 0, 2, 1, 0, 1)).forall(x => x._1 === x._2)) + assert(summarizer1.countInvalid === 0) + assert(summarizer1.numClasses === 7) + + val summarizer2 = (new MultiClassSummarizer) + .add(1.0).add(5.0).add(3.0).add(0.0).add(4.0).add(1.0) + assert(summarizer2.histogram.zip(Array[Long](1, 2, 0, 1, 1, 1)).forall(x => x._1 === x._2)) + assert(summarizer2.countInvalid === 0) + assert(summarizer2.numClasses === 6) + + val summarizer3 = (new MultiClassSummarizer) + .add(0.0).add(1.3).add(5.2).add(2.5).add(2.0).add(4.0).add(4.0).add(4.0).add(1.0) + assert(summarizer3.histogram.zip(Array[Long](1, 1, 1, 0, 3)).forall(x => x._1 === x._2)) + assert(summarizer3.countInvalid === 3) + assert(summarizer3.numClasses === 5) + + val summarizer4 = (new MultiClassSummarizer) + .add(3.1).add(4.3).add(2.0).add(1.0).add(3.0) + assert(summarizer4.histogram.zip(Array[Long](0, 1, 1, 1)).forall(x => x._1 === x._2)) + assert(summarizer4.countInvalid === 2) + assert(summarizer4.numClasses === 4) + + // small map merges large one + val summarizerA = summarizer1.merge(summarizer2) + assert(summarizerA.hashCode() === summarizer2.hashCode()) + assert(summarizerA.histogram.zip(Array[Long](2, 2, 0, 3, 2, 1, 1)).forall(x => x._1 === x._2)) + assert(summarizerA.countInvalid === 0) + assert(summarizerA.numClasses === 7) + + // large map merges small one + val summarizerB = summarizer3.merge(summarizer4) + assert(summarizerB.hashCode() === summarizer3.hashCode()) + assert(summarizerB.histogram.zip(Array[Long](1, 2, 2, 1, 3)).forall(x => x._1 === x._2)) + assert(summarizerB.countInvalid === 5) + assert(summarizerB.numClasses === 5) + } + + test("binary logistic regression with intercept without regularization") { + val trainer = (new LogisticRegression).setFitIntercept(true) + val model = trainer.fit(binaryDataset) + + /** + * Using the following R code to load the data and train the model using glmnet package. + * + * > library("glmnet") + * > data <- read.csv("path", header=FALSE) + * > label = factor(data$V1) + * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5)) + * > weights = coef(glmnet(features,label, family="binomial", alpha = 0, lambda = 0)) + * > weights + * 5 x 1 sparse Matrix of class "dgCMatrix" + * s0 + * (Intercept) 2.8366423 + * data.V2 -0.5895848 + * data.V3 0.8931147 + * data.V4 -0.3925051 + * data.V5 -0.7996864 + */ + val interceptR = 2.8366423 + val weightsR = Array(-0.5895848, 0.8931147, -0.3925051, -0.7996864) + + assert(model.intercept ~== interceptR relTol 1E-3) + assert(model.weights(0) ~== weightsR(0) relTol 1E-3) + assert(model.weights(1) ~== weightsR(1) relTol 1E-3) + assert(model.weights(2) ~== weightsR(2) relTol 1E-3) + assert(model.weights(3) ~== weightsR(3) relTol 1E-3) + } + + test("binary logistic regression without intercept without regularization") { + val trainer = (new LogisticRegression).setFitIntercept(false) + val model = trainer.fit(binaryDataset) + + /** + * Using the following R code to load the data and train the model using glmnet package. + * + * > library("glmnet") + * > data <- read.csv("path", header=FALSE) + * > label = factor(data$V1) + * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5)) + * > weights = + * coef(glmnet(features,label, family="binomial", alpha = 0, lambda = 0, intercept=FALSE)) + * > weights + * 5 x 1 sparse Matrix of class "dgCMatrix" + * s0 + * (Intercept) . + * data.V2 -0.3534996 + * data.V3 1.2964482 + * data.V4 -0.3571741 + * data.V5 -0.7407946 + */ + val interceptR = 0.0 + val weightsR = Array(-0.3534996, 1.2964482, -0.3571741, -0.7407946) + + assert(model.intercept ~== interceptR relTol 1E-3) + assert(model.weights(0) ~== weightsR(0) relTol 1E-2) + assert(model.weights(1) ~== weightsR(1) relTol 1E-2) + assert(model.weights(2) ~== weightsR(2) relTol 1E-3) + assert(model.weights(3) ~== weightsR(3) relTol 1E-3) + } + + test("binary logistic regression with intercept with L1 regularization") { + val trainer = (new LogisticRegression).setFitIntercept(true) + .setElasticNetParam(1.0).setRegParam(0.12) + val model = trainer.fit(binaryDataset) + + /** + * Using the following R code to load the data and train the model using glmnet package. + * + * > library("glmnet") + * > data <- read.csv("path", header=FALSE) + * > label = factor(data$V1) + * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5)) + * > weights = coef(glmnet(features,label, family="binomial", alpha = 1, lambda = 0.12)) + * > weights + * 5 x 1 sparse Matrix of class "dgCMatrix" + * s0 + * (Intercept) -0.05627428 + * data.V2 . + * data.V3 . + * data.V4 -0.04325749 + * data.V5 -0.02481551 + */ + val interceptR = -0.05627428 + val weightsR = Array(0.0, 0.0, -0.04325749, -0.02481551) + + assert(model.intercept ~== interceptR relTol 1E-2) + assert(model.weights(0) ~== weightsR(0) relTol 1E-3) + assert(model.weights(1) ~== weightsR(1) relTol 1E-3) + assert(model.weights(2) ~== weightsR(2) relTol 1E-2) + assert(model.weights(3) ~== weightsR(3) relTol 2E-2) + } + + test("binary logistic regression without intercept with L1 regularization") { + val trainer = (new LogisticRegression).setFitIntercept(false) + .setElasticNetParam(1.0).setRegParam(0.12) + val model = trainer.fit(binaryDataset) + + /** + * Using the following R code to load the data and train the model using glmnet package. + * + * > library("glmnet") + * > data <- read.csv("path", header=FALSE) + * > label = factor(data$V1) + * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5)) + * > weights = coef(glmnet(features,label, family="binomial", alpha = 1, lambda = 0.12, + * intercept=FALSE)) + * > weights + * 5 x 1 sparse Matrix of class "dgCMatrix" + * s0 + * (Intercept) . + * data.V2 . + * data.V3 . + * data.V4 -0.05189203 + * data.V5 -0.03891782 + */ + val interceptR = 0.0 + val weightsR = Array(0.0, 0.0, -0.05189203, -0.03891782) + + assert(model.intercept ~== interceptR relTol 1E-3) + assert(model.weights(0) ~== weightsR(0) relTol 1E-3) + assert(model.weights(1) ~== weightsR(1) relTol 1E-3) + assert(model.weights(2) ~== weightsR(2) relTol 1E-2) + assert(model.weights(3) ~== weightsR(3) relTol 1E-2) + } + + test("binary logistic regression with intercept with L2 regularization") { + val trainer = (new LogisticRegression).setFitIntercept(true) + .setElasticNetParam(0.0).setRegParam(1.37) + val model = trainer.fit(binaryDataset) + + /** + * Using the following R code to load the data and train the model using glmnet package. + * + * > library("glmnet") + * > data <- read.csv("path", header=FALSE) + * > label = factor(data$V1) + * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5)) + * > weights = coef(glmnet(features,label, family="binomial", alpha = 0, lambda = 1.37)) + * > weights + * 5 x 1 sparse Matrix of class "dgCMatrix" + * s0 + * (Intercept) 0.15021751 + * data.V2 -0.07251837 + * data.V3 0.10724191 + * data.V4 -0.04865309 + * data.V5 -0.10062872 + */ + val interceptR = 0.15021751 + val weightsR = Array(-0.07251837, 0.10724191, -0.04865309, -0.10062872) + + assert(model.intercept ~== interceptR relTol 1E-3) + assert(model.weights(0) ~== weightsR(0) relTol 1E-3) + assert(model.weights(1) ~== weightsR(1) relTol 1E-3) + assert(model.weights(2) ~== weightsR(2) relTol 1E-3) + assert(model.weights(3) ~== weightsR(3) relTol 1E-3) + } + + test("binary logistic regression without intercept with L2 regularization") { + val trainer = (new LogisticRegression).setFitIntercept(false) + .setElasticNetParam(0.0).setRegParam(1.37) + val model = trainer.fit(binaryDataset) + + /** + * Using the following R code to load the data and train the model using glmnet package. + * + * > library("glmnet") + * > data <- read.csv("path", header=FALSE) + * > label = factor(data$V1) + * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5)) + * > weights = coef(glmnet(features,label, family="binomial", alpha = 0, lambda = 1.37, + * intercept=FALSE)) + * > weights + * 5 x 1 sparse Matrix of class "dgCMatrix" + * s0 + * (Intercept) . + * data.V2 -0.06099165 + * data.V3 0.12857058 + * data.V4 -0.04708770 + * data.V5 -0.09799775 + */ + val interceptR = 0.0 + val weightsR = Array(-0.06099165, 0.12857058, -0.04708770, -0.09799775) + + assert(model.intercept ~== interceptR relTol 1E-3) + assert(model.weights(0) ~== weightsR(0) relTol 1E-2) + assert(model.weights(1) ~== weightsR(1) relTol 1E-2) + assert(model.weights(2) ~== weightsR(2) relTol 1E-3) + assert(model.weights(3) ~== weightsR(3) relTol 1E-3) + } + + test("binary logistic regression with intercept with ElasticNet regularization") { + val trainer = (new LogisticRegression).setFitIntercept(true) + .setElasticNetParam(0.38).setRegParam(0.21) + val model = trainer.fit(binaryDataset) + + /** + * Using the following R code to load the data and train the model using glmnet package. + * + * > library("glmnet") + * > data <- read.csv("path", header=FALSE) + * > label = factor(data$V1) + * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5)) + * > weights = coef(glmnet(features,label, family="binomial", alpha = 0.38, lambda = 0.21)) + * > weights + * 5 x 1 sparse Matrix of class "dgCMatrix" + * s0 + * (Intercept) 0.57734851 + * data.V2 -0.05310287 + * data.V3 . + * data.V4 -0.08849250 + * data.V5 -0.15458796 + */ + val interceptR = 0.57734851 + val weightsR = Array(-0.05310287, 0.0, -0.08849250, -0.15458796) + + assert(model.intercept ~== interceptR relTol 6E-3) + assert(model.weights(0) ~== weightsR(0) relTol 5E-3) + assert(model.weights(1) ~== weightsR(1) relTol 1E-3) + assert(model.weights(2) ~== weightsR(2) relTol 5E-3) + assert(model.weights(3) ~== weightsR(3) relTol 1E-3) + } + + test("binary logistic regression without intercept with ElasticNet regularization") { + val trainer = (new LogisticRegression).setFitIntercept(false) + .setElasticNetParam(0.38).setRegParam(0.21) + val model = trainer.fit(binaryDataset) + + /** + * Using the following R code to load the data and train the model using glmnet package. + * + * > library("glmnet") + * > data <- read.csv("path", header=FALSE) + * > label = factor(data$V1) + * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5)) + * > weights = coef(glmnet(features,label, family="binomial", alpha = 0.38, lambda = 0.21, + * intercept=FALSE)) + * > weights + * 5 x 1 sparse Matrix of class "dgCMatrix" + * s0 + * (Intercept) . + * data.V2 -0.001005743 + * data.V3 0.072577857 + * data.V4 -0.081203769 + * data.V5 -0.142534158 + */ + val interceptR = 0.0 + val weightsR = Array(-0.001005743, 0.072577857, -0.081203769, -0.142534158) + + assert(model.intercept ~== interceptR relTol 1E-3) + assert(model.weights(0) ~== weightsR(0) absTol 1E-3) + assert(model.weights(1) ~== weightsR(1) absTol 1E-2) + assert(model.weights(2) ~== weightsR(2) relTol 1E-3) + assert(model.weights(3) ~== weightsR(3) relTol 1E-2) + } + + test("binary logistic regression with intercept with strong L1 regularization") { + val trainer = (new LogisticRegression).setFitIntercept(true) + .setElasticNetParam(1.0).setRegParam(6.0) + val model = trainer.fit(binaryDataset) + + val histogram = binaryDataset.map { case Row(label: Double, features: Vector) => label } + .treeAggregate(new MultiClassSummarizer)( + seqOp = (c, v) => (c, v) match { + case (classSummarizer: MultiClassSummarizer, label: Double) => classSummarizer.add(label) + }, + combOp = (c1, c2) => (c1, c2) match { + case (classSummarizer1: MultiClassSummarizer, classSummarizer2: MultiClassSummarizer) => + classSummarizer1.merge(classSummarizer2) + }).histogram + + /** + * For binary logistic regression with strong L1 regularization, all the weights will be zeros. + * As a result, + * {{{ + * P(0) = 1 / (1 + \exp(b)), and + * P(1) = \exp(b) / (1 + \exp(b)) + * }}}, hence + * {{{ + * b = \log{P(1) / P(0)} = \log{count_1 / count_0} + * }}} + */ + val interceptTheory = Math.log(histogram(1).toDouble / histogram(0).toDouble) + val weightsTheory = Array(0.0, 0.0, 0.0, 0.0) + + assert(model.intercept ~== interceptTheory relTol 1E-5) + assert(model.weights(0) ~== weightsTheory(0) absTol 1E-6) + assert(model.weights(1) ~== weightsTheory(1) absTol 1E-6) + assert(model.weights(2) ~== weightsTheory(2) absTol 1E-6) + assert(model.weights(3) ~== weightsTheory(3) absTol 1E-6) + + /** + * Using the following R code to load the data and train the model using glmnet package. + * + * > library("glmnet") + * > data <- read.csv("path", header=FALSE) + * > label = factor(data$V1) + * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5)) + * > weights = coef(glmnet(features,label, family="binomial", alpha = 1.0, lambda = 6.0)) + * > weights + * 5 x 1 sparse Matrix of class "dgCMatrix" + * s0 + * (Intercept) -0.2480643 + * data.V2 0.0000000 + * data.V3 . + * data.V4 . + * data.V5 . + */ + val interceptR = -0.248065 + val weightsR = Array(0.0, 0.0, 0.0, 0.0) + + assert(model.intercept ~== interceptR relTol 1E-5) + assert(model.weights(0) ~== weightsR(0) absTol 1E-6) + assert(model.weights(1) ~== weightsR(1) absTol 1E-6) + assert(model.weights(2) ~== weightsR(2) absTol 1E-6) + assert(model.weights(3) ~== weightsR(3) absTol 1E-6) + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala index a26c52852c4d7..fb0a194718802 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala @@ -91,21 +91,21 @@ object LogisticRegressionSuite { seed: Int): Seq[LabeledPoint] = { val rnd = new Random(seed) - val xDim = xMean.size + val xDim = xMean.length val xWithInterceptsDim = if (addIntercept) xDim + 1 else xDim - val nClasses = weights.size / xWithInterceptsDim + 1 + val nClasses = weights.length / xWithInterceptsDim + 1 val x = Array.fill[Vector](nPoints)(Vectors.dense(Array.fill[Double](xDim)(rnd.nextGaussian()))) - x.map(vector => { + x.foreach { vector => // This doesn't work if `vector` is a sparse vector. val vectorArray = vector.toArray var i = 0 - while (i < vectorArray.size) { + while (i < vectorArray.length) { vectorArray(i) = vectorArray(i) * math.sqrt(xVariance(i)) + xMean(i) i += 1 } - }) + } val y = (0 until nPoints).map { idx => val xArray = x(idx).toArray