Skip to content
Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.BoundedPriorityQueue

/**
* Model representing the result of matrix factorization.
Expand Down Expand Up @@ -277,17 +278,39 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
val srcBlocks = blockify(rank, srcFeatures)
val dstBlocks = blockify(rank, dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
val m = srcIds.length
val n = dstIds.length
val ratings = srcFactors.transpose.multiply(dstFactors)
val output = new Array[(Int, (Int, Double))](m * n)
var k = 0
ratings.foreachActive { (i, j, r) =>
output(k) = (srcIds(i), (dstIds(j), r))
k += 1
}
output.toSeq
case (users, items) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put case statement on previous line: flatMap { case (... =>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer case (srcIter, dstIter) rather than users / items (as they can be swapped depending on which recommendation method is being called).

val m = users.size
val n = math.min(items.size, num)
val output = new Array[(Int, (Int, Double))](m * n)
var j = 0
users.foreach (user => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

srcIter.foreach { case (srcId, srcFactor) =>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be performance benefit to using while loop here vs foreach?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will test while here, thanks.

def order(a: (Int, Double)) = a._2
val pq: BoundedPriorityQueue[(Int, Double)] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could remove the type sig from the val definition here to make it fit on one line

new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(order))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can just do Ordering.by(_._2) without needing to define def order(... above

items.foreach (item => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here: dstIter.foreach { case (dstId, dstFactor) =>

/**
* blas.ddot (F2jBLAS) is the same performance with the following code.
* the performace of blas.ddot with NativeBLAS is very bad.
* blas.ddot (F2jBLAS) is about 10% improvement comparing with linalg.dot.
* val rate = blas.ddot(rank, user._2, 1, item._2, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can perhaps say here instead "The below code is equivalent to val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1)"

*/
var rate: Double = 0
var k = 0
while(k < rank) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space here: while (

rate += user._2(k) * item._2(k)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can have rate += srcFactor(k) * dstFactor(k)

Also, can we call it score or prediction rather than rate?

k += 1
}
pq += ((item._1, rate))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can then use dstFactor instead

})
val pqIter = pq.iterator
var i = 0
while(i < n) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space here: while (

output(j + i) = (user._1, pqIter.next())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elsewhere when using e.g. queue.toArray.sorted the ordering is reversed. Will this be correct?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, the ordering is not important. This is just one group TopK, we will get the real TopK in the last.
I have tested queue.toArray here, no performance benefit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right - good point. Fine to leave as it is

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here srcFactor instead

i += 1
}
j += n
})
output.toSeq
}
ratings.topByKey(num)(Ordering.by(_._2))
}
Expand All @@ -297,23 +320,10 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should adjust the comment here as we're not using Level-3 BLAS any more.

private def blockify(
rank: Int,
features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = {
features: RDD[(Int, Array[Double])]): RDD[Seq[(Int, Array[Double])]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove rank argument here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's match what I've done in https://github.com/apache/spark/pull/17845/files#diff-be65dd1d6adc53138156641b610fcadaR440 - i.e. blockSize as an argument, with a TODO: SPARK-20443 - expose blockSize as a param?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, it seems to me that the performance can be less sensitive to blockSize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, less sensitive. See https://issues.apache.org/jira/browse/SPARK-20443. It may be that we make the block size tunable - or by experiments set a block size that seems generally optimal (2048 in those experiments seems best).

But we would need to perform experiments over a wide range of data sizes (and check both recommendForAllUsers and recommendForAllItems performance).

val blockSize = 4096 // TODO: tune the block size
val blockStorage = rank * blockSize
features.mapPartitions { iter =>
iter.grouped(blockSize).map { grouped =>
val ids = mutable.ArrayBuilder.make[Int]
ids.sizeHint(blockSize)
val factors = mutable.ArrayBuilder.make[Double]
factors.sizeHint(blockStorage)
var i = 0
grouped.foreach { case (id, factor) =>
ids += id
factors ++= factor
i += 1
}
(ids.result(), new DenseMatrix(rank, i, factors.result()))
}
iter.grouped(blockSize)
}
}

Expand Down