Skip to content
Closed
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 173 additions & 32 deletions mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,28 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
/**
* :: DeveloperApi ::
* Implementation of the ALS algorithm.
*
* This implementation of the ALS factorization algorithm partitions the two sets of factors among
* Spark workers so as to reduce network communication by only sending one copy of each factor
* vector to each Spark worker on each iteration, and only if needed. This is achieved by
* precomputing some information about the ratings matrix to determine which users require which
* item factors and vice versa. See the Scaladoc for [[InBlock]] for a detailed explanation of
* how the precomputation is done.
*
* In addition, since each iteration of calculating the factor matrices depends on the known
* ratings, which are spread across Spark partitions, a naive implementation would incur
* significant network communication overhead between Spark workers, as the ratings RDD would be
* repeatedly shuffled during each iteration. This implementation reduces that overhead by
* performing the shuffling operation up front, precomputing each partition's ratings dependencies
* and duplicating those values to the appropriate workers before starting iterations to solve for
* the factor matrices. See the Scaladoc for [[OutBlock]] for a detailed explanation of how the
* precomputation is done.
*
* Note that the term "rating block" is a bit of a misnomer, as the ratings are not partitioned by
* contiguous blocks from the ratings matrix but by a hash function on the rating's location in
* the matrix. If it helps you to visualize the partitions, it is easier to think of the term
* "block" as referring to a subset of an RDD containing the ratings rather than a contiguous
* submatrix of the ratings matrix.
*/
@DeveloperApi
def train[ID: ClassTag]( // scalastyle:ignore
Expand All @@ -791,32 +813,43 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
checkpointInterval: Int = 10,
seed: Long = 0L)(
implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = {

require(!ratings.isEmpty(), s"No ratings available from $ratings")
require(intermediateRDDStorageLevel != StorageLevel.NONE,
"ALS is not designed to run without persisting intermediate RDDs.")

val sc = ratings.sparkContext

// Precompute the rating dependencies of each partition
val userPart = new ALSPartitioner(numUserBlocks)
val itemPart = new ALSPartitioner(numItemBlocks)
val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions)
val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions)
val solver = if (nonnegative) new NNLSSolver else new CholeskySolver
val blockRatings = partitionRatings(ratings, userPart, itemPart)
.persist(intermediateRDDStorageLevel)
val (userInBlocks, userOutBlocks) =
makeBlocks("user", blockRatings, userPart, itemPart, intermediateRDDStorageLevel)
// materialize blockRatings and user blocks
userOutBlocks.count()
userOutBlocks.count() // materialize blockRatings and user blocks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a nit, but I wouldn't make changes like this. It doesn't add anything

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the comment because the only other comment that has its own line, // Precompute the rating dependencies of each partition, is serving as the heading for this entire block of code, and having other whole-line comments in this block is a bit of a mismatch. If you still feel reversion is necessary though, just let me know.

val swappedBlockRatings = blockRatings.map {
case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, localRatings)) =>
((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings))
}
val (itemInBlocks, itemOutBlocks) =
makeBlocks("item", swappedBlockRatings, itemPart, userPart, intermediateRDDStorageLevel)
// materialize item blocks
itemOutBlocks.count()
itemOutBlocks.count() // materialize item blocks

// Encoders for storing each user/item's partition ID and index within its partition using a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably fine but I tend to avoid moving code around unless it really helps -- this minimizes things like back-port merge conflict problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the code because otherwise the comment on L823 (// Precompute the rating dependencies of each partition) would reference the LocalIndexEncoders and the solver. Agreed that otherwise it would be unnecessary to move.

Copy link
Contributor

@MLnick MLnick May 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add the comment before the encoder vals are defined (and not move this code around)? You could add a space in between the solver if you want to disambiguate the comment

// single integer; used as an optimization
val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions)
val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions)

// These are the user and item factor matrices that, once trained, are multiplied together to
// estimate the rating matrix. The two matrices are stored in RDDs, partitioned by column such
// that each factor column resides on the same Spark worker as its corresponding user or item.
val seedGen = new XORShiftRandom(seed)
var userFactors = initialize(userInBlocks, rank, seedGen.nextLong())
var itemFactors = initialize(itemInBlocks, rank, seedGen.nextLong())

val solver = if (nonnegative) new NNLSSolver else new CholeskySolver

var previousCheckpointFile: Option[String] = None
val shouldCheckpoint: Int => Boolean = (iter) =>
sc.checkpointDir.isDefined && checkpointInterval != -1 && (iter % checkpointInterval == 0)
Expand All @@ -830,6 +863,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
logWarning(s"Cannot delete checkpoint file $file:", e)
}
}

if (implicitPrefs) {
for (iter <- 1 to maxIter) {
userFactors.setName(s"userFactors-$iter").persist(intermediateRDDStorageLevel)
Expand Down Expand Up @@ -910,26 +944,127 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
private type FactorBlock = Array[Array[Float]]

/**
* Out-link block that stores, for each dst (item/user) block, which src (user/item) factors to
* send. For example, outLinkBlock(0) contains the local indices (not the original src IDs) of the
* src factors in this block to send to dst block 0.
* Out-link blocks that store information about which columns of the items factor matrix are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this any clearer? "For each user in each block, a mapping of which item blocks that user's factors must be sent to in order to compute the updated item factors, and vice versa."

Referring to user rows or item columns seems unnecessary since you can transpose the ratings matrix and get opposite mappings. There may be some standard convention though.

Also, how about adding

   /**
   * Say user block 0 corresponds users 1, 42, 29575. Then a corresponding outblock of:
   * 
   * {{{
   *   [[0, 15, 42],
   *    [12, 43],
   *    [314]]
   * }}}
   *  means that user 1 factors must be sent to item blocks 0, 15, and 42; user 42 factors must be
   *  sent to item blocks 12 and 43; user 29575 factors must be sent to item block 314.
   */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this. I'll add something to this effect in a bit.

* required to calculate which rows of the users factor matrix, and vice versa.
*
* Specifically, when calculating a user factor vector, since only those columns of the items
* factor matrix that correspond to the items that that user has rated are needed, we can avoid
* having to repeatedly copy the entire items factor matrix to each worker later in the algorithm
* by precomputing these dependencies for all users, storing them in an RDD of `OutBlock`s. The
* items' dependencies on the columns of the users factor matrix is computed similarly.
*/
private type OutBlock = Array[Array[Int]]

/**
* In-link block for computing src (user/item) factors. This includes the original src IDs
* of the elements within this block as well as encoded dst (item/user) indices and corresponding
* ratings. The dst indices are in the form of (blockId, localIndex), which are not the original
* dst IDs. To compute src factors, we expect receiving dst factors that match the dst indices.
* For example, if we have an in-link record
* In-link block for computing user and item factor matrices.
*
* {srcId: 0, dstBlockId: 2, dstLocalIndex: 3, rating: 5.0},
* The ALS algorithm partitions the columns of the users factor matrix evenly among Spark workers.
* Since each column of the factor matrix is calculated using the known ratings of the correspond-
* ing user, and since the ratings don't change across iterations, the ALS algorithm preshuffles
* the ratings to the appropriate partitions, storing them in `InBlock` objects.
*
* and assume that the dst factors are stored as dstFactors: Map[Int, Array[Array[Float]]], which
* is a blockId to dst factors map, the corresponding dst factor of the record is dstFactor(2)(3).
* The ratings shuffled by item ID are computed similarly and also stored in `InBlock` objects.
* Note that this means every rating is stored twice, once as shuffled by user ID and once by item
* ID. This is a necessary tradeoff, since in general a rating will not be on the same worker
* when partitioned by user as by item.
*
* We use a CSC-like (compressed sparse column) format to store the in-link information. So we can
* compute src factors one after another using only one normal equation instance.
* =Example=
*
* Say we have a small collection of eight items to offer the seven users in our application. We
* have some known ratings given by the users, as seen in the matrix below:
*
* {{{
* Items
* 0 1 2 3 4 5 6 7
* +---+---+---+---+---+---+---+---+
* 0 | |0.1| | |0.4| | |0.7|
* +---+---+---+---+---+---+---+---+
* 1 | | | | | | | | |
* +---+---+---+---+---+---+---+---+
* U 2 | | | | | | | | |
* s +---+---+---+---+---+---+---+---+
* e 3 | |3.1| | |3.4| | |3.7|
* r +---+---+---+---+---+---+---+---+
* s 4 | | | | | | | | |
* +---+---+---+---+---+---+---+---+
* 5 | | | | | | | | |
* +---+---+---+---+---+---+---+---+
* 6 | |6.1| | |6.4| | |6.7|
* +---+---+---+---+---+---+---+---+
* }}}
*
* The ratings are represented as an RDD, passed to the `partitionRatings` method as the `ratings`
* parameter:
*
* {{{
* ratings.collect() == Seq(
* Rating(0, 1, 0.1f),
* Rating(0, 4, 0.4f),
* Rating(0, 7, 0.7f),
* Rating(3, 1, 3.1f),
* Rating(3, 4, 3.4f),
* Rating(3, 7, 3.7f),
* Rating(6, 1, 6.1f),
* Rating(6, 4, 6.4f),
* Rating(6, 7, 6.7f)
* )
* }}}
*
* (In this contrived example, the rating values are chosen specifically for clarity and are in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part seems unnecessary. Definitely the last sentence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, the first sentence is probably overkill. I'll remove it.

The second one I would say should be included, since for someone new to the code, he/she might have some confusion as to why users' ratings aren't whole numbers (like star ratings). I'm always in favor of reducing any possible ambiguity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, on second thought, the first clause of the first sentence clarifies why, if ratings are usually whole numbers, we're using floats; the first sentence justifies the second sentence. I would err on keeping the whole thing in as is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why anyone would assume ratings have to be whole numbers. If anything it seems misleading to say that ratings "are usually whole numbers." "Ratings" need not be given by users - they could be computed in many ways, such as business rules for inferring numeric measures of preference based on user-item interactions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point. Thanks for pointing out to me what I missed. Removed—updated PR coming soon.

* the form ''x''.''y'', where ''x'' is the user ID and ''y'' is the item ID. Note that in a real
* use case, the ratings given by users would more likely be whole numbers.)
*
* Say that we are using two partitions to calculate each factor matrix:
*
* {{{
* val userPart = new ALSPartitioner(2)
* val itemPart = new ALSPartitioner(2)
* val blockRatings = partitionRatings(ratings, userPart, itemPart)
* }}}
*
* Ratings with even-valued user IDs are shuffled to partition 0 while those with odd-valued user
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand why the partitioner separates based on even/odd here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I'll update.

* IDs are shuffled to partition 1:
*
* {{{
* userInBlocks.collect() == Seq(
* 0 -> Seq(
* // Internally, the class stores the ratings in a more optimized format than
* // a sequence of `Rating`s, but for clarity we show it as such here.
* Rating(0, 1, 0.1f),
* Rating(0, 4, 0.4f),
* Rating(0, 7, 0.7f),
* Rating(6, 1, 6.1f),
* Rating(6, 4, 6.4f),
* Rating(6, 7, 6.7f)
* ),
* 1 -> Seq(
* Rating(3, 1, 3.1f),
* Rating(3, 4, 3.4f),
* Rating(3, 7, 3.7f)
* )
* )
* }}}
*
* Similarly, ratings with even-valued item IDs are shuffled to partition 0 while those with
* odd-valued item IDs are shuffled to partition 1:
*
* {{{
* itemInBlocks.collect() == Seq(
* 0 -> Seq(
* Rating(0, 4, 0.4f),
* Rating(3, 4, 3.4f),
* Rating(6, 4, 6.4f)
* ),
* 1 -> Seq(
* Rating(0, 1, 0.1f),
* Rating(0, 7, 0.7f),
* Rating(3, 1, 3.1f),
* Rating(3, 7, 3.7f),
* Rating(6, 1, 6.1f),
* Rating(6, 7, 6.7f)
* )
* )
* }}}
*
* @param srcIds src ids (ordered)
* @param dstPtrs dst pointers. Elements in range [dstPtrs(i), dstPtrs(i+1)) of dst indices and
Expand Down Expand Up @@ -1026,7 +1161,24 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
}

/**
* Partitions raw ratings into blocks.
* Groups an RDD of `Rating`s by the user partition and item partition to which each `Rating` maps
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[[Rating]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

* according to the given partitioners. The returned pair RDD holds the ratings, encoded in a
* memory-efficient format but otherwise unchanged, keyed by the (user partition ID, item
* partition ID) pair.
*
* Performance note: This is an expensive operation that performs an RDD shuffle.
*
* Implementation note: This implementation produces the same result as the following but
* generates fewer intermediate objects:
*
* {{{
* ratings.map { r =>
* ((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r)
* }.aggregateByKey(new RatingBlockBuilder)(
* seqOp = (b, r) => b.add(r),
* combOp = (b0, b1) => b0.merge(b1.build()))
* .mapValues(_.build())
* }}}
*
* @param ratings raw ratings
* @param srcPart partitioner for src IDs
Expand All @@ -1037,17 +1189,6 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
ratings: RDD[Rating[ID]],
srcPart: Partitioner,
dstPart: Partitioner): RDD[((Int, Int), RatingBlock[ID])] = {

/* The implementation produces the same result as the following but generates less objects.

ratings.map { r =>
((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r)
}.aggregateByKey(new RatingBlockBuilder)(
seqOp = (b, r) => b.add(r),
combOp = (b0, b1) => b0.merge(b1.build()))
.mapValues(_.build())
*/

val numPartitions = srcPart.numPartitions * dstPart.numPartitions
ratings.mapPartitions { iter =>
val builders = Array.fill(numPartitions)(new RatingBlockBuilder[ID])
Expand Down