Skip to content

Commit

Permalink
[SPARK-16445][MLLIB][SPARKR] Multilayer Perceptron Classifier wrapper…
Browse files Browse the repository at this point in the history
… in SparkR

https://issues.apache.org/jira/browse/SPARK-16445

## What changes were proposed in this pull request?

Create Multilayer Perceptron Classifier wrapper in SparkR

## How was this patch tested?

Tested manually on local machine

Author: Xin Ren <[email protected]>

Closes apache#14447 from keypointt/SPARK-16445.
  • Loading branch information
keypointt authored and Felix Cheung committed Aug 24, 2016
1 parent d2932a0 commit 2fbdb60
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 5 deletions.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ exportMethods("glm",
"summary",
"spark.kmeans",
"fitted",
"spark.mlp",
"spark.naiveBayes",
"spark.survreg",
"spark.lda",
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,10 @@ setGeneric("spark.kmeans", function(data, formula, ...) { standardGeneric("spark
#' @export
setGeneric("fitted")

#' @rdname spark.mlp
#' @export
setGeneric("spark.mlp", function(data, ...) { standardGeneric("spark.mlp") })

#' @rdname spark.naiveBayes
#' @export
setGeneric("spark.naiveBayes", function(data, formula, ...) { standardGeneric("spark.naiveBayes") })
Expand Down
125 changes: 120 additions & 5 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ setClass("AFTSurvivalRegressionModel", representation(jobj = "jobj"))
#' @note KMeansModel since 2.0.0
setClass("KMeansModel", representation(jobj = "jobj"))

#' S4 class that represents a MultilayerPerceptronClassificationModel
#'
#' @param jobj a Java object reference to the backing Scala MultilayerPerceptronClassifierWrapper
#' @export
#' @note MultilayerPerceptronClassificationModel since 2.1.0
setClass("MultilayerPerceptronClassificationModel", representation(jobj = "jobj"))

#' S4 class that represents an IsotonicRegressionModel
#'
#' @param jobj a Java object reference to the backing Scala IsotonicRegressionModel
Expand Down Expand Up @@ -90,7 +97,7 @@ setClass("ALSModel", representation(jobj = "jobj"))
#' @export
#' @seealso \link{spark.glm}, \link{glm},
#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans},
#' @seealso \link{spark.lda}, \link{spark.naiveBayes}, \link{spark.survreg},
#' @seealso \link{spark.lda}, \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg}
#' @seealso \link{read.ml}
NULL

Expand All @@ -103,7 +110,7 @@ NULL
#' @export
#' @seealso \link{spark.glm}, \link{glm},
#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans},
#' @seealso \link{spark.naiveBayes}, \link{spark.survreg},
#' @seealso \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg}
NULL

write_internal <- function(object, path, overwrite = FALSE) {
Expand Down Expand Up @@ -631,6 +638,95 @@ setMethod("predict", signature(object = "KMeansModel"),
predict_internal(object, newData)
})

#' Multilayer Perceptron Classification Model
#'
#' \code{spark.mlp} fits a multi-layer perceptron neural network model against a SparkDataFrame.
#' Users can call \code{summary} to print a summary of the fitted model, \code{predict} to make
#' predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models.
#' Only categorical data is supported.
#' For more details, see
#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html}{
#' Multilayer Perceptron}
#'
#' @param data a \code{SparkDataFrame} of observations and labels for model fitting.
#' @param blockSize blockSize parameter.
#' @param layers integer vector containing the number of nodes for each layer
#' @param solver solver parameter, supported options: "gd" (minibatch gradient descent) or "l-bfgs".
#' @param maxIter maximum iteration number.
#' @param tol convergence tolerance of iterations.
#' @param stepSize stepSize parameter.
#' @param seed seed parameter for weights initialization.
#' @param ... additional arguments passed to the method.
#' @return \code{spark.mlp} returns a fitted Multilayer Perceptron Classification Model.
#' @rdname spark.mlp
#' @aliases spark.mlp,SparkDataFrame-method
#' @name spark.mlp
#' @seealso \link{read.ml}
#' @export
#' @examples
#' \dontrun{
#' df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
#'
#' # fit a Multilayer Perceptron Classification Model
#' model <- spark.mlp(df, blockSize = 128, layers = c(4, 5, 4, 3), solver = "l-bfgs",
#' maxIter = 100, tol = 0.5, stepSize = 1, seed = 1)
#'
#' # get the summary of the model
#' summary(model)
#'
#' # make predictions
#' predictions <- predict(model, df)
#'
#' # save and load the model
#' path <- "path/to/model"
#' write.ml(model, path)
#' savedModel <- read.ml(path)
#' summary(savedModel)
#' }
#' @note spark.mlp since 2.1.0
setMethod("spark.mlp", signature(data = "SparkDataFrame"),
function(data, blockSize = 128, layers = c(3, 5, 2), solver = "l-bfgs", maxIter = 100,
tol = 0.5, stepSize = 1, seed = 1) {
jobj <- callJStatic("org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper",
"fit", data@sdf, as.integer(blockSize), as.array(layers),
as.character(solver), as.integer(maxIter), as.numeric(tol),
as.numeric(stepSize), as.integer(seed))
new("MultilayerPerceptronClassificationModel", jobj = jobj)
})

# Makes predictions from a model produced by spark.mlp().

#' @param newData a SparkDataFrame for testing.
#' @return \code{predict} returns a SparkDataFrame containing predicted labeled in a column named
#' "prediction".
#' @rdname spark.mlp
#' @aliases predict,MultilayerPerceptronClassificationModel-method
#' @export
#' @note predict(MultilayerPerceptronClassificationModel) since 2.1.0
setMethod("predict", signature(object = "MultilayerPerceptronClassificationModel"),
function(object, newData) {
predict_internal(object, newData)
})

# Returns the summary of a Multilayer Perceptron Classification Model produced by \code{spark.mlp}

#' @param object a Multilayer Perceptron Classification Model fitted by \code{spark.mlp}
#' @return \code{summary} returns a list containing \code{layers}, the label distribution, and
#' \code{tables}, conditional probabilities given the target label.
#' @rdname spark.mlp
#' @export
#' @aliases summary,MultilayerPerceptronClassificationModel-method
#' @note summary(MultilayerPerceptronClassificationModel) since 2.1.0
setMethod("summary", signature(object = "MultilayerPerceptronClassificationModel"),
function(object) {
jobj <- object@jobj
labelCount <- callJMethod(jobj, "labelCount")
layers <- unlist(callJMethod(jobj, "layers"))
weights <- callJMethod(jobj, "weights")
weights <- matrix(weights, nrow = length(weights))
list(labelCount = labelCount, layers = layers, weights = weights)
})

#' Naive Bayes Models
#'
#' \code{spark.naiveBayes} fits a Bernoulli naive Bayes model against a SparkDataFrame.
Expand Down Expand Up @@ -685,7 +781,7 @@ setMethod("spark.naiveBayes", signature(data = "SparkDataFrame", formula = "form
#'
#' @rdname spark.naiveBayes
#' @export
#' @seealso \link{read.ml}
#' @seealso \link{write.ml}
#' @note write.ml(NaiveBayesModel, character) since 2.0.0
setMethod("write.ml", signature(object = "NaiveBayesModel", path = "character"),
function(object, path, overwrite = FALSE) {
Expand All @@ -700,7 +796,7 @@ setMethod("write.ml", signature(object = "NaiveBayesModel", path = "character"),
#' @rdname spark.survreg
#' @export
#' @note write.ml(AFTSurvivalRegressionModel, character) since 2.0.0
#' @seealso \link{read.ml}
#' @seealso \link{write.ml}
setMethod("write.ml", signature(object = "AFTSurvivalRegressionModel", path = "character"),
function(object, path, overwrite = FALSE) {
write_internal(object, path, overwrite)
Expand Down Expand Up @@ -734,6 +830,23 @@ setMethod("write.ml", signature(object = "KMeansModel", path = "character"),
write_internal(object, path, overwrite)
})

# Saves the Multilayer Perceptron Classification Model to the input path.

#' @param path the directory where the model is saved.
#' @param overwrite overwrites or not if the output path already exists. Default is FALSE
#' which means throw exception if the output path exists.
#'
#' @rdname spark.mlp
#' @aliases write.ml,MultilayerPerceptronClassificationModel,character-method
#' @export
#' @seealso \link{write.ml}
#' @note write.ml(MultilayerPerceptronClassificationModel, character) since 2.1.0
setMethod("write.ml", signature(object = "MultilayerPerceptronClassificationModel",
path = "character"),
function(object, path, overwrite = FALSE) {
write_internal(object, path, overwrite)
})

# Save fitted IsotonicRegressionModel to the input path

#' @param path The directory where the model is saved
Expand Down Expand Up @@ -791,14 +904,16 @@ read.ml <- function(path) {
new("KMeansModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.LDAWrapper")) {
new("LDAModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper")) {
new("MultilayerPerceptronClassificationModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.IsotonicRegressionWrapper")) {
new("IsotonicRegressionModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GaussianMixtureWrapper")) {
new("GaussianMixtureModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.ALSWrapper")) {
new("ALSModel", jobj = jobj)
} else {
stop(paste("Unsupported model: ", jobj))
stop("Unsupported model: ", jobj)
}
}

Expand Down
32 changes: 32 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,38 @@ test_that("spark.kmeans", {
unlink(modelPath)
})

test_that("spark.mlp", {
df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
model <- spark.mlp(df, blockSize = 128, layers = c(4, 5, 4, 3), solver = "l-bfgs", maxIter = 100,
tol = 0.5, stepSize = 1, seed = 1)

# Test summary method
summary <- summary(model)
expect_equal(summary$labelCount, 3)
expect_equal(summary$layers, c(4, 5, 4, 3))
expect_equal(length(summary$weights), 64)

# Test predict method
mlpTestDF <- df
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 6), c(0, 1, 1, 1, 1, 1))

# Test model save/load
modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
summary2 <- summary(model2)

expect_equal(summary2$labelCount, 3)
expect_equal(summary2$layers, c(4, 5, 4, 3))
expect_equal(length(summary2$weights), 64)

unlink(modelPath)

})

test_that("spark.naiveBayes", {
# R code to reproduce the result.
# We do not support instance weights yet. So we ignore the frequencies.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.r

import org.apache.hadoop.fs.Path
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{MultilayerPerceptronClassificationModel, MultilayerPerceptronClassifier}
import org.apache.spark.ml.util.{MLReadable, MLReader, MLWritable, MLWriter}
import org.apache.spark.sql.{DataFrame, Dataset}

private[r] class MultilayerPerceptronClassifierWrapper private (
val pipeline: PipelineModel,
val labelCount: Long,
val layers: Array[Int],
val weights: Array[Double]
) extends MLWritable {

def transform(dataset: Dataset[_]): DataFrame = {
pipeline.transform(dataset)
}

/**
* Returns an [[MLWriter]] instance for this ML instance.
*/
override def write: MLWriter =
new MultilayerPerceptronClassifierWrapper.MultilayerPerceptronClassifierWrapperWriter(this)
}

private[r] object MultilayerPerceptronClassifierWrapper
extends MLReadable[MultilayerPerceptronClassifierWrapper] {

val PREDICTED_LABEL_COL = "prediction"

def fit(
data: DataFrame,
blockSize: Int,
layers: Array[Double],
solver: String,
maxIter: Int,
tol: Double,
stepSize: Double,
seed: Int
): MultilayerPerceptronClassifierWrapper = {
// get labels and feature names from output schema
val schema = data.schema

// assemble and fit the pipeline
val mlp = new MultilayerPerceptronClassifier()
.setLayers(layers.map(_.toInt))
.setBlockSize(blockSize)
.setSolver(solver)
.setMaxIter(maxIter)
.setTol(tol)
.setStepSize(stepSize)
.setSeed(seed)
.setPredictionCol(PREDICTED_LABEL_COL)
val pipeline = new Pipeline()
.setStages(Array(mlp))
.fit(data)

val multilayerPerceptronClassificationModel: MultilayerPerceptronClassificationModel =
pipeline.stages.head.asInstanceOf[MultilayerPerceptronClassificationModel]

val weights = multilayerPerceptronClassificationModel.weights.toArray
val layersFromPipeline = multilayerPerceptronClassificationModel.layers
val labelCount = data.select("label").distinct().count()

new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layersFromPipeline, weights)
}

/**
* Returns an [[MLReader]] instance for this class.
*/
override def read: MLReader[MultilayerPerceptronClassifierWrapper] =
new MultilayerPerceptronClassifierWrapperReader

override def load(path: String): MultilayerPerceptronClassifierWrapper = super.load(path)

class MultilayerPerceptronClassifierWrapperReader
extends MLReader[MultilayerPerceptronClassifierWrapper]{

override def load(path: String): MultilayerPerceptronClassifierWrapper = {
implicit val format = DefaultFormats
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val labelCount = (rMetadata \ "labelCount").extract[Long]
val layers = (rMetadata \ "layers").extract[Array[Int]]
val weights = (rMetadata \ "weights").extract[Array[Double]]

val pipeline = PipelineModel.load(pipelinePath)
new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layers, weights)
}
}

class MultilayerPerceptronClassifierWrapperWriter(instance: MultilayerPerceptronClassifierWrapper)
extends MLWriter {

override protected def saveImpl(path: String): Unit = {
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadata = ("class" -> instance.getClass.getName) ~
("labelCount" -> instance.labelCount) ~
("layers" -> instance.layers.toSeq) ~
("weights" -> instance.weights.toArray.toSeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
}
}
2 changes: 2 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ private[r] object RWrappers extends MLReader[Object] {
GeneralizedLinearRegressionWrapper.load(path)
case "org.apache.spark.ml.r.KMeansWrapper" =>
KMeansWrapper.load(path)
case "org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper" =>
MultilayerPerceptronClassifierWrapper.load(path)
case "org.apache.spark.ml.r.LDAWrapper" =>
LDAWrapper.load(path)
case "org.apache.spark.ml.r.IsotonicRegressionWrapper" =>
Expand Down

0 comments on commit 2fbdb60

Please sign in to comment.