Skip to content

Commit

Permalink
[SPARK-4324] [PySpark] [MLlib] support numpy.array for all MLlib API
Browse files Browse the repository at this point in the history
This PR check all of the existing Python MLlib API to make sure that numpy.array is supported as Vector (also RDD of numpy.array).

It also improve some docstring and doctest.

cc mateiz mengxr

Author: Davies Liu <[email protected]>

Closes apache#3189 from davies/numpy and squashes the following commits:

d5057c4 [Davies Liu] fix tests
6987611 [Davies Liu] support numpy.array for all MLlib API
  • Loading branch information
Davies Liu authored and mengxr committed Nov 11, 2014
1 parent 3c07b8f commit 65083e9
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 32 deletions.
13 changes: 8 additions & 5 deletions python/pyspark/mllib/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class LogisticRegressionModel(LinearModel):
"""

def predict(self, x):
x = _convert_to_vector(x)
margin = self.weights.dot(x) + self._intercept
if margin > 0:
prob = 1 / (1 + exp(-margin))
Expand All @@ -79,7 +80,7 @@ def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
"""
Train a logistic regression model on the given data.
:param data: The training data.
:param data: The training data, an RDD of LabeledPoint.
:param iterations: The number of iterations (default: 100).
:param step: The step parameter used in SGD
(default: 1.0).
Expand Down Expand Up @@ -136,6 +137,7 @@ class SVMModel(LinearModel):
"""

def predict(self, x):
x = _convert_to_vector(x)
margin = self.weights.dot(x) + self.intercept
return 1 if margin >= 0 else 0

Expand All @@ -148,7 +150,7 @@ def train(cls, data, iterations=100, step=1.0, regParam=1.0,
"""
Train a support vector machine on the given data.
:param data: The training data.
:param data: The training data, an RDD of LabeledPoint.
:param iterations: The number of iterations (default: 100).
:param step: The step parameter used in SGD
(default: 1.0).
Expand Down Expand Up @@ -233,11 +235,12 @@ def train(cls, data, lambda_=1.0):
classification. By making every vector a 0-1 vector, it can also be
used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
:param data: RDD of NumPy vectors, one per element, where the first
coordinate is the label and the rest is the feature vector
(e.g. a count vector).
:param data: RDD of LabeledPoint.
:param lambda_: The smoothing parameter
"""
first = data.first()
if not isinstance(first, LabeledPoint):
raise ValueError("`data` should be an RDD of LabeledPoint")
labels, pi, theta = callMLlibFunc("trainNaiveBayes", data, lambda_)
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))

Expand Down
31 changes: 24 additions & 7 deletions python/pyspark/mllib/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from pyspark import RDD, SparkContext
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg import Vectors, _convert_to_vector

__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
'HashingTF', 'IDFModel', 'IDF', 'Word2Vec', 'Word2VecModel']
Expand Down Expand Up @@ -81,12 +81,16 @@ def transform(self, vector):
"""
Applies unit length normalization on a vector.
:param vector: vector to be normalized.
:param vector: vector or RDD of vector to be normalized.
:return: normalized vector. If the norm of the input is zero, it
will return the input vector.
"""
sc = SparkContext._active_spark_context
assert sc is not None, "SparkContext should be initialized first"
if isinstance(vector, RDD):
vector = vector.map(_convert_to_vector)
else:
vector = _convert_to_vector(vector)
return callMLlibFunc("normalizeVector", self.p, vector)


Expand All @@ -95,8 +99,12 @@ class JavaVectorTransformer(JavaModelWrapper, VectorTransformer):
Wrapper for the model in JVM
"""

def transform(self, dataset):
return self.call("transform", dataset)
def transform(self, vector):
if isinstance(vector, RDD):
vector = vector.map(_convert_to_vector)
else:
vector = _convert_to_vector(vector)
return self.call("transform", vector)


class StandardScalerModel(JavaVectorTransformer):
Expand All @@ -109,7 +117,7 @@ def transform(self, vector):
"""
Applies standardization transformation on a vector.
:param vector: Vector to be standardized.
:param vector: Vector or RDD of Vector to be standardized.
:return: Standardized vector. If the variance of a column is zero,
it will return default `0.0` for the column with zero variance.
"""
Expand Down Expand Up @@ -154,6 +162,7 @@ def fit(self, dataset):
the transformation model.
:return: a StandardScalarModel
"""
dataset = dataset.map(_convert_to_vector)
jmodel = callMLlibFunc("fitStandardScaler", self.withMean, self.withStd, dataset)
return StandardScalerModel(jmodel)

Expand Down Expand Up @@ -211,6 +220,8 @@ def transform(self, dataset):
:param dataset: an RDD of term frequency vectors
:return: an RDD of TF-IDF vectors
"""
if not isinstance(dataset, RDD):
raise TypeError("dataset should be an RDD of term frequency vectors")
return JavaVectorTransformer.transform(self, dataset)


Expand Down Expand Up @@ -255,7 +266,9 @@ def fit(self, dataset):
:param dataset: an RDD of term frequency vectors
"""
jmodel = callMLlibFunc("fitIDF", self.minDocFreq, dataset)
if not isinstance(dataset, RDD):
raise TypeError("dataset should be an RDD of term frequency vectors")
jmodel = callMLlibFunc("fitIDF", self.minDocFreq, dataset.map(_convert_to_vector))
return IDFModel(jmodel)


Expand Down Expand Up @@ -287,6 +300,8 @@ def findSynonyms(self, word, num):
Note: local use only
"""
if not isinstance(word, basestring):
word = _convert_to_vector(word)
words, similarity = self.call("findSynonyms", word, num)
return zip(words, similarity)

Expand Down Expand Up @@ -374,9 +389,11 @@ def fit(self, data):
"""
Computes the vector representation of each word in vocabulary.
:param data: training data. RDD of subtype of Iterable[String]
:param data: training data. RDD of list of string
:return: Word2VecModel instance
"""
if not isinstance(data, RDD):
raise TypeError("data should be an RDD of list of string")
jmodel = callMLlibFunc("trainWord2Vec", data, int(self.vectorSize),
float(self.learningRate), int(self.numPartitions),
int(self.numIterations), long(self.seed))
Expand Down
45 changes: 43 additions & 2 deletions python/pyspark/mllib/random.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def uniformRDD(sc, size, numPartitions=None, seed=None):
C{RandomRDDs.uniformRDD(sc, n, p, seed)\
.map(lambda v: a + (b - a) * v)}
:param sc: SparkContext used to create the RDD.
:param size: Size of the RDD.
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
:param seed: Random seed (default: a random long integer).
:return: RDD of float comprised of i.i.d. samples ~ `U(0.0, 1.0)`.
>>> x = RandomRDDs.uniformRDD(sc, 100).collect()
>>> len(x)
100
Expand All @@ -76,6 +82,12 @@ def normalRDD(sc, size, numPartitions=None, seed=None):
C{RandomRDDs.normal(sc, n, p, seed)\
.map(lambda v: mean + sigma * v)}
:param sc: SparkContext used to create the RDD.
:param size: Size of the RDD.
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
:param seed: Random seed (default: a random long integer).
:return: RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0).
>>> x = RandomRDDs.normalRDD(sc, 1000, seed=1L)
>>> stats = x.stats()
>>> stats.count()
Expand All @@ -93,6 +105,13 @@ def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
Generates an RDD comprised of i.i.d. samples from the Poisson
distribution with the input mean.
:param sc: SparkContext used to create the RDD.
:param mean: Mean, or lambda, for the Poisson distribution.
:param size: Size of the RDD.
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
:param seed: Random seed (default: a random long integer).
:return: RDD of float comprised of i.i.d. samples ~ Pois(mean).
>>> mean = 100.0
>>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2L)
>>> stats = x.stats()
Expand All @@ -104,7 +123,7 @@ def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
>>> abs(stats.stdev() - sqrt(mean)) < 0.5
True
"""
return callMLlibFunc("poissonRDD", sc._jsc, mean, size, numPartitions, seed)
return callMLlibFunc("poissonRDD", sc._jsc, float(mean), size, numPartitions, seed)

@staticmethod
@toArray
Expand All @@ -113,6 +132,13 @@ def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the uniform distribution U(0.0, 1.0).
:param sc: SparkContext used to create the RDD.
:param numRows: Number of Vectors in the RDD.
:param numCols: Number of elements in each Vector.
:param numPartitions: Number of partitions in the RDD.
:param seed: Seed for the RNG that generates the seed for the generator in each partition.
:return: RDD of Vector with vectors containing i.i.d samples ~ `U(0.0, 1.0)`.
>>> import numpy as np
>>> mat = np.matrix(RandomRDDs.uniformVectorRDD(sc, 10, 10).collect())
>>> mat.shape
Expand All @@ -131,6 +157,13 @@ def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the standard normal distribution.
:param sc: SparkContext used to create the RDD.
:param numRows: Number of Vectors in the RDD.
:param numCols: Number of elements in each Vector.
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
:param seed: Random seed (default: a random long integer).
:return: RDD of Vector with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`.
>>> import numpy as np
>>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1L).collect())
>>> mat.shape
Expand All @@ -149,6 +182,14 @@ def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the Poisson distribution with the input mean.
:param sc: SparkContext used to create the RDD.
:param mean: Mean, or lambda, for the Poisson distribution.
:param numRows: Number of Vectors in the RDD.
:param numCols: Number of elements in each Vector.
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`)
:param seed: Random seed (default: a random long integer).
:return: RDD of Vector with vectors containing i.i.d. samples ~ Pois(mean).
>>> import numpy as np
>>> mean = 100.0
>>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1L)
Expand All @@ -161,7 +202,7 @@ def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
>>> abs(mat.std() - sqrt(mean)) < 0.5
True
"""
return callMLlibFunc("poissonVectorRDD", sc._jsc, mean, numRows, numCols,
return callMLlibFunc("poissonVectorRDD", sc._jsc, float(mean), numRows, numCols,
numPartitions, seed)


Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/mllib/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __reduce__(self):
return Rating, (self.user, self.product, self.rating)

def __repr__(self):
return "Rating(%d, %d, %d)" % (self.user, self.product, self.rating)
return "Rating(%d, %d, %s)" % (self.user, self.product, self.rating)


class MatrixFactorizationModel(JavaModelWrapper):
Expand All @@ -51,7 +51,7 @@ class MatrixFactorizationModel(JavaModelWrapper):
>>> testset = sc.parallelize([(1, 2), (1, 1)])
>>> model = ALS.train(ratings, 1, seed=10)
>>> model.predictAll(testset).collect()
[Rating(1, 1, 1), Rating(1, 2, 1)]
[Rating(1, 1, 1.0471...), Rating(1, 2, 1.9679...)]
>>> model = ALS.train(ratings, 4, seed=10)
>>> model.userFeatures().collect()
Expand Down Expand Up @@ -79,7 +79,7 @@ class MatrixFactorizationModel(JavaModelWrapper):
0.4473...
"""
def predict(self, user, product):
return self._java_model.predict(user, product)
return self._java_model.predict(int(user), int(product))

def predictAll(self, user_product):
assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
Expand Down
15 changes: 10 additions & 5 deletions python/pyspark/mllib/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class LabeledPoint(object):
"""

def __init__(self, label, features):
self.label = label
self.label = float(label)
self.features = _convert_to_vector(features)

def __reduce__(self):
Expand All @@ -46,7 +46,7 @@ def __str__(self):
return "(" + ",".join((str(self.label), str(self.features))) + ")"

def __repr__(self):
return "LabeledPoint(" + ",".join((repr(self.label), repr(self.features))) + ")"
return "LabeledPoint(%s, %s)" % (self.label, self.features)


class LinearModel(object):
Expand All @@ -55,7 +55,7 @@ class LinearModel(object):

def __init__(self, weights, intercept):
self._coeff = _convert_to_vector(weights)
self._intercept = intercept
self._intercept = float(intercept)

@property
def weights(self):
Expand All @@ -66,7 +66,7 @@ def intercept(self):
return self._intercept

def __repr__(self):
return "(weights=%s, intercept=%s)" % (self._coeff, self._intercept)
return "(weights=%s, intercept=%r)" % (self._coeff, self._intercept)


class LinearRegressionModelBase(LinearModel):
Expand All @@ -85,6 +85,7 @@ def predict(self, x):
Predict the value of the dependent variable given a vector x
containing values for the independent variables.
"""
x = _convert_to_vector(x)
return self.weights.dot(x) + self.intercept


Expand Down Expand Up @@ -124,6 +125,9 @@ class LinearRegressionModel(LinearRegressionModelBase):
# return the result of a call to the appropriate JVM stub.
# _regression_train_wrapper is responsible for setup and error checking.
def _regression_train_wrapper(train_func, modelClass, data, initial_weights):
first = data.first()
if not isinstance(first, LabeledPoint):
raise ValueError("data should be an RDD of LabeledPoint, but got %s" % first)
initial_weights = initial_weights or [0.0] * len(data.first().features)
weights, intercept = train_func(_to_java_object_rdd(data, cache=True),
_convert_to_vector(initial_weights))
Expand Down Expand Up @@ -264,7 +268,8 @@ def train(rdd, i):
def _test():
import doctest
from pyspark import SparkContext
globs = globals().copy()
import pyspark.mllib.regression
globs = pyspark.mllib.regression.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
Expand Down
16 changes: 15 additions & 1 deletion python/pyspark/mllib/stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pyspark import RDD
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg import Matrix, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint


__all__ = ['MultivariateStatisticalSummary', 'ChiSqTestResult', 'Statistics']
Expand Down Expand Up @@ -107,6 +108,11 @@ def colStats(rdd):
"""
Computes column-wise summary statistics for the input RDD[Vector].
:param rdd: an RDD[Vector] for which column-wise summary statistics
are to be computed.
:return: :class:`MultivariateStatisticalSummary` object containing
column-wise summary statistics.
>>> from pyspark.mllib.linalg import Vectors
>>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
... Vectors.dense([4, 5, 0, 3]),
Expand Down Expand Up @@ -140,6 +146,13 @@ def corr(x, y=None, method=None):
to specify the method to be used for single RDD inout.
If two RDDs of floats are passed in, a single float is returned.
:param x: an RDD of vector for which the correlation matrix is to be computed,
or an RDD of float of the same cardinality as y when y is specified.
:param y: an RDD of float of the same cardinality as x.
:param method: String specifying the method to use for computing correlation.
Supported: `pearson` (default), `spearman`
:return: Correlation matrix comparing columns in x.
>>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
>>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
>>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
Expand Down Expand Up @@ -242,7 +255,6 @@ def chiSqTest(observed, expected=None):
>>> print round(chi.statistic, 4)
21.9958
>>> from pyspark.mllib.regression import LabeledPoint
>>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
... LabeledPoint(0.0, Vectors.dense([1.5, 20.0])),
... LabeledPoint(1.0, Vectors.dense([1.5, 30.0])),
Expand All @@ -257,6 +269,8 @@ def chiSqTest(observed, expected=None):
1.5
"""
if isinstance(observed, RDD):
if not isinstance(observed.first(), LabeledPoint):
raise ValueError("observed should be an RDD of LabeledPoint")
jmodels = callMLlibFunc("chiSqTest", observed)
return [ChiSqTestResult(m) for m in jmodels]

Expand Down
Loading

0 comments on commit 65083e9

Please sign in to comment.