Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 70 additions & 39 deletions mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,10 @@ class ALS private (
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
*/
def run(ratings: RDD[Rating]): MatrixFactorizationModel = {
val sc = ratings.context

val numBlocks = if (this.numBlocks == -1) {
math.max(ratings.context.defaultParallelism, ratings.partitions.size / 2)
math.max(sc.defaultParallelism, ratings.partitions.size / 2)
} else {
this.numBlocks
}
Expand Down Expand Up @@ -187,53 +189,79 @@ class ALS private (
}
}

for (iter <- 1 to iterations) {
// perform ALS update
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
// YtY / XtX is an Option[DoubleMatrix] and is only required for the implicit feedback model
val YtY = computeYtY(users)
val YtYb = ratings.context.broadcast(YtY)
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtYb)
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
val XtX = computeYtY(products)
val XtXb = ratings.context.broadcast(XtX)
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
alpha, XtXb)
if (implicitPrefs) {
for (iter <- 1 to iterations) {
// perform ALS update
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
// Persist users because it will be called twice.
users.persist()
val YtY = Some(sc.broadcast(computeYtY(users)))
val previousProducts = products
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtY)
previousProducts.unpersist()
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
products.persist()
val XtX = Some(sc.broadcast(computeYtY(products)))
val previousUsers = users
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
alpha, XtX)
previousUsers.unpersist()
}
} else {
for (iter <- 1 to iterations) {
// perform ALS update
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtY = None)
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
alpha, YtY = None)
}
}

products.persist()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the final factors get materialized, both users and products are called:

val usersOut = unblockFactors(users, userOutLinks)
val productsOut = unblockFactors(products, productOutLinks)

However, the last users depends on the last products. So the last products will be used twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.


// Flatten and cache the two final RDDs to un-block them
val usersOut = unblockFactors(users, userOutLinks)
val productsOut = unblockFactors(products, productOutLinks)

usersOut.persist()
productsOut.persist()

// Materialize usersOut and productsOut.
usersOut.count()
productsOut.count()

products.unpersist()

// Clean up.
userInLinks.unpersist()
userOutLinks.unpersist()
productInLinks.unpersist()
productOutLinks.unpersist()

new MatrixFactorizationModel(rank, usersOut, productsOut)
}

/**
* Computes the (`rank x rank`) matrix `YtY`, where `Y` is the (`nui x rank`) matrix of factors
* for each user (or product), in a distributed fashion. Here `reduceByKeyLocally` is used as
* the driver program requires `YtY` to broadcast it to the slaves
* for each user (or product), in a distributed fashion.
*
* @param factors the (block-distributed) user or product factor vectors
* @return Option[YtY] - whose value is only used in the implicit preference model
* @return YtY - whose value is only used in the implicit preference model
*/
def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = {
if (implicitPrefs) {
val n = rank * (rank + 1) / 2
val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => {
Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L))
L
}, combOp = (L1, L2) => {
L1.addi(L2)
})
val YtY = new DoubleMatrix(rank, rank)
fillFullMatrix(LYtY, YtY)
Option(YtY)
} else {
None
}
private def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = {
val n = rank * (rank + 1) / 2
val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => {
Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L))
L
}, combOp = (L1, L2) => {
L1.addi(L2)
})
val YtY = new DoubleMatrix(rank, rank)
fillFullMatrix(LYtY, YtY)
YtY
}

/**
Expand Down Expand Up @@ -264,7 +292,7 @@ class ALS private (
/**
* Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs
*/
def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])],
private def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])],
outLinks: RDD[(Int, OutLinkBlock)]) = {
blockedFactors.join(outLinks).flatMap{ case (b, (factors, outLinkBlock)) =>
for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
Expand Down Expand Up @@ -332,8 +360,11 @@ class ALS private (
val outLinkBlock = makeOutLinkBlock(numBlocks, ratings)
Iterator.single((blockId, (inLinkBlock, outLinkBlock)))
}, true)
links.persist(StorageLevel.MEMORY_AND_DISK)
(links.mapValues(_._1), links.mapValues(_._2))
val inLinks = links.mapValues(_._1)
val outLinks = links.mapValues(_._2)
inLinks.persist(StorageLevel.MEMORY_AND_DISK)
outLinks.persist(StorageLevel.MEMORY_AND_DISK)
(inLinks, outLinks)
}

/**
Expand Down Expand Up @@ -365,7 +396,7 @@ class ALS private (
rank: Int,
lambda: Double,
alpha: Double,
YtY: Broadcast[Option[DoubleMatrix]])
YtY: Option[Broadcast[DoubleMatrix]])
: RDD[(Int, Array[Array[Double]])] =
{
val numBlocks = products.partitions.size
Expand All @@ -388,8 +419,8 @@ class ALS private (
* Compute the new feature vectors for a block of the users matrix given the list of factors
* it received from each product and its InLinkBlock.
*/
def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
rank: Int, lambda: Double, alpha: Double, YtY: Broadcast[Option[DoubleMatrix]])
private def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]])
: Array[Array[Double]] =
{
// Sort the incoming block factor messages by block ID and make them an array
Expand Down Expand Up @@ -445,7 +476,7 @@ class ALS private (
// Solve the resulting matrix, which is symmetric and positive-definite
implicitPrefs match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor style thing but I originally had used a match statement all around, and I did change most of them to if (implicitPrefs) ... else .... I seem to have missed this one, perhaps you can make that change while you're at it? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

case false => Solve.solvePositive(fullXtX, userXy(index)).data
case true => Solve.solvePositive(fullXtX.addi(YtY.value.get), userXy(index)).data
case true => Solve.solvePositive(fullXtX.addi(YtY.get.value), userXy(index)).data
}
}
}
Expand Down