diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 297a2bf37d2cf..5d90dddb5df1c 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -62,6 +62,7 @@ class LogisticRegressionModel(LinearModel): """ def predict(self, x): + x = _convert_to_vector(x) margin = self.weights.dot(x) + self._intercept if margin > 0: prob = 1 / (1 + exp(-margin)) @@ -79,7 +80,7 @@ def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, """ Train a logistic regression model on the given data. - :param data: The training data. + :param data: The training data, an RDD of LabeledPoint. :param iterations: The number of iterations (default: 100). :param step: The step parameter used in SGD (default: 1.0). @@ -136,6 +137,7 @@ class SVMModel(LinearModel): """ def predict(self, x): + x = _convert_to_vector(x) margin = self.weights.dot(x) + self.intercept return 1 if margin >= 0 else 0 @@ -148,7 +150,7 @@ def train(cls, data, iterations=100, step=1.0, regParam=1.0, """ Train a support vector machine on the given data. - :param data: The training data. + :param data: The training data, an RDD of LabeledPoint. :param iterations: The number of iterations (default: 100). :param step: The step parameter used in SGD (default: 1.0). @@ -233,11 +235,12 @@ def train(cls, data, lambda_=1.0): classification. By making every vector a 0-1 vector, it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}). - :param data: RDD of NumPy vectors, one per element, where the first - coordinate is the label and the rest is the feature vector - (e.g. a count vector). + :param data: RDD of LabeledPoint. :param lambda_: The smoothing parameter """ + first = data.first() + if not isinstance(first, LabeledPoint): + raise ValueError("`data` should be an RDD of LabeledPoint") labels, pi, theta = callMLlibFunc("trainNaiveBayes", data, lambda_) return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta)) diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index 44bf6f269d7a3..9ec28079aef43 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -25,7 +25,7 @@ from pyspark import RDD, SparkContext from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper -from pyspark.mllib.linalg import Vectors +from pyspark.mllib.linalg import Vectors, _convert_to_vector __all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler', 'HashingTF', 'IDFModel', 'IDF', 'Word2Vec', 'Word2VecModel'] @@ -81,12 +81,16 @@ def transform(self, vector): """ Applies unit length normalization on a vector. - :param vector: vector to be normalized. + :param vector: vector or RDD of vector to be normalized. :return: normalized vector. If the norm of the input is zero, it will return the input vector. """ sc = SparkContext._active_spark_context assert sc is not None, "SparkContext should be initialized first" + if isinstance(vector, RDD): + vector = vector.map(_convert_to_vector) + else: + vector = _convert_to_vector(vector) return callMLlibFunc("normalizeVector", self.p, vector) @@ -95,8 +99,12 @@ class JavaVectorTransformer(JavaModelWrapper, VectorTransformer): Wrapper for the model in JVM """ - def transform(self, dataset): - return self.call("transform", dataset) + def transform(self, vector): + if isinstance(vector, RDD): + vector = vector.map(_convert_to_vector) + else: + vector = _convert_to_vector(vector) + return self.call("transform", vector) class StandardScalerModel(JavaVectorTransformer): @@ -109,7 +117,7 @@ def transform(self, vector): """ Applies standardization transformation on a vector. - :param vector: Vector to be standardized. + :param vector: Vector or RDD of Vector to be standardized. :return: Standardized vector. If the variance of a column is zero, it will return default `0.0` for the column with zero variance. """ @@ -154,6 +162,7 @@ def fit(self, dataset): the transformation model. :return: a StandardScalarModel """ + dataset = dataset.map(_convert_to_vector) jmodel = callMLlibFunc("fitStandardScaler", self.withMean, self.withStd, dataset) return StandardScalerModel(jmodel) @@ -211,6 +220,8 @@ def transform(self, dataset): :param dataset: an RDD of term frequency vectors :return: an RDD of TF-IDF vectors """ + if not isinstance(dataset, RDD): + raise TypeError("dataset should be an RDD of term frequency vectors") return JavaVectorTransformer.transform(self, dataset) @@ -255,7 +266,9 @@ def fit(self, dataset): :param dataset: an RDD of term frequency vectors """ - jmodel = callMLlibFunc("fitIDF", self.minDocFreq, dataset) + if not isinstance(dataset, RDD): + raise TypeError("dataset should be an RDD of term frequency vectors") + jmodel = callMLlibFunc("fitIDF", self.minDocFreq, dataset.map(_convert_to_vector)) return IDFModel(jmodel) @@ -287,6 +300,8 @@ def findSynonyms(self, word, num): Note: local use only """ + if not isinstance(word, basestring): + word = _convert_to_vector(word) words, similarity = self.call("findSynonyms", word, num) return zip(words, similarity) @@ -374,9 +389,11 @@ def fit(self, data): """ Computes the vector representation of each word in vocabulary. - :param data: training data. RDD of subtype of Iterable[String] + :param data: training data. RDD of list of string :return: Word2VecModel instance """ + if not isinstance(data, RDD): + raise TypeError("data should be an RDD of list of string") jmodel = callMLlibFunc("trainWord2Vec", data, int(self.vectorSize), float(self.learningRate), int(self.numPartitions), int(self.numIterations), long(self.seed)) diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py index 7eebfc6bcd894..cb4304f92152b 100644 --- a/python/pyspark/mllib/random.py +++ b/python/pyspark/mllib/random.py @@ -52,6 +52,12 @@ def uniformRDD(sc, size, numPartitions=None, seed=None): C{RandomRDDs.uniformRDD(sc, n, p, seed)\ .map(lambda v: a + (b - a) * v)} + :param sc: SparkContext used to create the RDD. + :param size: Size of the RDD. + :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`). + :param seed: Random seed (default: a random long integer). + :return: RDD of float comprised of i.i.d. samples ~ `U(0.0, 1.0)`. + >>> x = RandomRDDs.uniformRDD(sc, 100).collect() >>> len(x) 100 @@ -76,6 +82,12 @@ def normalRDD(sc, size, numPartitions=None, seed=None): C{RandomRDDs.normal(sc, n, p, seed)\ .map(lambda v: mean + sigma * v)} + :param sc: SparkContext used to create the RDD. + :param size: Size of the RDD. + :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`). + :param seed: Random seed (default: a random long integer). + :return: RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0). + >>> x = RandomRDDs.normalRDD(sc, 1000, seed=1L) >>> stats = x.stats() >>> stats.count() @@ -93,6 +105,13 @@ def poissonRDD(sc, mean, size, numPartitions=None, seed=None): Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean. + :param sc: SparkContext used to create the RDD. + :param mean: Mean, or lambda, for the Poisson distribution. + :param size: Size of the RDD. + :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`). + :param seed: Random seed (default: a random long integer). + :return: RDD of float comprised of i.i.d. samples ~ Pois(mean). + >>> mean = 100.0 >>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2L) >>> stats = x.stats() @@ -104,7 +123,7 @@ def poissonRDD(sc, mean, size, numPartitions=None, seed=None): >>> abs(stats.stdev() - sqrt(mean)) < 0.5 True """ - return callMLlibFunc("poissonRDD", sc._jsc, mean, size, numPartitions, seed) + return callMLlibFunc("poissonRDD", sc._jsc, float(mean), size, numPartitions, seed) @staticmethod @toArray @@ -113,6 +132,13 @@ def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): Generates an RDD comprised of vectors containing i.i.d. samples drawn from the uniform distribution U(0.0, 1.0). + :param sc: SparkContext used to create the RDD. + :param numRows: Number of Vectors in the RDD. + :param numCols: Number of elements in each Vector. + :param numPartitions: Number of partitions in the RDD. + :param seed: Seed for the RNG that generates the seed for the generator in each partition. + :return: RDD of Vector with vectors containing i.i.d samples ~ `U(0.0, 1.0)`. + >>> import numpy as np >>> mat = np.matrix(RandomRDDs.uniformVectorRDD(sc, 10, 10).collect()) >>> mat.shape @@ -131,6 +157,13 @@ def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): Generates an RDD comprised of vectors containing i.i.d. samples drawn from the standard normal distribution. + :param sc: SparkContext used to create the RDD. + :param numRows: Number of Vectors in the RDD. + :param numCols: Number of elements in each Vector. + :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`). + :param seed: Random seed (default: a random long integer). + :return: RDD of Vector with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`. + >>> import numpy as np >>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1L).collect()) >>> mat.shape @@ -149,6 +182,14 @@ def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None): Generates an RDD comprised of vectors containing i.i.d. samples drawn from the Poisson distribution with the input mean. + :param sc: SparkContext used to create the RDD. + :param mean: Mean, or lambda, for the Poisson distribution. + :param numRows: Number of Vectors in the RDD. + :param numCols: Number of elements in each Vector. + :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`) + :param seed: Random seed (default: a random long integer). + :return: RDD of Vector with vectors containing i.i.d. samples ~ Pois(mean). + >>> import numpy as np >>> mean = 100.0 >>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1L) @@ -161,7 +202,7 @@ def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None): >>> abs(mat.std() - sqrt(mean)) < 0.5 True """ - return callMLlibFunc("poissonVectorRDD", sc._jsc, mean, numRows, numCols, + return callMLlibFunc("poissonVectorRDD", sc._jsc, float(mean), numRows, numCols, numPartitions, seed) diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index e26b152e0cdfd..41bbd9a779c70 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -32,7 +32,7 @@ def __reduce__(self): return Rating, (self.user, self.product, self.rating) def __repr__(self): - return "Rating(%d, %d, %d)" % (self.user, self.product, self.rating) + return "Rating(%d, %d, %s)" % (self.user, self.product, self.rating) class MatrixFactorizationModel(JavaModelWrapper): @@ -51,7 +51,7 @@ class MatrixFactorizationModel(JavaModelWrapper): >>> testset = sc.parallelize([(1, 2), (1, 1)]) >>> model = ALS.train(ratings, 1, seed=10) >>> model.predictAll(testset).collect() - [Rating(1, 1, 1), Rating(1, 2, 1)] + [Rating(1, 1, 1.0471...), Rating(1, 2, 1.9679...)] >>> model = ALS.train(ratings, 4, seed=10) >>> model.userFeatures().collect() @@ -79,7 +79,7 @@ class MatrixFactorizationModel(JavaModelWrapper): 0.4473... """ def predict(self, user, product): - return self._java_model.predict(user, product) + return self._java_model.predict(int(user), int(product)) def predictAll(self, user_product): assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)" diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 43c1a2fc101dd..66e25a48dfa71 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -36,7 +36,7 @@ class LabeledPoint(object): """ def __init__(self, label, features): - self.label = label + self.label = float(label) self.features = _convert_to_vector(features) def __reduce__(self): @@ -46,7 +46,7 @@ def __str__(self): return "(" + ",".join((str(self.label), str(self.features))) + ")" def __repr__(self): - return "LabeledPoint(" + ",".join((repr(self.label), repr(self.features))) + ")" + return "LabeledPoint(%s, %s)" % (self.label, self.features) class LinearModel(object): @@ -55,7 +55,7 @@ class LinearModel(object): def __init__(self, weights, intercept): self._coeff = _convert_to_vector(weights) - self._intercept = intercept + self._intercept = float(intercept) @property def weights(self): @@ -66,7 +66,7 @@ def intercept(self): return self._intercept def __repr__(self): - return "(weights=%s, intercept=%s)" % (self._coeff, self._intercept) + return "(weights=%s, intercept=%r)" % (self._coeff, self._intercept) class LinearRegressionModelBase(LinearModel): @@ -85,6 +85,7 @@ def predict(self, x): Predict the value of the dependent variable given a vector x containing values for the independent variables. """ + x = _convert_to_vector(x) return self.weights.dot(x) + self.intercept @@ -124,6 +125,9 @@ class LinearRegressionModel(LinearRegressionModelBase): # return the result of a call to the appropriate JVM stub. # _regression_train_wrapper is responsible for setup and error checking. def _regression_train_wrapper(train_func, modelClass, data, initial_weights): + first = data.first() + if not isinstance(first, LabeledPoint): + raise ValueError("data should be an RDD of LabeledPoint, but got %s" % first) initial_weights = initial_weights or [0.0] * len(data.first().features) weights, intercept = train_func(_to_java_object_rdd(data, cache=True), _convert_to_vector(initial_weights)) @@ -264,7 +268,8 @@ def train(rdd, i): def _test(): import doctest from pyspark import SparkContext - globs = globals().copy() + import pyspark.mllib.regression + globs = pyspark.mllib.regression.__dict__.copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py index 0700f8a8e5a8e..1980f5b03f430 100644 --- a/python/pyspark/mllib/stat.py +++ b/python/pyspark/mllib/stat.py @@ -22,6 +22,7 @@ from pyspark import RDD from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper from pyspark.mllib.linalg import Matrix, _convert_to_vector +from pyspark.mllib.regression import LabeledPoint __all__ = ['MultivariateStatisticalSummary', 'ChiSqTestResult', 'Statistics'] @@ -107,6 +108,11 @@ def colStats(rdd): """ Computes column-wise summary statistics for the input RDD[Vector]. + :param rdd: an RDD[Vector] for which column-wise summary statistics + are to be computed. + :return: :class:`MultivariateStatisticalSummary` object containing + column-wise summary statistics. + >>> from pyspark.mllib.linalg import Vectors >>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]), ... Vectors.dense([4, 5, 0, 3]), @@ -140,6 +146,13 @@ def corr(x, y=None, method=None): to specify the method to be used for single RDD inout. If two RDDs of floats are passed in, a single float is returned. + :param x: an RDD of vector for which the correlation matrix is to be computed, + or an RDD of float of the same cardinality as y when y is specified. + :param y: an RDD of float of the same cardinality as x. + :param method: String specifying the method to use for computing correlation. + Supported: `pearson` (default), `spearman` + :return: Correlation matrix comparing columns in x. + >>> x = sc.parallelize([1.0, 0.0, -2.0], 2) >>> y = sc.parallelize([4.0, 5.0, 3.0], 2) >>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2) @@ -242,7 +255,6 @@ def chiSqTest(observed, expected=None): >>> print round(chi.statistic, 4) 21.9958 - >>> from pyspark.mllib.regression import LabeledPoint >>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])), ... LabeledPoint(0.0, Vectors.dense([1.5, 20.0])), ... LabeledPoint(1.0, Vectors.dense([1.5, 30.0])), @@ -257,6 +269,8 @@ def chiSqTest(observed, expected=None): 1.5 """ if isinstance(observed, RDD): + if not isinstance(observed.first(), LabeledPoint): + raise ValueError("observed should be an RDD of LabeledPoint") jmodels = callMLlibFunc("chiSqTest", observed) return [ChiSqTestResult(m) for m in jmodels] diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 96aef8f510fa6..4ed978b45409c 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -161,15 +161,8 @@ def loadLabeledPoints(sc, path, minPartitions=None): >>> tempFile = NamedTemporaryFile(delete=True) >>> tempFile.close() >>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name) - >>> loaded = MLUtils.loadLabeledPoints(sc, tempFile.name).collect() - >>> type(loaded[0]) == LabeledPoint - True - >>> print examples[0] - (1.1,(3,[0,2],[-1.23,4.56e-07])) - >>> type(examples[1]) == LabeledPoint - True - >>> print examples[1] - (0.0,[1.01,2.02,3.03]) + >>> MLUtils.loadLabeledPoints(sc, tempFile.name).collect() + [LabeledPoint(1.1, (3,[0,2],[-1.23,4.56e-07])), LabeledPoint(0.0, [1.01,2.02,3.03])] """ minPartitions = minPartitions or min(sc.defaultParallelism, 2) return callMLlibFunc("loadLabeledPoints", sc, path, minPartitions)