Skip to content

Commit

Permalink
use SparseMatrix everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
rezazadeh committed Jan 4, 2014
1 parent cdff9fc commit 06c0f76
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.examples
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.SVD
import org.apache.spark.mllib.linalg.MatrixEntry
import org.apache.spark.mllib.linalg.SparseMatrix

/**
* Compute SVD of an example matrix
Expand Down Expand Up @@ -48,10 +49,10 @@ object SparkSVD {
val n = 4

// recover largest singular vector
val decomposed = SVD.sparseSVD(data, m, n, 1)
val u = decomposed.U
val s = decomposed.S
val v = decomposed.V
val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), 1)
val u = decomposed.U.data
val s = decomposed.S.data
val v = decomposed.V.data

println("singular values = " + s.toArray.mkString)
}
Expand Down
67 changes: 22 additions & 45 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,8 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}

/**
* Class used to obtain singular value decompositions
* @param data Matrix in sparse matrix format
* @param m number of rows
* @param n number of columns
*/
class SVD(var data: RDD[MatrixEntry], var m: Int, var n: Int) {
class SVD {
private var k: Int = 1

/**
Expand All @@ -41,35 +38,11 @@ class SVD(var data: RDD[MatrixEntry], var m: Int, var n: Int) {
this
}

/**
* Set matrix to be used for SVD
*/
def setDatadata(data: RDD[MatrixEntry]): this.type = {
this.data = data
this
}

/**
* Set dimensions of matrix: rows
*/
def setNumRows(m: Int): this.type = {
this.m = m
this
}

/**
* Set dimensions of matrix: columns
*/
def setNumCols(n: Int): this.type = {
this.n = n
this
}

/**
* Compute SVD using the current set parameters
*/
def computeSVD() : SVDecomposedMatrix = {
SVD.sparseSVD(data, m, n, k)
def computeSVD(matrix: SparseMatrix) : SVDecomposedMatrix = {
SVD.sparseSVD(matrix, k)
}
}

Expand Down Expand Up @@ -103,19 +76,19 @@ object SVD {
* All input and output is expected in sparse matrix format, 1-indexed
* as tuples of the form ((i,j),value) all in RDDs
*
* @param data RDD Matrix in sparse 1-index format ((int, int), value)
* @param m number of rows
* @param n number of columns
* @param matrix sparse matrix to factorize
* @param k Recover k singular values and vectors
* @return Three sparse matrices: U, S, V such that A = USV^T
*/
def sparseSVD(
data: RDD[MatrixEntry],
m: Int,
n: Int,
matrix: SparseMatrix,
k: Int)
: SVDecomposedMatrix =
{
val data = matrix.data
val m = matrix.m
val n = matrix.n

if (m < n || m <= 0 || n <= 0) {
throw new IllegalArgumentException("Expecting a tall and skinny matrix")
}
Expand Down Expand Up @@ -153,13 +126,16 @@ object SVD {
val sc = data.sparkContext

// prepare V for returning
val retV = sc.makeRDD(
val retVdata = sc.makeRDD(
Array.tabulate(V.rows, sigma.length){ (i,j) =>
MatrixEntry(i + 1, j + 1, V.get(i,j)) }.flatten)

val retS = sc.makeRDD(Array.tabulate(sigma.length){
val retV = SparseMatrix(retVdata, V.rows, sigma.length)

val retSdata = sc.makeRDD(Array.tabulate(sigma.length){
x => MatrixEntry(x + 1, x + 1, sigma(x))})

val retS = SparseMatrix(retSdata, sigma.length, sigma.length)

// Compute U as U = A V S^-1
// turn V S^-1 into an RDD as a sparse matrix
val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
Expand All @@ -168,10 +144,11 @@ object SVD {
// Multiply A by VS^-1
val aCols = data.map(entry => (entry.j, (entry.i, entry.mval)))
val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
=> ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
.map{ case ((row, col), mval) => MatrixEntry(row, col, mval)}

val retU = SparseMatrix(retUdata, m, sigma.length)

SVDecomposedMatrix(retU, retS, retV)
}

Expand All @@ -195,10 +172,10 @@ object SVD {
MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
}

val decomposed = SVD.sparseSVD(data, m, n, k)
val u = decomposed.U
val s = decomposed.S
val v = decomposed.V
val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k)
val u = decomposed.U.data
val s = decomposed.S.data
val v = decomposed.V.data

println("Computed " + s.toArray.length + " singular values and vectors")
u.saveAsTextFile(output_u)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

package org.apache.spark.mllib.linalg

import org.apache.spark.rdd.RDD

/**
* Class that represents the SV decomposition of a matrix
*
* @param U such that A = USV^T
* @param S such that A = USV^T
* @param V such that A = USV^T
*/
case class SVDecomposedMatrix(val U: RDD[MatrixEntry],
val S: RDD[MatrixEntry],
val V: RDD[MatrixEntry])
case class SVDecomposedMatrix(val U: SparseMatrix,
val S: SparseMatrix,
val V: SparseMatrix)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.linalg

import org.apache.spark.rdd.RDD


/**
* Class that represents a sparse matrix
*
* @param data RDD of nonzero entries
* @param m number of rows
* @param n numner of columns
*/
case class SparseMatrix(val data: RDD[MatrixEntry], val m: Int, val n: Int)
50 changes: 29 additions & 21 deletions mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
val EPSILON = 1e-4

// Return jblas matrix from sparse matrix RDD
def getDenseMatrix(matrix:RDD[MatrixEntry], m:Int, n:Int) : DoubleMatrix = {
def getDenseMatrix(matrix:SparseMatrix) : DoubleMatrix = {
val data = matrix.data
val m = matrix.m
val n = matrix.n
val ret = DoubleMatrix.zeros(m, n)
matrix.toArray.map(x => ret.put(x.i - 1, x.j - 1, x.mval))
matrix.data.toArray.map(x => ret.put(x.i - 1, x.j - 1, x.mval))
ret
}

Expand All @@ -67,24 +70,26 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) =>
MatrixEntry(a + 1, b + 1, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten )

val decomposed = SVD.sparseSVD(data, m, n, n)
val a = SparseMatrix(data, m, n)

val decomposed = SVD.sparseSVD(a, n)
val u = decomposed.U
val v = decomposed.V
val s = decomposed.S
val s = decomposed.S

val densea = getDenseMatrix(data, m, n)
val densea = getDenseMatrix(a)
val svd = Singular.sparseSVD(densea)

val retu = getDenseMatrix(u, m, n)
val rets = getDenseMatrix(s, n, n)
val retv = getDenseMatrix(v, n, n)
val retu = getDenseMatrix(u)
val rets = getDenseMatrix(s)
val retv = getDenseMatrix(v)

// check individual decomposition
assertMatrixEquals(retu, svd(0))
assertMatrixEquals(rets, DoubleMatrix.diag(svd(1)))
assertMatrixEquals(retv, svd(2))

// check multiplication guarantee
// check multiplication guarantee
assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea)
}

Expand All @@ -95,20 +100,22 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
MatrixEntry(a + 1, b + 1, 1.0) }.flatten )
val k = 1

val decomposed = SVD.sparseSVD(data, m, n, k)
val a = SparseMatrix(data, m, n)

val decomposed = SVD.sparseSVD(a, k)
val u = decomposed.U
val s = decomposed.S
val v = decomposed.V
val retrank = s.toArray.length
val retrank = s.data.toArray.length

assert(retrank == 1, "rank returned not one")

val densea = getDenseMatrix(data, m, n)
val densea = getDenseMatrix(a)
val svd = Singular.sparseSVD(densea)

val retu = getDenseMatrix(u, m, retrank)
val rets = getDenseMatrix(s, retrank, retrank)
val retv = getDenseMatrix(v, n, retrank)
val retu = getDenseMatrix(u)
val rets = getDenseMatrix(s)
val retv = getDenseMatrix(v)

// check individual decomposition
assertMatrixEquals(retu, svd(0).getColumn(0))
Expand All @@ -124,21 +131,22 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
val n = 3
val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) =>
MatrixEntry(a + 1, b + 1, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten )
val a = SparseMatrix(data, m, n)

val k = 1 // only one svalue above this

val decomposed = SVD.sparseSVD(data, m, n, k)
val decomposed = SVD.sparseSVD(a, k)
val u = decomposed.U
val s = decomposed.S
val v = decomposed.V
val retrank = s.toArray.length
val retrank = s.data.toArray.length

val densea = getDenseMatrix(data, m, n)
val densea = getDenseMatrix(a)
val svd = Singular.sparseSVD(densea)

val retu = getDenseMatrix(u, m, retrank)
val rets = getDenseMatrix(s, retrank, retrank)
val retv = getDenseMatrix(v, n, retrank)
val retu = getDenseMatrix(u)
val rets = getDenseMatrix(s)
val retv = getDenseMatrix(v)

assert(retrank == 1, "rank returned not one")

Expand Down

0 comments on commit 06c0f76

Please sign in to comment.