Skip to content

Commit

Permalink
[SPARK-17772][ML][TEST] Add test functions for ML sample weights
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

More and more ML algos are accepting sample weights, and they have been tested rather heterogeneously and with code duplication. This patch adds extensible helper methods to `MLTestingUtils` that can be reused by various algorithms accepting sample weights. Up to now, there seems to be a few tests that have been implemented commonly:

* Check that oversampling is the same as giving the instances sample weights proportional to the number of samples
* Check that outliers with tiny sample weights do not affect the algorithm's performance

This patch adds an additional test:

* Check that algorithms are invariant to constant scaling of the sample weights. i.e. uniform sample weights with `w_i = 1.0` is effectively the same as uniform sample weights with `w_i = 10000` or `w_i = 0.0001`

The instances of these tests occurred in LinearRegression, NaiveBayes, and LogisticRegression. Those tests have been removed/modified to use the new helper methods. These helper functions will be of use when [SPARK-9478](https://issues.apache.org/jira/browse/SPARK-9478) is implemented.

## How was this patch tested?

This patch only involves modifying test suites.

## Other notes

Both IsotonicRegression and GeneralizedLinearRegression also extend `HasWeightCol`. I did not modify these test suites because it will make this patch easier to review, and because they did not duplicate the same tests as the three suites that were modified. If we want to change them later, we can create a JIRA for it now, but it's open for debate.

Author: sethah <[email protected]>

Closes apache#15721 from sethah/SPARK-17772.
  • Loading branch information
sethah authored and yanboliang committed Dec 28, 2016
1 parent d7bce3b commit 6a475ae
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1836,52 +1836,24 @@ class LogisticRegressionSuite
.forall(x => x(0) >= x(1)))
}

test("binary logistic regression with weighted data") {
val numClasses = 2
val numPoints = 40
val outlierData = MLTestingUtils.genClassificationInstancesWithWeightedOutliers(spark,
numClasses, numPoints)
val testData = Array.tabulate[LabeledPoint](numClasses) { i =>
LabeledPoint(i.toDouble, Vectors.dense(i.toDouble))
}.toSeq.toDF()
val lr = new LogisticRegression().setFamily("binomial").setWeightCol("weight")
val model = lr.fit(outlierData)
val results = model.transform(testData).select("label", "prediction").collect()

// check that the predictions are the one to one mapping
results.foreach { case Row(label: Double, pred: Double) =>
assert(label === pred)
test("logistic regression with sample weights") {
def modelEquals(m1: LogisticRegressionModel, m2: LogisticRegressionModel): Unit = {
assert(m1.coefficientMatrix ~== m2.coefficientMatrix absTol 0.05)
assert(m1.interceptVector ~== m2.interceptVector absTol 0.05)
}
val (overSampledData, weightedData) =
MLTestingUtils.genEquivalentOversampledAndWeightedInstances(outlierData, "label", "features",
42L)
val weightedModel = lr.fit(weightedData)
val overSampledModel = lr.setWeightCol("").fit(overSampledData)
assert(weightedModel.coefficientMatrix ~== overSampledModel.coefficientMatrix relTol 0.01)
}

test("multinomial logistic regression with weighted data") {
val numClasses = 5
val numPoints = 40
val outlierData = MLTestingUtils.genClassificationInstancesWithWeightedOutliers(spark,
numClasses, numPoints)
val testData = Array.tabulate[LabeledPoint](numClasses) { i =>
LabeledPoint(i.toDouble, Vectors.dense(i.toDouble))
}.toSeq.toDF()
val mlr = new LogisticRegression().setFamily("multinomial").setWeightCol("weight")
val model = mlr.fit(outlierData)
val results = model.transform(testData).select("label", "prediction").collect()

// check that the predictions are the one to one mapping
results.foreach { case Row(label: Double, pred: Double) =>
assert(label === pred)
val testParams = Seq(
("binomial", smallBinaryDataset, 2),
("multinomial", smallMultinomialDataset, 3)
)
testParams.foreach { case (family, dataset, numClasses) =>
val estimator = new LogisticRegression().setFamily(family)
MLTestingUtils.testArbitrarilyScaledWeights[LogisticRegressionModel, LogisticRegression](
dataset.as[LabeledPoint], estimator, modelEquals)
MLTestingUtils.testOutliersWithSmallWeights[LogisticRegressionModel, LogisticRegression](
dataset.as[LabeledPoint], estimator, numClasses, modelEquals)
MLTestingUtils.testOversamplingVsWeighting[LogisticRegressionModel, LogisticRegression](
dataset.as[LabeledPoint], estimator, modelEquals, seed)
}
val (overSampledData, weightedData) =
MLTestingUtils.genEquivalentOversampledAndWeightedInstances(outlierData, "label", "features",
42L)
val weightedModel = mlr.fit(weightedData)
val overSampledModel = mlr.setWeightCol("").fit(overSampledData)
assert(weightedModel.coefficientMatrix ~== overSampledModel.coefficientMatrix relTol 0.01)
}

test("set family") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,22 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
import testImplicits._

@transient var dataset: Dataset[_] = _
@transient var bernoulliDataset: Dataset[_] = _

private val seed = 42

override def beforeAll(): Unit = {
super.beforeAll()

val pi = Array(0.5, 0.1, 0.4).map(math.log)
val pi = Array(0.3, 0.3, 0.4).map(math.log)
val theta = Array(
Array(0.70, 0.10, 0.10, 0.10), // label 0
Array(0.10, 0.70, 0.10, 0.10), // label 1
Array(0.10, 0.10, 0.70, 0.10) // label 2
Array(0.30, 0.30, 0.30, 0.30), // label 0
Array(0.30, 0.30, 0.30, 0.30), // label 1
Array(0.40, 0.40, 0.40, 0.40) // label 2
).map(_.map(math.log))

dataset = generateNaiveBayesInput(pi, theta, 100, 42).toDF()
dataset = generateNaiveBayesInput(pi, theta, 100, seed).toDF()
bernoulliDataset = generateNaiveBayesInput(pi, theta, 100, seed, "bernoulli").toDF()
}

def validatePrediction(predictionAndLabels: DataFrame): Unit = {
Expand Down Expand Up @@ -139,7 +143,7 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
val theta = new DenseMatrix(3, 4, thetaArray.flatten, true)

val testDataset =
generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial").toDF()
generateNaiveBayesInput(piArray, thetaArray, nPoints, seed, "multinomial").toDF()
val nb = new NaiveBayes().setSmoothing(1.0).setModelType("multinomial")
val model = nb.fit(testDataset)

Expand All @@ -157,50 +161,27 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
validateProbabilities(featureAndProbabilities, model, "multinomial")
}

test("Naive Bayes Multinomial with weighted samples") {
val nPoints = 1000
val piArray = Array(0.5, 0.1, 0.4).map(math.log)
val thetaArray = Array(
Array(0.70, 0.10, 0.10, 0.10), // label 0
Array(0.10, 0.70, 0.10, 0.10), // label 1
Array(0.10, 0.10, 0.70, 0.10) // label 2
).map(_.map(math.log))

val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial").toDF()
val (overSampledData, weightedData) =
MLTestingUtils.genEquivalentOversampledAndWeightedInstances(testData,
"label", "features", 42L)
val nb = new NaiveBayes().setModelType("multinomial")
val unweightedModel = nb.fit(weightedData)
val overSampledModel = nb.fit(overSampledData)
val weightedModel = nb.setWeightCol("weight").fit(weightedData)
assert(weightedModel.theta ~== overSampledModel.theta relTol 0.001)
assert(weightedModel.pi ~== overSampledModel.pi relTol 0.001)
assert(unweightedModel.theta !~= overSampledModel.theta relTol 0.001)
assert(unweightedModel.pi !~= overSampledModel.pi relTol 0.001)
}

test("Naive Bayes Bernoulli with weighted samples") {
val nPoints = 10000
val piArray = Array(0.5, 0.3, 0.2).map(math.log)
val thetaArray = Array(
Array(0.50, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.40), // label 0
Array(0.02, 0.70, 0.10, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02), // label 1
Array(0.02, 0.02, 0.60, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.30) // label 2
).map(_.map(math.log))

val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "bernoulli").toDF()
val (overSampledData, weightedData) =
MLTestingUtils.genEquivalentOversampledAndWeightedInstances(testData,
"label", "features", 42L)
val nb = new NaiveBayes().setModelType("bernoulli")
val unweightedModel = nb.fit(weightedData)
val overSampledModel = nb.fit(overSampledData)
val weightedModel = nb.setWeightCol("weight").fit(weightedData)
assert(weightedModel.theta ~== overSampledModel.theta relTol 0.001)
assert(weightedModel.pi ~== overSampledModel.pi relTol 0.001)
assert(unweightedModel.theta !~= overSampledModel.theta relTol 0.001)
assert(unweightedModel.pi !~= overSampledModel.pi relTol 0.001)
test("Naive Bayes with weighted samples") {
val numClasses = 3
def modelEquals(m1: NaiveBayesModel, m2: NaiveBayesModel): Unit = {
assert(m1.pi ~== m2.pi relTol 0.01)
assert(m1.theta ~== m2.theta relTol 0.01)
}
val testParams = Seq(
("bernoulli", bernoulliDataset),
("multinomial", dataset)
)
testParams.foreach { case (family, dataset) =>
// NaiveBayes is sensitive to constant scaling of the weights unless smoothing is set to 0
val estimatorNoSmoothing = new NaiveBayes().setSmoothing(0.0).setModelType(family)
val estimatorWithSmoothing = new NaiveBayes().setModelType(family)
MLTestingUtils.testArbitrarilyScaledWeights[NaiveBayesModel, NaiveBayes](
dataset.as[LabeledPoint], estimatorNoSmoothing, modelEquals)
MLTestingUtils.testOutliersWithSmallWeights[NaiveBayesModel, NaiveBayes](
dataset.as[LabeledPoint], estimatorWithSmoothing, numClasses, modelEquals)
MLTestingUtils.testOversamplingVsWeighting[NaiveBayesModel, NaiveBayes](
dataset.as[LabeledPoint], estimatorWithSmoothing, modelEquals, seed)
}
}

test("Naive Bayes Bernoulli") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class LinearRegressionSuite

private val seed: Int = 42
@transient var datasetWithDenseFeature: DataFrame = _
@transient var datasetWithStrongNoise: DataFrame = _
@transient var datasetWithDenseFeatureWithoutIntercept: DataFrame = _
@transient var datasetWithSparseFeature: DataFrame = _
@transient var datasetWithWeight: DataFrame = _
Expand All @@ -47,6 +48,11 @@ class LinearRegressionSuite
datasetWithDenseFeature = sc.parallelize(LinearDataGenerator.generateLinearInput(
intercept = 6.3, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3),
xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 2).map(_.asML).toDF()

datasetWithStrongNoise = sc.parallelize(LinearDataGenerator.generateLinearInput(
intercept = 6.3, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3),
xVariance = Array(0.7, 1.2), nPoints = 100, seed, eps = 5.0), 2).map(_.asML).toDF()

/*
datasetWithDenseFeatureWithoutIntercept is not needed for correctness testing
but is useful for illustrating training model without intercept
Expand Down Expand Up @@ -95,6 +101,7 @@ class LinearRegressionSuite
Instance(17.0, 3.0, Vectors.dense(2.0, 11.0)),
Instance(17.0, 4.0, Vectors.dense(3.0, 13.0))
), 2).toDF()

datasetWithWeightZeroLabel = sc.parallelize(Seq(
Instance(0.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
Instance(0.0, 2.0, Vectors.dense(1.0, 7.0)),
Expand Down Expand Up @@ -810,91 +817,34 @@ class LinearRegressionSuite
}

test("linear regression with weighted samples") {
Seq("auto", "l-bfgs", "normal").foreach { solver =>
val (data, weightedData) = {
val activeData = LinearDataGenerator.generateLinearInput(
6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 500, 1, 0.1).map(_.asML)

val rnd = new Random(8392)
val signedData = activeData.map { case p: LabeledPoint =>
(rnd.nextGaussian() > 0.0, p)
}

val data1 = signedData.flatMap {
case (true, p) => Iterator(p, p)
case (false, p) => Iterator(p)
}

val weightedSignedData = signedData.flatMap {
case (true, LabeledPoint(label, features)) =>
Iterator(
Instance(label, weight = 1.2, features),
Instance(label, weight = 0.8, features)
)
case (false, LabeledPoint(label, features)) =>
Iterator(
Instance(label, weight = 0.3, features),
Instance(label, weight = 0.1, features),
Instance(label, weight = 0.6, features)
)
}

val noiseData = LinearDataGenerator.generateLinearInput(
2, Array(1, 3), Array(0.9, -1.3), Array(0.7, 1.2), 500, 1, 0.1).map(_.asML)
val weightedNoiseData = noiseData.map {
case LabeledPoint(label, features) => Instance(label, weight = 0, features)
}
val data2 = weightedSignedData ++ weightedNoiseData

(sc.parallelize(data1, 4).toDF(), sc.parallelize(data2, 4).toDF())
}

val trainer1a = (new LinearRegression).setFitIntercept(true)
.setElasticNetParam(0.0).setRegParam(0.21).setStandardization(true).setSolver(solver)
val trainer1b = (new LinearRegression).setFitIntercept(true).setWeightCol("weight")
.setElasticNetParam(0.0).setRegParam(0.21).setStandardization(true).setSolver(solver)

// Normal optimizer is not supported with non-zero elasticnet parameter.
val model1a0 = trainer1a.fit(data)
val model1a1 = trainer1a.fit(weightedData)
val model1b = trainer1b.fit(weightedData)

assert(model1a0.coefficients !~= model1a1.coefficients absTol 1E-3)
assert(model1a0.intercept !~= model1a1.intercept absTol 1E-3)
assert(model1a0.coefficients ~== model1b.coefficients absTol 1E-3)
assert(model1a0.intercept ~== model1b.intercept absTol 1E-3)

val trainer2a = (new LinearRegression).setFitIntercept(true)
.setElasticNetParam(0.0).setRegParam(0.21).setStandardization(false).setSolver(solver)
val trainer2b = (new LinearRegression).setFitIntercept(true).setWeightCol("weight")
.setElasticNetParam(0.0).setRegParam(0.21).setStandardization(false).setSolver(solver)
val model2a0 = trainer2a.fit(data)
val model2a1 = trainer2a.fit(weightedData)
val model2b = trainer2b.fit(weightedData)
assert(model2a0.coefficients !~= model2a1.coefficients absTol 1E-3)
assert(model2a0.intercept !~= model2a1.intercept absTol 1E-3)
assert(model2a0.coefficients ~== model2b.coefficients absTol 1E-3)
assert(model2a0.intercept ~== model2b.intercept absTol 1E-3)

val trainer3a = (new LinearRegression).setFitIntercept(false)
.setElasticNetParam(0.0).setRegParam(0.21).setStandardization(true).setSolver(solver)
val trainer3b = (new LinearRegression).setFitIntercept(false).setWeightCol("weight")
.setElasticNetParam(0.0).setRegParam(0.21).setStandardization(true).setSolver(solver)
val model3a0 = trainer3a.fit(data)
val model3a1 = trainer3a.fit(weightedData)
val model3b = trainer3b.fit(weightedData)
assert(model3a0.coefficients !~= model3a1.coefficients absTol 1E-3)
assert(model3a0.coefficients ~== model3b.coefficients absTol 1E-3)

val trainer4a = (new LinearRegression).setFitIntercept(false)
.setElasticNetParam(0.0).setRegParam(0.21).setStandardization(false).setSolver(solver)
val trainer4b = (new LinearRegression).setFitIntercept(false).setWeightCol("weight")
.setElasticNetParam(0.0).setRegParam(0.21).setStandardization(false).setSolver(solver)
val model4a0 = trainer4a.fit(data)
val model4a1 = trainer4a.fit(weightedData)
val model4b = trainer4b.fit(weightedData)
assert(model4a0.coefficients !~= model4a1.coefficients absTol 1E-3)
assert(model4a0.coefficients ~== model4b.coefficients absTol 1E-3)
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val numClasses = 0
def modelEquals(m1: LinearRegressionModel, m2: LinearRegressionModel): Unit = {
assert(m1.coefficients ~== m2.coefficients relTol 0.01)
assert(m1.intercept ~== m2.intercept relTol 0.01)
}
val testParams = Seq(
// (elasticNetParam, regParam, fitIntercept, standardization)
(0.0, 0.21, true, true),
(0.0, 0.21, true, false),
(0.0, 0.21, false, false),
(1.0, 0.21, true, true)
)

for (solver <- Seq("auto", "l-bfgs", "normal");
(elasticNetParam, regParam, fitIntercept, standardization) <- testParams) {
val estimator = new LinearRegression()
.setFitIntercept(fitIntercept)
.setStandardization(standardization)
.setRegParam(regParam)
.setElasticNetParam(elasticNetParam)
MLTestingUtils.testArbitrarilyScaledWeights[LinearRegressionModel, LinearRegression](
datasetWithStrongNoise.as[LabeledPoint], estimator, modelEquals)
MLTestingUtils.testOutliersWithSmallWeights[LinearRegressionModel, LinearRegression](
datasetWithStrongNoise.as[LabeledPoint], estimator, numClasses, modelEquals)
MLTestingUtils.testOversamplingVsWeighting[LinearRegressionModel, LinearRegression](
datasetWithStrongNoise.as[LabeledPoint], estimator, modelEquals, seed)
}
}

Expand Down
Loading

0 comments on commit 6a475ae

Please sign in to comment.