diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala index 5590ee728a843..4b9e674c68c4e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala @@ -20,6 +20,7 @@ package org.apache.spark.examples import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg.SVD import org.apache.spark.mllib.linalg.MatrixEntry +import org.apache.spark.mllib.linalg.SparseMatrix /** * Compute SVD of an example matrix @@ -48,10 +49,10 @@ object SparkSVD { val n = 4 // recover largest singular vector - val decomposed = SVD.sparseSVD(data, m, n, 1) - val u = decomposed.U - val s = decomposed.S - val v = decomposed.V + val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), 1) + val u = decomposed.U.data + val s = decomposed.S.data + val v = decomposed.V.data println("singular values = " + s.toArray.mkString) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 31990b0223c42..a8efdc787e270 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -26,11 +26,8 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} /** * Class used to obtain singular value decompositions - * @param data Matrix in sparse matrix format - * @param m number of rows - * @param n number of columns */ -class SVD(var data: RDD[MatrixEntry], var m: Int, var n: Int) { +class SVD { private var k: Int = 1 /** @@ -41,35 +38,11 @@ class SVD(var data: RDD[MatrixEntry], var m: Int, var n: Int) { this } - /** - * Set matrix to be used for SVD - */ - def setDatadata(data: RDD[MatrixEntry]): this.type = { - this.data = data - this - } - - /** - * Set dimensions of matrix: rows - */ - def setNumRows(m: Int): this.type = { - this.m = m - this - } - - /** - * Set dimensions of matrix: columns - */ - def setNumCols(n: Int): this.type = { - this.n = n - this - } - /** * Compute SVD using the current set parameters */ - def computeSVD() : SVDecomposedMatrix = { - SVD.sparseSVD(data, m, n, k) + def computeSVD(matrix: SparseMatrix) : SVDecomposedMatrix = { + SVD.sparseSVD(matrix, k) } } @@ -103,19 +76,19 @@ object SVD { * All input and output is expected in sparse matrix format, 1-indexed * as tuples of the form ((i,j),value) all in RDDs * - * @param data RDD Matrix in sparse 1-index format ((int, int), value) - * @param m number of rows - * @param n number of columns + * @param matrix sparse matrix to factorize * @param k Recover k singular values and vectors * @return Three sparse matrices: U, S, V such that A = USV^T */ def sparseSVD( - data: RDD[MatrixEntry], - m: Int, - n: Int, + matrix: SparseMatrix, k: Int) : SVDecomposedMatrix = { + val data = matrix.data + val m = matrix.m + val n = matrix.n + if (m < n || m <= 0 || n <= 0) { throw new IllegalArgumentException("Expecting a tall and skinny matrix") } @@ -153,13 +126,16 @@ object SVD { val sc = data.sparkContext // prepare V for returning - val retV = sc.makeRDD( + val retVdata = sc.makeRDD( Array.tabulate(V.rows, sigma.length){ (i,j) => MatrixEntry(i + 1, j + 1, V.get(i,j)) }.flatten) - - val retS = sc.makeRDD(Array.tabulate(sigma.length){ + val retV = SparseMatrix(retVdata, V.rows, sigma.length) + + val retSdata = sc.makeRDD(Array.tabulate(sigma.length){ x => MatrixEntry(x + 1, x + 1, sigma(x))}) + val retS = SparseMatrix(retSdata, sigma.length, sigma.length) + // Compute U as U = A V S^-1 // turn V S^-1 into an RDD as a sparse matrix val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) @@ -168,10 +144,11 @@ object SVD { // Multiply A by VS^-1 val aCols = data.map(entry => (entry.j, (entry.i, entry.mval))) val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) - val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) + val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_) .map{ case ((row, col), mval) => MatrixEntry(row, col, mval)} - + val retU = SparseMatrix(retUdata, m, sigma.length) + SVDecomposedMatrix(retU, retS, retV) } @@ -195,10 +172,10 @@ object SVD { MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) } - val decomposed = SVD.sparseSVD(data, m, n, k) - val u = decomposed.U - val s = decomposed.S - val v = decomposed.V + val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k) + val u = decomposed.U.data + val s = decomposed.S.data + val v = decomposed.V.data println("Computed " + s.toArray.length + " singular values and vectors") u.saveAsTextFile(output_u) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala index e0bcdab2d2856..622003576d474 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala @@ -17,8 +17,6 @@ package org.apache.spark.mllib.linalg -import org.apache.spark.rdd.RDD - /** * Class that represents the SV decomposition of a matrix * @@ -26,6 +24,6 @@ import org.apache.spark.rdd.RDD * @param S such that A = USV^T * @param V such that A = USV^T */ -case class SVDecomposedMatrix(val U: RDD[MatrixEntry], - val S: RDD[MatrixEntry], - val V: RDD[MatrixEntry]) +case class SVDecomposedMatrix(val U: SparseMatrix, + val S: SparseMatrix, + val V: SparseMatrix) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala new file mode 100644 index 0000000000000..cbd1a2a5a4bd8 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.rdd.RDD + + +/** + * Class that represents a sparse matrix + * + * @param data RDD of nonzero entries + * @param m number of rows + * @param n numner of columns + */ +case class SparseMatrix(val data: RDD[MatrixEntry], val m: Int, val n: Int) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala index 4126e819e3296..f239e8505ff1a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala @@ -45,9 +45,12 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val EPSILON = 1e-4 // Return jblas matrix from sparse matrix RDD - def getDenseMatrix(matrix:RDD[MatrixEntry], m:Int, n:Int) : DoubleMatrix = { + def getDenseMatrix(matrix:SparseMatrix) : DoubleMatrix = { + val data = matrix.data + val m = matrix.m + val n = matrix.n val ret = DoubleMatrix.zeros(m, n) - matrix.toArray.map(x => ret.put(x.i - 1, x.j - 1, x.mval)) + matrix.data.toArray.map(x => ret.put(x.i - 1, x.j - 1, x.mval)) ret } @@ -67,24 +70,26 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) => MatrixEntry(a + 1, b + 1, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten ) - val decomposed = SVD.sparseSVD(data, m, n, n) + val a = SparseMatrix(data, m, n) + + val decomposed = SVD.sparseSVD(a, n) val u = decomposed.U val v = decomposed.V - val s = decomposed.S + val s = decomposed.S - val densea = getDenseMatrix(data, m, n) + val densea = getDenseMatrix(a) val svd = Singular.sparseSVD(densea) - val retu = getDenseMatrix(u, m, n) - val rets = getDenseMatrix(s, n, n) - val retv = getDenseMatrix(v, n, n) + val retu = getDenseMatrix(u) + val rets = getDenseMatrix(s) + val retv = getDenseMatrix(v) // check individual decomposition assertMatrixEquals(retu, svd(0)) assertMatrixEquals(rets, DoubleMatrix.diag(svd(1))) assertMatrixEquals(retv, svd(2)) - // check multiplication guarantee + // check multiplication guarantee assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea) } @@ -95,20 +100,22 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { MatrixEntry(a + 1, b + 1, 1.0) }.flatten ) val k = 1 - val decomposed = SVD.sparseSVD(data, m, n, k) + val a = SparseMatrix(data, m, n) + + val decomposed = SVD.sparseSVD(a, k) val u = decomposed.U val s = decomposed.S val v = decomposed.V - val retrank = s.toArray.length + val retrank = s.data.toArray.length assert(retrank == 1, "rank returned not one") - val densea = getDenseMatrix(data, m, n) + val densea = getDenseMatrix(a) val svd = Singular.sparseSVD(densea) - val retu = getDenseMatrix(u, m, retrank) - val rets = getDenseMatrix(s, retrank, retrank) - val retv = getDenseMatrix(v, n, retrank) + val retu = getDenseMatrix(u) + val rets = getDenseMatrix(s) + val retv = getDenseMatrix(v) // check individual decomposition assertMatrixEquals(retu, svd(0).getColumn(0)) @@ -124,21 +131,22 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val n = 3 val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) => MatrixEntry(a + 1, b + 1, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten ) + val a = SparseMatrix(data, m, n) val k = 1 // only one svalue above this - val decomposed = SVD.sparseSVD(data, m, n, k) + val decomposed = SVD.sparseSVD(a, k) val u = decomposed.U val s = decomposed.S val v = decomposed.V - val retrank = s.toArray.length + val retrank = s.data.toArray.length - val densea = getDenseMatrix(data, m, n) + val densea = getDenseMatrix(a) val svd = Singular.sparseSVD(densea) - val retu = getDenseMatrix(u, m, retrank) - val rets = getDenseMatrix(s, retrank, retrank) - val retv = getDenseMatrix(v, n, retrank) + val retu = getDenseMatrix(u) + val rets = getDenseMatrix(s) + val retv = getDenseMatrix(v) assert(retrank == 1, "rank returned not one")