Skip to content

Commit

Permalink
[SPARK-1237, 1238] Improve the computation of YtY for implicit ALS
Browse files Browse the repository at this point in the history
Computing YtY can be implemented using BLAS's DSPR operations instead of generating y_i y_i^T and then combining them. The latter generates many k-by-k matrices. On the movielens data, this change improves the performance by 10-20%. The algorithm remains the same, verified by computing RMSE on the movielens data.

To compare the results, I also added an option to set a random seed in ALS.

JIRA:
1. https://spark-project.atlassian.net/browse/SPARK-1237
2. https://spark-project.atlassian.net/browse/SPARK-1238

Author: Xiangrui Meng <[email protected]>

Closes apache#131 from mengxr/als and squashes the following commits:

ed00432 [Xiangrui Meng] minor changes
d984623 [Xiangrui Meng] minor changes
2fc1641 [Xiangrui Meng] remove commented code
4c7cde2 [Xiangrui Meng] allow specifying a random seed in ALS
200bef0 [Xiangrui Meng] optimize computeYtY and updateBlock
  • Loading branch information
mengxr authored and rxin committed Mar 13, 2014
1 parent 4ea23db commit e4e8d8f
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 55 deletions.
174 changes: 120 additions & 54 deletions mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,15 @@ case class Rating(val user: Int, val product: Int, val rating: Double)
* indicated user
* preferences rather than explicit ratings given to items.
*/
class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var lambda: Double,
var implicitPrefs: Boolean, var alpha: Double)
extends Serializable with Logging
{
class ALS private (
var numBlocks: Int,
var rank: Int,
var iterations: Int,
var lambda: Double,
var implicitPrefs: Boolean,
var alpha: Double,
var seed: Long = System.nanoTime()
) extends Serializable with Logging {
def this() = this(-1, 10, 10, 0.01, false, 1.0)

/**
Expand Down Expand Up @@ -132,6 +137,12 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
this
}

/** Sets a random seed to have deterministic results. */
def setSeed(seed: Long): ALS = {
this.seed = seed
this
}

/**
* Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
Expand All @@ -155,7 +166,7 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l

// Initialize user and product factors randomly, but use a deterministic seed for each
// partition so that fault recovery works
val seedGen = new Random()
val seedGen = new Random(seed)
val seed1 = seedGen.nextInt()
val seed2 = seedGen.nextInt()
// Hash an integer to propagate random bits at all positions, similar to java.util.HashTable
Expand Down Expand Up @@ -210,21 +221,46 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
*/
def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = {
if (implicitPrefs) {
Option(
factors.flatMapValues { case factorArray =>
factorArray.view.map { vector =>
val x = new DoubleMatrix(vector)
x.mmul(x.transpose())
}
}.reduceByKeyLocally((a, b) => a.addi(b))
.values
.reduce((a, b) => a.addi(b))
)
val n = rank * (rank + 1) / 2
val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => {
Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L))
L
}, combOp = (L1, L2) => {
L1.addi(L2)
})
val YtY = new DoubleMatrix(rank, rank)
fillFullMatrix(LYtY, YtY)
Option(YtY)
} else {
None
}
}

/**
* Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's DSPR.
*
* @param L the lower triangular part of the matrix packed in an array (row major)
*/
private def dspr(alpha: Double, x: DoubleMatrix, L: DoubleMatrix) = {
val n = x.length
var i = 0
var j = 0
var idx = 0
var axi = 0.0
val xd = x.data
val Ld = L.data
while (i < n) {
axi = alpha * xd(i)
j = 0
while (j <= i) {
Ld(idx) += axi * xd(j)
j += 1
idx += 1
}
i += 1
}
}

/**
* Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs
*/
Expand Down Expand Up @@ -376,7 +412,8 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
for (productBlock <- 0 until numBlocks) {
for (p <- 0 until blockFactors(productBlock).length) {
val x = new DoubleMatrix(blockFactors(productBlock)(p))
fillXtX(x, tempXtX)
tempXtX.fill(0.0)
dspr(1.0, x, tempXtX)
val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p)
for (i <- 0 until us.length) {
implicitPrefs match {
Expand All @@ -387,7 +424,7 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
// Extension to the original paper to handle rs(i) < 0. confidence is a function
// of |rs(i)| instead so that it is never negative:
val confidence = 1 + alpha * abs(rs(i))
userXtX(us(i)).addi(tempXtX.mul(confidence - 1))
SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i)))
// For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i)
// means we try to reconstruct 0. We add terms only where P = 1, so, term below
// is now only added for rs(i) > 0:
Expand All @@ -400,38 +437,19 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
}

// Solve the least-squares problem for each user and return the new feature vectors
userXtX.zipWithIndex.map{ case (triangularXtX, index) =>
Array.range(0, numUsers).map { index =>
// Compute the full XtX matrix from the lower-triangular part we got above
fillFullMatrix(triangularXtX, fullXtX)
fillFullMatrix(userXtX(index), fullXtX)
// Add regularization
(0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda)
// Solve the resulting matrix, which is symmetric and positive-definite
implicitPrefs match {
case false => Solve.solvePositive(fullXtX, userXy(index)).data
case true => Solve.solvePositive(fullXtX.add(YtY.value.get), userXy(index)).data
case true => Solve.solvePositive(fullXtX.addi(YtY.value.get), userXy(index)).data
}
}
}

/**
* Set xtxDest to the lower-triangular part of x transpose * x. For efficiency in summing
* these matrices, we store xtxDest as only rank * (rank+1) / 2 values, namely the values
* at (0,0), (1,0), (1,1), (2,0), (2,1), (2,2), etc in that order.
*/
private def fillXtX(x: DoubleMatrix, xtxDest: DoubleMatrix) {
var i = 0
var pos = 0
while (i < x.length) {
var j = 0
while (j <= i) {
xtxDest.data(pos) = x.data(i) * x.data(j)
pos += 1
j += 1
}
i += 1
}
}

/**
* Given a triangular matrix in the order of fillXtX above, compute the full symmetric square
* matrix that it represents, storing it into destMatrix.
Expand All @@ -455,9 +473,10 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l


/**
* Top-level methods for calling Alternating Least Squares (ALS) matrix factorizaton.
* Top-level methods for calling Alternating Least Squares (ALS) matrix factorization.
*/
object ALS {

/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
Expand All @@ -470,15 +489,39 @@ object ALS {
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
* @param seed random seed
*/
def train(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int)
: MatrixFactorizationModel =
{
blocks: Int,
seed: Long
): MatrixFactorizationModel = {
new ALS(blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings)
}

/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
* product of two lower-rank matrices of a given rank (number of features). To solve for these
* features, we run a given number of iterations of ALS. This is done using a level of
* parallelism given by `blocks`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
*/
def train(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int
): MatrixFactorizationModel = {
new ALS(blocks, rank, iterations, lambda, false, 1.0).run(ratings)
}

Expand All @@ -495,8 +538,7 @@ object ALS {
* @param lambda regularization factor (recommended: 0.01)
*/
def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double)
: MatrixFactorizationModel =
{
: MatrixFactorizationModel = {
train(ratings, rank, iterations, lambda, -1)
}

Expand All @@ -512,8 +554,7 @@ object ALS {
* @param iterations number of iterations of ALS (recommended: 10-20)
*/
def train(ratings: RDD[Rating], rank: Int, iterations: Int)
: MatrixFactorizationModel =
{
: MatrixFactorizationModel = {
train(ratings, rank, iterations, 0.01, -1)
}

Expand All @@ -530,16 +571,42 @@ object ALS {
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
* @param alpha confidence parameter (only applies when immplicitPrefs = true)
* @param seed random seed
*/
def trainImplicit(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int,
alpha: Double)
: MatrixFactorizationModel =
{
alpha: Double,
seed: Long
): MatrixFactorizationModel = {
new ALS(blocks, rank, iterations, lambda, true, alpha, seed).run(ratings)
}

/**
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users
* to some products, in the form of (userID, productID, preference) pairs. We approximate the
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
* To solve for these features, we run a given number of iterations of ALS. This is done using
* a level of parallelism given by `blocks`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
* @param alpha confidence parameter (only applies when immplicitPrefs = true)
*/
def trainImplicit(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int,
alpha: Double
): MatrixFactorizationModel = {
new ALS(blocks, rank, iterations, lambda, true, alpha).run(ratings)
}

Expand All @@ -555,8 +622,8 @@ object ALS {
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
*/
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double,
alpha: Double): MatrixFactorizationModel = {
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double)
: MatrixFactorizationModel = {
trainImplicit(ratings, rank, iterations, lambda, -1, alpha)
}

Expand All @@ -573,8 +640,7 @@ object ALS {
* @param iterations number of iterations of ALS (recommended: 10-20)
*/
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int)
: MatrixFactorizationModel =
{
: MatrixFactorizationModel = {
trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import scala.util.Random

import org.scalatest.FunSuite

import org.jblas._
import org.jblas.DoubleMatrix

import org.apache.spark.mllib.util.LocalSparkContext
import org.apache.spark.SparkContext._

object ALSSuite {

Expand Down Expand Up @@ -115,6 +116,18 @@ class ALSSuite extends FunSuite with LocalSparkContext {
testALS(100, 200, 2, 15, 0.7, 0.4, true, false, true)
}

test("pseudorandomness") {
val ratings = sc.parallelize(ALSSuite.generateRatings(10, 20, 5, 0.5, false, false)._1, 2)
val model11 = ALS.train(ratings, 5, 1, 1.0, 2, 1)
val model12 = ALS.train(ratings, 5, 1, 1.0, 2, 1)
val u11 = model11.userFeatures.values.flatMap(_.toList).collect().toList
val u12 = model12.userFeatures.values.flatMap(_.toList).collect().toList
val model2 = ALS.train(ratings, 5, 1, 1.0, 2, 2)
val u2 = model2.userFeatures.values.flatMap(_.toList).collect().toList
assert(u11 == u12)
assert(u11 != u2)
}

/**
* Test if we can correctly factorize R = U * P where U and P are of known rank.
*
Expand Down

0 comments on commit e4e8d8f

Please sign in to comment.